|
46 | 46 | DatabaseTaskStore, |
47 | 47 | ) |
48 | 48 | from a2a.server.tasks.push_notification_sender import PushNotificationEvent |
49 | | -from a2a.types import TaskPushNotificationConfig |
| 49 | +from a2a.types import Task, TaskPushNotificationConfig |
50 | 50 |
|
51 | 51 | log = logging.getLogger(__name__) |
52 | 52 |
|
@@ -229,6 +229,119 @@ async def _dispatch_notification( |
229 | 229 | return await super()._dispatch_notification(event, push_info, task_id) |
230 | 230 |
|
231 | 231 |
|
| 232 | +# ── Reasoning coalescing (#1710) ──────────────────────────────────────────────── |
| 233 | + |
| 234 | +# Duplicated from a2a_impl.executor (the executor imports graph.* at module |
| 235 | +# scope; keeping the constant local avoids pulling the whole agent brain into |
| 236 | +# the store's import chain). Locked together by test_a2a_stores. |
| 237 | +_REASONING_MIME = "application/vnd.protolabs.reasoning-v1+json" |
| 238 | + |
| 239 | + |
| 240 | +def coalesce_reasoning_history(task: Task) -> int: |
| 241 | + """Collapse each contiguous run of reasoning-v1 Messages in ``task.history`` |
| 242 | + into ONE Message, in place. Returns the number of messages removed. |
| 243 | +
|
| 244 | + Reasoning ("thinking") deltas are a STREAMING affordance: the executor emits |
| 245 | + them as reasoning-v1 DataParts on WORKING status frames so the console fills |
| 246 | + the live thinking bubble. But the a2a-sdk ``TaskManager`` moves every status |
| 247 | + frame's message into durable ``history`` — so a token-per-frame reasoning |
| 248 | + stream persisted one near-single-word Message per delta (~700 rows for one |
| 249 | + turn: #1710), bloating the store and making ``GetTask(historyLength=N)`` |
| 250 | + return word fragments instead of conversation. The executor now batches the |
| 251 | + wire frames (~24-char granularity, keeping the live bubble); this collapses |
| 252 | + whatever reaches the durable store to one Message per contiguous reasoning |
| 253 | + block, mirroring how the answer text finalizes into a single canonical part. |
| 254 | +
|
| 255 | + Deliberately mutates the task in place: the SDK ``TaskManager`` re-saves its |
| 256 | + ONE in-memory ``Task`` on every event and copies it whole for every |
| 257 | + subscriber frame, so keeping history compact also removes the O(events × |
| 258 | + history) serialization/copy pressure the flood created (the prime suspect |
| 259 | + for the mid-stream artifact frame loss in #1709). Only agent messages whose |
| 260 | + parts are ALL reasoning DataParts are touched; user messages, tool frames, |
| 261 | + and HITL prompts are never merged, and non-contiguous runs stay separate. |
| 262 | + """ |
| 263 | + from google.protobuf import json_format, struct_pb2 |
| 264 | + |
| 265 | + from a2a.types import Part, Role |
| 266 | + |
| 267 | + def _reasoning_text(part) -> str | None: |
| 268 | + """The part's reasoning text, or None when it isn't a reasoning DataPart.""" |
| 269 | + if part.WhichOneof("content") != "data": |
| 270 | + return None |
| 271 | + mime = part.metadata.fields["mimeType"].string_value if "mimeType" in part.metadata.fields else "" |
| 272 | + if mime != _REASONING_MIME: |
| 273 | + return None |
| 274 | + fields = part.data.struct_value.fields |
| 275 | + return fields["text"].string_value if "text" in fields else "" |
| 276 | + |
| 277 | + def _run_texts(msg) -> list[str] | None: |
| 278 | + """All parts' reasoning texts when the whole message is reasoning, else None.""" |
| 279 | + if msg.role != Role.ROLE_AGENT or not msg.parts: |
| 280 | + return None |
| 281 | + texts = [_reasoning_text(p) for p in msg.parts] |
| 282 | + return texts if all(t is not None for t in texts) else None # type: ignore[return-value] |
| 283 | + |
| 284 | + merged: list = [] |
| 285 | + run_head = None # first Message of the current contiguous reasoning run |
| 286 | + run_texts: list[str] = [] |
| 287 | + |
| 288 | + def _close_run() -> None: |
| 289 | + nonlocal run_head, run_texts |
| 290 | + if run_head is None: |
| 291 | + return |
| 292 | + # Rewrite the head's parts to a single data part with the run's full text |
| 293 | + # (same shape the executor emits, so parsers see one big reasoning part). |
| 294 | + del run_head.parts[:] |
| 295 | + part = Part() |
| 296 | + value = struct_pb2.Value() |
| 297 | + json_format.ParseDict({"text": "".join(run_texts)}, value.struct_value) |
| 298 | + part.data.CopyFrom(value) |
| 299 | + part.metadata.update({"mimeType": _REASONING_MIME}) |
| 300 | + part.media_type = "application/json" |
| 301 | + run_head.parts.append(part) |
| 302 | + merged.append(run_head) |
| 303 | + run_head, run_texts = None, [] |
| 304 | + |
| 305 | + for msg in task.history: |
| 306 | + texts = _run_texts(msg) |
| 307 | + if texts is None: |
| 308 | + _close_run() |
| 309 | + merged.append(msg) |
| 310 | + continue |
| 311 | + if run_head is None: |
| 312 | + head = type(msg)() |
| 313 | + head.CopyFrom(msg) |
| 314 | + run_head = head |
| 315 | + run_texts.extend(texts) |
| 316 | + _close_run() |
| 317 | + |
| 318 | + removed = len(task.history) - len(merged) |
| 319 | + if removed: |
| 320 | + # `merged` holds references into task.history — copy before clearing. |
| 321 | + kept = [] |
| 322 | + for m in merged: |
| 323 | + c = type(m)() |
| 324 | + c.CopyFrom(m) |
| 325 | + kept.append(c) |
| 326 | + del task.history[:] |
| 327 | + task.history.extend(kept) |
| 328 | + return removed |
| 329 | + |
| 330 | + |
| 331 | +class ReasoningCoalescingTaskStore(DatabaseTaskStore): |
| 332 | + """Durable task store that coalesces contiguous reasoning-v1 history runs |
| 333 | + into one Message per run on every save (#1710). Streaming frames are |
| 334 | + untouched — this is persistence-shape only, so the wire contract and the |
| 335 | + live thinking bubble are unchanged.""" |
| 336 | + |
| 337 | + async def save(self, task: Task, context: ServerCallContext) -> None: |
| 338 | + try: |
| 339 | + coalesce_reasoning_history(task) |
| 340 | + except Exception: # noqa: BLE001 — coalescing must never lose a save |
| 341 | + log.exception("[a2a] reasoning coalescing failed; saving uncoalesced") |
| 342 | + await super().save(task, context) |
| 343 | + |
| 344 | + |
232 | 345 | # ── Durable store construction (paths match the bespoke stores) ───────────────── |
233 | 346 |
|
234 | 347 |
|
@@ -263,10 +376,12 @@ def build_a2a_stores() -> tuple[ |
263 | 376 | Each store gets its own engine/file (same split the bespoke stores used: |
264 | 377 | ``a2a-tasks.db`` and ``a2a-push.db``). The SDK stores lazy-init their schema |
265 | 378 | on first use; ``initialize_a2a_stores`` forces that + a TTL sweep at boot. |
| 379 | + The task store coalesces streamed reasoning runs into one durable history |
| 380 | + Message per run (#1710) — see ``ReasoningCoalescingTaskStore``. |
266 | 381 | """ |
267 | 382 | task_db = _resolve_db_path("a2a-tasks.db") |
268 | 383 | push_db = _resolve_db_path("a2a-push.db") |
269 | | - task_store = DatabaseTaskStore(make_sqlite_engine(task_db)) |
| 384 | + task_store = ReasoningCoalescingTaskStore(make_sqlite_engine(task_db)) |
270 | 385 | push_store = ValidatingPushNotificationConfigStore(make_sqlite_engine(push_db)) |
271 | 386 | return task_store, push_store, task_db, push_db |
272 | 387 |
|
|
0 commit comments