Skip to content

Commit 7dbc709

Browse files
earayucursoragent
andauthored
feat: add OTLP-first observability foundation (#1702)
* docs: add future observability design Co-authored-by: earayu <earayu@163.com> * feat: add OTLP-first observability foundation Co-authored-by: earayu <earayu@163.com> * fix: tolerate unset legacy otel flag Co-authored-by: earayu <earayu@163.com> * fix: satisfy observability lint checks Co-authored-by: earayu <earayu@163.com> * fix: avoid duplicate FastAPI instrumentation Co-authored-by: earayu <earayu@163.com> * fix: keep application logs capturable Co-authored-by: earayu <earayu@163.com> * chore: remove jaeger observability path Co-authored-by: earayu <earayu@163.com> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com>
1 parent f035166 commit 7dbc709

30 files changed

Lines changed: 1648 additions & 1051 deletions

AGENTS.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Agent Guide
2+
3+
## Observability
4+
5+
ApeRAG's observability entrypoint is `aperag.observability`.
6+
7+
- Default mode is `APERAG_OBSERVABILITY_MODE=local`: no extra observability service is required.
8+
- Logs should stay structured JSON and include trace/span correlation fields.
9+
- Export telemetry through OTLP only (`OTEL_EXPORTER_OTLP_ENDPOINT`) when a deployment needs a backend or collector.
10+
- Do not add backend-specific exporters or deployment profiles for tracing systems.
11+
- Do not log prompts, document bodies, API keys, cookies, authorization headers, database passwords, or raw LLM responses.
12+
- New business instrumentation should use stable low-cardinality names and attributes.
13+
14+
Read the full design before changing observability behavior:
15+
16+
- `docs/zh-CN/deployment/observability.md`

Makefile

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,8 @@ db-check:
123123
# make stack-up # Full application
124124
# make stack-up WITH_NEO4J=1 # Full application + Neo4j
125125
# make stack-up WITH_NEBULA=1 # Full application + Nebula Graph
126-
# make stack-up WITH_JAEGER=1 # Full application + Jaeger
127-
# make stack-up WITH_JAEGER=1 WITH_NEO4J=1 # Full application + Jaeger + Neo4j
128126
# make infra-up # Infrastructure only (databases)
129127
# make infra-up WITH_NEO4J=1 # Infrastructure + Neo4j
130-
# make infra-up WITH_JAEGER=1 # Infrastructure + Jaeger
131128
# make stack-down # Stop all services
132129
# make stack-down REMOVE_VOLUMES=1 # Stop and remove volumes
133130
_PROFILES_TO_ACTIVATE :=
@@ -143,10 +140,6 @@ ifeq ($(WITH_NEBULA),1)
143140
_PROFILES_TO_ACTIVATE += --profile nebula
144141
endif
145142

146-
ifeq ($(WITH_JAEGER),1)
147-
_PROFILES_TO_ACTIVATE += --profile jaeger
148-
endif
149-
150143
# Determine flags for 'compose-down'
151144
ifeq ($(REMOVE_VOLUMES),1)
152145
_COMPOSE_DOWN_FLAGS += -v
@@ -158,18 +151,17 @@ stack-up:
158151
$(_EXTRA_ENVS) docker-compose $(_PROFILES_TO_ACTIVATE) -f docker-compose.yml up -d
159152

160153
# Infrastructure only (databases + supporting services)
161-
# Optional services like Neo4j, Nebula, and Jaeger will ONLY start if explicitly enabled:
154+
# Optional services like Neo4j and Nebula will ONLY start if explicitly enabled:
162155
# make infra-up WITH_NEO4J=1 # adds Neo4j
163156
# make infra-up WITH_NEBULA=1 # adds Nebula Graph
164-
# make infra-up WITH_JAEGER=1 # adds Jaeger
165157
infra-up:
166158
docker-compose $(_PROFILES_TO_ACTIVATE) -f docker-compose.yml up -d \
167-
postgres redis qdrant es jaeger \
159+
postgres redis qdrant es \
168160
$(if $(filter 1,$(WITH_NEO4J)),neo4j,) \
169161
$(if $(filter 1,$(WITH_NEBULA)),nebula-metad nebula-storaged nebula-graphd nebula-storage-activator,)
170162

171163
stack-down:
172-
docker-compose --profile neo4j --profile nebula --profile jaeger -f docker-compose.yml down $(_COMPOSE_DOWN_FLAGS)
164+
docker-compose --profile neo4j --profile nebula -f docker-compose.yml down $(_COMPOSE_DOWN_FLAGS)
173165

174166
stack-logs:
175167
docker-compose -f docker-compose.yml logs -f

README-zh.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ ApeRAG 是你构建自己的知识图谱、进行上下文工程以及部署能
1515
- [Kubernetes 部署(推荐生产环境)](#kubernetes-部署推荐生产环境)
1616
- [开发指南](./docs/zh-CN/development-guide.md)
1717
- [构建 Docker 镜像](./docs/zh-CN/build-docker-image.md)
18+
- [可观测性设计](./docs/zh-CN/deployment/observability.md)
1819
- [致谢](#致谢)
1920
- [许可证](#许可证)
2021

aperag/app.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# ruff: noqa: E402
12
# Copyright 2025 ApeCloud, Inc.
23
#
34
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,23 +14,22 @@
1314
# limitations under the License.
1415

1516
from aperag.config import settings
17+
from aperag.observability import (
18+
bind_observability_context,
19+
build_observability_config,
20+
configure_fastapi,
21+
configure_logging,
22+
configure_process_observability,
23+
reset_observability_context,
24+
)
25+
from aperag.observability.tracing import inject_carrier
1626

17-
# Initialize OpenTelemetry FIRST - before any other imports
18-
from aperag.trace import init_tracing
19-
20-
# Initialize tracing with configuration
21-
if settings.otel_enabled:
22-
init_tracing(
23-
service_name=settings.otel_service_name,
24-
service_version=settings.otel_service_version,
25-
jaeger_endpoint=settings.jaeger_endpoint if settings.jaeger_enabled else None,
26-
enable_console=settings.otel_console_enabled,
27-
enable_fastapi=settings.otel_fastapi_enabled,
28-
enable_sqlalchemy=settings.otel_sqlalchemy_enabled,
29-
enable_mcp=settings.otel_mcp_enabled,
30-
)
27+
observability_config = build_observability_config(settings)
28+
configure_logging(observability_config)
29+
configure_process_observability(observability_config)
3130

3231
from fastapi import FastAPI # noqa: E402
32+
from starlette.middleware.base import BaseHTTPMiddleware # noqa: E402
3333

3434
from aperag.domains.agent_runtime.api.routes import router as agent_runtime_router
3535
from aperag.domains.agent_runtime.runtime import set_prompt_template_ops as _ar_set_prompt_template_ops
@@ -219,6 +219,26 @@ async def combined_lifespan(app: FastAPI):
219219
generate_unique_id_function=custom_generate_unique_id,
220220
)
221221

222+
223+
class ObservabilityContextMiddleware(BaseHTTPMiddleware):
224+
async def dispatch(self, request, call_next):
225+
request_id = request.headers.get("x-request-id") or request.headers.get("x-correlation-id")
226+
tokens = bind_observability_context(request_id=request_id)
227+
try:
228+
response = await call_next(request)
229+
if request_id:
230+
response.headers["x-request-id"] = request_id
231+
trace_headers = inject_carrier({})
232+
if "traceparent" in trace_headers:
233+
response.headers["traceparent"] = trace_headers["traceparent"]
234+
return response
235+
finally:
236+
reset_observability_context(tokens)
237+
238+
239+
app.add_middleware(ObservabilityContextMiddleware)
240+
configure_fastapi(app, observability_config)
241+
222242
# Register global exception handlers
223243
register_exception_handlers(app)
224244

aperag/config.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,20 @@ class Config(BaseSettings):
196196
cache_enabled: bool = Field(True, alias="CACHE_ENABLED")
197197
cache_ttl: int = Field(86400, alias="CACHE_TTL")
198198

199-
# OpenTelemetry/Jaeger Tracing
200-
otel_enabled: bool = Field(True, alias="OTEL_ENABLED")
199+
# Observability
200+
aperag_observability_mode: str = Field("local", alias="APERAG_OBSERVABILITY_MODE")
201+
aperag_observability_log_format: str = Field("json", alias="APERAG_OBSERVABILITY_LOG_FORMAT")
202+
aperag_observability_capture_content: bool = Field(False, alias="APERAG_OBSERVABILITY_CAPTURE_CONTENT")
203+
aperag_observability_sample_ratio: float = Field(1.0, alias="APERAG_OBSERVABILITY_SAMPLE_RATIO")
204+
otel_enabled: Optional[str] = Field(None, alias="OTEL_ENABLED")
201205
otel_service_name: str = Field("aperag", alias="OTEL_SERVICE_NAME")
202206
otel_service_version: str = Field("1.0.0", alias="OTEL_SERVICE_VERSION")
203-
jaeger_enabled: bool = Field(False, alias="JAEGER_ENABLED")
204-
jaeger_endpoint: Optional[str] = Field(None, alias="JAEGER_ENDPOINT")
207+
otel_environment: str = Field("development", alias="OTEL_ENVIRONMENT")
208+
otel_resource_attributes: Optional[str] = Field(None, alias="OTEL_RESOURCE_ATTRIBUTES")
209+
otel_exporter_otlp_endpoint: Optional[str] = Field(None, alias="OTEL_EXPORTER_OTLP_ENDPOINT")
210+
otel_exporter_otlp_headers: Optional[str] = Field(None, alias="OTEL_EXPORTER_OTLP_HEADERS")
211+
otel_exporter_otlp_protocol: str = Field("http/protobuf", alias="OTEL_EXPORTER_OTLP_PROTOCOL")
212+
205213
otel_console_enabled: bool = Field(False, alias="OTEL_CONSOLE_ENABLED")
206214
otel_fastapi_enabled: bool = Field(True, alias="OTEL_FASTAPI_ENABLED")
207215
otel_sqlalchemy_enabled: bool = Field(True, alias="OTEL_SQLALCHEMY_ENABLED")

aperag/domains/retrieval/pipeline.py

Lines changed: 88 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
ProviderNotFoundError,
5151
RerankError,
5252
)
53+
from aperag.observability import start_span
5354
from aperag.platform.query.query import DocumentWithScore
5455
from aperag.schema.utils import parseCollectionConfig
5556
from aperag.utils.utils import generate_fulltext_index_name, generate_vector_db_collection_name
@@ -142,92 +143,103 @@ async def execute_search(
142143
search_user_id: str,
143144
chat_id: Optional[str] = None,
144145
) -> Tuple[List[SearchResultItem], str]:
145-
query = (data.query or "").strip()
146-
if not query:
147-
raise ValidationException("query is required")
148-
149-
recall_tasks = []
150-
collection = await async_db_ops.query_collection(search_user_id, collection_id)
151-
if not collection:
152-
raise ValidationException(f"collection not found: {collection_id}")
153-
154-
if data.vector_search:
155-
recall_tasks.append(
156-
self._vector_search(
157-
collection=collection,
158-
query=query,
159-
top_k=data.vector_search.topk,
160-
similarity_threshold=data.vector_search.similarity,
161-
chat_id=chat_id,
146+
with start_span(
147+
"retrieval.search",
148+
tracer_name=__name__,
149+
**{
150+
"aperag.domain": "retrieval",
151+
"aperag.operation": "retrieval.search",
152+
"aperag.collection.id": collection_id,
153+
"aperag.user.id": search_user_id,
154+
"aperag.chat.id": chat_id,
155+
},
156+
):
157+
query = (data.query or "").strip()
158+
if not query:
159+
raise ValidationException("query is required")
160+
161+
recall_tasks = []
162+
collection = await async_db_ops.query_collection(search_user_id, collection_id)
163+
if not collection:
164+
raise ValidationException(f"collection not found: {collection_id}")
165+
166+
if data.vector_search:
167+
recall_tasks.append(
168+
self._vector_search(
169+
collection=collection,
170+
query=query,
171+
top_k=data.vector_search.topk,
172+
similarity_threshold=data.vector_search.similarity,
173+
chat_id=chat_id,
174+
)
162175
)
163-
)
164-
if data.fulltext_search:
165-
recall_tasks.append(
166-
self._fulltext_search(
167-
collection=collection,
168-
query=query,
169-
top_k=data.fulltext_search.topk,
170-
keywords=data.fulltext_search.keywords,
171-
user_id=search_user_id,
172-
chat_id=chat_id,
176+
if data.fulltext_search:
177+
recall_tasks.append(
178+
self._fulltext_search(
179+
collection=collection,
180+
query=query,
181+
top_k=data.fulltext_search.topk,
182+
keywords=data.fulltext_search.keywords,
183+
user_id=search_user_id,
184+
chat_id=chat_id,
185+
)
173186
)
174-
)
175-
if data.graph_search:
176-
recall_tasks.append(
177-
self._graph_search(
178-
collection=collection,
179-
query=query,
180-
top_k=data.graph_search.topk,
187+
if data.graph_search:
188+
recall_tasks.append(
189+
self._graph_search(
190+
collection=collection,
191+
query=query,
192+
top_k=data.graph_search.topk,
193+
)
181194
)
182-
)
183-
if data.summary_search:
184-
recall_tasks.append(
185-
self._summary_search(
186-
collection=collection,
187-
query=query,
188-
top_k=data.summary_search.topk,
189-
similarity_threshold=data.summary_search.similarity,
195+
if data.summary_search:
196+
recall_tasks.append(
197+
self._summary_search(
198+
collection=collection,
199+
query=query,
200+
top_k=data.summary_search.topk,
201+
similarity_threshold=data.summary_search.similarity,
202+
)
190203
)
191-
)
192-
if data.vision_search:
193-
recall_tasks.append(
194-
self._vision_search(
195-
collection=collection,
196-
query=query,
197-
top_k=data.vision_search.topk,
198-
similarity_threshold=data.vision_search.similarity,
204+
if data.vision_search:
205+
recall_tasks.append(
206+
self._vision_search(
207+
collection=collection,
208+
query=query,
209+
top_k=data.vision_search.topk,
210+
similarity_threshold=data.vision_search.similarity,
211+
)
199212
)
200-
)
201213

202-
if not recall_tasks:
203-
raise ValidationException("At least one search strategy must be enabled")
214+
if not recall_tasks:
215+
raise ValidationException("At least one search strategy must be enabled")
204216

205-
recall_results = await asyncio.gather(*recall_tasks)
206-
merged_docs = self._merge_results(recall_results)
207-
reranked_docs = await self._rerank(
208-
query=query,
209-
docs=merged_docs,
210-
user_id=search_user_id,
211-
use_rerank=bool(data.rerank),
212-
)
217+
recall_results = await asyncio.gather(*recall_tasks)
218+
merged_docs = self._merge_results(recall_results)
219+
reranked_docs = await self._rerank(
220+
query=query,
221+
docs=merged_docs,
222+
user_id=search_user_id,
223+
use_rerank=bool(data.rerank),
224+
)
213225

214-
items = []
215-
for idx, doc in enumerate(reranked_docs):
216-
metadata = doc.metadata or {}
217-
public_metadata = SearchResultMetadata.from_raw(metadata)
218-
source = public_metadata.source if public_metadata and public_metadata.source else ""
219-
items.append(
220-
SearchResultItem(
221-
rank=idx + 1,
222-
score=doc.score,
223-
content=doc.text,
224-
source=source,
225-
recall_type=metadata.get("recall_type", ""),
226-
metadata=public_metadata,
226+
items = []
227+
for idx, doc in enumerate(reranked_docs):
228+
metadata = doc.metadata or {}
229+
public_metadata = SearchResultMetadata.from_raw(metadata)
230+
source = public_metadata.source if public_metadata and public_metadata.source else ""
231+
items.append(
232+
SearchResultItem(
233+
rank=idx + 1,
234+
score=doc.score,
235+
content=doc.text,
236+
source=source,
237+
recall_type=metadata.get("recall_type", ""),
238+
metadata=public_metadata,
239+
)
227240
)
228-
)
229241

230-
return items, "rerank"
242+
return items, "rerank"
231243

232244
async def _vector_search(
233245
self,

0 commit comments

Comments
 (0)