Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/llminternal/stream_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package llminternal

import (
"context"
"fmt"
"iter"
"maps"
"reflect"
Expand Down Expand Up @@ -58,8 +57,10 @@ func NewStreamingResponseAggregator() *streamingResponseAggregator {
func (s *streamingResponseAggregator) ProcessResponse(ctx context.Context, genResp *genai.GenerateContentResponse) iter.Seq2[*model.LLMResponse, error] {
return func(yield func(*model.LLMResponse, error) bool) {
if len(genResp.Candidates) == 0 {
// shouldn't happen?
yield(nil, fmt.Errorf("empty response"))
// Vertex AI occasionally emits leading SSE chunks that contain only
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the information that sometimes SSE stream can start with an empty text parts mixed with chunks which do not contain any candidates.

// response metadata (createTime, modelVersion, usageMetadata, etc.)
// but no Candidates. These are valid and should be skipped; the actual
// content arrives in subsequent chunks.
return
}
candidate := genResp.Candidates[0]
Expand Down
47 changes: 47 additions & 0 deletions internal/llminternal/stream_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,53 @@ func TestPartialFunctionCallsNotExecutedInNoneStreamingMode(t *testing.T) {
}
}

func TestMetadataOnlyChunkDoesNotAbortStream(t *testing.T) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for the following sequence
Empty text
No candidates
No candidates
Empty text
No candidates
Non-empty text

aggregator := llminternal.NewStreamingResponseAggregator()
ctx := t.Context()

// Simulate the leading metadata-only SSE chunk that Vertex AI emits for
// gemini-3-flash-preview + googleSearch grounding: no Candidates at all.
metadataChunk := &genai.GenerateContentResponse{
// Candidates is intentionally nil / empty.
UsageMetadata: &genai.GenerateContentResponseUsageMetadata{},
}

// The real content arrives in the next chunk.
contentChunk := &genai.GenerateContentResponse{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Role: "model",
Parts: []*genai.Part{{Text: "Here are some movie recommendations."}},
},
FinishReason: genai.FinishReasonStop,
},
},
}

for _, chunk := range []*genai.GenerateContentResponse{metadataChunk, contentChunk} {
for resp, err := range aggregator.ProcessResponse(ctx, chunk) {
if err != nil {
t.Fatalf("unexpected error processing chunk: %v", err)
}
_ = resp
}
}

finalResponse := aggregator.Close()
if finalResponse == nil {
t.Fatal("expected a final aggregated response, got nil")
}

if len(finalResponse.Content.Parts) == 0 {
t.Fatal("expected content parts in the final response, got none")
}

if finalResponse.Content.Parts[0].Text != "Here are some movie recommendations." {
t.Errorf("unexpected text in final response: %q", finalResponse.Content.Parts[0].Text)
}
}

func TestFinishReasonUnexpectedToolCallPreservesErrorCode(t *testing.T) {
aggregator := llminternal.NewStreamingResponseAggregator()
ctx := t.Context()
Expand Down
Loading