Skip to content

Commit 94bc4d1

Browse files
authored
Fix remote-device tool timing out on scheduled runs (Redis-backed broker) (#2511)
* fix: route remote-device tool through Redis so scheduled runs reach the device The remote-device tool worked interactively but timed out on every scheduled run. DeviceBroker was an in-process, in-memory singleton, but scheduled runs execute in the Celery worker — a different process from the gunicorn web tier that holds the device's SSE session — so a worker-side dispatch never reached the device and the tool always hit its deadline. Make the broker Redis-backed so every hop crosses the process boundary: - queued commands -> Redis list dev:cmd:{device_id} - output chunks -> Redis stream dev:out:{invocation_id} - invocation metadata -> Redis hash dev:inv:{invocation_id} - SSE upgrade tickets -> Redis key dev:ticket:{device_id} Per-connection SSE session state stays in the web process. Reuses the existing get_redis_instance()/CACHE_REDIS_URL; no new infrastructure. Also makes the web tier safe to scale past one worker. Concurrency hardening (from adversarial review + real-Redis e2e): - XADD the output/control chunk before flipping completed=1, and have drain_output do a final non-blocking flush after observing completion, so a reader can't see completion and stop before the control chunk lands (this had reintroduced the false "device did not respond (timed out)" under a race). - _collect_result builds the result from drained chunks, checks the deadline only after capturing a chunk, and falls back to the authoritative snapshot (before cleanup) when no control chunk was observed. - Audit outcome is written from locally-known fields so it survives the worker racing to delete the invocation; a denied command now records a terminal "denied" outcome instead of staying "dispatched". - cmd-queue TTL raised to 900s (>= max drain deadline); dispatch-failure and reaped-invocation cleanup; UTF-8 byte counts. Tests: new tests/devices/{conftest (FakeRedis double), test_broker_cross_process, test_broker_race, test_submit_output_audit}; drain/cleanup/ticket tests rewritten for the Redis contract. The race tests fail against the pre-fix code. ruff clean; device + tool-executor suites green. * fix: log instead of silently passing on failed-dispatch cleanup Addresses the code-quality lint on the best-effort hash delete in dispatch_invocation's failure path: replace the bare `except: pass` with a logger.debug carrying the invocation_id. No behavior change — cleanup stays best-effort and still returns a failed Invocation.
1 parent 666db29 commit 94bc4d1

11 files changed

Lines changed: 1396 additions & 387 deletions

File tree

application/agents/tools/remote_device.py

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -246,37 +246,72 @@ def _matches_sticky(self, command: str) -> bool:
246246
return False
247247

248248
def _collect_result(self, broker, inv, device: dict, timeout_ms: int) -> Dict[str, Any]:
249-
"""Drain output from the broker until the control chunk arrives."""
249+
"""Drain output from the broker until the control chunk arrives.
250+
251+
Result fields come from the drained chunks, not from ``inv``: the
252+
invocation runs and reports back in a different process (the web
253+
tier), so the dispatching process never sees ``inv`` mutated.
254+
"""
255+
# Dispatch already failed (e.g. broker/Redis unavailable): report it.
256+
if inv.completed and inv.error:
257+
return {
258+
"exit_code": None,
259+
"stdout": "",
260+
"stderr": "",
261+
"duration_ms": None,
262+
"device_name": device.get("name"),
263+
"error": inv.error,
264+
}
265+
250266
deadline = time.time() + (timeout_ms / 1000.0) + 5.0
251267
stdout = []
252268
stderr = []
269+
exit_code = None
270+
duration_ms = None
271+
error = None
272+
saw_control = False
253273
try:
254274
for chunk in broker.drain_output(
255275
inv.invocation_id, timeout=1.0, deadline=deadline
256276
):
257-
if time.time() > deadline:
258-
break
259277
stream = chunk.get("stream")
260278
if stream == "stdout":
261279
stdout.append(chunk.get("chunk", ""))
262280
elif stream == "stderr":
263281
stderr.append(chunk.get("chunk", ""))
264282
elif stream == "control":
265-
# control chunks include exit_code; drain loop will stop next iter
266-
pass
283+
saw_control = True
284+
exit_code = chunk.get("exit_code")
285+
duration_ms = chunk.get("duration_ms")
286+
error = chunk.get("error") or error
287+
# Stop once past the deadline — but only AFTER capturing a chunk
288+
# the drain already yielded, so a near-deadline control chunk
289+
# isn't dropped and misreported as a timeout.
290+
if time.time() > deadline:
291+
break
292+
# No control chunk observed: consult the authoritative completion
293+
# state (before cleanup deletes it) so a late or dropped control
294+
# chunk isn't misreported as "device did not respond".
295+
if not saw_control:
296+
final = broker.get_invocation(inv.invocation_id)
297+
if final is not None and final.completed:
298+
saw_control = True
299+
exit_code = final.exit_code
300+
duration_ms = final.duration_ms
301+
error = final.error or error
267302
finally:
268303
broker.cleanup_invocation(inv.invocation_id)
269304

270-
# Deadline hit with no control chunk: the device never connected or
305+
# Deadline hit with no completion at all: the device never connected or
271306
# never finished. Surface a clear timeout instead of empty success.
272-
if not inv.completed.is_set() and inv.exit_code is None and not inv.error:
273-
inv.error = "device did not respond (timed out)"
307+
if not saw_control and exit_code is None and not error:
308+
error = "device did not respond (timed out)"
274309

275310
return {
276-
"exit_code": inv.exit_code,
277-
"stdout": "".join(stdout) if stdout else "".join(inv.stdout_parts),
278-
"stderr": "".join(stderr) if stderr else "".join(inv.stderr_parts),
279-
"duration_ms": inv.duration_ms,
311+
"exit_code": exit_code,
312+
"stdout": "".join(stdout),
313+
"stderr": "".join(stderr),
314+
"duration_ms": duration_ms,
280315
"device_name": device.get("name"),
281-
"error": inv.error,
316+
"error": error,
282317
}

application/api/devices/session.py

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import json
88
import logging
99
import time
10-
from queue import Empty
1110
from typing import Iterator
1211

1312
from flask import Response, jsonify, make_response, request, stream_with_context
@@ -118,16 +117,15 @@ def generate() -> Iterator[str]:
118117
sess.last_event_id += 1
119118
broker.close_session(sess.session_id, reason="idle")
120119
return
121-
try:
122-
inv = sess.invocation_queue.get(timeout=1.0)
123-
except Empty:
120+
envelope = broker.next_command(sess, timeout=1.0)
121+
if envelope is None:
124122
if time.time() - last_keepalive >= keepalive_interval:
125123
last_keepalive = time.time()
126124
yield ": heartbeat\n\n"
127125
continue
128126
sess.last_event_id += 1
129127
sess.last_activity_at = time.time()
130-
yield _sse_event("invocation", inv.envelope, sess.last_event_id)
128+
yield _sse_event("invocation", envelope, sess.last_event_id)
131129
last_keepalive = time.time()
132130
except GeneratorExit:
133131
logger.debug("device SSE generator exiting for session %s", sess.session_id)
@@ -164,6 +162,22 @@ def ack_invocation(session_id: str, invocation_id: str) -> Response:
164162
jsonify({"success": False, "error": "invocation_not_found"}), 404
165163
)
166164
broker.submit_ack(invocation_id, decision, reason)
165+
if decision == "denied":
166+
# A denial is terminal and produces no device output, so submit_output's
167+
# audit write is never reached. Record the outcome here from locally
168+
# known facts (not re-read Redis state the agent's drain races to clean
169+
# up), so the audit row reflects the denial instead of staying
170+
# "dispatched". Accepted/auto_approved runs record via submit_output.
171+
from datetime import datetime, timezone
172+
try:
173+
with db_session() as conn:
174+
DeviceAuditLogRepository(conn).record_result(
175+
invocation_id,
176+
finished_at=datetime.now(timezone.utc),
177+
error="denied",
178+
)
179+
except Exception:
180+
logger.exception("audit record_result (denied) failed for %s", invocation_id)
167181
return make_response(jsonify({"success": True}), 200)
168182

169183

@@ -189,6 +203,7 @@ def submit_output(session_id: str, invocation_id: str) -> Response:
189203
)
190204

191205
received = 0
206+
control_chunk = None
192207
for line in io.BytesIO(body):
193208
line = line.strip()
194209
if not line:
@@ -199,33 +214,33 @@ def submit_output(session_id: str, invocation_id: str) -> Response:
199214
continue
200215
if not isinstance(chunk, dict):
201216
continue
217+
if chunk.get("stream") == "control":
218+
control_chunk = chunk
202219
broker.submit_output_chunk(invocation_id, chunk)
203220
received += 1
204221

205-
# Persist outcome on the audit row when a control chunk closed the stream.
206-
if inv.completed.is_set():
222+
# Persist the outcome when the closing control chunk arrived in this POST.
223+
# Its fields are captured locally so the audit write survives the draining
224+
# (worker) process racing to delete the invocation's Redis state. Byte
225+
# totals / started_at live in the hash, read best-effort (the functional
226+
# exit_code/error/duration still land even if the hash is already gone).
227+
if control_chunk is not None:
228+
from datetime import datetime, timezone
229+
snap = broker.get_invocation(invocation_id)
207230
try:
208-
from datetime import datetime, timezone
209231
with db_session() as conn:
210232
DeviceAuditLogRepository(conn).record_result(
211233
invocation_id,
212234
started_at=(
213-
datetime.fromtimestamp(inv.started_at, tz=timezone.utc)
214-
if inv.started_at else None
235+
datetime.fromtimestamp(snap.started_at, tz=timezone.utc)
236+
if snap is not None and snap.started_at else None
215237
),
216-
finished_at=(
217-
datetime.fromtimestamp(inv.finished_at, tz=timezone.utc)
218-
if inv.finished_at else None
219-
),
220-
exit_code=inv.exit_code,
221-
duration_ms=inv.duration_ms,
222-
stdout_bytes=sum(
223-
len(c) for c in inv.stdout_parts
224-
),
225-
stderr_bytes=sum(
226-
len(c) for c in inv.stderr_parts
227-
),
228-
error=inv.error,
238+
finished_at=datetime.now(timezone.utc),
239+
exit_code=_as_opt_int(control_chunk.get("exit_code")),
240+
duration_ms=_as_opt_int(control_chunk.get("duration_ms")),
241+
stdout_bytes=(snap.stdout_bytes if snap is not None else 0),
242+
stderr_bytes=(snap.stderr_bytes if snap is not None else 0),
243+
error=control_chunk.get("error"),
229244
)
230245
except Exception:
231246
logger.exception("audit record_result failed for %s", invocation_id)
@@ -235,6 +250,16 @@ def submit_output(session_id: str, invocation_id: str) -> Response:
235250
)
236251

237252

253+
def _as_opt_int(value) -> int | None:
254+
"""Coerce a CLI-supplied JSON value to int for an INTEGER audit column."""
255+
if value is None:
256+
return None
257+
try:
258+
return int(value)
259+
except (TypeError, ValueError):
260+
return None
261+
262+
238263
def _sse_event(name: str, payload: dict, event_id: int) -> str:
239264
return (
240265
f"event: {name}\n"

application/core/settings.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,14 @@ class Settings(BaseSettings):
241241
REMOTE_DEVICE_SESSION_IDLE_SECONDS: int = 60
242242
REMOTE_DEVICE_REQUIRE_SIGNATURE: bool = False
243243
REMOTE_DEVICE_PAIRING_TTL_SECONDS: int = 600
244+
# Redis-backed broker tunables (route invocations cross-process so a
245+
# scheduled/Celery run reaches the web-held device session). The command
246+
# queue TTL must exceed the max command drain deadline (the tool caps
247+
# timeout_ms at 600s, drained with a +5s margin = 605s) so a queued command
248+
# for a briefly-offline device isn't evicted before its own drain gives up.
249+
REMOTE_DEVICE_CMD_QUEUE_TTL_SECONDS: int = 900
250+
REMOTE_DEVICE_INVOCATION_TTL_SECONDS: int = 900
251+
REMOTE_DEVICE_OUTPUT_STREAM_MAXLEN: int = 10_000
244252

245253
# Scheduler (see scheduler.md).
246254
SCHEDULE_DISPATCHER_INTERVAL: int = 30

0 commit comments

Comments
 (0)