Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual environments
venv/
ENV/
env/
.venv

# IDEs
.vscode/
.idea/
*.swp
*.swo
*~

# OS
.DS_Store
Thumbs.db
3 changes: 2 additions & 1 deletion src/a2a_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""A2A ingestion and interaction layer for moltbook.com."""

from .constellation import ConstellationStore
from .constellation import AnchorStatus, ConstellationStore
from .gatekeeper import IngestionGatekeeper
from .schemas import AgentManifest, CelestialBody
from .verification import InMemoryKnowledgeGraph

__all__ = [
"AgentManifest",
"AnchorStatus",
"CelestialBody",
"ConstellationStore",
"IngestionGatekeeper",
Expand Down
115 changes: 101 additions & 14 deletions src/a2a_ingest/constellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,59 @@
from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
import hashlib
import json
from typing import Any, Dict, Optional

from .schemas import AgentCapabilities, CelestialBody, MemoryState


CORRUPTION_MARKER = "[CORRUPTED/MUTATED]"
class AnchorStatus(str, Enum):
"""Status of an anchored body in the Constellation."""
ANCHOR_OK = "ANCHOR_OK"
CORRUPTED = "[CORRUPTED/MUTATED]"


@dataclass(frozen=True)
class ConstellationRecord:
"""Record retrieved from the Constellation with integrity verification status."""
integrity_hash: str
encrypted_blob: Dict[str, Any]
record_data: Dict[str, Any]
signature: str
public_key: Optional[str]
status: str
status: AnchorStatus
recalculated_hash: Optional[str] = None


class ConstellationStore:
"""In-memory DHT-like store for anchoring Celestial Body manifests."""
"""In-memory DHT-like store for anchoring Celestial Body manifests.

Thread Safety:
This store is NOT thread-safe. Access from multiple threads requires
external synchronization (e.g., threading.Lock).

Hash Collisions:
If two different bodies produce the same integrity hash, the second
anchor_body call will overwrite the first. While SHA-256 collisions
are extremely unlikely, callers should be aware of this behavior.

Signature Verification:
This store accepts and stores signatures but does NOT perform
cryptographic validation. Signature verification is the caller's
responsibility before calling anchor_body.

Integrity Hash:
The integrity hash covers mission-critical fields only:
- atmosphere: mission, constraints, interfaces
- capabilities: interfaces, skills, compute_profile
- memory_state: continuity_hash, summaries, attachments

The following fields are intentionally excluded from integrity checks
as they are metadata that may change without affecting the agent's
core functionality:
- body_id, display_name, mass, gravity, trust
"""

def __init__(self) -> None:
self._dht: Dict[str, Dict[str, Any]] = {}
Expand All @@ -35,37 +66,63 @@ def anchor_body(
signature: str,
public_key: Optional[str] = None,
) -> str:
"""Anchor a CelestialBody to the Constellation.

Args:
body: The CelestialBody to anchor
signature: Cryptographic signature (not validated by this method)
public_key: Optional public key associated with the signature

Returns:
The integrity hash that can be used to retrieve the body
"""
integrity_hash = self.compute_integrity_hash(body)
encrypted_blob = {
record_data = {
"body": body.as_dict(),
"signature": signature,
"public_key": public_key,
}
self._dht[integrity_hash] = encrypted_blob
self._dht[integrity_hash] = record_data
return integrity_hash

def retrieve_body(self, integrity_hash: str) -> Optional[ConstellationRecord]:
encrypted_blob = self._dht.get(integrity_hash)
if not encrypted_blob:
"""Retrieve an anchored body and verify its integrity.

Args:
integrity_hash: The hash returned by anchor_body

Returns:
ConstellationRecord with verification status, or None if not found
"""
record_data = self._dht.get(integrity_hash)
if not record_data:
return None
body = self._body_from_payload(encrypted_blob["body"])
body = self._body_from_payload(record_data["body"])
recalculated_hash = self.compute_integrity_hash(body)
status = "ANCHOR_OK"
status = AnchorStatus.ANCHOR_OK
if recalculated_hash != integrity_hash:
status = CORRUPTION_MARKER
status = AnchorStatus.CORRUPTED
return ConstellationRecord(
integrity_hash=integrity_hash,
encrypted_blob=encrypted_blob,
signature=encrypted_blob["signature"],
public_key=encrypted_blob.get("public_key"),
record_data=record_data,
signature=record_data["signature"],
public_key=record_data.get("public_key"),
status=status,
recalculated_hash=recalculated_hash,
)

def compute_integrity_hash(self, body: CelestialBody) -> str:
"""Compute SHA-256 integrity hash over mission-critical fields.

The hash includes:
- atmosphere: mission, constraints, interfaces
- capabilities: interfaces, skills, compute_profile
- memory_state: continuity_hash, summaries, attachments
"""
intent_payload = {
"mission": body.atmosphere.get("mission"),
"constraints": list(body.atmosphere.get("constraints", [])),
"interfaces": list(body.atmosphere.get("interfaces", [])),
}
capabilities_payload = {
"interfaces": list(body.capabilities.interfaces),
Expand All @@ -89,8 +146,38 @@ def compute_integrity_hash(self, body: CelestialBody) -> str:
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()

def _body_from_payload(self, payload: Dict[str, Any]) -> CelestialBody:
"""Reconstruct a CelestialBody from a stored payload.

Args:
payload: Dictionary representation of a CelestialBody

Returns:
Reconstructed CelestialBody instance

Raises:
KeyError: If required fields are missing from the payload
TypeError: If field types are incorrect
"""
# Validate required top-level fields
required_fields = [
"body_id", "display_name", "mass", "atmosphere",
"gravity", "memory_state", "capabilities"
]
missing = [f for f in required_fields if f not in payload]
if missing:
raise KeyError(f"Missing required fields in payload: {missing}")

memory = payload["memory_state"]
capabilities = payload["capabilities"]

# Validate nested required fields
if "continuity_hash" not in memory:
raise KeyError("Missing 'continuity_hash' in memory_state")
Comment thread
igor-holt marked this conversation as resolved.
if "interfaces" not in capabilities:
raise KeyError("Missing 'interfaces' in capabilities")
if "skills" not in capabilities:
raise KeyError("Missing 'skills' in capabilities")
Comment on lines +176 to +179
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return corrupted status for malformed capabilities

When an anchored record is mutated so that capabilities.interfaces or skills is missing (for example through the mutable record_data returned by retrieve_body), these new checks raise KeyError before retrieve_body can recalculate the hash. This regresses the store's corruption-detection behavior: the previous reconstruction defaulted missing capability lists and returned a CORRUPTED record, while now a corrupted anchor can crash callers that expect a ConstellationRecord with a status.

Useful? React with 👍 / 👎.


return CelestialBody(
body_id=payload["body_id"],
display_name=payload["display_name"],
Expand Down
40 changes: 31 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,38 @@ def anchor_manifest(
signature: str,
public_key: str,
constellation: ConstellationStore,
knowledge_graph: InMemoryKnowledgeGraph,
Comment thread
igor-holt marked this conversation as resolved.
) -> str:
"""Transform a manifest into a Celestial Body and anchor it to the Constellation."""
gatekeeper = IngestionGatekeeper(knowledge_graph=InMemoryKnowledgeGraph())
manifest = gatekeeper.load_manifest(payload)
celestial_body = gatekeeper.transform_to_celestial_body(manifest)
return constellation.anchor_body(
body=celestial_body,
signature=signature,
public_key=public_key,
)
"""Transform a manifest into a Celestial Body and anchor it to the Constellation.

Args:
payload: Raw agent manifest payload
signature: Cryptographic signature (not validated by this function)
public_key: Public key associated with the signature
constellation: ConstellationStore instance for anchoring
knowledge_graph: InMemoryKnowledgeGraph instance for verification

Returns:
The integrity hash of the anchored body

Raises:
KeyError: If required fields are missing from the payload
ValueError: If the manifest is malformed or invalid
TypeError: If field types are incorrect
"""
try:
gatekeeper = IngestionGatekeeper(knowledge_graph=knowledge_graph)
manifest = gatekeeper.load_manifest(payload)
celestial_body = gatekeeper.transform_to_celestial_body(manifest)
return constellation.anchor_body(
body=celestial_body,
signature=signature,
public_key=public_key,
)
except KeyError as e:
raise KeyError(f"Missing required field in manifest: {e}") from e
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid manifest format: {e}") from e
Comment thread
igor-holt marked this conversation as resolved.


__all__ = ["anchor_manifest"]