From 2737a8afe8587cfc73892f014548118447a91a77 Mon Sep 17 00:00:00 2001 From: earayu Date: Sun, 26 Apr 2026 02:13:44 +0800 Subject: [PATCH] feat(phase8 #92 D8.5-BE): canonical UIMessage chat history + runtime_kind discriminator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 8 task #92 (D8.5-BE) — first-cut backend migration of the non-agent chat path to the canonical ``UIMessage`` shape, scoped per architect msg=01918929 + Weston msg=df87fe24 + earayu2 msg=f20d5034 hard-cut acceptance: The inventory revealed the production "non-agent chat path" the original D8.5 design assumed has already converged on the agent runtime (``chat_completion_service.openai_chat_completions`` already delegates to ``runtime_manager.turn_service.create_or_get_turn`` and ``ChatService.create_chat`` rejects non-AGENT bots). So the actual #92 work is A+B+C only — adding the discriminator column for future non-agent paths and migrating the user-visible chat history shape to canonical UIMessage. The translator extension (``chat.text.delta`` / ``chat.completed``) and the ``StoredChatMessagePart`` / ``RedisChatMessageHistory`` deletion are deferred per architect / Weston canonical lock. Changes: A. ``runtime_kind`` discriminator on ``agent_message`` table - ``aperag/domains/agent_runtime/db/models.py``: new ``runtime_kind: str`` ORM column with values ``agent_runtime`` / ``direct_chat`` / ``rag_chat`` (mutually exclusive enum); existing rows backfill via ``server_default="agent_runtime"``. ``role`` keeps speaker semantics independent of the runtime that produced the message. - ``aperag/migration/versions/...c8f2d34a51e7_add_agent_message_runtime_kind.py``: additive migration; downgrade drops the column. B. ``ChatService._build_v3_chat_history`` rewrite - Returns ``list[AgentTurnSnapshot]`` (one snapshot per assistant turn) instead of the legacy ``list[list[ChatMessage]]`` shape. - Reuses ``snapshot_assembler.assemble_parts_from_artifacts`` (the #90 D8.4d projection) so historical turns expose the same ``UIMessagePart`` shape the FE consumes from the live SSE stream (D8 §2 wire/at-rest byte-equal). - ``error_text`` for FAILED / CANCELLED turns surfaces an ``error_summary`` artifact's message, falling back to ``turn.error_message`` — mirrors the snapshot endpoint contract. - The turn's user query lives at ``input_text`` on the snapshot envelope (rather than as a separate ``role=human`` ChatMessage) so the FE renders user/assistant from a single object per turn. - Legacy ``_extract_artifact_text`` / ``_extract_references`` / ``_map_reference_item`` / ``_artifact_type_value`` / ``_coerce_timestamp`` helpers are retired alongside the legacy shape. C. ``ChatDetails.history`` schema - ``aperag/domains/conversation/schemas.py``: ``history`` is now ``Optional[list[AgentTurnSnapshot]]`` with explicit description citing D8 §2 byte-equal canonical and the new shape. - The ``conversation.schemas`` ↔ ``agent_runtime.uimessage`` ↔ ``agent_runtime.schemas`` ↔ ``conversation.schemas`` cycle is broken via ``TYPE_CHECKING`` import + a module-level ``ChatDetails.model_rebuild()`` hook at the bottom of ``conversation/schemas.py``. Pydantic resolves the forward ref at load time so the OpenAPI schema is fully populated. - ``aperag/domains/agent_runtime/uimessage.py``: ``AgentTurnSnapshot`` gains ``runtime_kind: RuntimeKind`` (default ``"agent_runtime"``) and ``input_text: Optional[str]`` so historical turns can render the user query without a separate envelope round-trip. - ``TurnService.get_turn_snapshot`` writes both new fields on the live snapshot endpoint so live and historical reload paths match. D. (deferred) Translator extension for ``chat.text.delta`` / ``chat.completed`` and ``StoredChatMessagePart`` / ``RedisChatMessageHistory`` deletion stay out of #92 per Weston msg=df87fe24 / PM msg=01918929. The non-agent live path the extension would have served does not exist in the current codebase; reintroducing it is a feature task, not a refactor. Tests: - ``tests/unit_test/chat/test_chat_service.py`` rewritten: * ``test_get_chat_returns_canonical_uimessage_history`` pins the new shape (snapshot per turn with text + source-url + data-citation parts, runtime_kind, input_text) * ``test_get_chat_history_surfaces_error_text_for_failed_turn`` pins the error_text contract for FAILED turns * ``test_get_chat_history_does_not_expose_legacy_chatmessage_shape`` regression-guard against revert to ``list[list[ChatMessage]]`` - ``tests/unit_test/agent_runtime/test_agent_runtime_v3.py`` updated to import ``AgentTurnSnapshot`` from ``agent_runtime.uimessage`` (the back-compat re-export through ``agent_runtime.schemas`` was retired to break the new cycle). Per D10 §G hard gate 1 (comprehensive grep sweep) ran across ``aperag/`` + ``tests/unit_test/`` + ``tests/e2e_http/hurl/`` + ``tests/e2e_http/scripts/``: only the FE ``web/src/components/chat/chat-messages.tsx`` reads ``chat.history`` in the old shape — that is the explicit hand-off seam for #93 huangheng (per architect msg=6e53a7c4). Gates: full unit suite 833 / 29 skip / 0 fail; ruff check + format clean. Co-Authored-By: Claude Opus 4.7 --- aperag/domains/agent_runtime/api/routes.py | 2 +- aperag/domains/agent_runtime/db/models.py | 10 +- aperag/domains/agent_runtime/schemas.py | 30 ++- aperag/domains/agent_runtime/services.py | 4 +- aperag/domains/agent_runtime/uimessage.py | 12 +- aperag/domains/conversation/schemas.py | 37 +++- .../conversation/service/chat_service.py | 186 ++++++------------ ...d34a51e7_add_agent_message_runtime_kind.py | 54 +++++ .../agent_runtime/test_agent_runtime_v3.py | 2 +- tests/unit_test/chat/test_chat_service.py | 132 +++++++++++-- 10 files changed, 312 insertions(+), 157 deletions(-) create mode 100644 aperag/migration/versions/20260426012000-c8f2d34a51e7_add_agent_message_runtime_kind.py diff --git a/aperag/domains/agent_runtime/api/routes.py b/aperag/domains/agent_runtime/api/routes.py index 9c234e1f1..ece47969e 100644 --- a/aperag/domains/agent_runtime/api/routes.py +++ b/aperag/domains/agent_runtime/api/routes.py @@ -43,7 +43,6 @@ from aperag.domains.agent_runtime.runtime import agent_runtime_manager as runtime_manager from aperag.domains.agent_runtime.schemas import ( AgentArtifactEnvelope, - AgentTurnSnapshot, CancelTurnResponse, CreateTurnRequest, CreateTurnResponse, @@ -51,6 +50,7 @@ from aperag.domains.agent_runtime.tools.consent import ConsentOwnershipError, ConsentService from aperag.domains.agent_runtime.tools.elicitation import ElicitationOwnershipError, ElicitationService from aperag.domains.agent_runtime.tools.lifecycle import translate_lifecycle_envelope +from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot from aperag.domains.agent_runtime.wire import ( StreamPart, TranslatorState, diff --git a/aperag/domains/agent_runtime/db/models.py b/aperag/domains/agent_runtime/db/models.py index fc85eeba3..62cda39b2 100644 --- a/aperag/domains/agent_runtime/db/models.py +++ b/aperag/domains/agent_runtime/db/models.py @@ -153,7 +153,7 @@ class AgentArtifact(Base): class AgentMessage(Base): - """At-rest UIMessage envelope (Phase 8 D8.2 first-cut). + """At-rest UIMessage envelope (Phase 8 D8.2 first-cut, D8.5-BE refined). One row per assistant turn (1:1 with ``AgentTurn`` for now via ``turn_id``). ``parts`` holds the JSON-serialised @@ -163,6 +163,13 @@ class AgentMessage(Base): is the runtime contract version tag so future renderer updates can branch without a separate negotiation. + ``runtime_kind`` (D8.5-BE / #92) tags the runtime that produced the + message — ``agent_runtime`` for the agent reasoning loop (D8.x + Phase A), and a forward-compat enum for direct LLM (``direct_chat``) + or RAG-only (``rag_chat``) paths. ``role`` retains its speaker + semantics independent of runtime origin per Weston msg=94dac98a / + architect canonical lock msg=e01e9b4b. + The legacy ``AgentArtifact`` / ``AgentTimelineEvent`` tables are retained alongside this table during D8.x rollout — they will be dropped in D8.6 (#80) once D8.4 FE renderer is consuming @@ -179,6 +186,7 @@ class AgentMessage(Base): turn_id = Column(String(24), nullable=False, index=True) chat_id = Column(String(24), nullable=False, index=True) role = Column(String(16), nullable=False) + runtime_kind = Column(String(24), nullable=False, default="agent_runtime", server_default="agent_runtime") schema_version = Column(String(64), nullable=False) parts = Column(JSON, default=lambda: [], nullable=False) gmt_created = Column(DateTime(timezone=True), default=utc_now, nullable=False) diff --git a/aperag/domains/agent_runtime/schemas.py b/aperag/domains/agent_runtime/schemas.py index d28e8b9a3..07497cc19 100644 --- a/aperag/domains/agent_runtime/schemas.py +++ b/aperag/domains/agent_runtime/schemas.py @@ -44,7 +44,6 @@ from pydantic import BaseModel, Field -from aperag.domains.conversation.schemas import File from aperag.domains.knowledge_base.schemas import Collection from aperag.schema.common import ModelSpec @@ -88,6 +87,18 @@ class UserActivityEnvelope(BaseModel): context: Optional[UserActivityContext] = None +# ``File`` is imported lazily here to break the cycle introduced by D8.5-BE +# (#92): ``conversation.schemas.ChatDetails.history`` now references +# ``AgentTurnSnapshot`` from :mod:`aperag.domains.agent_runtime.uimessage`, +# and ``uimessage`` in turn imports ``AGENT_RUNTIME_SCHEMA_VERSION`` and +# ``UserActivityEnvelope`` from this module. Importing ``File`` at the +# module top would close that cycle. By this point both symbols +# ``uimessage`` needs are already defined, so importing ``File`` here is +# safe and only the classes below (``CreateTurnRequest`` / +# ``AgentMessage``) actually depend on it. +from aperag.domains.conversation.schemas import File # noqa: E402 + + class AgentTurnEnvelope(BaseModel): schema_version: str = AGENT_RUNTIME_SCHEMA_VERSION turn_id: str @@ -176,14 +187,14 @@ class CreateTurnResponse(BaseModel): stream_url: str -# ``AgentTurnSnapshot`` is the canonical UIMessage at-rest envelope and -# lives in :mod:`aperag.domains.agent_runtime.uimessage` next to the -# other UIMessage classes. It is re-exported here so the existing -# ``from aperag.domains.agent_runtime.schemas import AgentTurnSnapshot`` -# import sites continue to work without a domain-internal hop. -from aperag.domains.agent_runtime.uimessage import ( # noqa: E402, F401 - AgentTurnSnapshot, -) +# ``AgentTurnSnapshot`` lives in :mod:`aperag.domains.agent_runtime.uimessage` +# next to the rest of the ``UIMessage`` family. The previous deferred +# re-export from this module was retired in D8.5-BE (#92) because it +# would close a fresh cycle between ``conversation.schemas`` (which +# now imports ``AgentTurnSnapshot`` directly to type ``ChatDetails.history``) +# and ``agent_runtime.schemas``. Existing call sites that still import +# from this module are migrated to import from +# ``aperag.domains.agent_runtime.uimessage`` directly. class CancelTurnResponse(BaseModel): @@ -236,7 +247,6 @@ class AgentMessage(BaseModel): "AgentMessage", "AgentTimelineEventEnvelope", "AgentTurnEnvelope", - "AgentTurnSnapshot", "CancelTurnResponse", "CreateTurnRequest", "CreateTurnResponse", diff --git a/aperag/domains/agent_runtime/services.py b/aperag/domains/agent_runtime/services.py index c43b3f253..3d677d87f 100644 --- a/aperag/domains/agent_runtime/services.py +++ b/aperag/domains/agent_runtime/services.py @@ -24,7 +24,6 @@ AgentArtifactEnvelope, AgentTimelineEventEnvelope, AgentTurnEnvelope, - AgentTurnSnapshot, CreateTurnRequest, ReferenceBundleItem, UserActivityContext, @@ -36,6 +35,7 @@ extract_error_text, ) from aperag.domains.agent_runtime.storage import AgentRuntimeRedisStore +from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot from aperag.domains.agent_runtime.uimessage_store import UIMessageStore from aperag.domains.conversation.db.models import BotType from aperag.domains.conversation.schemas import BotConfig @@ -489,6 +489,7 @@ async def get_turn_snapshot(self, user: str, chat_id: str, turn_id: str) -> Agen return AgentTurnSnapshot( turn_id=turn.id, chat_id=turn.chat_id, + runtime_kind="agent_runtime", status=status_str, parts=parts, error_text=error_text, @@ -497,6 +498,7 @@ async def get_turn_snapshot(self, user: str, chat_id: str, turn_id: str) -> Agen finished_at=turn.gmt_finished, created_at=turn.gmt_created, updated_at=turn.gmt_updated, + input_text=turn.input_text, ) @staticmethod diff --git a/aperag/domains/agent_runtime/uimessage.py b/aperag/domains/agent_runtime/uimessage.py index 6edf85144..0aebdd2c7 100644 --- a/aperag/domains/agent_runtime/uimessage.py +++ b/aperag/domains/agent_runtime/uimessage.py @@ -299,8 +299,11 @@ class UIMessage(BaseModel): # --------------------------------------------------------------------- +RuntimeKind = Literal["agent_runtime", "direct_chat", "rag_chat"] + + class AgentTurnSnapshot(BaseModel): - """Canonical UIMessage at-rest snapshot for an agent turn (Phase 8 D8.4d / #90). + """Canonical UIMessage at-rest snapshot for an agent turn (Phase 8 D8.4d / #90, D8.5-BE / #92). Replaces the legacy ``{turn, timeline, artifacts}`` snapshot shape with the ``UIMessage``-aligned canonical that the FE renderer @@ -310,6 +313,11 @@ class AgentTurnSnapshot(BaseModel): deserializes through the same ``UIMessagePart`` discriminated union the wire emits. + ``runtime_kind`` (D8.5-BE) tags the runtime that produced the + turn — ``agent_runtime`` for the agent reasoning loop, + ``direct_chat`` / ``rag_chat`` reserved for future paths. ``role`` + keeps speaker semantics independent of runtime kind. + ``parts`` is the persistable subset (transient ``data-activity`` is stripped before write per :func:`persistable_parts`); ``status`` mirrors the runtime ``AgentTurnStatus`` enum value; @@ -327,6 +335,8 @@ class AgentTurnSnapshot(BaseModel): schema_version: str = AGENT_RUNTIME_SCHEMA_VERSION turn_id: str chat_id: str + runtime_kind: RuntimeKind = "agent_runtime" + input_text: Optional[str] = None role: Literal["assistant"] = "assistant" status: str parts: list[UIMessagePart] = Field(default_factory=list) diff --git a/aperag/domains/conversation/schemas.py b/aperag/domains/conversation/schemas.py index e3a495612..5f08736f9 100644 --- a/aperag/domains/conversation/schemas.py +++ b/aperag/domains/conversation/schemas.py @@ -45,10 +45,19 @@ from __future__ import annotations from datetime import datetime -from typing import Any, Literal, Optional +from typing import TYPE_CHECKING, Any, Literal, Optional from pydantic import BaseModel, Field, conint +if TYPE_CHECKING: + # Type-only import to keep the OpenAPI schema for ``ChatDetails.history`` + # populated without forming a runtime cycle: ``agent_runtime.uimessage`` + # imports ``AGENT_RUNTIME_SCHEMA_VERSION`` / ``UserActivityEnvelope`` from + # ``agent_runtime.schemas``, which in turn imports ``File`` from this + # module. Pydantic resolves the forward reference via + # :func:`ChatDetails.model_rebuild` at the bottom of this file. + from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot + from aperag.domains.knowledge_base.schemas import Collection as KBCollectionSchema from aperag.schema.common import ModelSpec, PageResult, PaginatedResponse @@ -169,9 +178,16 @@ class ChatDetails(BaseModel): bot_id: Optional[str] = None peer_id: Optional[str] = None peer_type: Optional[Literal["system", "feishu", "weixin", "weixin_official", "web", "dingtalk"]] = None - history: Optional[list[list[ChatMessage]]] = Field( + history: Optional[list[AgentTurnSnapshot]] = Field( None, - description="Array of conversation turns, where each turn is an array of message parts", + description=( + "Phase 8 D8.5-BE (#92): historical conversation turns as canonical " + "``AgentTurnSnapshot`` envelopes — each turn carries the same " + "``UIMessagePart[]`` shape the FE consumes from the live SSE stream " + "(D8 §2 wire/at-rest byte-equal). Replaces the legacy " + "``list[list[ChatMessage]]`` shape; FE renders historical turns with " + "the same renderer used for live turns." + ), ) status: Optional[Literal["active", "archived"]] = None created: Optional[datetime] = None @@ -219,6 +235,21 @@ class Feedback(BaseModel): TurnFeedbackWrite = Feedback +# Phase 8 D8.5-BE (#92): resolve the forward reference to +# ``AgentTurnSnapshot`` after ``conversation.schemas`` finishes loading. +# A direct top-level import would form a cycle through +# ``agent_runtime.uimessage`` → ``agent_runtime.schemas`` → this module. +# The TYPE_CHECKING block at the top declares the import for static +# checkers / OpenAPI; this rebuild wires it up at runtime. +def _rebuild_chat_details() -> None: + from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot # noqa: F401 + + ChatDetails.model_rebuild() + + +_rebuild_chat_details() + + __all__ = [ "Agent", "Bot", diff --git a/aperag/domains/conversation/service/chat_service.py b/aperag/domains/conversation/service/chat_service.py index 2af721ab2..a1feacd8d 100644 --- a/aperag/domains/conversation/service/chat_service.py +++ b/aperag/domains/conversation/service/chat_service.py @@ -40,10 +40,15 @@ from sqlalchemy.ext.asyncio import AsyncSession from aperag.db.ops import AsyncDatabaseOps, async_db_ops -from aperag.domains.agent_runtime.db.models import AgentArtifactType, AgentTurnStatus +from aperag.domains.agent_runtime.db.models import AgentTurnStatus +from aperag.domains.agent_runtime.snapshot_assembler import ( + assemble_parts_from_artifacts, + extract_error_text, +) +from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot from aperag.domains.conversation.db.models import BotType, ChatStatus from aperag.domains.conversation.db.models import Chat as ChatRow -from aperag.domains.conversation.schemas import Chat, ChatDetails, ChatMessage, ChatUpdate, Reference +from aperag.domains.conversation.schemas import Chat, ChatDetails, ChatUpdate from aperag.exceptions import ChatNotFoundException, ResourceNotFoundException, ValidationException from aperag.utils.history import ( RedisChatMessageHistory, @@ -53,46 +58,6 @@ logger = logging.getLogger(__name__) -def _artifact_type_value(artifact) -> Optional[str]: - if not artifact: - return None - artifact_type = getattr(artifact, "artifact_type", None) - return artifact_type.value if hasattr(artifact_type, "value") else artifact_type - - -def _extract_artifact_text(artifact) -> str: - if not artifact or not isinstance(getattr(artifact, "payload", None), dict): - return "" - return artifact.payload.get("text") or artifact.payload.get("content") or artifact.payload.get("message") or "" - - -def _coerce_timestamp(value) -> Optional[float]: - return value.timestamp() if value else None - - -def _map_reference_item(item: dict) -> Reference: - return Reference( - score=item.get("score"), - text=item.get("snippet") or "", - metadata={ - **(item.get("metadata") or {}), - "title": item.get("title"), - "source_type": item.get("source_type"), - "source_id": item.get("source_id"), - "uri": item.get("uri"), - }, - ) - - -def _extract_references(artifact) -> list[Reference]: - if not artifact or not isinstance(getattr(artifact, "payload", None), dict): - return [] - items = artifact.payload.get("items") - if not isinstance(items, list): - return [] - return [_map_reference_item(item) for item in items if isinstance(item, dict)] - - class ChatService: """Chat service that handles business logic for chats""" @@ -114,97 +79,66 @@ def build_chat_response(self, chat: ChatRow) -> Chat: updated=chat.gmt_updated.isoformat(), ) - async def _build_v3_chat_history(self, user: str, chat_id: str) -> list[list[ChatMessage]]: + async def _build_v3_chat_history(self, user: str, chat_id: str) -> list[AgentTurnSnapshot]: + """Build the chat history as canonical ``AgentTurnSnapshot`` envelopes. + + Phase 8 D8.5-BE (#92) flips this from the legacy + ``list[list[ChatMessage]]`` shape to one ``AgentTurnSnapshot`` + per assistant turn. Each snapshot carries the same + ``UIMessagePart[]`` shape the FE consumes from the live SSE + stream, so historical and live turns render through a single + canonical path (D8 §2 wire/at-rest byte-equal). + + The user query is exposed at ``input_text`` on the snapshot + envelope rather than as a separate ``role=human`` ChatMessage + entry; the FE renders it from there. The assistant turn's + ``answer`` / ``reference_bundle`` / ``error_summary`` + artifacts are projected into ``parts`` via + :func:`assemble_parts_from_artifacts`, mirroring the snapshot + endpoint (#90 / D8.4d). FAILED / CANCELLED turns surface their + message via ``error_text`` (preferring an ``error_summary`` + artifact, falling back to ``turn.error_message``), again + mirroring the snapshot endpoint contract. + + Once the wire emitter starts populating ``agent_message.parts`` + directly (D8.6 / #80), this method can short-circuit to + :meth:`UIMessageStore.read` per-turn; until then the artifact + projection is the single source. ``runtime_kind`` is hardcoded + to ``agent_runtime`` here — non-agent runtimes will write + ``direct_chat`` / ``rag_chat`` rows directly via the future + non-agent write path and this method only needs to surface + them when they exist (a no-op until then per architect lock + msg=01918929). + """ + turns = await self.db_ops.query_agent_turns(user, chat_id) - history: list[list[ChatMessage]] = [] + history: list[AgentTurnSnapshot] = [] for turn in turns: - history.append( - [ - ChatMessage( - id=turn.id, - type="message", - role="human", - data=turn.input_text, - timestamp=_coerce_timestamp(turn.gmt_created), - ) - ] - ) - artifacts = await self.db_ops.query_agent_artifacts_by_turn(turn.id) - answer_artifact = ( - next((artifact for artifact in artifacts if artifact.id == turn.answer_artifact_id), None) - if turn.answer_artifact_id - else None - ) - if not answer_artifact: - answer_artifact = next( - ( - artifact - for artifact in artifacts - if _artifact_type_value(artifact) == AgentArtifactType.ANSWER.value - ), - None, - ) + parts = list(assemble_parts_from_artifacts(artifacts)) - reference_artifact = ( - next((artifact for artifact in artifacts if artifact.id == turn.reference_bundle_artifact_id), None) - if turn.reference_bundle_artifact_id - else None - ) - if not reference_artifact: - reference_artifact = next( - ( - artifact - for artifact in artifacts - if _artifact_type_value(artifact) == AgentArtifactType.REFERENCE_BUNDLE.value - ), - None, - ) + error_text: Optional[str] = None + if turn.status in {AgentTurnStatus.FAILED, AgentTurnStatus.CANCELLED}: + error_text = extract_error_text(artifacts) or turn.error_message - answer_text = _extract_artifact_text(answer_artifact) - if not answer_text and turn.status in { - AgentTurnStatus.FAILED, - AgentTurnStatus.CANCELLED, - }: - answer_text = turn.error_message or "" - - ai_parts: list[ChatMessage] = [] - if answer_text: - ai_parts.append( - ChatMessage( - id=turn.id, - type="message", - role="ai", - data=answer_text, - timestamp=_coerce_timestamp(turn.gmt_finished) or _coerce_timestamp(turn.gmt_created), - ) - ) - else: - ai_parts.append( - ChatMessage( - id=turn.id, - type="start", - role="ai", - data="", - timestamp=_coerce_timestamp(turn.gmt_created), - ) - ) - - references = _extract_references(reference_artifact) - if references: - ai_parts.append( - ChatMessage( - id=turn.id, - type="references", - role="ai", - data="", - references=references, - timestamp=_coerce_timestamp(turn.gmt_finished) or _coerce_timestamp(turn.gmt_created), - ) + status_value = turn.status.value if hasattr(turn.status, "value") else str(turn.status) + history.append( + AgentTurnSnapshot( + turn_id=turn.id, + chat_id=turn.chat_id, + runtime_kind="agent_runtime", + status=status_value, + parts=parts, + error_text=error_text, + timeline_cursor=turn.timeline_cursor or 0, + started_at=turn.gmt_started, + finished_at=turn.gmt_finished, + created_at=turn.gmt_created, + updated_at=turn.gmt_updated, + input_text=turn.input_text, ) - - history.append(ai_parts) + ) return history diff --git a/aperag/migration/versions/20260426012000-c8f2d34a51e7_add_agent_message_runtime_kind.py b/aperag/migration/versions/20260426012000-c8f2d34a51e7_add_agent_message_runtime_kind.py new file mode 100644 index 000000000..dafaf8c36 --- /dev/null +++ b/aperag/migration/versions/20260426012000-c8f2d34a51e7_add_agent_message_runtime_kind.py @@ -0,0 +1,54 @@ +"""add agent_message.runtime_kind discriminator (Phase 8 D8.5-BE / #92) + +Phase 8 task #92 — adds the ``runtime_kind`` column to ``agent_message`` +so a single canonical UIMessage table can host messages produced by +distinct runtimes (the agent reasoning loop today; future direct-LLM +and RAG-only chat paths). Per architect canonical lock msg=e01e9b4b ++ Weston msg=94dac98a, ``runtime_kind`` is a stable enum +(``agent_runtime`` / ``direct_chat`` / ``rag_chat``) and ``role`` +retains its ChatML speaker semantics. + +Existing rows (all produced by the agent runtime to date) are +backfilled to ``agent_runtime`` via the column ``server_default`` so +no data migration step is required. The column is non-null going +forward; SQLAlchemy ORM also defaults new rows to ``agent_runtime`` +when the writer doesn't supply a value (D8.5 keeps the agent runtime +write path unchanged). + +Per Phase 8 destructive philosophy + earayu2 msg=f20d5034 hard-cut +acceptance: no legacy chat-history table to drop in this migration — +the non-agent path's previous storage was Redis-only via +``RedisChatMessageHistory`` (kept until D8.6 / #80 cleanup post-soak +per Weston msg=5ec539c8). + +Revision ID: c8f2d34a51e7 +Revises: 84fac9e3d8c2 +Create Date: 2026-04-26 01:20:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "c8f2d34a51e7" +down_revision: Union[str, None] = "84fac9e3d8c2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "agent_message", + sa.Column( + "runtime_kind", + sa.String(length=24), + nullable=False, + server_default="agent_runtime", + ), + ) + + +def downgrade() -> None: + op.drop_column("agent_message", "runtime_kind") diff --git a/tests/unit_test/agent_runtime/test_agent_runtime_v3.py b/tests/unit_test/agent_runtime/test_agent_runtime_v3.py index a0abad531..0edd9d0a3 100644 --- a/tests/unit_test/agent_runtime/test_agent_runtime_v3.py +++ b/tests/unit_test/agent_runtime/test_agent_runtime_v3.py @@ -14,11 +14,11 @@ from aperag.domains.agent_runtime.schemas import ( AgentArtifactEnvelope, AgentTimelineEventEnvelope, - AgentTurnSnapshot, CreateTurnRequest, UserActivityIntent, ) from aperag.domains.agent_runtime.services import EventService, HistoryWriter, TurnService +from aperag.domains.agent_runtime.uimessage import AgentTurnSnapshot def _now(): diff --git a/tests/unit_test/chat/test_chat_service.py b/tests/unit_test/chat/test_chat_service.py index 6af9e5f1c..e6412ad84 100644 --- a/tests/unit_test/chat/test_chat_service.py +++ b/tests/unit_test/chat/test_chat_service.py @@ -1,9 +1,28 @@ +# Copyright 2025 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from datetime import datetime, timezone from types import SimpleNamespace import pytest from aperag.domains.agent_runtime.db.models import AgentTurnStatus +from aperag.domains.agent_runtime.uimessage import ( + DataCitationPart, + SourceUrlPart, + TextPart, +) from aperag.domains.conversation.service.chat_service import ChatService @@ -31,17 +50,22 @@ def __init__(self): answer_artifact_id="artifact-answer", reference_bundle_artifact_id="artifact-refs", error_message=None, + timeline_cursor=2, gmt_created=_now(), + gmt_started=_now(), gmt_finished=_now(), + gmt_updated=_now(), ) self.answer_artifact = SimpleNamespace( id="artifact-answer", artifact_type="answer", + summary="Here is the answer.", payload={"text": "Here is the answer."}, ) self.reference_artifact = SimpleNamespace( id="artifact-refs", artifact_type="reference_bundle", + summary="1 references", payload={ "items": [ { @@ -74,7 +98,15 @@ async def query_agent_artifacts_by_turn(self, turn_id): @pytest.mark.asyncio -async def test_get_chat_projects_v3_turn_history(): +async def test_get_chat_returns_canonical_uimessage_history(): + """Phase 8 D8.5-BE (#92): ``ChatDetails.history`` is now a list of + canonical ``AgentTurnSnapshot`` envelopes (one per assistant turn), + each carrying the same ``UIMessagePart`` shape the FE consumes from + the live SSE stream. Replaces the legacy + ``list[list[ChatMessage]]`` shape so historical and live turns + render through a single canonical path. + """ + service = ChatService() service.db_ops = _FakeChatDbOps() @@ -82,18 +114,92 @@ async def test_get_chat_projects_v3_turn_history(): assert chat.id == "chat-1" assert chat.history is not None - assert len(chat.history) == 2 + assert len(chat.history) == 1 + + snapshot = chat.history[0] + assert snapshot.turn_id == "turn-1" + assert snapshot.chat_id == "chat-1" + assert snapshot.runtime_kind == "agent_runtime" + assert snapshot.role == "assistant" + assert snapshot.status == "COMPLETED" + assert snapshot.input_text == "What changed?" + assert snapshot.error_text is None + assert snapshot.timeline_cursor == 2 + + types = [getattr(part, "type", None) for part in snapshot.parts] + assert types == ["text", "source-url", "data-citation"] + + assert isinstance(snapshot.parts[0], TextPart) + assert snapshot.parts[0].text == "Here is the answer." + + assert isinstance(snapshot.parts[1], SourceUrlPart) + assert snapshot.parts[1].source_id == "doc-a" + assert snapshot.parts[1].url == "https://example.com/doc-a" - user_group, ai_group = chat.history - assert user_group[0].role == "human" - assert user_group[0].data == "What changed?" + assert isinstance(snapshot.parts[2], DataCitationPart) + assert snapshot.parts[2].data.cited_text == "Reference snippet" + assert snapshot.parts[2].data.location.url == "https://example.com/doc-a" + + +@pytest.mark.asyncio +async def test_get_chat_history_surfaces_error_text_for_failed_turn(): + """A FAILED turn's error_text comes from ``error_summary`` artifact + when present, falling back to ``turn.error_message`` (mirrors the + snapshot endpoint contract from #90 D8.4d).""" + + service = ChatService() + db_ops = _FakeChatDbOps() + db_ops.turn = SimpleNamespace( + id="turn-failed", + chat_id="chat-1", + user="user-1", + input_text="Trigger failure", + status=AgentTurnStatus.FAILED, + answer_artifact_id=None, + reference_bundle_artifact_id=None, + error_message="upstream provider timeout", + timeline_cursor=1, + gmt_created=_now(), + gmt_started=_now(), + gmt_finished=_now(), + gmt_updated=_now(), + ) + db_ops.answer_artifact = None + db_ops.reference_artifact = None + + async def _empty_artifacts(turn_id): + return [] - assert ai_group[0].role == "ai" - assert ai_group[0].id == "turn-1" - assert ai_group[0].type == "message" - assert ai_group[0].data == "Here is the answer." + db_ops.query_agent_artifacts_by_turn = _empty_artifacts + service.db_ops = db_ops + + chat = await service.get_chat("user-1", "bot-1", "chat-1") + assert chat.history is not None + assert len(chat.history) == 1 + + snapshot = chat.history[0] + assert snapshot.status == "FAILED" + assert snapshot.error_text == "upstream provider timeout" + assert snapshot.parts == [] + + +@pytest.mark.asyncio +async def test_get_chat_history_does_not_expose_legacy_chatmessage_shape(): + """Regression-guard: ``ChatDetails.history`` must not regress to the + legacy ``list[list[ChatMessage]]`` shape (separate human / ai + groups per turn). Pre-#92 callers that read ``history[i][j].role`` + will fail loudly; that is intentional — they need to migrate to + the canonical snapshot shape. + """ + + service = ChatService() + service.db_ops = _FakeChatDbOps() + + chat = await service.get_chat("user-1", "bot-1", "chat-1") - assert ai_group[1].type == "references" - assert ai_group[1].references is not None - assert len(ai_group[1].references) == 1 - assert "feedback" not in ai_group[1].model_dump(exclude_none=True) + serialised = [snapshot.model_dump(mode="json") for snapshot in chat.history or []] + for entry in serialised: + assert "turn_id" in entry + assert "parts" in entry + assert "role" in entry # but role is a string, not a list of ChatMessages + assert isinstance(entry["role"], str)