Files
ietf-draft-analyzer/workspace/act/act/ledger.py
Christian Nennemann 2506b6325a
Some checks failed
CI / test (3.11) (push) Failing after 1m37s
CI / test (3.12) (push) Failing after 57s
feat: add draft data, gap analysis report, and workspace config
2026-04-06 18:47:15 +02:00

153 lines
4.9 KiB
Python

"""ACT in-memory append-only audit ledger.
Provides an in-memory reference implementation of the audit ledger
interface. Enforces append-only semantics and hash-chain integrity.
Reference: ACT §10 (Audit Ledger Interface).
"""
from __future__ import annotations
import hashlib
import json
from typing import Any
from .errors import ACTLedgerImmutabilityError
from .token import ACTRecord
class ACTLedger:
"""In-memory append-only audit ledger for ACT execution records.
Records are stored in insertion order with monotonically increasing
sequence numbers. A hash chain provides integrity verification.
Reference: ACT §10.
This is a reference implementation suitable for testing. Production
deployments should use a persistent backend implementing the same
interface.
"""
def __init__(self) -> None:
self._records: list[tuple[int, ACTRecord, str]] = []
# jti → index mapping for efficient lookup
self._jti_index: dict[str, int] = {}
# wid → list of indices for workflow queries
self._wid_index: dict[str | None, list[int]] = {}
self._seq_counter: int = 0
# Hash chain: each entry's hash includes the previous hash
self._chain_hashes: list[bytes] = []
def append(self, act_record: ACTRecord) -> int:
"""Append an execution record to the ledger.
Returns the sequence number assigned to the record.
Reference: ACT §10, requirement 1 (append-only), requirement 2 (ordering).
Raises:
ACTLedgerImmutabilityError: If a record with the same jti
already exists.
"""
if act_record.jti in self._jti_index:
raise ACTLedgerImmutabilityError(
f"Record with jti {act_record.jti!r} already exists in ledger"
)
seq = self._seq_counter
self._seq_counter += 1
# Compute hash chain entry
record_hash = self._hash_record(act_record, seq)
if self._chain_hashes:
chained = hashlib.sha256(
self._chain_hashes[-1] + record_hash
).digest()
else:
chained = record_hash
self._chain_hashes.append(chained)
idx = len(self._records)
self._records.append((seq, act_record, act_record.jti))
self._jti_index[act_record.jti] = idx
wid = act_record.wid
if wid not in self._wid_index:
self._wid_index[wid] = []
self._wid_index[wid].append(idx)
return seq
def get(self, jti: str) -> ACTRecord | None:
"""Retrieve a record by jti.
Reference: ACT §10, requirement 3 (lookup).
"""
idx = self._jti_index.get(jti)
if idx is None:
return None
return self._records[idx][1]
def list(self, wid: str | None = None) -> list[ACTRecord]:
"""List records, optionally filtered by workflow id.
If wid is None, returns all records. If wid is a string,
returns only records with that wid value.
Reference: ACT §10.
"""
if wid is None:
return [r[1] for r in self._records]
indices = self._wid_index.get(wid, [])
return [self._records[i][1] for i in indices]
def verify_integrity(self) -> bool:
"""Verify the hash chain integrity of the ledger.
Recomputes the hash chain from scratch and compares against
stored chain hashes. Returns True if all hashes match.
Reference: ACT §10, requirement 4 (integrity).
"""
if not self._records:
return True
prev_hash: bytes | None = None
for i, (seq, record, _jti) in enumerate(self._records):
record_hash = self._hash_record(record, seq)
if prev_hash is not None:
expected = hashlib.sha256(prev_hash + record_hash).digest()
else:
expected = record_hash
if i >= len(self._chain_hashes):
return False
if self._chain_hashes[i] != expected:
return False
prev_hash = expected
return True
def __len__(self) -> int:
return len(self._records)
def _hash_record(self, record: ACTRecord, seq: int) -> bytes:
"""Compute a deterministic hash of a record for chain integrity."""
claims = record.to_claims()
# Include sequence number in hash for ordering integrity
claims["_seq"] = seq
canonical = json.dumps(claims, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).digest()
def _immutable_guard(self) -> None:
"""Internal method — not callable externally.
The ledger has no update/delete methods by design.
This exists to make the intent explicit.
"""
raise ACTLedgerImmutabilityError(
"Ledger records cannot be modified or deleted"
)