Skip to content

Commit 43ff3e8

Browse files
authored
fix: prevent A2A on_message trigger infinite loop (#666)
- Fix 1 (P0): Add role filter to on_message query — only match 'assistant' and 'user' messages, exclude 'tool_call' and 'system' - Fix 2 (P1): Exclude trigger internal sessions (source_channel='trigger') from on_message message scanning - Fix 3 (P1): Add per-agent on_message hourly rate limiter (30/hr cap) with auto-disable and warning log - Fix 4 (P2): Default max_fires=100 and expires_at=7d for on_message triggers created via set_trigger tool - Cleanup: Remove unused MAX_AGENT_CHAIN_DEPTH dead code
1 parent aa83f60 commit 43ff3e8

3 files changed

Lines changed: 48 additions & 1 deletion

File tree

backend/app/services/agent_tools.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8396,6 +8396,12 @@ async def _handle_set_trigger(
83968396
reason=reason,
83978397
focus_ref=focus_ref,
83988398
)
8399+
# Fix 4: Safety cap for on_message triggers —
8400+
# prevent infinite loops if agent creates broad watchers.
8401+
if ttype == "on_message":
8402+
trigger.max_fires = trigger.max_fires or 100
8403+
if not trigger.expires_at:
8404+
trigger.expires_at = datetime.now(timezone.utc) + timedelta(days=7)
83998405
db.add(trigger)
84008406
await db.commit()
84018407

backend/app/services/trigger_daemon.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@
3232

3333
TICK_INTERVAL = 15 # seconds
3434
DEDUP_WINDOW = 30 # seconds — same agent won't be invoked twice within this window
35-
MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping
3635
MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse
3736

37+
# Safety: per-agent on_message fire rate limiter
38+
_ON_MSG_RATE_WINDOW = 3600 # 1 hour window
39+
_ON_MSG_RATE_LIMIT = 30 # max on_message fires per agent per hour
40+
_on_msg_fire_log: dict[uuid.UUID, list[datetime]] = {} # agent_id -> list of fire timestamps
41+
3842
_last_invoke: dict[uuid.UUID, datetime] = {}
3943

4044
_A2A_WAKE_CHAIN: dict[str, int] = {}
@@ -47,6 +51,15 @@ def _cleanup_stale_invoke_cache():
4751
stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2]
4852
for k in stale:
4953
del _last_invoke[k]
54+
# Clean up old on_message rate limiter entries
55+
cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW)
56+
stale_agents = []
57+
for aid, timestamps in _on_msg_fire_log.items():
58+
_on_msg_fire_log[aid] = [t for t in timestamps if t > cutoff]
59+
if not _on_msg_fire_log[aid]:
60+
stale_agents.append(aid)
61+
for aid in stale_agents:
62+
del _on_msg_fire_log[aid]
5063

5164

5265
async def _should_skip_non_workday(trigger: AgentTrigger, local_now: datetime) -> bool:
@@ -118,6 +131,28 @@ async def _tick():
118131
if not handled:
119132
handled = await _handle_okr_collection_trigger(trigger, now)
120133
if not handled:
134+
# Fix 3: Rate limit on_message triggers per agent
135+
if trigger.type == "on_message":
136+
agent_fires = _on_msg_fire_log.get(trigger.agent_id, [])
137+
cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW)
138+
recent = [t for t in agent_fires if t > cutoff]
139+
if len(recent) >= _ON_MSG_RATE_LIMIT:
140+
logger.warning(
141+
f"[A2A Safety] Agent {trigger.agent_id} hit "
142+
f"on_message rate limit ({_ON_MSG_RATE_LIMIT}/hr). "
143+
f"Auto-disabling trigger '{trigger.name}'."
144+
)
145+
async with async_session() as db:
146+
result = await db.execute(
147+
select(AgentTrigger).where(AgentTrigger.id == trigger.id)
148+
)
149+
t_obj = result.scalar_one_or_none()
150+
if t_obj:
151+
t_obj.is_enabled = False
152+
await db.commit()
153+
continue
154+
recent.append(now)
155+
_on_msg_fire_log[trigger.agent_id] = recent
121156
await enqueue_due_trigger(trigger, now)
122157
except Exception as e:
123158
logger.warning(f"Error evaluating trigger {trigger.name}: {e}")

backend/app/services/trigger_runtime/evaluator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,12 @@ async def check_new_agent_messages(trigger: AgentTrigger) -> bool:
370370
.where(
371371
ChatMessage.participant_id == from_participant,
372372
ChatMessage.created_at > since,
373+
# Fix 1: Only match real conversational messages,
374+
# not internal tool_call / system records.
375+
ChatMessage.role.in_(["assistant", "user"]),
376+
# Fix 2: Exclude trigger internal "reflection"
377+
# sessions to avoid cross-trigger false matches.
378+
ChatSession.source_channel != "trigger",
373379
)
374380
.order_by(ChatMessage.created_at.desc())
375381
.limit(1)

0 commit comments

Comments
 (0)