Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions aperag/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,29 @@ async def combined_lifespan(app: FastAPI):
# — otherwise queue-backlog / failure-rate alerts on the
# collector side never receive data.
if settings.indexing_metrics_emitter.lower() == "otlp":
# Wave 5 P5B: cross-check that the broader observability
# mode is also OTLP-shaped — operators that flip
# ``INDEXING_METRICS_EMITTER=otlp`` without configuring
# the parent ``APERAG_OBSERVABILITY_MODE`` end up with
# an :class:`OTLPMetricsEmitter` whose underlying
# ``MeterProvider`` was never installed by
# ``aperag.observability.metrics.init_metrics_provider``.
# The samples then no-op silently — the same operator-
# visible failure mode we explicitly avoided when
# making ``noop`` the default.
obs_mode = (settings.aperag_observability_mode or "").lower()
if obs_mode not in ("otlp", "collector"):
import logging as _logging

_logging.getLogger(__name__).warning(
"INDEXING_METRICS_EMITTER=otlp but APERAG_OBSERVABILITY_MODE=%r "
"(expected 'otlp' or 'collector') — the OTLP MeterProvider "
"may not be installed, indexing SLI samples will silently "
"no-op. Set APERAG_OBSERVABILITY_MODE=otlp to enable real "
"OTLP export, OR revert INDEXING_METRICS_EMITTER=noop to "
"make the gap explicit.",
settings.aperag_observability_mode,
)
metrics_emitter = OTLPMetricsEmitter()
else:
metrics_emitter = NoopMetricsEmitter()
Expand Down
2 changes: 2 additions & 0 deletions aperag/db/repositories/llm_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async def create_model(
embedding_dimensions: int | None = None,
supports_vision: bool = False,
supports_tool_calling: bool = False,
supports_multimodal_embedding: bool = False,
extra: dict | None = None,
) -> Model:
async def _operation(session):
Expand All @@ -302,6 +303,7 @@ async def _operation(session):
embedding_dimensions=embedding_dimensions,
supports_vision=supports_vision,
supports_tool_calling=supports_tool_calling,
supports_multimodal_embedding=supports_multimodal_embedding,
extra=extra or {},
)
session.add(model)
Expand Down
4 changes: 3 additions & 1 deletion aperag/domains/knowledge_base/service/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,14 @@ async def _create_or_update_document_indexes(
return

parser_config = await _resolve_parser_config_for_collection(document.collection_id, session)
collection = await session.get(Collection, document.collection_id) if document.collection_id else None
from aperag.indexing.parse_orchestrator import resolve_tenant_scope_key

payload = ParseDispatchPayload(
document_id=document.id,
collection_id=document.collection_id,
object_path=object_path,
tenant_scope_key=f"user:{document.user}",
tenant_scope_key=resolve_tenant_scope_key(document=document, collection=collection),
modalities=tuple(m.value for m in index_types),
parser_config=parser_config,
purge_existing_triples=True,
Expand Down
58 changes: 7 additions & 51 deletions aperag/domains/knowledge_graph/graphindex/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from typing import Any, Awaitable, Optional

from aperag.config import async_engine, build_graph_db_context, settings
from aperag.db.ops import db_ops
from aperag.domains.knowledge_graph.graphindex.config import GraphIndexConfig
from aperag.domains.knowledge_graph.graphindex.dto import (
DeleteDocumentResult,
Expand All @@ -48,7 +47,13 @@
from aperag.domains.knowledge_graph.graphindex.service import GraphIndexService
from aperag.domains.knowledge_graph.graphindex.storage.connector import GraphStoreAdaptor
from aperag.domains.knowledge_graph.ports import CollectionRow
from aperag.schema.utils import parseCollectionConfig

# Wave 5 P1 chunk 1 (`aperag/indexing/llm.py` relocate per §K.9.1 item 3):
# the canonical home for ``build_collection_llm_callable`` is now
# :mod:`aperag.indexing.llm`. Re-exported here so legacy callers that
# still import from this module keep working during the Wave 5
# deprecation window.
from aperag.indexing.llm import build_collection_llm_callable # noqa: F401

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,55 +96,6 @@ def _build_store():
# ---------------------------------------------------------------------------


def build_collection_llm_callable(collection: CollectionRow):
"""Construct the ``LLMCall`` for a specific collection.

Reads the collection's completion config (provider, model, base_url,
api key from the user's provider record) and returns an async
function ``(prompt) -> str``. The function is closure-bound to the
config so multiple concurrent calls can share it safely without
serialising on a global LLM client.

The ``CompletionService`` is instantiated once per callable and
reused across every chunk extraction for the same document. Before
this, each per-chunk call rebuilt the client (litellm import, HTTP
session, etc.) — a meaningful overhead on high-chunk documents.
"""
config = parseCollectionConfig(collection.config)
if not config.completion or not config.completion.model_id:
raise RuntimeError(f"graphindex: completion model not configured (collection {collection.id})")
row = db_ops.query_model_runtime(config.completion.model_id, collection.user)
if not row:
raise RuntimeError(f"graphindex: model not found {config.completion.model_id!r} (collection {collection.id})")
model, account = row

# Local import: CompletionService pulls in litellm which is heavy at
# import time and we don't want to pay it just for ``import aperag.domains.knowledge_graph.graphindex``.
from aperag.llm.completion.completion_service import CompletionService
from aperag.llm.runtime.resolver import resolve_model_invocation_from_records

invocation = resolve_model_invocation_from_records(model=model, account=account)
provider = invocation.runner_config.get("provider")
if not provider:
provider = "openai" if invocation.runner_type == "openai_compatible" else invocation.provider_type

svc = CompletionService(
provider=provider,
model=invocation.provider_model_id,
base_url=invocation.base_url,
api_key=invocation.api_key,
temperature=0.0, # deterministic output for extraction
max_tokens=None,
caching=False,
)

async def _llm(prompt: str) -> str:
# No history, no images, no memory. Single-turn JSON request.
return await svc.agenerate(history=[], prompt=prompt, images=[], memory=False)

return _llm


def build_collection_embed_callables(collection: CollectionRow):
"""Build the ``embed_query`` and ``embed_texts`` callables for
vector-based entity/relation recall.
Expand Down
106 changes: 6 additions & 100 deletions aperag/domains/knowledge_graph/graphindex/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,106 +35,12 @@

from __future__ import annotations

ENTITY_RELATION_EXTRACTION: str = """\
You are an information-extraction assistant building a knowledge graph.

Read the TEXT below and return a single JSON object with two arrays:
``entities`` and ``relations``. Do not output anything outside the JSON
object.

**Rules**

1. Output language: {language}. Names keep their original script and
case (e.g. English names capitalized, Chinese names unchanged).
2. Use only these entity types: {entity_types}. If no provided type
fits, skip the entity rather than invent a new type.
3. Every entity needs a short, self-contained description in
{language}. Do not add information that isn't in the text.
4. Every relation must reference entities by the exact ``name`` you put
in the ``entities`` list. No self-loops (source != target).
5. ``weight`` is an integer 1-10 expressing how strongly the text
supports the relation; default to 5 when unsure.
6. Cap: at most {max_entities} entities and {max_relations} relations.
If the text contains more, prefer the most specific and the
most-mentioned.
7. If the text has no extractable entities, return
``{{"entities": [], "relations": []}}``.

**JSON schema**

```
{{
"entities": [
{{"name": "<string>", "type": "<one of the allowed types>",
"description": "<string>"}}
],
"relations": [
{{"source": "<entity name>", "target": "<entity name>",
"description": "<string>", "weight": <int 1-10>}}
]
}}
```

**Example** (English, types=[person, organization]):

Text:
```
Alice, a researcher at Acme Labs, collaborated with Bob on the project.
```

Output:
```
{{
"entities": [
{{"name": "Alice", "type": "person",
"description": "A researcher at Acme Labs."}},
{{"name": "Bob", "type": "person",
"description": "A collaborator on the project."}},
{{"name": "Acme Labs", "type": "organization",
"description": "Research organization where Alice works."}}
],
"relations": [
{{"source": "Alice", "target": "Bob",
"description": "Alice and Bob collaborated on a project.",
"weight": 7}},
{{"source": "Alice", "target": "Acme Labs",
"description": "Alice is a researcher employed by Acme Labs.",
"weight": 8}}
]
}}
```

---

TEXT:
```
{input_text}
```

Output (JSON only):"""


def render_extraction_prompt(
*,
input_text: str,
entity_types: list[str] | tuple[str, ...],
language: str,
max_entities: int,
max_relations: int,
) -> str:
"""Fill in the extraction prompt template for one chunk batch.

Kept as a simple function so tests can snapshot the exact rendered
string and catch accidental prompt regressions.
"""
return ENTITY_RELATION_EXTRACTION.format(
input_text=input_text,
entity_types=", ".join(entity_types),
language=language,
max_entities=max_entities,
max_relations=max_relations,
)

# Wave 5 P1 chunk 1 (`aperag/indexing/llm.py` relocate per §K.9.1 item 3):
# ``ENTITY_RELATION_EXTRACTION`` + ``render_extraction_prompt`` 已 relocate
# 到 :mod:`aperag.indexing.llm`. 这里 re-export 保持 legacy callers 跨
# Wave 5 deprecation window 不破。一旦 retrieval/curation Phase 1
# close-out,整个 legacy module 删除。
from aperag.indexing.llm import ENTITY_RELATION_EXTRACTION, render_extraction_prompt # noqa: F401

DESCRIPTION_SUMMARIZATION: str = """\
You are consolidating a knowledge-graph entry.
Expand Down
5 changes: 5 additions & 0 deletions aperag/domains/model_platform/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ class Model(Base):
embedding_dimensions = Column(Integer, nullable=True)
supports_vision = Column(Boolean, default=False, nullable=False)
supports_tool_calling = Column(Boolean, default=False, nullable=False)
# Wave 5 P2 chunk 3: typed capability flag for embedding models
# that accept image bytes; surfaced through the v3 model API +
# consumed by ``base_embedding.get_embedding_service`` to set
# ``EmbeddingService.multimodal=True``.
supports_multimodal_embedding = Column(Boolean, default=False, nullable=False)
status = Column(_enum_column(ModelStatus), default=ModelStatus.ACTIVE.value, nullable=False, index=True)
extra = Column(JSON, default=lambda: {}, nullable=False)
gmt_created = Column(DateTime(timezone=True), default=utc_now, nullable=False)
Expand Down
11 changes: 11 additions & 0 deletions aperag/domains/model_platform/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ class Model(BaseModel):
embedding_dimensions: Optional[int] = None
supports_vision: bool = False
supports_tool_calling: bool = False
# Wave 5 P2 chunk 3 (per §G.2.5.1 spec amend item 3): a typed
# capability flag for embedding models that accept image bytes
# (CLIP / Voyage Multimodal / Jina v3 / OpenAI multimodal
# embeddings / etc.) and produce a single vector. Distinct from
# ``supports_vision`` which describes chat/completion models that
# accept images as input. Drives the chunk 4b vision gate's
# ``EmbeddingService.is_multimodal()`` runtime check — flip on the
# collection's embedder spec model and the gate self-disables.
supports_multimodal_embedding: bool = False
status: Literal["ACTIVE", "INACTIVE"] = "ACTIVE"
extra: dict[str, Any] = Field(default_factory=dict)
created: Optional[datetime] = None
Expand All @@ -125,6 +134,7 @@ class ModelCreate(BaseModel):
embedding_dimensions: Optional[int] = None
supports_vision: bool = False
supports_tool_calling: bool = False
supports_multimodal_embedding: bool = False
extra: dict[str, Any] = Field(default_factory=dict)


Expand All @@ -139,6 +149,7 @@ class ModelUpdate(BaseModel):
embedding_dimensions: Optional[int] = None
supports_vision: Optional[bool] = None
supports_tool_calling: Optional[bool] = None
supports_multimodal_embedding: Optional[bool] = None
status: Optional[Literal["ACTIVE", "INACTIVE"]] = None
extra: Optional[dict[str, Any]] = None

Expand Down
1 change: 1 addition & 0 deletions aperag/domains/model_platform/service/model_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _model_to_schema(model) -> Model:
embedding_dimensions=model.embedding_dimensions,
supports_vision=model.supports_vision,
supports_tool_calling=model.supports_tool_calling,
supports_multimodal_embedding=getattr(model, "supports_multimodal_embedding", False) or False,
status=model.status,
extra=model.extra or {},
created=model.gmt_created,
Expand Down
6 changes: 5 additions & 1 deletion aperag/indexing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,11 @@
HEARTBEAT_STALE_SECONDS,
RECONCILE_BATCH_SIZE,
RECONCILE_INTERVAL_SECONDS,
STUCK_PARSE_COOLDOWN_SECONDS,
reconcile_failed_retry,
reconcile_pending_dispatch,
reconcile_running_reclaim,
reconcile_stuck_documents_for_parse_reenqueue,
run_reconcile_loop,
)
from aperag.indexing.summary import (
Expand Down Expand Up @@ -276,13 +278,15 @@
"process_one_parse_task",
"run_parse_worker",
"run_parse_worker_loop",
# Reconciler (T2.1)
# Reconciler (T2.1 + Wave 5 P4 stuck-parse re-enqueue)
"RECONCILE_INTERVAL_SECONDS",
"RECONCILE_BATCH_SIZE",
"HEARTBEAT_STALE_SECONDS",
"STUCK_PARSE_COOLDOWN_SECONDS",
"reconcile_pending_dispatch",
"reconcile_failed_retry",
"reconcile_running_reclaim",
"reconcile_stuck_documents_for_parse_reenqueue",
"run_reconcile_loop",
# Cleanup (T2.1 + T3.1 path C)
"CLEANUP_INTERVAL_SECONDS",
Expand Down
Loading
Loading