Skip to content

Commit cbe0e78

Browse files
authored
fix(upload): hangs indefinitely on download error instead of exiting (#65)
For: RUN-10704
1 parent 8de7d66 commit cbe0e78

3 files changed

Lines changed: 61 additions & 10 deletions

File tree

internal/api/client.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"log/slog"
9+
"strings"
910
"time"
1011

1112
"github.com/google/uuid"
@@ -308,9 +309,10 @@ func (c *Client) ModelUpload(ctx context.Context, req ModelUploadRequest, opts M
308309
}
309310

310311
// consumeUploadFrame inspects a single modelUpload pipeline frame. done is
311-
// true for a terminal frame ("ready" or "failed"). Intermediate statuses are
312-
// reported through onStatus, deduplicated against *lastStatus so repeated
313-
// frames for the same phase fire the callback once.
312+
// true for a terminal frame ("ready", "failed", or other error statuses such as
313+
// "error downloading"). Intermediate statuses are reported through onStatus,
314+
// deduplicated against *lastStatus so repeated frames for the same phase fire
315+
// the callback once.
314316
func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message string), lastStatus *string) (*ModelUploadResult, bool, error) {
315317
var item ModelUploadResult
316318
if err := json.Unmarshal(raw, &item); err != nil {
@@ -321,15 +323,12 @@ func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message strin
321323
switch item.Status {
322324
case uploadStatusReady:
323325
return &item, true, nil
324-
case uploadStatusFailed:
325-
msg := item.Message
326-
if msg == "" {
327-
msg = "no failure reason reported"
328-
}
329-
return nil, true, fmt.Errorf("model upload failed: %s", msg)
330326
case "":
331327
return nil, false, nil
332328
default:
329+
if msg, failed := uploadFailureMessage(item.Status, item.Message); failed {
330+
return nil, true, fmt.Errorf("model upload failed: %s", msg)
331+
}
333332
if item.Status != *lastStatus {
334333
*lastStatus = item.Status
335334
if onStatus != nil {
@@ -340,6 +339,21 @@ func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message strin
340339
}
341340
}
342341

342+
// uploadFailureMessage reports whether a modelUpload pipeline status is terminal
343+
// failure and returns the message to surface to the caller.
344+
func uploadFailureMessage(status, message string) (string, bool) {
345+
if status == uploadStatusFailed || strings.Contains(strings.ToLower(status), "error") {
346+
if message != "" {
347+
return message, true
348+
}
349+
if status == uploadStatusFailed {
350+
return "no failure reason reported", true
351+
}
352+
return status, true
353+
}
354+
return "", false
355+
}
356+
343357
// Poll polls for async task results using the getResponse task type.
344358
// It blocks until at least minResults items with status "success" have been
345359
// returned in a single poll cycle, a data item reports status "error", the

internal/api/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ type ModelUploadRequest struct {
348348

349349
// ModelUploadResult is a single status/result frame from the modelUpload
350350
// pipeline. Status progresses validated → downloaded → optimized → stored →
351-
// ready, or failed.
351+
// ready, or terminates with a failure status such as "failed" or "error downloading".
352352
type ModelUploadResult struct {
353353
TaskType TaskType `json:"taskType"`
354354
TaskUUID uuid.UUID `json:"taskUUID"`

internal/api/upload_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,43 @@ func TestModelUpload_DuplicateStatusDeduped(t *testing.T) {
186186
}
187187
}
188188

189+
// TestModelUpload_ErrorDownloadingReturnsError: pipeline error statuses such as
190+
// "error downloading" must terminate the stream instead of waiting indefinitely.
191+
func TestModelUpload_ErrorDownloadingReturnsError(t *testing.T) {
192+
mock := &mockStreamTransport{
193+
frames: []json.RawMessage{
194+
uploadStatusItem(t, "validated", "", ""),
195+
uploadStatusItem(t, "error downloading", "could not fetch file", ""),
196+
},
197+
}
198+
199+
_, err := NewClient(mock, slog.Default()).ModelUpload(context.Background(), minimalUploadRequest(), ModelUploadOptions{})
200+
if err == nil {
201+
t.Fatal("expected error, got nil")
202+
}
203+
if !strings.Contains(err.Error(), "could not fetch file") {
204+
t.Errorf("expected error to contain failure message, got: %v", err)
205+
}
206+
}
207+
208+
// TestModelUpload_ErrorDownloadingWithoutMessage: when the API omits a message,
209+
// the status string is used as the failure reason.
210+
func TestModelUpload_ErrorDownloadingWithoutMessage(t *testing.T) {
211+
mock := &mockStreamTransport{
212+
frames: []json.RawMessage{
213+
uploadStatusItem(t, "error downloading", "", ""),
214+
},
215+
}
216+
217+
_, err := NewClient(mock, slog.Default()).ModelUpload(context.Background(), minimalUploadRequest(), ModelUploadOptions{})
218+
if err == nil {
219+
t.Fatal("expected error, got nil")
220+
}
221+
if !strings.Contains(err.Error(), "error downloading") {
222+
t.Errorf("expected error to contain status, got: %v", err)
223+
}
224+
}
225+
189226
// TestModelUpload_FailedReturnsError: a "failed" frame surfaces its message.
190227
func TestModelUpload_FailedReturnsError(t *testing.T) {
191228
mock := &mockStreamTransport{

0 commit comments

Comments
 (0)