Skip to content

Commit 35ddf42

Browse files
jsell-rhclaude
andcommitted
fix(control-plane,runner): prevent gRPC message replay on session restart
Control-plane: - Set IS_RESUME=true when session.StartTime is non-nil (parity with legacy operator) - Query max message seq via SessionMessages().List(size=1, orderBy="seq desc") and pass as RESUME_AFTER_SEQ env var Runner: - gRPC listener reads RESUME_AFTER_SEQ env var on startup - Watches from that seq so historical messages are never replayed - Zero scanning delay — the control-plane provides the seq directly - Fresh sessions (no env var) start from seq 0 as before Root cause: the control-plane never set IS_RESUME, so the runner re-executed the initial prompt on restart. The gRPC listener also started from seq=0, replaying all historical user messages and triggering duplicate bridge.run() calls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7ea109c commit 35ddf42

2 files changed

Lines changed: 53 additions & 1 deletion

File tree

components/ambient-control-plane/internal/reconciler/kube_reconciler.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,14 @@ func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Sessi
719719
envVar("REQUESTS_CA_BUNDLE", "/etc/pki/ca-trust/extracted/pem/service-ca.crt"),
720720
}
721721

722+
if session.StartTime != nil {
723+
env = append(env, envVar("IS_RESUME", "true"))
724+
maxSeq := r.resolveMaxMessageSeq(ctx, sdk, session.ID)
725+
if maxSeq > 0 {
726+
env = append(env, envVar("RESUME_AFTER_SEQ", fmt.Sprintf("%d", maxSeq)))
727+
}
728+
}
729+
722730
if r.cfg.AnthropicAPIKey != "" {
723731
env = append(env, envVar("ANTHROPIC_API_KEY", r.cfg.AnthropicAPIKey))
724732
}
@@ -769,6 +777,23 @@ func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Sessi
769777
return env
770778
}
771779

780+
func (r *SimpleKubeReconciler) resolveMaxMessageSeq(ctx context.Context, sdk *sdkclient.Client, sessionID string) int {
781+
opts := &types.ListOptions{
782+
Size: 1,
783+
Search: fmt.Sprintf("session_id = '%s'", sessionID),
784+
OrderBy: "seq desc",
785+
}
786+
result, err := sdk.SessionMessages().List(ctx, opts)
787+
if err != nil {
788+
r.logger.Warn().Err(err).Str("session_id", sessionID).Msg("failed to resolve max message seq")
789+
return 0
790+
}
791+
if len(result.Items) == 0 {
792+
return 0
793+
}
794+
return result.Items[0].Seq
795+
}
796+
772797
func (r *SimpleKubeReconciler) resolveCredentialIDs(ctx context.Context, sdk *sdkclient.Client, projectID string) (map[string]string, error) {
773798
result := map[string]string{}
774799

components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,35 @@ def _watch_in_thread(
185185
exc_info=True,
186186
)
187187

188+
def _resume_after_seq(self) -> int:
189+
"""Read RESUME_AFTER_SEQ env var set by the control-plane.
190+
191+
On resumed sessions, the control-plane queries the API server
192+
for the current max message seq and passes it as an env var.
193+
The listener starts watching after this seq so historical
194+
messages are never replayed.
195+
"""
196+
import os
197+
raw = os.getenv("RESUME_AFTER_SEQ", "").strip()
198+
if not raw:
199+
return 0
200+
try:
201+
seq = int(raw)
202+
logger.info(
203+
"[GRPC LISTENER] Resuming after seq=%d for session=%s",
204+
seq,
205+
self._session_id,
206+
)
207+
return seq
208+
except ValueError:
209+
logger.warning(
210+
"[GRPC LISTENER] Invalid RESUME_AFTER_SEQ=%r, starting from 0",
211+
raw,
212+
)
213+
return 0
214+
188215
async def _listen_loop(self) -> None:
189-
last_seq = 0
216+
last_seq = self._resume_after_seq()
190217
backoff = _BACKOFF_INITIAL
191218

192219
while True:

0 commit comments

Comments
 (0)