Skip to content
This repository was archived by the owner on Jun 3, 2026. It is now read-only.

Commit 36e6b76

Browse files
committed
Add v2 S3 original storage and hybrid search
1 parent c2fc0aa commit 36e6b76

9 files changed

Lines changed: 980 additions & 20 deletions

File tree

.env.example

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,36 @@ PINECONE_REGION=us-east-1
2222
# EMBEDDING_MODEL=all-MiniLM-L6-v2
2323
EMBEDDING_MODEL=gemini-embedding-001
2424

25+
# =============================================================================
26+
# Original Storage + v2 Hybrid Search (optional, v2 only)
27+
# =============================================================================
28+
ORIGINAL_STORAGE_ENABLED=false
29+
ORIGINAL_STORAGE_PROVIDER=s3
30+
ORIGINAL_STORAGE_FAIL_CLOSED=false
31+
ORIGINAL_STORAGE_TIMEOUT_SECONDS=180
32+
33+
ORIGINAL_S3_BUCKET=
34+
ORIGINAL_S3_REGION=us-east-1
35+
ORIGINAL_S3_PREFIX=originals
36+
ORIGINAL_S3_ENDPOINT_URL=
37+
ORIGINAL_S3_KMS_KEY_ID=
38+
ORIGINAL_S3_MULTIPART_THRESHOLD_BYTES=8388608
39+
ORIGINAL_S3_MULTIPART_CHUNK_BYTES=8388608
40+
41+
ORIGINAL_CHUNK_SIZE_TOKENS=350
42+
ORIGINAL_CHUNK_OVERLAP_TOKENS=40
43+
ORIGINAL_INDEX_BATCH_SIZE=64
44+
ORIGINAL_EMBED_CONCURRENCY=4
45+
ORIGINAL_INDEX_CONCURRENCY=2
46+
ORIGINAL_BATCH_ITEM_CONCURRENCY=3
47+
ORIGINAL_MAX_BYTES=10485760
48+
ORIGINAL_INCLUDE_AGENT_RESPONSE=true
49+
ORIGINAL_INCLUDE_IMAGE_URL=false
50+
51+
HYBRID_SEARCH_MEMORY_TOP_K=10
52+
HYBRID_SEARCH_ORIGINAL_TOP_K=10
53+
HYBRID_SEARCH_MIN_SCORE=0.0
54+
2555
# =============================================================================
2656
# Database Configuration
2757
# =============================================================================

src/api/routes/v2/activities.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from src.billing.context import use_billing_context
1313
from src.billing.service import commit_job_billing, release_job_billing
1414
from src.jobs.durable import get_default_job_store
15+
from src.storage.original import preserve_original
1516

1617
try: # pragma: no cover - no-op fallback keeps imports working without SDK.
1718
from temporalio import activity
@@ -179,6 +180,16 @@ async def memory_run_pipeline_activity(payload: Dict[str, Any]) -> Dict[str, Any
179180
return await memory_v1._run_ingest_payload(payload, payload["user_id"])
180181

181182

183+
@activity.defn
184+
async def memory_store_original_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
185+
pipeline = get_ingest_pipeline()
186+
return await preserve_original(
187+
payload,
188+
vector_store=pipeline.vector_store,
189+
embed_fn=pipeline.embed_fn,
190+
)
191+
192+
182193
@activity.defn
183194
async def memory_scrape_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
184195
def _run_scrape() -> Dict[str, Any]:
@@ -286,6 +297,7 @@ async def scanner_phase2_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
286297
memory_classify_activity,
287298
memory_domain_activity,
288299
memory_run_pipeline_activity,
300+
memory_store_original_activity,
289301
memory_scrape_activity,
290302
scanner_scan_activity,
291303
scanner_phase2_activity,

src/api/routes/v2/memory.py

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
from fastapi import APIRouter, Depends, Request
88
from fastapi.responses import JSONResponse
99

10-
from src.api.dependencies import enforce_rate_limit, require_api_key, require_ready
10+
from src.api.dependencies import (
11+
enforce_rate_limit,
12+
get_retrieval_pipeline,
13+
require_api_key,
14+
require_ready,
15+
)
1116
from src.api.routes import memory as memory_v1
1217
from src.api.routes.v2.shared import (
1318
_error,
@@ -18,10 +23,20 @@
1823
read_user_job,
1924
)
2025
from src.api.routes.v2.temporal_client import start_job_workflow
21-
from src.api.schemas import APIResponse, BatchIngestRequest, IngestRequest, ScrapeRequest, StatusEnum
26+
from src.api.schemas import (
27+
APIResponse,
28+
BatchIngestRequest,
29+
HybridSearchRequest,
30+
HybridSearchResponse,
31+
IngestRequest,
32+
ScrapeRequest,
33+
SourceRecord,
34+
StatusEnum,
35+
)
2236
from src.billing import InsufficientCredits, get_default_billing_service
2337
from src.config import settings
2438
from src.jobs.durable import QUEUED, get_default_job_store, idempotency_key, new_attempt_id, stable_hash
39+
from src.storage.original import ORIGINAL_CHUNK_DOMAIN, original_config_snapshot
2540

2641
router = APIRouter(
2742
prefix="/v2/memory",
@@ -44,6 +59,18 @@ def _durable_job_id(job_type: str, fields: Dict[str, Any]) -> str:
4459
return f"{job_type}:{idempotency_key(job_type, fields)}"
4560

4661

62+
def _attach_original_storage_config(payload: Dict[str, Any]) -> None:
63+
payload["original_storage_enabled"] = bool(settings.original_storage_enabled)
64+
payload["original_storage_fail_closed"] = bool(settings.original_storage_fail_closed)
65+
payload["original_storage_timeout_seconds"] = float(
66+
settings.original_storage_timeout_seconds
67+
)
68+
payload["original_batch_item_concurrency"] = int(
69+
settings.original_batch_item_concurrency
70+
)
71+
payload["original_config"] = original_config_snapshot()
72+
73+
4774
class WorkflowStartFailed(RuntimeError):
4875
def __init__(self, job: Dict[str, Any], error: str) -> None:
4976
super().__init__(error)
@@ -122,6 +149,7 @@ async def ingest_memory_v2(req: IngestRequest, request: Request, user: dict = De
122149
payload = req.model_dump()
123150
payload["user_id"] = user_id
124151
payload["timeout_seconds"] = float(settings.memory_ingest_timeout_seconds)
152+
_attach_original_storage_config(payload)
125153
idempotency_fields = {
126154
"user_id": user_id,
127155
"org_id": payload.get("org_id", "default"),
@@ -132,6 +160,7 @@ async def ingest_memory_v2(req: IngestRequest, request: Request, user: dict = De
132160
"image_url": req.image_url,
133161
"effort_level": req.effort_level,
134162
}),
163+
"original_storage_enabled": bool(settings.original_storage_enabled),
135164
}
136165
job_id = _durable_job_id("memory_ingest", idempotency_fields)
137166
billing_service = get_default_billing_service()
@@ -217,9 +246,11 @@ async def batch_ingest_memory_v2(req: BatchIngestRequest, request: Request, user
217246
min(len(req.items) * float(settings.memory_ingest_timeout_seconds), 3600.0),
218247
),
219248
}
249+
_attach_original_storage_config(payload)
220250
idempotency_fields = {
221251
"user_id": user_id,
222252
"content_hash": _content_hash({"items": items}),
253+
"original_storage_enabled": bool(settings.original_storage_enabled),
223254
}
224255
job_id = _durable_job_id("memory_batch_ingest", idempotency_fields)
225256
billing_service = get_default_billing_service()
@@ -278,6 +309,93 @@ async def batch_ingest_memory_v2(req: BatchIngestRequest, request: Request, user
278309
return _error(request, str(exc), 500, elapsed_ms(start))
279310

280311

312+
async def _search_original_chunks(
313+
query: str,
314+
user_id: str,
315+
top_k: int,
316+
) -> list[SourceRecord]:
317+
pipeline = get_retrieval_pipeline()
318+
raw = await pipeline.vector_store.search_by_text(
319+
query_text=query,
320+
top_k=top_k,
321+
filters={"user_id": user_id, "domain": ORIGINAL_CHUNK_DOMAIN},
322+
)
323+
results: list[SourceRecord] = []
324+
for item in raw:
325+
score = float(item.score or 0.0)
326+
if score < float(settings.hybrid_search_min_score):
327+
continue
328+
results.append(
329+
SourceRecord(
330+
domain=ORIGINAL_CHUNK_DOMAIN,
331+
content=item.content,
332+
score=round(score, 3),
333+
metadata={"id": item.id, **item.metadata},
334+
)
335+
)
336+
return results
337+
338+
339+
@router.post(
340+
"/hybrid-search",
341+
response_model=APIResponse,
342+
summary="v2-only hybrid search across extracted memories and original chunks",
343+
)
344+
async def hybrid_search_memory_v2(
345+
req: HybridSearchRequest,
346+
request: Request,
347+
user: dict = Depends(require_api_key),
348+
):
349+
start = time.perf_counter()
350+
pipeline = get_retrieval_pipeline()
351+
user_id = memory_v1._current_user_id(user, req.user_id)
352+
memory_top_k = req.memory_top_k or int(settings.hybrid_search_memory_top_k)
353+
original_top_k = req.original_top_k or int(settings.hybrid_search_original_top_k)
354+
355+
try:
356+
memory_results: list[SourceRecord] = []
357+
if "profile" in req.domains:
358+
memory_results.extend(memory_v1._search_profile(pipeline, user_id))
359+
if "temporal" in req.domains:
360+
memory_results.extend(
361+
memory_v1._search_temporal(
362+
pipeline,
363+
req.query,
364+
user_id,
365+
memory_top_k,
366+
)
367+
)
368+
if "summary" in req.domains:
369+
memory_results.extend(
370+
await memory_v1._search_summary(
371+
pipeline,
372+
req.query,
373+
user_id,
374+
memory_top_k,
375+
)
376+
)
377+
378+
original_chunks: list[SourceRecord] = []
379+
if req.include_original_chunks and settings.original_storage_enabled:
380+
original_chunks = await _search_original_chunks(
381+
req.query,
382+
user_id,
383+
original_top_k,
384+
)
385+
386+
all_results = memory_results + original_chunks
387+
data = HybridSearchResponse(
388+
memory_results=memory_results,
389+
original_chunks=original_chunks,
390+
results=all_results,
391+
total=len(all_results),
392+
original_storage_enabled=bool(settings.original_storage_enabled),
393+
)
394+
return _wrap(request, data, elapsed_ms(start))
395+
except Exception as exc:
396+
return _error(request, str(exc), 500, elapsed_ms(start))
397+
398+
281399
@scrape_router.post("/scrape", response_model=APIResponse, summary="Start an async durable scrape job")
282400
async def scrape_chat_link_v2(req: ScrapeRequest, request: Request):
283401
start = time.perf_counter()

0 commit comments

Comments
 (0)