Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -308,9 +309,10 @@ func (c *Client) ModelUpload(ctx context.Context, req ModelUploadRequest, opts M
}

// consumeUploadFrame inspects a single modelUpload pipeline frame. done is
// true for a terminal frame ("ready" or "failed"). Intermediate statuses are
// reported through onStatus, deduplicated against *lastStatus so repeated
// frames for the same phase fire the callback once.
// true for a terminal frame ("ready", "failed", or other error statuses such as
// "error downloading"). Intermediate statuses are reported through onStatus,
// deduplicated against *lastStatus so repeated frames for the same phase fire
// the callback once.
func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message string), lastStatus *string) (*ModelUploadResult, bool, error) {
var item ModelUploadResult
if err := json.Unmarshal(raw, &item); err != nil {
Expand All @@ -321,15 +323,12 @@ func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message strin
switch item.Status {
case uploadStatusReady:
return &item, true, nil
case uploadStatusFailed:
msg := item.Message
if msg == "" {
msg = "no failure reason reported"
}
return nil, true, fmt.Errorf("model upload failed: %s", msg)
case "":
return nil, false, nil
default:
if msg, failed := uploadFailureMessage(item.Status, item.Message); failed {
return nil, true, fmt.Errorf("model upload failed: %s", msg)
}
if item.Status != *lastStatus {
*lastStatus = item.Status
if onStatus != nil {
Expand All @@ -340,6 +339,21 @@ func consumeUploadFrame(raw json.RawMessage, onStatus func(status, message strin
}
}

// uploadFailureMessage reports whether a modelUpload pipeline status is terminal
// failure and returns the message to surface to the caller.
func uploadFailureMessage(status, message string) (string, bool) {
if status == uploadStatusFailed || strings.Contains(strings.ToLower(status), "error") {
if message != "" {
return message, true
}
if status == uploadStatusFailed {
return "no failure reason reported", true
}
return status, true
}
return "", false
}

// Poll polls for async task results using the getResponse task type.
// It blocks until at least minResults items with status "success" have been
// returned in a single poll cycle, a data item reports status "error", the
Expand Down
2 changes: 1 addition & 1 deletion internal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ type ModelUploadRequest struct {

// ModelUploadResult is a single status/result frame from the modelUpload
// pipeline. Status progresses validated → downloaded → optimized → stored →
// ready, or failed.
// ready, or terminates with a failure status such as "failed" or "error downloading".
type ModelUploadResult struct {
TaskType TaskType `json:"taskType"`
TaskUUID uuid.UUID `json:"taskUUID"`
Expand Down
37 changes: 37 additions & 0 deletions internal/api/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,43 @@ func TestModelUpload_DuplicateStatusDeduped(t *testing.T) {
}
}

// TestModelUpload_ErrorDownloadingReturnsError: pipeline error statuses such as
// "error downloading" must terminate the stream instead of waiting indefinitely.
func TestModelUpload_ErrorDownloadingReturnsError(t *testing.T) {
mock := &mockStreamTransport{
frames: []json.RawMessage{
uploadStatusItem(t, "validated", "", ""),
uploadStatusItem(t, "error downloading", "could not fetch file", ""),
},
}

_, err := NewClient(mock, slog.Default()).ModelUpload(context.Background(), minimalUploadRequest(), ModelUploadOptions{})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "could not fetch file") {
t.Errorf("expected error to contain failure message, got: %v", err)
}
}

// TestModelUpload_ErrorDownloadingWithoutMessage: when the API omits a message,
// the status string is used as the failure reason.
func TestModelUpload_ErrorDownloadingWithoutMessage(t *testing.T) {
mock := &mockStreamTransport{
frames: []json.RawMessage{
uploadStatusItem(t, "error downloading", "", ""),
},
}

_, err := NewClient(mock, slog.Default()).ModelUpload(context.Background(), minimalUploadRequest(), ModelUploadOptions{})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "error downloading") {
t.Errorf("expected error to contain status, got: %v", err)
}
}

// TestModelUpload_FailedReturnsError: a "failed" frame surfaces its message.
func TestModelUpload_FailedReturnsError(t *testing.T) {
mock := &mockStreamTransport{
Expand Down