Skip to content

Commit 9807cd3

Browse files
committed
feat(recall): 新增前端直连召回 SSE 端点(LINK-40)
- 新增对外直连端点 POST /api/v1/recall/stream,前端凭 Java 签发的短期可复用 session token 直连,绕过 Java 中转 - 独立密钥验签(aud=tolink-rag-frontend)、scope 子集校验、按用户并发限流(Redis) - 抽取 recall_stream_runtime 供内部/直连端点共享,零回归 - 同步 http_contracts/error_codes/recall_http_api/configure 契约文档 - 新增 18 个 acceptance Scenario + 会话鉴权/Redis 原子 helper 单测
1 parent 9ddeeb3 commit 9807cd3

20 files changed

Lines changed: 1656 additions & 89 deletions

.env.example

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,24 @@ RECALL_STRICT_DEFAULT=false
238238
RECALL_RESULT_LIMIT=20
239239
RECALL_ENABLED_SOURCES=bm25,sparse,dense
240240

241+
# ==========================================
242+
# 对外直连召回 SSE 配置 (Recall Direct SSE / LINK-40)
243+
# ==========================================
244+
# 前端凭 Java 签发的短期 session token 直连 POST /api/v1/recall/stream。
245+
# token 短期可复用(只校验 exp,不做一次性/防重放/撤销),资源滥用由并发上限封顶。
246+
RECALL_SESSION_AUTH_ENABLED=true
247+
RECALL_SESSION_JWT_ISSUER=tolink-java
248+
# 受众与内部端点(tolink-rag)区分,避免内部 token 误用到对外端点。
249+
RECALL_SESSION_JWT_AUDIENCE=tolink-rag-frontend
250+
RECALL_SESSION_JWT_SCOPE=recall:stream
251+
# 独立 HS256 密钥:与 RECALL_INTERNAL_JWT_SECRET 物理隔离,可单独轮转;生产务必替换。
252+
RECALL_SESSION_JWT_SECRET=your_recall_session_jwt_secret
253+
# 单用户最大并发召回流数(资源主闸门)。
254+
RECALL_SESSION_MAX_CONCURRENT=3
255+
241256
# ==========================================
242257
# 杂项配置 (Misc)
243258
# ==========================================
259+
# 生产对外环境需把 CORS_ORIGINS 收敛为前端可信域名清单(不可用 "*",与对外直连召回端点的
260+
# Authorization 跨域预检相关)。
244261
CORS_ORIGINS=http://localhost:3000,http://localhost:8080

docs/api/error_codes.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,29 @@ CODE: 中文业务原因;底层详情
158158
EMBEDDING 配置属**必备前置缺失**,走硬失败(`RECALL_EMBEDDING_CONFIG_MISSING`)而非宽松降级——
159159
即便其余路可用也不返回部分结果,避免"读侧系统模型 / 写侧用户模型"向量空间不一致的误召回。
160160

161-
## 6. Chunk Status Values
161+
## 6. 对外直连 Recall 错误码
162+
163+
对外直连召回 SSE 接口 `POST /api/v1/recall/stream`(LINK-40,见
164+
[http_contracts.md §7](http_contracts.md#7-对外直连-recall-sseapilink-40))。与内部端点
165+
区分专属错误码,便于审计区分「Java 内部调用」与「前端直连会话」。
166+
167+
**握手前**(会话鉴权 / 参数 / scope / 限流失败)→ 非 2xx 的 `{code, message, data}` JSON:
168+
169+
| 场景 | HTTP | code |
170+
| --- | --- | --- |
171+
| 缺失 / 验签 / iss / aud / scope / exp 失败、用内部密钥签发的 token | `401` | `RECALL_SESSION_UNAUTHORIZED` |
172+
| `dataset_ids` 超出 token 授权范围 | `403` | `RECALL_SCOPE_FORBIDDEN` |
173+
| JSON 非法 / 缺字段 / 类型错 / 出现未知字段(含 `user_id`| `422` | `RECALL_INVALID_REQUEST` |
174+
| `query` 为空或纯空白 | `400` | `RECALL_INVALID_REQUEST` |
175+
| 单用户并发流数超过 `RECALL_SESSION_MAX_CONCURRENT` | `429` | `RECALL_RATE_LIMITED` |
176+
177+
**握手后**(pipeline 执行期)→ SSE `error` 事件,与内部端点共享同一 runtime、语义一致:
178+
`RECALL_EMBEDDING_CONFIG_MISSING` / `RECALL_ALL_SOURCES_FAILED` / `RECALL_TIMEOUT` /
179+
`RECALL_INTERNAL_ERROR`
180+
181+
token **短期可复用**:有效期内重复建连均放行,无重放类错误码。
182+
183+
## 7. Chunk Status Values
162184

163185
| Status | 含义 |
164186
| --- | --- |

docs/api/http_contracts.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,42 @@ data: {"code":"RECALL_ALL_SOURCES_FAILED","message":"all retrievers failed"}
239239
```
240240

241241
错误码与 HTTP 状态见 [error_codes.md](error_codes.md#5-internal-recall-错误码)
242+
243+
## 7. 对外直连 Recall SSE API(LINK-40)
244+
245+
路由前缀:`/api/v1/recall`**面向浏览器前端**:前端凭 Java 签发的**短期 session token**
246+
直连,绕过 Java 中转。与 §6 内部端点是**两条并存**链路(本端点是新增可选路径)。运行时
247+
与会话鉴权细节见 [docs/internals/recall_http_api.md](../internals/recall_http_api.md)
248+
249+
| Method | Path | 用途 | 鉴权 |
250+
| --- | --- | --- | --- |
251+
| `POST` | `/stream` | 前端直连多路召回,SSE 流式返回融合候选 | Header `Authorization: Bearer <session-token>` |
252+
253+
### POST /api/v1/recall/stream
254+
255+
前端以 fetch 流式(`ReadableStream`)建连,**不使用** `EventSource`(无法设鉴权头)。
256+
请求头:`Authorization: Bearer <session-token>``Content-Type: application/json`、可选
257+
`Origin`(CORS)、`X-Request-Id`
258+
259+
session token 由 Java 签发、Python 用**独立密钥**验签(与内部端点密钥隔离);claims:
260+
`iss=tolink-java``aud=tolink-rag-frontend``scope=recall:stream``sub``dataset_ids`
261+
`exp`**token 短期可复用**(只校验 `exp`,不做一次性 / 防重放 / 撤销)。
262+
263+
请求体(仅以下字段;出现 `user_id` / `top_k` / `sources` / `strict` / `doc_ids` 等任何未知
264+
字段返回 `422`):
265+
266+
| 字段 | 类型 | 必填 | 说明 |
267+
| --- | --- | --- | --- |
268+
| `query` | string || 用户问题,不能为空或纯空白 |
269+
| `dataset_ids` | list[int] || 本次查询的数据集**子集选择**,必须 ⊆ token 授权范围(超出 `403`);省略/空 = 用 token 全量授权范围 |
270+
271+
**身份只取 token claims**——body 不含 `user_id`,前端自报一律不信任。`top_k` / `sources` /
272+
`strict` 同内部端点,由服务端配置控制。
273+
274+
并发:按 `user_id` 限并发流数(`RECALL_SESSION_MAX_CONCURRENT`),超限返回 `429`
275+
276+
响应、SSE 事件协议(`recall_done` / `error`)与 §6 内部端点**完全一致**(复用同一执行链)。
277+
错误码见 [error_codes.md](error_codes.md#6-对外直连-recall-错误码)
278+
279+
> CORS:本端点暴露给浏览器,生产环境必须把 `CORS_ORIGINS` 收敛为前端可信域名清单
280+
> (不可用 `*`)。

docs/internals/recall_http_api.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,28 @@ sparse 底座含本地 BGE-M3,装配较重,必须单例。dense 底座走远
9898
`user_id` / `top_k` 不在装配期注入,而是执行期由 pipeline 透传给
9999
`Retriever.recall(query, dataset_ids, doc_ids, *, user_id, top_k)`——这是相对 LINK-6
100100
的契约调整(见 [recall_pipeline.md](recall_pipeline.md)),使单例化成立。
101+
102+
## 7. 对外直连 SSE(LINK-40)
103+
104+
内部端点(§2–§6)面向 Java、内网可信;**对外直连端点**面向浏览器前端,让前端凭 Java
105+
签发的短期 session token 直连、绕过 Java 中转。两条链路**并存**,召回执行复用同一实现。
106+
107+
- 端点:`POST /api/v1/recall/stream`[src/api/routes/recall_direct.py](../../src/api/routes/recall_direct.py))。
108+
- 会话鉴权:[src/api/recall_session_auth.py](../../src/api/recall_session_auth.py)
109+
`verify_session_token`——用**独立密钥** `RECALL_SESSION_JWT_SECRET` HS256 验签,校验
110+
`aud=tolink-rag-frontend` / `iss=tolink-java` / `scope=recall:stream` / `exp`。与内部端点
111+
密码学隔离,前端面 token 疑似泄露可单独轮转。
112+
- **token 短期可复用**:只校验 `exp`**不做一次性 / 防重放 / 撤销**。本场景只读、不可越权
113+
(只能召回本人授权范围)、且有并发上限作资源闸门,一次性收益不抵复杂度(决策见
114+
`.specs/recall-direct-sse/brief.md §3.3`)。断线重连可复用未过期 token,过期后回 Java 重申。
115+
- 入参:body 只含 `query` + 可选 `dataset_ids`(授权范围内子集选择,`extra=forbid`);
116+
**不含 `user_id`**,身份只取 claims(`_resolve_dataset_ids` 做 ⊆ claims 校验)。
117+
- 并发限流:[recall_session_auth.py](../../src/api/recall_session_auth.py)
118+
`acquire_stream_slot` / `release_stream_slot`,按 `user_id` 用 Redis `INCR/DECR` 计数,
119+
上限 `RECALL_SESSION_MAX_CONCURRENT`,超限 `429 RECALL_RATE_LIMITED``_guarded_stream`
120+
在流收尾(含断连 `CancelledError`)的 `finally` 中释放名额。握手顺序:验签 → body 校验
121+
→ scope → 并发 acquire → 建流。Redis 不可用时 acquire **fail-open**(限流是资源保护非鉴权)。
122+
- SSE 执行:与内部端点共享 [src/api/recall_stream_runtime.py](../../src/api/recall_stream_runtime.py)
123+
`recall_event_stream`,事件协议、降级、失败终态完全一致(避免双链路漂移)。
124+
- CORS:复用全局 `CORSMiddleware`;对外环境必须把 `CORS_ORIGINS``*` 收敛为前端可信
125+
域名清单。错误码见 [error_codes.md §6](../api/error_codes.md#6-对外直连-recall-错误码)

docs/ops/configure.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,26 @@
157157
| `DENSE_RETRIEVAL_TOP_K` | `10` | dense 召回 facade 直调时的兜底 top_k;pipeline 路径下被 `RECALL_RESULT_LIMIT` 覆盖 |
158158
| `DENSE_RETRIEVAL_SCORE_THRESHOLD` | `0.0` | dense 召回默认 score 阈值(cosine 上界 [0, 1],0.0 = 不过滤;facade 入口校验 `> 1.0` 早死) |
159159

160+
### 对外直连召回 SSE 配置(LINK-40)
161+
162+
对外直连召回 SSE 接口 `POST /api/v1/recall/stream` 的配置。前端凭 Java 签发的短期
163+
session token 直连,**独立密钥**与内部端点隔离。详见
164+
[recall_http_api.md](../internals/recall_http_api.md)
165+
166+
| 变量 | 默认 | 说明 |
167+
| --- | --- | --- |
168+
| `RECALL_SESSION_AUTH_ENABLED` | `true` | 是否启用 session token 验签;**生产必须为 true** |
169+
| `RECALL_SESSION_JWT_ISSUER` | `tolink-java` | 期望的 session JWT `iss` |
170+
| `RECALL_SESSION_JWT_AUDIENCE` | `tolink-rag-frontend` | 期望的 session JWT `aud`(与内部端点 `tolink-rag` 区分)|
171+
| `RECALL_SESSION_JWT_SCOPE` | `recall:stream` | 期望的 session JWT `scope` |
172+
| `RECALL_SESSION_JWT_SECRET` | 本地联调占位值 | **独立** HS256 密钥,与 `RECALL_INTERNAL_JWT_SECRET` 物理隔离、可单独轮转;**生产务必覆盖** |
173+
| `RECALL_SESSION_MAX_CONCURRENT` | `3` | 单用户最大并发召回流数;token 短期可复用,此为资源滥用主闸门,超限返回 `429` |
174+
| `CORS_ORIGINS` | `["*"]` | **生产对外环境必须收敛为前端可信域名清单**(不可用 `*`,否则带 `Authorization` 头的跨域预检失败)|
175+
176+
> token 短期可复用:Python 只校验 `exp`(建议 Java 签发 30s,仅够建连),不做一次性 /
177+
> 防重放 / 撤销;连上后流的存活由 `RECALL_STREAM_TIMEOUT_MS` 控制。并发计数依赖 Redis,
178+
> Redis 不可用时 fail-open(放行,因限流是资源保护非鉴权)。
179+
160180
### 远程 BGE-M3 推理服务(`remote_bge_m3` provider)
161181

162182
`SPARSE_VECTOR_PROVIDER` 除已有 `bge_m3`(本地)/ `bge_m3_http`(早期 bge-m3-server)

src/api/internal_auth.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
CODE_INTERNAL_ERROR = "RECALL_INTERNAL_ERROR"
3333
# 发起用户无默认 EMBEDDING 配置:dense 召回无法编码 query,整请求硬失败。
3434
CODE_EMBEDDING_CONFIG_MISSING = "RECALL_EMBEDDING_CONFIG_MISSING"
35+
# 对外直连 SSE(LINK-40)专属错误码:与内部端点的 RECALL_INTERNAL_* 区分,便于审计区分
36+
# 是「Java 内部调用」还是「前端直连会话」失败。
37+
CODE_SESSION_UNAUTHORIZED = "RECALL_SESSION_UNAUTHORIZED"
38+
CODE_RATE_LIMITED = "RECALL_RATE_LIMITED"
3539

3640

3741
class RecallApiError(Exception):

src/api/recall_session_auth.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""对外直连召回 SSE(LINK-40)的会话鉴权与并发治理。
2+
3+
外部用户态 Recall 入口归属 Java:Java 用 Sa-Token 鉴权并校验 dataset 归属后,签发
4+
**短期 session token**;前端凭该 token 直连 Python ``POST /api/v1/recall/stream``。
5+
本模块提供:
6+
7+
- ``SessionAuthContext``:从可信 claims 解析出的请求上下文;
8+
- ``verify_session_token``:FastAPI 依赖,用**独立密钥**验签 + 校验 iss/aud/scope/exp;
9+
- ``acquire_stream_slot`` / ``release_stream_slot``:按 ``user_id`` 的并发流计数。
10+
11+
与内部端点(``internal_auth.py``)的关键差异:面向浏览器、密钥/受众独立;token
12+
**短期可复用**——只校验 ``exp``,不做一次性消费 / 防重放 / 撤销,资源滥用由并发上限封顶。
13+
设计依据见 .specs/recall-direct-sse/{brief,technical_design}.md。
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from dataclasses import dataclass
19+
20+
import jwt
21+
from fastapi import Request
22+
from loguru import logger
23+
24+
from src.api.internal_auth import (
25+
CODE_SESSION_UNAUTHORIZED,
26+
RecallApiError,
27+
_request_id,
28+
)
29+
from src.cache.redis_client import redis_client
30+
from src.config import settings
31+
32+
# 并发计数 key 前缀;按 user_id 分桶,跨 worker / 实例共享。
33+
_CONCURRENT_KEY_PREFIX = "recall:concurrent:"
34+
35+
36+
@dataclass(frozen=True)
37+
class SessionAuthContext:
38+
"""从 session token claims 解析出的可信请求上下文。
39+
40+
Attributes:
41+
user_id: 来自 claims ``sub`` 的权威用户身份(正整数)。
42+
dataset_ids: claims 授权的数据集范围;``None`` 或空表示全库授权。
43+
request_id: 本次请求标识;取 ``X-Request-Id``,缺省时生成。
44+
"""
45+
46+
user_id: int
47+
dataset_ids: list[int] | None
48+
request_id: str
49+
50+
51+
def _extract_session_token(request: Request) -> str:
52+
"""从 ``Authorization: Bearer`` 提取 session token。
53+
54+
缺失或格式不符抛 ``RECALL_SESSION_UNAUTHORIZED``(区别于内部端点的错误码)。
55+
"""
56+
header = request.headers.get("Authorization")
57+
if not header or not header.startswith("Bearer "):
58+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "missing session credential")
59+
token = header[len("Bearer ") :].strip()
60+
if not token:
61+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "missing session credential")
62+
return token
63+
64+
65+
def _context_from_session_claims(claims: dict, request_id: str) -> SessionAuthContext:
66+
"""从可信 claims 装配上下文;身份只取 claims,不信任前端自报。"""
67+
raw_sub = claims.get("sub")
68+
try:
69+
user_id = int(raw_sub)
70+
except (TypeError, ValueError):
71+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "invalid subject in credential")
72+
if user_id <= 0:
73+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "invalid subject in credential")
74+
75+
dataset_ids = claims.get("dataset_ids")
76+
if dataset_ids is not None and not isinstance(dataset_ids, list):
77+
raise RecallApiError(
78+
401, CODE_SESSION_UNAUTHORIZED, "invalid dataset_ids in credential"
79+
)
80+
81+
return SessionAuthContext(
82+
user_id=user_id, dataset_ids=dataset_ids, request_id=request_id
83+
)
84+
85+
86+
async def verify_session_token(request: Request) -> SessionAuthContext:
87+
"""FastAPI 依赖:校验 Java 签发的 session token,产出 ``SessionAuthContext``。
88+
89+
校验链(任一失败 → ``RecallApiError(401, RECALL_SESSION_UNAUTHORIZED)``):
90+
Bearer token → HS256 验签(**独立 session 密钥**)+ iss/aud/exp(PyJWT 内置)
91+
→ scope(手动)→ sub→user_id。token 短期可复用,无一次性消费步骤——有效期内重复
92+
建连均放行(断线重连可复用未过期 token)。
93+
94+
``RECALL_SESSION_AUTH_ENABLED=False`` 仅本地联调:跳过验签但仍解析 claims 取身份;
95+
生产恒开启。
96+
"""
97+
request_id = _request_id(request)
98+
token = _extract_session_token(request)
99+
100+
if not settings.RECALL_SESSION_AUTH_ENABLED:
101+
# 本地联调:不验签,仅解析 claims 取身份。生产恒开启,不会走到这里。
102+
logger.warning(
103+
"[recall-session] auth disabled; skipping JWT verification request_id={}",
104+
request_id,
105+
)
106+
claims = jwt.decode(token, options={"verify_signature": False})
107+
return _context_from_session_claims(claims, request_id)
108+
109+
try:
110+
claims = jwt.decode(
111+
token,
112+
settings.RECALL_SESSION_JWT_SECRET,
113+
algorithms=["HS256"],
114+
audience=settings.RECALL_SESSION_JWT_AUDIENCE,
115+
issuer=settings.RECALL_SESSION_JWT_ISSUER,
116+
options={"require": ["exp"]},
117+
)
118+
except jwt.PyJWTError as exc:
119+
logger.info("[recall-session] JWT rejected request_id={}: {}", request_id, exc)
120+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "invalid or expired credential")
121+
122+
if claims.get("scope") != settings.RECALL_SESSION_JWT_SCOPE:
123+
raise RecallApiError(401, CODE_SESSION_UNAUTHORIZED, "credential scope not permitted")
124+
125+
return _context_from_session_claims(claims, request_id)
126+
127+
128+
def _concurrent_key(user_id: int) -> str:
129+
return f"{_CONCURRENT_KEY_PREFIX}{user_id}"
130+
131+
132+
async def acquire_stream_slot(user_id: int) -> bool:
133+
"""占用一个并发流名额;返回是否成功(False → 调用方应回 429)。
134+
135+
INCR 先占位再判断,保证多 worker 下不超卖;超过上限则 DECR 回退。key 设
136+
``2×stream_timeout`` 安全 TTL,兜底进程异常退出未 release 造成的名额泄漏。
137+
138+
Redis 不可用时 **fail-open**(放行 + 告警):去一次性后 Redis 仅做资源保护、不再
139+
承载安全语义,短暂失去并发限流好于阻断全部召回。
140+
"""
141+
key = _concurrent_key(user_id)
142+
safety_ttl = max(1, settings.RECALL_STREAM_TIMEOUT_MS // 1000 * 2)
143+
try:
144+
count = await redis_client.incr(key)
145+
await redis_client.expire(key, safety_ttl)
146+
except Exception: # noqa: BLE001 - Redis 故障不阻断召回,fail-open
147+
logger.warning(
148+
"[recall-session] redis unavailable on acquire, fail-open user_id={}", user_id
149+
)
150+
return True
151+
152+
if count > settings.RECALL_SESSION_MAX_CONCURRENT:
153+
# 超卖,回退占位并拒绝。
154+
try:
155+
await redis_client.decr(key)
156+
except Exception: # noqa: BLE001 - 回退失败由 TTL 兜底
157+
logger.warning("[recall-session] redis decr failed on rollback user_id={}", user_id)
158+
return False
159+
return True
160+
161+
162+
async def release_stream_slot(user_id: int) -> None:
163+
"""释放一个并发流名额;在流结束 / 断连的 finally 中调用。
164+
165+
DECR 后若计数为负(异常路径下的重复释放),重置回 0,避免计数漂移把后续请求误放行。
166+
Redis 故障静默忽略,由 key 的安全 TTL 兜底回收。
167+
"""
168+
key = _concurrent_key(user_id)
169+
try:
170+
remaining = await redis_client.decr(key)
171+
if remaining < 0:
172+
await redis_client.set(key, "0")
173+
except Exception: # noqa: BLE001 - 释放失败由 TTL 兜底,不影响主流程
174+
logger.warning("[recall-session] redis unavailable on release user_id={}", user_id)

0 commit comments

Comments
 (0)