Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 1 addition & 53 deletions aperag/db/repositories/agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
from sqlalchemy import select

from aperag.db.repositories.base import AsyncRepositoryProtocol
from aperag.domains.agent_runtime.db.models import (
AgentTimelineEvent,
AgentTurn,
)
from aperag.domains.agent_runtime.db.models import AgentTurn
from aperag.utils.utils import utc_now


Expand Down Expand Up @@ -132,52 +129,3 @@ async def _operation(session):
return instance

return await self.execute_with_transaction(_operation)

async def create_agent_timeline_event(
self,
*,
turn_id: str,
sequence: int,
timestamp,
event_type: str,
label: Optional[str],
status: Optional[str],
actor,
data: dict,
) -> AgentTimelineEvent:
async def _operation(session):
instance = AgentTimelineEvent(
turn_id=turn_id,
sequence=sequence,
timestamp=timestamp,
type=event_type,
label=label,
status=status,
actor=actor,
data=data,
)
session.add(instance)
await session.flush()
await session.refresh(instance)
return instance

return await self.execute_with_transaction(_operation)

async def query_agent_timeline_events(
self,
turn_id: str,
*,
after_sequence: int = 0,
limit: int = 500,
) -> list[AgentTimelineEvent]:
async def _query(session):
stmt = (
select(AgentTimelineEvent)
.where(AgentTimelineEvent.turn_id == turn_id, AgentTimelineEvent.sequence > after_sequence)
.order_by(AgentTimelineEvent.sequence.asc())
.limit(limit)
)
result = await session.execute(stmt)
return result.scalars().all()

return await self._execute_query(_query)
59 changes: 23 additions & 36 deletions aperag/domains/agent_runtime/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@

"""Agent-runtime-domain SQLAlchemy models.

Owns ``AgentTurn`` + ``AgentTimelineEvent`` + ``AgentMessage`` plus
``AgentTurnStatus`` / ``AgentEventActor`` enums. Moved here from
``aperag.db.models`` in Phase 5 step 5-S2b; the legacy aggregate
module retains a re-export shim until Phase 6 cleanup.

Phase 8 D8.6 (#80) chunk-2 hard-cut removed the legacy
``AgentArtifact`` table and the ``AgentArtifactType`` enum — at-rest
answer/citation persistence is now canonical via ``AgentMessage``
(D8.2 #74) populated by the runtime emit path.

Tables key on ``turn_id`` (events + messages) or ``chat_id`` (turn);
these are opaque string FKs, so no Python-level cross-domain import
is required. ``chat_id`` is logically owned by the ``conversation``
Owns ``AgentTurn`` + ``AgentMessage`` plus ``AgentTurnStatus`` /
``AgentEventActor`` enums. Moved here from ``aperag.db.models`` in
Phase 5 step 5-S2b; the legacy aggregate module retains a re-export
shim until Phase 6 cleanup.

Phase 8 D8.6 (#80) hard-cut history:

* chunk-2 dropped the legacy ``AgentArtifact`` table and the
``AgentArtifactType`` enum — at-rest answer/citation persistence
is now canonical via ``AgentMessage`` (D8.2 #74), populated by the
runtime emit path's ``UIMessageStore.write`` call at end-of-turn.
* chunk-3 dropped the ``AgentTimelineEvent`` table — wire-emitter
envelopes are now ephemeral (live SSE + Redis cache for
reconnect-within-TTL only). Reload outside the TTL has no
historical events to replay; the canonical at-rest authority is
``AgentMessage``.

Tables key on ``turn_id`` (messages) or ``chat_id`` (turn); these
are opaque string FKs, so no Python-level cross-domain import is
required. ``chat_id`` is logically owned by the ``conversation``
domain but the relationship is traversed at query time via the turn
service, not via ORM relationship() bindings.
"""
Expand Down Expand Up @@ -112,25 +119,6 @@ class AgentTurn(Base):
gmt_updated = Column(DateTime(timezone=True), default=utc_now, nullable=False)


class AgentTimelineEvent(Base):
__tablename__ = "agent_timeline_event"
__table_args__ = (
UniqueConstraint("turn_id", "sequence", name="uq_agent_timeline_event_turn_sequence"),
Index("idx_agent_timeline_event_turn_timestamp", "turn_id", "timestamp"),
)

id = Column(String(24), primary_key=True, default=lambda: "evt" + _random_id())
turn_id = Column(String(24), nullable=False, index=True)
sequence = Column(Integer, nullable=False)
timestamp = Column(DateTime(timezone=True), default=utc_now, nullable=False, index=True)
type = Column(String(128), nullable=False, index=True)
label = Column(String(128), nullable=True)
status = Column(String(64), nullable=True)
actor = Column(_enum_column(AgentEventActor), nullable=False, default=AgentEventActor.SYSTEM)
data = Column(JSON, default=lambda: {}, nullable=False)
gmt_created = Column(DateTime(timezone=True), default=utc_now, nullable=False)


class AgentMessage(Base):
"""At-rest UIMessage envelope (Phase 8 D8.2 first-cut, D8.5-BE refined).

Expand All @@ -150,10 +138,9 @@ class AgentMessage(Base):
architect canonical lock msg=e01e9b4b.

Phase 8 D8.6 (#80) chunk-2 dropped the legacy ``AgentArtifact``
table and column FKs from ``AgentTurn``; ``AgentMessage`` is now
the single authoritative at-rest store for assistant turn
contents. ``AgentTimelineEvent`` remains pending chunk-3 cleanup
(replay/reload semantic change is reviewed separately).
table and column FKs from ``AgentTurn``; chunk-3 dropped
``AgentTimelineEvent``. ``AgentMessage`` is now the single
authoritative at-rest store for assistant turn contents.
"""

__tablename__ = "agent_message"
Expand Down
58 changes: 26 additions & 32 deletions aperag/domains/agent_runtime/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,17 @@ def to_turn_envelope(turn) -> AgentTurnEnvelope:


class EventService:
"""Live wire-emitter envelope service for agent runtime events.

Phase 8 D8.6 (#80) chunk-3 hard-cut removed the
``agent_timeline_event`` DB persistence layer. Envelopes are now
composed locally and pushed only to the Redis live cache (live SSE
+ reconnect-within-TTL). Reload outside the Redis TTL has no
historical events to replay — by design (PM lock msg=ce…): the
canonical at-rest authority is the ``agent_message`` UIMessage row;
timeline events are ephemeral wire telemetry, not durable history.
"""

def __init__(self, db_ops: AsyncDatabaseOps | None = None, redis_store: AgentRuntimeRedisStore | None = None):
self.db_ops = db_ops or async_db_ops
self.redis_store = redis_store or AgentRuntimeRedisStore()
Expand All @@ -488,18 +499,21 @@ async def append_event(
status: Optional[str] = None,
data: Optional[dict[str, Any]] = None,
) -> AgentTimelineEventEnvelope:
timestamp = utc_now()
event = await self.db_ops.create_agent_timeline_event(
turn_id=turn_id,
sequence=sequence,
timestamp=timestamp,
event_type=event_type,
label=label,
status=status,
actor=actor,
data=data or {},
actor_value = actor.value if hasattr(actor, "value") else actor
envelope = self.adapt_event_envelope(
AgentTimelineEventEnvelope(
event_id="evt" + uuid.uuid4().hex[:21],
turn_id=turn_id,
sequence=sequence,
timestamp=utc_now(),
type=event_type,
technical_type=event_type,
label=label,
status=status,
actor=actor_value,
data=data or {},
)
)
envelope = self.to_event_envelope(event)
await self.redis_store.append_event(envelope)
await self.redis_store.merge_runtime_state(turn_id, {"timeline_cursor": sequence})
return envelope
Expand All @@ -508,27 +522,7 @@ async def get_events_after(
self, turn_id: str, after_sequence: int = 0, limit: int = 500
) -> list[AgentTimelineEventEnvelope]:
cached = await self.redis_store.get_events_after(turn_id, after_sequence=after_sequence, limit=limit)
if cached:
return [self.adapt_event_envelope(AgentTimelineEventEnvelope.model_validate(item)) for item in cached]
persisted = await self.db_ops.query_agent_timeline_events(turn_id, after_sequence=after_sequence, limit=limit)
return [self.to_event_envelope(item) for item in persisted]

@staticmethod
def to_event_envelope(event) -> AgentTimelineEventEnvelope:
actor_value = event.actor.value if hasattr(event.actor, "value") else event.actor
envelope = AgentTimelineEventEnvelope(
event_id=event.id,
turn_id=event.turn_id,
sequence=event.sequence,
timestamp=event.timestamp,
type=event.type,
technical_type=event.type,
label=event.label,
status=event.status,
actor=actor_value,
data=event.data or {},
)
return EventService.adapt_event_envelope(envelope)
return [self.adapt_event_envelope(AgentTimelineEventEnvelope.model_validate(item)) for item in cached]

@staticmethod
def adapt_event_envelope(event: AgentTimelineEventEnvelope) -> AgentTimelineEventEnvelope:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""drop agent_timeline_event table (D8.6 #80 chunk-3)

Phase 8 D8.6 (#80) chunk-3 hard-cut: wire-emitter envelopes are now
ephemeral wire telemetry — pushed to the Redis live cache only (live
SSE + reconnect-within-TTL). Reload outside the Redis TTL has no
historical events to replay; the canonical at-rest authority is
``agent_message`` (D8.2 #74).

Pre-launch system has no users / no data, so the cutover is direct
delete (per earayu2 hard-cut acceptance) — no backfill / no row
migration. The downgrade restores the dropped surface so a rollback
can replay subsequent migrations cleanly.

Revision ID: e8a1f5b2d4c7
Revises: d8e6c2b4f1a9
Create Date: 2026-04-26 03:00:00.000000
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "e8a1f5b2d4c7"
down_revision: Union[str, None] = "d8e6c2b4f1a9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.drop_index("ix_agent_timeline_event_type", table_name="agent_timeline_event")
op.drop_index("ix_agent_timeline_event_turn_id", table_name="agent_timeline_event")
op.drop_index("ix_agent_timeline_event_timestamp", table_name="agent_timeline_event")
op.drop_index("idx_agent_timeline_event_turn_timestamp", table_name="agent_timeline_event")
op.drop_table("agent_timeline_event")


def downgrade() -> None:
op.create_table(
"agent_timeline_event",
sa.Column("id", sa.String(length=24), nullable=False),
sa.Column("turn_id", sa.String(length=24), nullable=False),
sa.Column("sequence", sa.Integer(), nullable=False),
sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False),
sa.Column("type", sa.String(length=128), nullable=False),
sa.Column("label", sa.String(length=128), nullable=True),
sa.Column("status", sa.String(length=64), nullable=True),
sa.Column("actor", sa.String(length=50), nullable=False),
sa.Column("data", sa.JSON(), nullable=False),
sa.Column("gmt_created", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("turn_id", "sequence", name="uq_agent_timeline_event_turn_sequence"),
)
op.create_index(
"idx_agent_timeline_event_turn_timestamp",
"agent_timeline_event",
["turn_id", "timestamp"],
unique=False,
)
op.create_index("ix_agent_timeline_event_timestamp", "agent_timeline_event", ["timestamp"], unique=False)
op.create_index("ix_agent_timeline_event_turn_id", "agent_timeline_event", ["turn_id"], unique=False)
op.create_index("ix_agent_timeline_event_type", "agent_timeline_event", ["type"], unique=False)
49 changes: 26 additions & 23 deletions tests/unit_test/agent_runtime/test_agent_runtime_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,12 @@ async def read(self, turn_id):


class _FakeDbOps:
def __init__(self, *, turn=None, persisted_events=None):
def __init__(self, *, turn=None):
self.turn = turn
self.persisted_events = list(persisted_events or [])

async def query_agent_turn(self, _user, _chat_id, _turn_id):
return self.turn

async def query_agent_timeline_events(self, _turn_id, after_sequence=0, limit=2000):
return [event for event in self.persisted_events if event.sequence > after_sequence][:limit]


class _FakeCreateTurnDbOps:
def __init__(self, *, existing_turn=None, raise_on_create=False):
Expand Down Expand Up @@ -154,16 +150,17 @@ def _build_turn(**overrides):
return SimpleNamespace(**values)


def _build_event(sequence: int, event_type: str, *, label=None, status=None, data=None):
return SimpleNamespace(
id=f"event-{sequence}",
def _build_envelope(sequence: int, event_type: str, *, label=None, status=None, data=None):
return AgentTimelineEventEnvelope(
event_id=f"event-{sequence}",
turn_id="turn-1",
sequence=sequence,
timestamp=_now(),
type=event_type,
technical_type=event_type,
label=label,
status=status,
actor=AgentEventActor.AGENT,
actor=AgentEventActor.AGENT.value,
data=data or {},
)

Expand Down Expand Up @@ -275,8 +272,14 @@ async def test_turn_snapshot_does_not_expose_legacy_keys():


@pytest.mark.asyncio
async def test_event_service_to_event_envelope_adds_user_activity_contract():
event = _build_event(
async def test_event_service_adapt_event_envelope_adds_user_activity_contract():
"""Phase 8 D8.6 (#80) chunk-3: ``to_event_envelope`` is gone with
``agent_timeline_event``. ``adapt_event_envelope`` is now the only
surface for turning a wire envelope into the user-activity-tagged
payload the FE renderer consumes.
"""

envelope = _build_envelope(
3,
"tool.started",
label="search_collection",
Expand All @@ -290,18 +293,18 @@ async def test_event_service_to_event_envelope_adds_user_activity_contract():
},
)

envelope = EventService.to_event_envelope(event)

assert envelope.technical_type == "tool.started"
assert envelope.user_activity is not None
assert envelope.user_activity.intent == UserActivityIntent.SEARCHING_KNOWLEDGE
assert envelope.user_activity.title_key == "activity.searching_knowledge.title"
assert envelope.user_activity.subtitle_key == "activity.searching_knowledge.subtitle"
assert envelope.user_activity.detail_key == "activity.searching_knowledge.detail.keyword"
assert envelope.user_activity.context is not None
assert envelope.user_activity.context.keyword == "OpenAI API key"
assert envelope.user_activity.context.source_name == "Product Docs"
assert envelope.user_activity.context.target_type == "knowledge_base"
adapted = EventService.adapt_event_envelope(envelope)

assert adapted.technical_type == "tool.started"
assert adapted.user_activity is not None
assert adapted.user_activity.intent == UserActivityIntent.SEARCHING_KNOWLEDGE
assert adapted.user_activity.title_key == "activity.searching_knowledge.title"
assert adapted.user_activity.subtitle_key == "activity.searching_knowledge.subtitle"
assert adapted.user_activity.detail_key == "activity.searching_knowledge.detail.keyword"
assert adapted.user_activity.context is not None
assert adapted.user_activity.context.keyword == "OpenAI API key"
assert adapted.user_activity.context.source_name == "Product Docs"
assert adapted.user_activity.context.target_type == "knowledge_base"


@pytest.mark.asyncio
Expand Down
Loading