feat: add CelestialBody ingestion layer to A2A handoff server#73
feat: add CelestialBody ingestion layer to A2A handoff server#73Igor Holt (igor-holt) wants to merge 1 commit into
Conversation
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ❌ Deployment failed View logs |
yennefer | c22db0a | Apr 10 2026, 12:36 AM |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c22db0ab2a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| seismic_test={}, | ||
| ) | ||
| payload = body.to_dict() | ||
| payload["seismic_test"] = _run_seismic_test(payload) |
There was a problem hiding this comment.
Compute seismic citation from the returned payload
The citation is generated before seismic_test is populated (payload["seismic_test"] is still {} at hash time), and then the payload is mutated with the final seismic block. As a result, consumers cannot validate the returned object by hashing the response body: the advertised deterministic citation will always disagree with the final payload content, breaking downstream verification workflows.
Useful? React with 👍 / 👎.
| mass = _compute_mass(file_count=file_count, loc=loc) | ||
| gravity = _compute_gravity(mass, atmosphere, language_counts) | ||
| orbital_state = { | ||
| "phase": metadata.get("phase", "stable"), |
There was a problem hiding this comment.
Validate metadata type before field access
ingest_repository assumes metadata is a mapping and calls .get(...), but the new /api/a2a/ingest endpoint accepts arbitrary JSON values for metadata. If a client sends a non-object (for example a string or array), this raises AttributeError and escapes as a 500 because the route only catches IngestionError; invalid client input should be rejected as 400 instead of crashing the request.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces a new API endpoint for repository ingestion and adds utility functions to process repository metadata into CelestialBody objects. My review highlights a critical path traversal vulnerability in the API endpoint, as user-provided paths are not validated. Additionally, I have suggested optimizing the line counting logic to avoid loading entire files into memory, simplifying the redundant hash calculation in the seismic test, and refactoring the filesystem traversal to reduce I/O overhead.
| def ingest_celestial_body(): | ||
| """Convert a raw repository path into a CelestialBody payload.""" | ||
| data = request.json or {} | ||
| repo_path = data.get("repo_path") |
There was a problem hiding this comment.
The repo_path is taken directly from user input and used to access the filesystem without validation. This presents a significant security risk as it allows for Path Traversal attacks. An attacker could provide paths like /etc/passwd or other sensitive directories, which the server would then attempt to process and potentially expose through the ingestion metadata. You should validate that the provided path is within a designated, safe base directory.
| continue | ||
| try: | ||
| if file.suffix.lower() in CODE_EXTENSIONS: | ||
| total += len(file.read_text(encoding="utf-8", errors="ignore").splitlines()) |
There was a problem hiding this comment.
Using file.read_text().splitlines() loads the entire content of every source file into memory at once. For repositories containing very large files, this can lead to excessive memory consumption or Out-Of-Memory (OOM) errors. It is more efficient to iterate over the file object to count lines.
| total += len(file.read_text(encoding="utf-8", errors="ignore").splitlines()) | |
| with file.open("r", encoding="utf-8", errors="ignore") as f: | |
| total += sum(1 for _ in f) |
| digest_a = hashlib.sha256(canonical.encode()).hexdigest() | ||
| digest_b = hashlib.sha256(canonical.encode()).hexdigest() | ||
| invariant = digest_a == digest_b |
There was a problem hiding this comment.
Calculating the same SHA-256 hash twice (digest_a and digest_b) from the same input and then comparing them is redundant. This does not provide a meaningful "invariance" check or stress test. If the intent is to provide a deterministic citation for the payload, a single calculation is sufficient.
| digest_a = hashlib.sha256(canonical.encode()).hexdigest() | |
| digest_b = hashlib.sha256(canonical.encode()).hexdigest() | |
| invariant = digest_a == digest_b | |
| digest_a = hashlib.sha256(canonical.encode()).hexdigest() | |
| invariant = True |
| language_counts = _infer_language_counts(root) | ||
| file_count = sum(language_counts.values()) | ||
| loc = _line_count(root) |
There was a problem hiding this comment.
The repository is traversed twice: once in _infer_language_counts and again in _line_count. This doubles the I/O overhead, which can be significant for large repositories. Consider refactoring these into a single pass over the filesystem that collects both language statistics and line counts simultaneously.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a repository “ingestion” pipeline that infers heuristic metadata and exposes it via a new A2A API endpoint for downstream handoff workflows.
Changes:
- Introduces
CelestialBody+ingest_repository()to derive language/LOC signals, heuristics, and a deterministic “seismic test” hash. - Adds
POST /api/a2a/ingestto call ingestion and return acelestial_bodypayload (or 400 on invalid input). - Adds pytest coverage for successful ingestion and missing-path behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| genesis-q-mem/celestial_ingestion.py | Implements CelestialBody model + repository ingestion heuristics and seismic test output. |
| genesis-q-mem/a2a_handoff_server.py | Adds a new API endpoint that validates input and returns ingestion results. |
| genesis-q-mem/tests/test_celestial_ingestion.py | Adds unit tests for ingestion success and error handling. |
| class CelestialBody: | ||
| body_id: str | ||
| name: str | ||
| source_path: str |
There was a problem hiding this comment.
The ingestion payload includes source_path as an absolute resolved server filesystem path. Since this payload is returned by the API, it can unintentionally disclose server directory structure. Consider omitting source_path from the returned payload, or returning only a sanitized identifier (e.g., repo basename) or a path relative to a configured allowed base directory.
| body = CelestialBody( | ||
| body_id=body_id, | ||
| name=body_name, | ||
| source_path=str(root), |
There was a problem hiding this comment.
The ingestion payload includes source_path as an absolute resolved server filesystem path. Since this payload is returned by the API, it can unintentionally disclose server directory structure. Consider omitting source_path from the returned payload, or returning only a sanitized identifier (e.g., repo basename) or a path relative to a configured allowed base directory.
| @app.route('/api/a2a/ingest', methods=['POST']) | ||
| def ingest_celestial_body(): | ||
| """Convert a raw repository path into a CelestialBody payload.""" | ||
| data = request.json or {} | ||
| repo_path = data.get("repo_path") |
There was a problem hiding this comment.
This endpoint accepts an arbitrary repo_path and passes it to ingest_repository, which walks the filesystem and reads files (README + code files). Without an allowlist/sandbox, a client can point this at any readable directory on the server, causing unintended data exposure. Recommend restricting ingestion to a configured base directory (and verifying the resolved path stays within it), or requiring a pre-registered repo identifier instead of a raw filesystem path.
| try: | ||
| body = ingest_repository(repo_path, metadata) |
There was a problem hiding this comment.
This endpoint accepts an arbitrary repo_path and passes it to ingest_repository, which walks the filesystem and reads files (README + code files). Without an allowlist/sandbox, a client can point this at any readable directory on the server, causing unintended data exposure. Recommend restricting ingestion to a configured base directory (and verifying the resolved path stays within it), or requiring a pre-registered repo identifier instead of a raw filesystem path.
| data = request.json or {} | ||
| repo_path = data.get("repo_path") | ||
| metadata = data.get("metadata", {}) |
There was a problem hiding this comment.
metadata is assumed to be a dict but isn’t validated. If a client sends a non-object JSON value (e.g., string/list/null) for metadata, ingest_repository will later call metadata.get(...) and throw AttributeError, returning a 500 instead of a 400. Add input validation here (ensure metadata is a dict, otherwise return 400) and/or defensive validation inside ingest_repository.
|
|
||
| if not repo_path: | ||
| return jsonify({"error": "repo_path is required"}), 400 | ||
|
|
There was a problem hiding this comment.
metadata is assumed to be a dict but isn’t validated. If a client sends a non-object JSON value (e.g., string/list/null) for metadata, ingest_repository will later call metadata.get(...) and throw AttributeError, returning a 500 instead of a 400. Add input validation here (ensure metadata is a dict, otherwise return 400) and/or defensive validation inside ingest_repository.
| if not isinstance(metadata, dict): | |
| return jsonify({"error": "metadata must be an object"}), 400 |
| @app.route('/api/a2a/ingest', methods=['POST']) | ||
| def ingest_celestial_body(): | ||
| """Convert a raw repository path into a CelestialBody payload.""" | ||
| data = request.json or {} |
There was a problem hiding this comment.
Using request.json can raise a BadRequest exception for invalid JSON with an application/json content-type, which would bypass this handler and return a default error. Prefer request.get_json(silent=True) or {} so you can consistently return a structured 400 response for invalid bodies.
| def _run_seismic_test(payload: dict[str, Any]) -> dict[str, Any]: | ||
| canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")) | ||
| digest_a = hashlib.sha256(canonical.encode()).hexdigest() | ||
| digest_b = hashlib.sha256(canonical.encode()).hexdigest() |
There was a problem hiding this comment.
The seismic test hashes the same canonical string twice, so invariant will always be true unless there’s an internal hashing failure. This is redundant work and doesn’t add validation value. Consider computing a single digest and basing the test on that, or (if the intent is to detect non-determinism) re-canonicalize from independently re-materialized data rather than re-hashing the same bytes.
| digest_b = hashlib.sha256(canonical.encode()).hexdigest() | |
| rematerialized = json.loads(canonical) | |
| canonical_rematerialized = json.dumps(rematerialized, sort_keys=True, separators=(",", ":")) | |
| digest_b = hashlib.sha256(canonical_rematerialized.encode()).hexdigest() |
| import sys | ||
| from pathlib import Path | ||
|
|
||
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | ||
|
|
||
| from celestial_ingestion import IngestionError, ingest_repository |
There was a problem hiding this comment.
Modifying sys.path in the test file is a brittle workaround and can cause import-order issues as the test suite grows. Prefer configuring imports via packaging (e.g., making genesis-q-mem importable) or pytest configuration (e.g., pythonpath/PYTHONPATH in a pytest.ini/conftest.py) so tests don’t need to mutate global interpreter state.
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | |
| from celestial_ingestion import IngestionError, ingest_repository | |
| import importlib.util | |
| from pathlib import Path | |
| _MODULE_PATH = Path(__file__).resolve().parents[1] / "celestial_ingestion.py" | |
| _SPEC = importlib.util.spec_from_file_location("celestial_ingestion", _MODULE_PATH) | |
| _MODULE = importlib.util.module_from_spec(_SPEC) | |
| assert _SPEC is not None and _SPEC.loader is not None | |
| _SPEC.loader.exec_module(_MODULE) | |
| IngestionError = _MODULE.IngestionError | |
| ingest_repository = _MODULE.ingest_repository |
| def test_ingest_repository_rejects_missing_path(): | ||
| try: | ||
| ingest_repository("/tmp/does-not-exist-123") | ||
| assert False, "Expected IngestionError" | ||
| except IngestionError: | ||
| assert True |
There was a problem hiding this comment.
This test is likely to be flaky because it hardcodes a path under /tmp that could exist on some systems. Also, the try/except + assert True/False pattern is harder to read and can hide unexpected exceptions. Prefer using a tmp-based guaranteed-missing path (e.g., from tmp_path) and pytest.raises(IngestionError).
Motivation
CelestialBodyJSON object for downstream Yennefer/hand-off workflows.Description
genesis-q-mem/celestial_ingestion.pycontaining aCelestialBodydataclass and ingestion pipeline that infers language counts, LOC, summary,mass,atmosphere,gravity,orbital_state, and aseismic_test(schema check + deterministic hash citation).POST /api/a2a/ingesttogenesis-q-mem/a2a_handoff_server.pywhich validates input (repo_path), callsingest_repository, and returns thecelestial_bodypayload or a 400 error on invalid input.ingest_repositoryandIngestionErrorinto the A2A server and kept all existing endpoints unchanged.genesis-q-mem/tests/test_celestial_ingestion.pyto verify successful ingestion and missing-path error behavior.Testing
cd genesis-q-mem && pytest -q tests/test_celestial_ingestion.pyand the final run reported2 passed(tests cover successful ingestion and missing-path rejection).sys.path), after which automated tests passed.8200and only the single new endpointPOST /api/a2a/ingestwas added.Codex Task