本文描述 Python 侧的内部多路召回 SSE 运行时:暴露面、内部鉴权、请求装配与降级 语义。对外契约见 docs/api/http_contracts.md §6; 错误码见 docs/api/error_codes.md §5; 配置见 docs/ops/configure.md;召回 pipeline 编排见 recall_pipeline.md。
外部用户态 Recall API 归属 Java Recall Gateway:Sa-Token 登录态、用户状态、数据集 /
文档归属校验都在 Java 端完成。若让前端直连 Python 并透传 user_id,会绕过 Java 的
租户隔离,可伪造身份越权召回他人数据集。
因此 Python 只暴露内部 runtime,且只信任 Java 为每次调用签发的短期内部凭证,不接受
前端 Sa-Token,也不信任请求体里自报的 user_id。
- 端点:
POST /api/v1/internal/recall/stream(src/api/routes/recall.py)。 - 仅此一个流式端点;首版不提供一次性 JSON 接口。
- 返回
text/event-stream,终态事件recall_done/error(见对外契约)。
实现见 src/api/internal_auth.py 的 verify_internal_jwt
依赖。Java 用共享密钥签发,Python 用同一密钥验签。
校验链(任一失败 → RecallApiError(401, RECALL_INTERNAL_UNAUTHORIZED)):
- 取
Authorization: Bearer <token>,缺失或非 Bearer → 401。 - HS256 验签 + 校验
aud/iss/exp(PyJWT 内置,require=["exp"])。 - 手动校验
scope == RECALL_INTERNAL_JWT_SCOPE。 sub→ 正整数user_id;dataset_ids(可选 list)作为授权范围。
产出 InternalAuthContext(user_id, dataset_ids, jti, request_id)。request_id 取
X-Request-Id,缺省时生成 uuid4().hex 并回写响应头,用于贯穿日志。
JWT 推荐 claims:
{
"iss": "tolink-java", "aud": "tolink-rag", "sub": "123",
"scope": "recall:execute", "dataset_ids": [1, 2],
"jti": "request-id", "exp": 1710000300
}jti 本期仅用于日志 / 审计 / trace,不做防重放存储。
RECALL_INTERNAL_AUTH_ENABLED=False 仅供本地联调(跳过验签,仍需 token 解析身份),
生产恒开启。
握手前在 recall.py _check_scope 完成:
body.user_id必须等于凭证sub,否则403 RECALL_USER_MISMATCH。- 凭证带
dataset_ids时,body.dataset_ids必须是其子集,否则403 RECALL_SCOPE_FORBIDDEN; 凭证dataset_ids为空 / 缺省表示 Java 已授权全库召回,不限制 body 范围。
下传 pipeline 的 user_id 始终取凭证 sub,不信任 body 自报值。
握手前依次完成:JWT 校验 → JSON 解析 + Pydantic 校验(extra=forbid,非首版字段
→ 422)→ query 空白 → 400 → scope 校验。任一失败走 HTTP JSON 错误。
通过后组装 RecallRequest:
query← body;user_id← 凭证sub;dataset_ids← body;doc_ids= None;top_k←RECALL_RESULT_LIMIT(服务端配置,不接受请求覆盖)。
随后建立 SSE 流,在流内 asyncio.wait_for(pipeline.execute(req), RECALL_STREAM_TIMEOUT_MS):
- 成功 / 宽松降级 →
recall_done(failed_sources表达降级)。 - 全路失败
RecallError→errorRECALL_ALL_SOURCES_FAILED。 - 超时 →
errorRECALL_TIMEOUT。 - 客户端断连(
CancelledError)→ 停止发送事件并向上传播取消,pipeline 协程随之结束; recall-only 无后续 rerank / 上下文 / LLM 步骤。
top_k / sources / strict 由配置而非请求决定,因此 pipeline 与各路 retriever 都是
无用户态的长期实例。
src/api/recall_pipeline_provider.py 按
RECALL_ENABLED_SOURCES 装配 RecallPipeline 单例(lru_cache):
bm25→Bm25Retriever(EsBm25Retriever(), RagFlowTokenizer());sparse→SparseRetriever(compose_vector_storage_facade(), score_threshold=...);dense→DenseRetriever(compose_vector_storage_facade(), score_threshold=...)(本期新增);- 配置中出现未登记 source → 装配期
ValueError,不静默跳过。
sparse 底座含本地 BGE-M3,装配较重,必须单例。dense 底座走远程 system embedding HTTP
(无本地模型加载),单例化主要是为了与 recall_pipeline 单例对齐——所有 retriever
在 pipeline 单例之内只构造一次。
user_id / top_k 不在装配期注入,而是执行期由 pipeline 透传给
Retriever.recall(query, dataset_ids, doc_ids, *, user_id, top_k)——这是相对 LINK-6
的契约调整(见 recall_pipeline.md),使单例化成立。
内部端点(§2–§6)面向 Java、内网可信;对外直连端点面向浏览器前端,让前端凭 Java 签发的短期 session token 直连、绕过 Java 中转。两条链路并存,召回执行复用同一实现。
- 端点:
POST /api/v1/recall/stream(src/api/routes/recall_direct.py)。 - 会话鉴权:src/api/recall_session_auth.py 的
verify_session_token——用独立密钥RECALL_SESSION_JWT_SECRETHS256 验签,校验aud=tolink-rag-frontend/iss=tolink-java/scope=recall:stream/exp。与内部端点 密码学隔离,前端面 token 疑似泄露可单独轮转。 - token 短期可复用:只校验
exp,不做一次性 / 防重放 / 撤销。本场景只读、不可越权 (只能召回本人授权范围)、且有并发上限作资源闸门,一次性收益不抵复杂度(决策见.specs/recall-direct-sse/brief.md §3.3)。断线重连可复用未过期 token,过期后回 Java 重申。 - 入参:body 只含
query+ 可选dataset_ids(授权范围内子集选择,extra=forbid); 不含user_id,身份只取 claims(_resolve_dataset_ids做 ⊆ claims 校验)。 - 并发限流:recall_session_auth.py 的
acquire_stream_slot/release_stream_slot,按user_id用 RedisINCR/DECR计数, 上限RECALL_SESSION_MAX_CONCURRENT,超限429 RECALL_RATE_LIMITED。_guarded_stream在流收尾(含断连CancelledError)的finally中释放名额。握手顺序:验签 → body 校验 → scope → 并发 acquire → 建流。Redis 不可用时 acquire fail-open(限流是资源保护非鉴权)。 - SSE 执行:与内部端点共享 src/api/recall_stream_runtime.py
的
recall_event_stream,事件协议、降级、失败终态完全一致(避免双链路漂移)。 - CORS:复用全局
CORSMiddleware;对外环境必须把CORS_ORIGINS由*收敛为前端可信 域名清单。错误码见 error_codes.md §6。