Skip to content

Commit 9963d8f

Browse files
Merge pull request #273 from Aman-Cool/fix/sandbox-terminal-failure-notification
fix: stop blocking on client disconnect and sanitize internal errors in sandbox creation
2 parents e31a23d + 21b007b commit 9963d8f

6 files changed

Lines changed: 314 additions & 158 deletions

File tree

pkg/workloadmanager/handlers.go

Lines changed: 96 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ import (
3636
"github.com/volcano-sh/agentcube/pkg/store"
3737
)
3838

39+
// errSandboxCreationTimeout is returned when the internal sandbox-ready wait exceeds the 2-minute deadline.
40+
var errSandboxCreationTimeout = errors.New("sandbox creation timed out")
41+
42+
// isContextError reports whether err is a context cancellation or deadline error.
43+
func isContextError(err error) bool {
44+
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
45+
}
46+
3947
// handleHealth handles health check requests
4048
func (s *Server) handleHealth(c *gin.Context) {
4149
respondJSON(c, http.StatusOK, map[string]string{
@@ -132,53 +140,99 @@ func (s *Server) handleSandboxCreate(c *gin.Context, kind string) {
132140

133141
response, err := s.createSandbox(c.Request.Context(), dynamicClient, sandbox, sandboxClaim, sandboxEntry, resultChan)
134142
if err != nil {
143+
// Client disconnected — abort with 499 so logs/metrics reflect the cancellation.
144+
if errors.Is(err, context.Canceled) {
145+
klog.Warningf("create sandbox aborted %s/%s: client disconnected", sandbox.Namespace, sandbox.Name)
146+
c.AbortWithStatus(499)
147+
return
148+
}
149+
// Deadline exceeded — client may still be connected; return 504 so they get a meaningful response.
150+
if errors.Is(err, context.DeadlineExceeded) {
151+
klog.Warningf("create sandbox timed out %s/%s: request deadline exceeded", sandbox.Namespace, sandbox.Name)
152+
respondError(c, http.StatusGatewayTimeout, "request timed out")
153+
return
154+
}
155+
// Internal sandbox-ready wait timed out; surface as 504 rather than a generic 500.
156+
if errors.Is(err, errSandboxCreationTimeout) {
157+
klog.Warningf("create sandbox timed out %s/%s: sandbox did not become ready within deadline", sandbox.Namespace, sandbox.Name)
158+
respondError(c, http.StatusGatewayTimeout, err.Error())
159+
return
160+
}
135161
klog.Errorf("create sandbox failed %s/%s: %v", sandbox.Namespace, sandbox.Name, err)
136-
respondError(c, http.StatusInternalServerError, "internal server error")
162+
// Internal errors (store, K8s API) must not leak system details to callers;
163+
// sandbox-level failures (terminal pod state, timeout) are safe to surface.
164+
msg := err.Error()
165+
if apierrors.IsInternalError(err) {
166+
msg = "internal server error"
167+
}
168+
respondError(c, http.StatusInternalServerError, msg)
137169
return
138170
}
139171

140172
respondJSON(c, http.StatusOK, response)
141173
}
142174

143-
// createSandbox performs sandbox creation and returns the response payload or an error with an HTTP status code.
144-
func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sandboxEntry *sandboxEntry, resultChan <-chan SandboxStatusUpdate) (*types.CreateSandboxResponse, error) {
145-
// Store placeholder before creating, make sandbox/sandboxClaim GarbageCollection possible
146-
sandboxStorePlaceHolder := buildSandboxPlaceHolder(sandbox, sandboxEntry)
147-
if err := s.storeClient.StoreSandbox(ctx, sandboxStorePlaceHolder); err != nil {
148-
err = api.NewInternalError(fmt.Errorf("store sandbox placeholder failed: %v", err))
149-
return nil, err
150-
}
151-
175+
// createK8sResources creates the K8s sandbox or sandbox claim resource.
176+
func (s *Server) createK8sResources(ctx context.Context, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim) error {
152177
if sandboxClaim != nil {
153178
if err := createSandboxClaim(ctx, dynamicClient, sandboxClaim); err != nil {
154-
err = api.NewInternalError(fmt.Errorf("create sandbox claim %s/%s failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err))
155-
return nil, err
179+
if isContextError(err) {
180+
return err
181+
}
182+
return api.NewInternalError(fmt.Errorf("create sandbox claim %s/%s failed: %w", sandboxClaim.Namespace, sandboxClaim.Name, err))
156183
}
157184
} else {
158185
if _, err := createSandbox(ctx, dynamicClient, sandbox); err != nil {
159-
return nil, api.NewInternalError(fmt.Errorf("failed to create sandbox: %w", err))
186+
if isContextError(err) {
187+
return err
188+
}
189+
return api.NewInternalError(fmt.Errorf("failed to create sandbox: %w", err))
160190
}
161191
}
192+
return nil
193+
}
162194

163-
// Register rollback BEFORE waiting for the sandbox to become ready.
164-
// This ensures the K8s resource and store placeholder are cleaned up on
165-
// timeout, pod-IP failure, or store-update failure — not just on post-creation errors.
195+
// createSandbox performs sandbox creation and returns the response payload or an error with an HTTP status code.
196+
func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sandboxEntry *sandboxEntry, resultChan <-chan SandboxStatusUpdate) (*types.CreateSandboxResponse, error) {
197+
placeholder := buildSandboxPlaceHolder(sandbox, sandboxEntry)
198+
if err := s.storeClient.StoreSandbox(ctx, placeholder); err != nil {
199+
if isContextError(err) {
200+
return nil, err
201+
}
202+
return nil, api.NewInternalError(fmt.Errorf("store sandbox placeholder failed: %w", err))
203+
}
204+
205+
// Register rollback right after the placeholder is stored so that a K8s
206+
// creation failure does not leave an orphaned store entry.
166207
needRollbackSandbox := true
167208
defer func() {
168209
if !needRollbackSandbox {
169210
return
170211
}
171-
s.sandboxRollback(sandboxClaim, dynamicClient, sandbox, sandboxEntry)
212+
s.rollbackSandboxCreation(dynamicClient, sandbox, sandboxClaim, sandboxEntry.SessionID)
172213
}()
173214

215+
if err := s.createK8sResources(ctx, dynamicClient, sandbox, sandboxClaim); err != nil {
216+
return nil, err
217+
}
218+
219+
// Use NewTimer so we can stop it explicitly when another branch wins,
220+
// preventing the runtime from retaining the timer until it fires.
221+
timer := time.NewTimer(2 * time.Minute) // consistent with router settings
222+
174223
var createdSandbox *sandboxv1alpha1.Sandbox
175224
select {
176225
case result := <-resultChan:
226+
timer.Stop()
177227
createdSandbox = result.Sandbox
178228
klog.V(2).Infof("sandbox %s/%s reported ready, verifying entrypoints", createdSandbox.Namespace, createdSandbox.Name)
179-
case <-time.After(2 * time.Minute): // consistent with router settings
229+
case <-ctx.Done():
230+
timer.Stop()
231+
klog.Warningf("sandbox %s/%s wait canceled: %v", sandbox.Namespace, sandbox.Name, ctx.Err())
232+
return nil, ctx.Err()
233+
case <-timer.C:
180234
klog.Warningf("sandbox %s/%s create timed out", sandbox.Namespace, sandbox.Name)
181-
return nil, fmt.Errorf("sandbox creation timed out")
235+
return nil, errSandboxCreationTimeout
182236
}
183237

184238
// agent-sandbox create pod with same name as sandbox if no warmpool is used
@@ -192,10 +246,16 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf
192246

193247
podIP, err := s.k8sClient.GetSandboxPodIP(ctx, sandbox.Namespace, sandbox.Name, sandboxPodName)
194248
if err != nil {
195-
return nil, fmt.Errorf("failed to get sandbox %s/%s pod IP: %v", sandbox.Namespace, sandbox.Name, err)
249+
if isContextError(err) {
250+
return nil, err
251+
}
252+
return nil, api.NewInternalError(fmt.Errorf("failed to get sandbox %s/%s pod IP: %w", sandbox.Namespace, sandbox.Name, err))
196253
}
197254
if err := s.waitForSandboxEntryPointsReady(ctx, podIP, sandboxEntry); err != nil {
198-
return nil, fmt.Errorf("failed to verify sandbox %s/%s entrypoints: %w", sandbox.Namespace, sandbox.Name, err)
255+
if isContextError(err) {
256+
return nil, err
257+
}
258+
return nil, api.NewInternalError(fmt.Errorf("failed to verify sandbox %s/%s entrypoints: %w", sandbox.Namespace, sandbox.Name, err))
199259
}
200260

201261
storeCacheInfo := buildSandboxInfo(createdSandbox, podIP, sandboxEntry)
@@ -208,7 +268,10 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf
208268
}
209269

210270
if err := s.storeClient.UpdateSandbox(ctx, storeCacheInfo); err != nil {
211-
return nil, fmt.Errorf("update store cache failed: %v", err)
271+
if isContextError(err) {
272+
return nil, err
273+
}
274+
return nil, api.NewInternalError(fmt.Errorf("update store cache failed: %w", err))
212275
}
213276

214277
needRollbackSandbox = false
@@ -217,25 +280,26 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf
217280
return response, nil
218281
}
219282

220-
func (s *Server) sandboxRollback(sandboxClaim *extensionsv1alpha1.SandboxClaim, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxEntry *sandboxEntry) {
283+
// rollbackSandboxCreation deletes the sandbox (or sandbox claim) and its store
284+
// placeholder when creation fails. It runs in a fresh context so that a
285+
// canceled request context does not prevent cleanup.
286+
func (s *Server) rollbackSandboxCreation(dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sessionID string) {
221287
ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
222288
defer cancel()
223-
var err error
224289
if sandboxClaim != nil {
225-
// Rollback SandboxClaim
226-
err = deleteSandboxClaim(ctxTimeout, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name)
227-
if err != nil {
290+
if err := deleteSandboxClaim(ctxTimeout, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name); err != nil {
228291
klog.Infof("sandbox claim %s/%s rollback failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err)
292+
} else {
293+
klog.Infof("sandbox claim %s/%s rollback succeeded", sandboxClaim.Namespace, sandboxClaim.Name)
229294
}
230295
} else {
231-
// Rollback Sandbox
232-
err = deleteSandbox(ctxTimeout, dynamicClient, sandbox.Namespace, sandbox.Name)
233-
if err != nil {
296+
if err := deleteSandbox(ctxTimeout, dynamicClient, sandbox.Namespace, sandbox.Name); err != nil {
234297
klog.Infof("sandbox %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err)
298+
} else {
299+
klog.Infof("sandbox %s/%s rollback succeeded", sandbox.Namespace, sandbox.Name)
235300
}
236301
}
237-
// Clean up the store placeholder so it does not pollute GC queries
238-
if delErr := s.storeClient.DeleteSandboxBySessionID(ctxTimeout, sandboxEntry.SessionID); delErr != nil {
302+
if delErr := s.storeClient.DeleteSandboxBySessionID(ctxTimeout, sessionID); delErr != nil {
239303
klog.Infof("sandbox %s/%s store placeholder cleanup failed: %v", sandbox.Namespace, sandbox.Name, delErr)
240304
}
241305
}

0 commit comments

Comments
 (0)