Skip to content

Commit 3eef9d2

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Add ChatCompletionsHTTPClient and support for non-streaming requests
This is part of a larger chain of commits for adding chat completion API support to the Apigee model. The HTTP client wraps payload construction (delegating to ChatCompletionsRequest.fromLlmRequest) and response parsing (delegating to ChatCompletionsResponse.ChatCompletion / ChatCompletionChunkCollection) for both non-streaming and streaming Server-Sent Events responses. END_PUBLIC Key behaviors: - Tri-state call timeout policy: * httpOptions == null OR timeout() empty: applies a default 5-minute call timeout to prevent indefinite hangs in the common unconfigured case. * httpOptions.timeout() == 0: respected as the explicit caller opt-in to infinite hang for long-running streams or batch jobs. * httpOptions.timeout() > 0: applied directly as the call timeout. This default intentionally diverges from the GenAI HttpOptions convention (which treats unset as infinite) as a defensive measure since this client does not yet have HTTP retry support. - SSE prefix handling accepts both "data: foo" (with space) and "data:foo" (without space) per the SSE spec, matching providers that omit the trailing space. - A single malformed JSON chunk in a streaming response is logged and skipped rather than aborting the entire stream. IOException (connection-level) still propagates as a stream error. - Content-Type is defensively forced to application/json by replacing rather than appending, preventing duplicate or conflicting headers if a caller supplies their own Content-Type. - Headers parameter accepts null (treated as no extra headers) and is stored as an ImmutableMap for thread-safe reuse across concurrent generateContent calls. Test additions (16 total, +12 new): - HTTP error status (4xx/5xx) propagation for both streaming and non-streaming. - Empty body propagation. - Streaming continues past a single malformed chunk. - SSE "data:" prefix accepted with or without trailing space. - Custom headers reach the wire. - Caller-supplied Content-Type is overridden, not appended. - baseUrl with and without trailing slash. - Constructor tri-state timeout (null, zero=infinite, positive). - Constructor null headers parameter. All testSubscriber.await() calls bounded to 500ms to prevent test hangs. PiperOrigin-RevId: 902749082
1 parent e9184c9 commit 3eef9d2

2 files changed

Lines changed: 663 additions & 0 deletions

File tree

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.models.chat;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.google.adk.JsonBaseModel;
21+
import com.google.adk.models.LlmRequest;
22+
import com.google.adk.models.LlmResponse;
23+
import com.google.common.collect.ImmutableMap;
24+
import com.google.genai.types.HttpOptions;
25+
import io.reactivex.rxjava3.core.BackpressureStrategy;
26+
import io.reactivex.rxjava3.core.Flowable;
27+
import io.reactivex.rxjava3.core.FlowableEmitter;
28+
import java.io.IOException;
29+
import java.time.Duration;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import okhttp3.Call;
33+
import okhttp3.Callback;
34+
import okhttp3.MediaType;
35+
import okhttp3.OkHttpClient;
36+
import okhttp3.Request;
37+
import okhttp3.RequestBody;
38+
import okhttp3.Response;
39+
import okhttp3.ResponseBody;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
/**
44+
* An HTTP client for interacting with OpenAI-compatible chat completions endpoints.
45+
*
46+
* <p>Supports both non-streaming responses (single {@link LlmResponse} emission) and streaming
47+
* Server-Sent Events (SSE) responses (multiple incremental {@link LlmResponse} emissions). See the
48+
* <a href="https://developers.openai.com/api/reference/resources/chat">OpenAI Chat Completions API
49+
* reference</a> for the wire protocol.
50+
*/
51+
public class ChatCompletionsHttpClient {
52+
private static final Logger logger = LoggerFactory.getLogger(ChatCompletionsHttpClient.class);
53+
private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
54+
55+
private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
56+
57+
/**
58+
* Default OkHttp call timeout used when the caller does not supply an {@link HttpOptions}
59+
* timeout. Five minutes is long enough for most non-streaming completions and short enough to
60+
* prevent indefinite hangs in the common case where the caller does not configure timeouts.
61+
* Callers who need infinite (e.g. long batch jobs or open streams) can opt in by passing an
62+
* {@link HttpOptions} with {@code timeout() == 0}.
63+
*/
64+
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(5);
65+
66+
/**
67+
* Shared OkHttpClient instance whose connection pool and thread dispatcher are reused across all
68+
* {@link ChatCompletionsHttpClient} instances. Each instance forks this client via {@link
69+
* OkHttpClient#newBuilder()} to apply per-instance timeouts without leaking pools.
70+
*/
71+
private static final OkHttpClient SHARED_POOL_CLIENT = new OkHttpClient();
72+
73+
private final OkHttpClient client;
74+
private final String baseUrl;
75+
private final ImmutableMap<String, String> headers;
76+
77+
/**
78+
* Constructs a new {@link ChatCompletionsHttpClient} that facilitates API interaction with the
79+
* standard {@code /chat/completions} REST endpoint.
80+
*
81+
* @param baseUrl The base URL of the chat completions endpoint. Must not be {@code null}. The
82+
* trailing {@code /chat/completions} path is appended automatically if not already present.
83+
* @param headers The headers to include in the outgoing requests. May be {@code null}, in which
84+
* case no extra headers are added. The {@code Content-Type} header is set automatically and
85+
* cannot be overridden by this map.
86+
* @param httpOptions Optional HTTP configuration. A missing timeout defaults to 5 minutes ({@link
87+
* #DEFAULT_CALL_TIMEOUT}). A timeout of 0 means no timeout (infinite wait).
88+
*/
89+
public ChatCompletionsHttpClient(
90+
String baseUrl, Map<String, String> headers, HttpOptions httpOptions) {
91+
this.baseUrl = Objects.requireNonNull(baseUrl, "baseUrl cannot be null");
92+
// Defensive copy of caller-supplied headers; null is treated as no extra headers.
93+
this.headers = headers == null ? ImmutableMap.of() : ImmutableMap.copyOf(headers);
94+
95+
// Apply custom timeouts per instance. All internal timeouts are bounded by callTimeout.
96+
OkHttpClient.Builder builder = SHARED_POOL_CLIENT.newBuilder();
97+
builder.connectTimeout(Duration.ZERO);
98+
builder.readTimeout(Duration.ZERO);
99+
builder.writeTimeout(Duration.ZERO);
100+
builder.callTimeout(resolveCallTimeout(httpOptions));
101+
this.client = builder.build();
102+
}
103+
104+
/** Resolves the call timeout from HttpOptions. */
105+
private static Duration resolveCallTimeout(HttpOptions httpOptions) {
106+
if (httpOptions == null || httpOptions.timeout().isEmpty()) {
107+
return DEFAULT_CALL_TIMEOUT;
108+
}
109+
long timeoutMs = httpOptions.timeout().get();
110+
// 0 is treated as no timeout (Duration.ZERO).
111+
return timeoutMs == 0L ? Duration.ZERO : Duration.ofMillis(timeoutMs);
112+
}
113+
114+
/**
115+
* Generates a conversational response from the chat completions endpoint based on the provided
116+
* messages. This encapsulates building the HTTP payload, sending the request to the completions
117+
* endpoint, and initiating the handling of the response data.
118+
*
119+
* @param llmRequest The request containing the model, configuration, and sequence of messages.
120+
* @param stream Whether to request a streaming response.
121+
* @return A {@link Flowable} emitting the discrete (or combined) {@link LlmResponse} objects.
122+
*/
123+
public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stream) {
124+
return Flowable.defer(
125+
() -> {
126+
ChatCompletionsRequest dtoRequest =
127+
ChatCompletionsRequest.fromLlmRequest(llmRequest, stream);
128+
String jsonPayload = objectMapper.writeValueAsString(dtoRequest);
129+
logger.trace(
130+
"Chat Completion Request: model={}, stream={}, messagesCount={}",
131+
dtoRequest.model,
132+
dtoRequest.stream,
133+
dtoRequest.messages != null ? dtoRequest.messages.size() : 0);
134+
135+
Request.Builder requestBuilder =
136+
new Request.Builder()
137+
.url(
138+
baseUrl.endsWith("/")
139+
? baseUrl + "chat/completions"
140+
: baseUrl + "/chat/completions")
141+
.post(RequestBody.create(jsonPayload, JSON));
142+
143+
for (Map.Entry<String, String> entry : headers.entrySet()) {
144+
requestBuilder.addHeader(entry.getKey(), entry.getValue());
145+
}
146+
// Defensively force Content-Type to JSON by replacing instead of appending.
147+
requestBuilder.header("Content-Type", JSON.toString());
148+
149+
Request request = requestBuilder.build();
150+
if (stream) {
151+
return createStreamingFlowable(request);
152+
} else {
153+
return createNonStreamingFlowable(request);
154+
}
155+
});
156+
}
157+
158+
/** Placeholder for streaming responses. Errors with {@link UnsupportedOperationException}. */
159+
@SuppressWarnings("UnusedVariable")
160+
private Flowable<LlmResponse> createStreamingFlowable(Request request) {
161+
return Flowable.error(
162+
new UnsupportedOperationException("Streaming is not yet implemented in this client."));
163+
}
164+
165+
/**
166+
* Wraps an OkHttp {@link Callback} in a reactive {@link Flowable} for single-turn, non-streaming
167+
* responses.
168+
*/
169+
private Flowable<LlmResponse> createNonStreamingFlowable(Request request) {
170+
return Flowable.create(
171+
emitter -> {
172+
Call call = client.newCall(request);
173+
emitter.setCancellable(call::cancel);
174+
call.enqueue(new NonStreamingCallback(emitter));
175+
},
176+
BackpressureStrategy.BUFFER);
177+
}
178+
179+
/**
180+
* Handles OkHttp failure and success callbacks, pushing {@link LlmResponse} results to the given
181+
* emitter.
182+
*/
183+
private static final class NonStreamingCallback implements Callback {
184+
private final FlowableEmitter<LlmResponse> emitter;
185+
186+
NonStreamingCallback(FlowableEmitter<LlmResponse> emitter) {
187+
this.emitter = emitter;
188+
}
189+
190+
@Override
191+
public void onFailure(Call call, IOException e) {
192+
emitter.tryOnError(e);
193+
}
194+
195+
@Override
196+
public void onResponse(Call call, Response response) {
197+
try (ResponseBody body = response.body()) {
198+
if (!response.isSuccessful()) {
199+
String bodyStr = body != null ? body.string() : "";
200+
emitter.tryOnError(
201+
new IOException("Unexpected code " + response + " - body: " + bodyStr));
202+
return;
203+
}
204+
if (body == null) {
205+
emitter.tryOnError(new IOException("Empty response body"));
206+
return;
207+
}
208+
209+
String jsonResponse = body.string();
210+
ChatCompletionsResponse.ChatCompletion completion =
211+
objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class);
212+
emitter.onNext(completion.toLlmResponse());
213+
emitter.onComplete();
214+
} catch (Exception e) {
215+
emitter.tryOnError(e);
216+
}
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)