Skip to content

Commit a298f63

Browse files
committed
feat(streaming): cleanup task Redis stream on terminal status transitions
The SDK defines RedisStreamRepository.cleanup_stream(topic) but never calls it. As a result, every task:* stream that an agent writes to during its lifetime stays in Redis forever, even after the task completes. For long-lived clusters this leaks unbounded memory. This change wires cleanup_stream into all terminal status transitions on TasksService: cancel, complete, fail, terminate, timeout, and delete. After the corresponding agentex API call returns successfully, the task's Redis stream (task:{task_id}) is deleted best-effort -- any failure is logged and swallowed so cleanup issues never break the lifecycle call itself. TasksService now accepts an optional stream_repository in __init__. The two existing construction sites pass one in: - get_all_activities() reuses the same RedisStreamRepository already constructed for StreamingService - TasksModule's lazy default constructs its own when no service is provided Note: this only fires when an application actually calls a terminal transition method (e.g. adk.tasks.complete). Workflows that exit without calling these still leak streams; that is a separate concern for the application layer.
1 parent 23bfd6e commit a298f63

3 files changed

Lines changed: 34 additions & 1 deletion

File tree

src/agentex/lib/adk/_modules/tasks.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from agentex import AsyncAgentex # noqa: F401
99
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
10+
from agentex.lib.core.adapters.streams.adapter_redis import RedisStreamRepository
1011
from agentex.lib.core.services.adk.tasks import TasksService
1112
from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers
1213
from agentex.lib.core.temporal.activities.adk.tasks_activities import (
@@ -42,8 +43,11 @@ def __init__(
4243
if tasks_service is None:
4344
agentex_client = create_async_agentex_client()
4445
tracer = AsyncTracer(agentex_client)
46+
stream_repository = RedisStreamRepository()
4547
self._tasks_service = TasksService(
46-
agentex_client=agentex_client, tracer=tracer
48+
agentex_client=agentex_client,
49+
tracer=tracer,
50+
stream_repository=stream_repository,
4751
)
4852
else:
4953
self._tasks_service = tasks_service

src/agentex/lib/core/services/adk/tasks.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,41 @@
77
from agentex.lib.utils.temporal import heartbeat_if_in_workflow
88
from agentex.lib.core.tracing.tracer import AsyncTracer
99
from agentex.types.task_retrieve_response import TaskRetrieveResponse
10+
from agentex.lib.core.adapters.streams.port import StreamRepository
1011
from agentex.types.task_query_workflow_response import TaskQueryWorkflowResponse
1112
from agentex.types.task_retrieve_by_name_response import TaskRetrieveByNameResponse
1213

1314
logger = make_logger(__name__)
1415

1516

17+
def _stream_topic(task_id: str) -> str:
18+
return f"task:{task_id}"
19+
20+
1621
class TasksService:
1722
def __init__(
1823
self,
1924
agentex_client: AsyncAgentex,
2025
tracer: AsyncTracer,
26+
stream_repository: StreamRepository | None = None,
2127
):
2228
self._agentex_client = agentex_client
2329
self._tracer = tracer
30+
self._stream_repository = stream_repository
31+
32+
async def _cleanup_task_stream(self, task_id: str) -> None:
33+
"""Best-effort delete of the task's Redis stream.
34+
35+
Called after a terminal status transition (cancel/complete/fail/
36+
terminate/timeout/delete). Failures are logged and swallowed so
37+
cleanup issues never break the lifecycle call.
38+
"""
39+
if self._stream_repository is None:
40+
return
41+
try:
42+
await self._stream_repository.cleanup_stream(_stream_topic(task_id))
43+
except Exception as e:
44+
logger.warning(f"Failed to cleanup task stream {task_id}: {e}")
2445

2546
async def get_task(
2647
self,
@@ -75,6 +96,8 @@ async def delete_task(
7596
task_model = await self._agentex_client.tasks.delete_by_name(task_name=task_name)
7697
else:
7798
raise ValueError("Either task_id or task_name must be provided.")
99+
if task_model.id:
100+
await self._cleanup_task_stream(task_model.id)
78101
if span:
79102
span.output = task_model.model_dump()
80103
return task_model
@@ -94,6 +117,7 @@ async def cancel_task(
94117
) as span:
95118
heartbeat_if_in_workflow("cancel task")
96119
task_model = await self._agentex_client.tasks.cancel(task_id=task_id, reason=reason)
120+
await self._cleanup_task_stream(task_id)
97121
if span:
98122
span.output = task_model.model_dump()
99123
return task_model
@@ -113,6 +137,7 @@ async def complete_task(
113137
) as span:
114138
heartbeat_if_in_workflow("complete task")
115139
task_model = await self._agentex_client.tasks.complete(task_id=task_id, reason=reason)
140+
await self._cleanup_task_stream(task_id)
116141
if span:
117142
span.output = task_model.model_dump()
118143
return task_model
@@ -132,6 +157,7 @@ async def fail_task(
132157
) as span:
133158
heartbeat_if_in_workflow("fail task")
134159
task_model = await self._agentex_client.tasks.fail(task_id=task_id, reason=reason)
160+
await self._cleanup_task_stream(task_id)
135161
if span:
136162
span.output = task_model.model_dump()
137163
return task_model
@@ -151,6 +177,7 @@ async def terminate_task(
151177
) as span:
152178
heartbeat_if_in_workflow("terminate task")
153179
task_model = await self._agentex_client.tasks.terminate(task_id=task_id, reason=reason)
180+
await self._cleanup_task_stream(task_id)
154181
if span:
155182
span.output = task_model.model_dump()
156183
return task_model
@@ -170,6 +197,7 @@ async def timeout_task(
170197
) as span:
171198
heartbeat_if_in_workflow("timeout task")
172199
task_model = await self._agentex_client.tasks.timeout(task_id=task_id, reason=reason)
200+
await self._cleanup_task_stream(task_id)
173201
if span:
174202
span.output = task_model.model_dump()
175203
return task_model

src/agentex/lib/core/temporal/activities/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def get_all_activities(sgp_client=None):
9292
tasks_service = TasksService(
9393
agentex_client=agentex_client,
9494
tracer=tracer,
95+
stream_repository=stream_repository,
9596
)
9697
tracing_service = TracingService(
9798
tracer=tracer,

0 commit comments

Comments
 (0)