Skip to content
This repository was archived by the owner on Jun 3, 2026. It is now read-only.

Add v2 S3 original storage and hybrid search#229

Open
ishaanxgupta wants to merge 4 commits into
mainfrom
amazon_s3
Open

Add v2 S3 original storage and hybrid search#229
ishaanxgupta wants to merge 4 commits into
mainfrom
amazon_s3

Conversation

@ishaanxgupta
Copy link
Copy Markdown
Member

Summary

  • Add v2-only original preservation to S3 with deterministic object keys and original_chunk vector IDs
  • Run original preservation in parallel with v2 memory extraction, including bounded batch/concurrency controls
  • Add v2-only /v2/memory/hybrid-search returning extracted memory hits plus indexed original chunks
  • Add configurable env/settings knobs and tests for preservation + hybrid search

Testing

  • .venv/bin/python -m pytest tests/unit/test_original_storage.py tests/api/test_v2_hybrid_search.py
  • .venv/bin/python -m pytest tests/api/test_memory_versioning.py tests/api/test_dependencies_and_routes.py tests/integration/test_retrieval_pipeline.py
  • .venv/bin/python -m pytest tests/api
  • .venv/bin/python -m ruff check src/storage/original.py src/api/routes/v2/memory.py src/api/routes/v2/workflows.py src/api/routes/v2/activities.py src/api/schemas.py src/config/settings.py tests/unit/test_original_storage.py tests/api/test_v2_hybrid_search.py
  • .venv/bin/python -m py_compile src/storage/original.py src/api/routes/v2/memory.py src/api/routes/v2/workflows.py src/api/routes/v2/activities.py src/api/schemas.py src/config/settings.py

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

Fails
🚫

🔐 This PR modifies sensitive files: src/config/settings.py. These require review by a core maintainer (@ishaanxgupta or @ved015) before merging.

Warnings
⚠️

📦 This PR changes 1017 lines (additions + deletions). Large PRs are harder to review thoroughly — consider splitting it.

Messages
📖

✅ Targeting main. Please squash commits before merging to keep the git history clean.

Generated by 🚫 dangerJS against 0b79b8d

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

✅ Staging Deployment Report

Item Value
Branch amazon_s3
Commit 209eed7
Environment Staging
Health http://3.6.255.148:8001/health
API Docs http://3.6.255.148:8001/docs
Smoke Tests success

🟢 Staging is live and healthy! Test your changes at the staging URL above.

Ready to ship? Comment /promote on this PR to merge to main and deploy to production.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

🔍 API Schema Diff

---REPORT---


Auto-generated by API Schema Diff workflow

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

✅ Staging Deployment Report

Item Value
Branch amazon_s3
Commit 5b512db
Environment Staging
Health http://3.6.255.148:8001/health
API Docs http://3.6.255.148:8001/docs
Smoke Tests success

🟢 Staging is live and healthy! Test your changes at the staging URL above.

Ready to ship? Comment /promote on this PR to merge to main and deploy to production.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces v2-only raw original document storage (S3) and original chunk indexing, alongside a new hybrid search endpoint (/hybrid-search) that queries both extracted memories and original document chunks. It also enhances security in the scanner enricher by escaping untrusted code blocks and validating symbol types and languages against allowlists. Feedback on these changes highlights three key areas: wrapping synchronous database/service search calls in asyncio.to_thread within the async FastAPI route handler to prevent blocking the event loop, utilizing a try...finally block in the batch ingest workflow to safely cancel and clean up background original storage tasks in case of extraction failures, and normalizing the language string to lowercase before validating it against the allowlist.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

I am having trouble creating individual review comments. Click here to see my feedback.

src/api/routes/v2/memory.py (356-367)

high

The route handler hybrid_search_memory_v2 is an async def function, but it calls memory_v1._search_profile and memory_v1._search_temporal synchronously. These functions perform database/service I/O and will block the FastAPI event loop, violating the rule to wrap synchronous database or service I/O calls in asyncio.to_thread.

        memory_results: list[SourceRecord] = []
        if "profile" in req.domains:
            profile_results = await asyncio.to_thread(
                memory_v1._search_profile, pipeline, user_id
            )
            memory_results.extend(profile_results)
        if "temporal" in req.domains:
            temporal_results = await asyncio.to_thread(
                memory_v1._search_temporal,
                pipeline,
                req.query,
                user_id,
                memory_top_k,
            )
            memory_results.extend(temporal_results)
References
  1. In FastAPI async def route handlers, wrap synchronous database or service I/O calls in asyncio.to_thread to prevent blocking the event loop.

src/api/routes/v2/workflows.py (291-316)

high

Wrapping _execute in asyncio.create_task here is redundant since it is immediately awaited. More importantly, if the extraction task fails, original_task is left dangling (leaked) in the Temporal workflow. Using a try...finally block ensures that the background original_task is properly cancelled and cleaned up if any part of the item execution fails.

            async def _run_item(index: int, item: Dict[str, Any]):
                item_payload = dict(item)
                item_payload["user_id"] = (
                    item_payload.get("user_id") or payload["user_id"]
                )
                for key in (
                    "original_storage_enabled",
                    "original_storage_fail_closed",
                    "original_storage_timeout_seconds",
                    "original_config",
                ):
                    if key in payload and key not in item_payload:
                        item_payload[key] = payload[key]

                original_task = _start_original_task(job_id, item_payload)
                try:
                    item_result = await _execute(
                        "memory_run_pipeline_activity",
                        {**item_payload, **billing_activity},
                        item_timeout,
                    )
                    item_result["original_storage"] = await _await_original_task(
                        original_task,
                        item_payload,
                    )
                    original_task = None
                    return index, item_result
                finally:
                    if original_task and not original_task.done():
                        original_task.cancel()
                        try:
                            await original_task
                        except Exception:
                            pass

src/scanner/enricher.py (357)

medium

The language value should be normalized to lowercase before checking against _ALLOWED_LANGUAGES to prevent false negatives for capitalized language names (e.g., 'Python').

        language=_allowlist(language.lower() if language else "", _ALLOWED_LANGUAGES, "python"),

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

🔍 API Schema Diff

---REPORT---


Auto-generated by API Schema Diff workflow

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

✅ Staging Deployment Report

Item Value
Branch amazon_s3
Commit 7c3f2a9
Environment Staging
Health http://3.6.255.148:8001/health
API Docs http://3.6.255.148:8001/docs
Smoke Tests success

🟢 Staging is live and healthy! Test your changes at the staging URL above.

Ready to ship? Comment /promote on this PR to merge to main and deploy to production.

Comment thread src/api/routes/v2/workflows.py Fixed
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

🔍 API Schema Diff

---REPORT---


Auto-generated by API Schema Diff workflow

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

✅ Staging Deployment Report

Item Value
Branch amazon_s3
Commit 65a8733
Environment Staging
Health http://3.6.255.148:8001/health
API Docs http://3.6.255.148:8001/docs
Smoke Tests success

🟢 Staging is live and healthy! Test your changes at the staging URL above.

Ready to ship? Comment /promote on this PR to merge to main and deploy to production.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

🔍 API Schema Diff

---REPORT---


Auto-generated by API Schema Diff workflow

Copy link
Copy Markdown
Member Author

@greptileai

Copy link
Copy Markdown
Member Author

@greptileai review this PR

Copy link
Copy Markdown
Member Author

@greptileai review

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Greptile Summary

This PR adds v2-only original document preservation to S3 with deterministic object keys and indexed original chunks in the vector store, plus a new /v2/memory/hybrid-search endpoint that combines extracted memory results with raw-chunk results in a single response.

  • src/storage/original.py implements S3 upload and concurrent vector-store chunk indexing with replay-safe deterministic IDs; config is snapshotted into the Temporal payload at workflow start.
  • MemoryIngestWorkflow and MemoryBatchIngestWorkflow are updated to launch original-storage in parallel with the extraction pipeline, with bounded concurrency and configurable fail-open/fail-closed behavior.
  • hybrid_search_memory_v2 fans out across requested memory domains and, when original storage is enabled, appends filtered vector-store results for indexed chunks.

Confidence Score: 3/5

Two correctness issues need fixing before merging: the single-item workflow leaks a background Temporal activity task after failure, and the new hybrid-search endpoint calls the pipeline accessor outside its error-handling scope.

The single-item MemoryIngestWorkflow creates an asyncio Task for original storage but has no finally/except path to cancel it when a downstream activity fails, causing background Temporal activity dispatch for dead-lettered jobs. Separately, hybrid_search_memory_v2 calls get_retrieval_pipeline() before its try/except block without a require_ready guard, producing unstructured 500s on pipeline init failure.

src/api/routes/v2/workflows.py (MemoryIngestWorkflow error path) and src/api/routes/v2/memory.py (hybrid_search_memory_v2 pre-try pipeline call)

Important Files Changed

Filename Overview
src/api/routes/v2/workflows.py Adds parallel original-storage task to both single-item and batch ingest workflows. Single-item workflow leaks the original_task asyncio Task when a non-high-effort activity fails — the task is never cancelled in the except handler, causing background Temporal activity dispatch for dead-lettered jobs.
src/api/routes/v2/memory.py Adds hybrid-search endpoint and original-storage config injection. The hybrid_search endpoint calls get_retrieval_pipeline() before the try/except block without a require_ready dependency guard, producing unstructured 500s if the pipeline is not ready.
src/storage/original.py New module implementing S3 upload and vector-store chunk indexing for raw originals. Deterministic S3 keys and vector IDs make the operation replay-safe. Well-structured with bounded concurrency controls.
src/api/routes/v2/activities.py Adds memory_store_original_activity Temporal activity that delegates to preserve_original. Straightforward; correctly registers the activity in ALL_ACTIVITIES.
src/api/schemas.py Adds HybridSearchRequest and HybridSearchResponse Pydantic models. Domain validation and field constraints follow the same pattern as the existing SearchRequest.
src/config/settings.py Adds ~30 new settings fields for original storage and hybrid search, all with sensible defaults. Also adds aws_session_token for temporary-credential support.

Fix All in Cursor Fix All in Codex Fix All in Claude Code

Reviews (2): Last reviewed commit: "Address Greptile review feedback" | Re-trigger Greptile

Comment thread src/storage/original.py Outdated
Comment thread src/api/routes/v2/memory.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

✅ Staging Deployment Report

Item Value
Branch amazon_s3
Commit e55e801
Environment Staging
Health http://3.6.255.148:8001/health
API Docs http://3.6.255.148:8001/docs
Smoke Tests success

🟢 Staging is live and healthy! Test your changes at the staging URL above.

Ready to ship? Comment /promote on this PR to merge to main and deploy to production.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

🔍 API Schema Diff

---REPORT---


Auto-generated by API Schema Diff workflow

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants