Skip to content

Commit 80ef18f

Browse files
jsell-rhclaude
andcommitted
fix(runner): prevent gRPC message replay on session restart
The gRPC listener started watching from seq=0 on every startup, which replayed all historical messages including the original user prompt. On session restart, this caused the agent to auto-continue the conversation without the user sending a new message. Fix: _catch_up_seq() scans existing messages on startup to find the current max seq, then watches from there. Only messages that arrive AFTER the listener starts trigger runs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7ea109c commit 80ef18f

3 files changed

Lines changed: 42 additions & 1 deletion

File tree

components/ambient-control-plane/internal/reconciler/kube_reconciler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,10 @@ func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Sessi
719719
envVar("REQUESTS_CA_BUNDLE", "/etc/pki/ca-trust/extracted/pem/service-ca.crt"),
720720
}
721721

722+
if session.StartTime != nil {
723+
env = append(env, envVar("IS_RESUME", "true"))
724+
}
725+
722726
if r.cfg.AnthropicAPIKey != "" {
723727
env = append(env, envVar("ANTHROPIC_API_KEY", r.cfg.AnthropicAPIKey))
724728
}

components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,12 @@ async def start_grpc_listener(self, grpc_url: str) -> None:
495495
from ambient_runner.bridges.claude.grpc_transport import GRPCSessionListener
496496

497497
session_id = self._context.session_id
498+
is_resume = os.getenv("IS_RESUME", "").strip().lower() == "true"
498499
self._grpc_listener = GRPCSessionListener(
499500
bridge=self,
500501
session_id=session_id,
501502
grpc_url=grpc_url,
503+
is_resume=is_resume,
502504
)
503505
self._grpc_listener.start()
504506
logger.info(

components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ def __init__(
9494
bridge: "PlatformBridge",
9595
session_id: str,
9696
grpc_url: str,
97+
is_resume: bool = False,
9798
) -> None:
9899
self._bridge = bridge
99100
self._session_id = session_id
100101
self._grpc_url = grpc_url
102+
self._is_resume = is_resume
101103
self._grpc_client: Optional["AmbientGRPCClient"] = None
102104
self.ready = asyncio.Event()
103105
self._task: Optional[asyncio.Task] = None
@@ -185,8 +187,41 @@ def _watch_in_thread(
185187
exc_info=True,
186188
)
187189

190+
async def _catch_up_seq(self) -> int:
191+
"""Find the current max seq so we only process NEW messages.
192+
193+
Only runs on resumed sessions (is_resume=True). Fresh sessions
194+
start from seq 0 with no delay.
195+
"""
196+
if not self._is_resume or self._grpc_client is None:
197+
return 0
198+
199+
def _scan() -> int:
200+
max_seq = 0
201+
try:
202+
for msg in self._grpc_client.session_messages.watch(
203+
self._session_id, after_seq=0, timeout=30.0,
204+
):
205+
max_seq = max(max_seq, msg.seq)
206+
except Exception as exc:
207+
logger.warning(
208+
"[GRPC LISTENER] Catch-up scan error (using seq=%d): %s",
209+
max_seq,
210+
exc,
211+
)
212+
return max_seq
213+
214+
loop = asyncio.get_running_loop()
215+
max_seq = await loop.run_in_executor(None, _scan)
216+
logger.info(
217+
"[GRPC LISTENER] Resumed session — caught up to seq=%d for session=%s",
218+
max_seq,
219+
self._session_id,
220+
)
221+
return max_seq
222+
188223
async def _listen_loop(self) -> None:
189-
last_seq = 0
224+
last_seq = await self._catch_up_seq()
190225
backoff = _BACKOFF_INITIAL
191226

192227
while True:

0 commit comments

Comments
 (0)