Skip to content

Commit 9529c1a

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Add ChatCompletionsHTTPClient and support for non-streaming requests
This is part of a larger chain of commits for adding chat completion API support to the Apigee model. The HTTP client wraps payload construction (delegating to ChatCompletionsRequest.fromLlmRequest) and response parsing (delegating to ChatCompletionsResponse.ChatCompletion / ChatCompletionChunkCollection) for both non-streaming and streaming Server-Sent Events responses. END_PUBLIC Key behaviors: - Tri-state call timeout policy: * httpOptions == null OR timeout() empty: applies a default 5-minute call timeout to prevent indefinite hangs in the common unconfigured case. * httpOptions.timeout() == 0: respected as the explicit caller opt-in to infinite hang for long-running streams or batch jobs. * httpOptions.timeout() > 0: applied directly as the call timeout. This default intentionally diverges from the GenAI HttpOptions convention (which treats unset as infinite) as a defensive measure since this client does not yet have HTTP retry support. - SSE prefix handling accepts both "data: foo" (with space) and "data:foo" (without space) per the SSE spec, matching providers that omit the trailing space. - A single malformed JSON chunk in a streaming response is logged and skipped rather than aborting the entire stream. IOException (connection-level) still propagates as a stream error. - Content-Type is defensively forced to application/json by replacing rather than appending, preventing duplicate or conflicting headers if a caller supplies their own Content-Type. - Headers parameter accepts null (treated as no extra headers) and is stored as an ImmutableMap for thread-safe reuse across concurrent generateContent calls. Test additions (16 total, +12 new): - HTTP error status (4xx/5xx) propagation for both streaming and non-streaming. - Empty body propagation. - Streaming continues past a single malformed chunk. - SSE "data:" prefix accepted with or without trailing space. - Custom headers reach the wire. - Caller-supplied Content-Type is overridden, not appended. - baseUrl with and without trailing slash. - Constructor tri-state timeout (null, zero=infinite, positive). - Constructor null headers parameter. All testSubscriber.await() calls bounded to 500ms to prevent test hangs. PiperOrigin-RevId: 915109034
1 parent 5af998c commit 9529c1a

2 files changed

Lines changed: 733 additions & 0 deletions

File tree

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.models.chat;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.google.adk.JsonBaseModel;
21+
import com.google.adk.models.LlmRequest;
22+
import com.google.adk.models.LlmResponse;
23+
import com.google.common.collect.ImmutableMap;
24+
import com.google.genai.types.HttpOptions;
25+
import io.reactivex.rxjava3.core.BackpressureStrategy;
26+
import io.reactivex.rxjava3.core.Flowable;
27+
import io.reactivex.rxjava3.core.FlowableEmitter;
28+
import java.io.IOException;
29+
import java.time.Duration;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import okhttp3.Call;
33+
import okhttp3.Callback;
34+
import okhttp3.HttpUrl;
35+
import okhttp3.MediaType;
36+
import okhttp3.OkHttpClient;
37+
import okhttp3.Request;
38+
import okhttp3.RequestBody;
39+
import okhttp3.Response;
40+
import okhttp3.ResponseBody;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
/**
45+
* An HTTP client for interacting with OpenAI-compatible chat completions endpoints.
46+
*
47+
* <p>Supports both non-streaming responses (single {@link LlmResponse} emission) and streaming
48+
* Server-Sent Events (SSE) responses (multiple incremental {@link LlmResponse} emissions). See the
49+
* <a href="https://developers.openai.com/api/reference/resources/chat">OpenAI Chat Completions API
50+
* reference</a> for the wire protocol.
51+
*/
52+
public class ChatCompletionsHttpClient {
53+
private static final Logger logger = LoggerFactory.getLogger(ChatCompletionsHttpClient.class);
54+
private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
55+
56+
private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
57+
58+
/**
59+
* Default OkHttp call timeout used when the caller does not supply an {@link HttpOptions}
60+
* timeout. Five minutes is long enough for most non-streaming completions and short enough to
61+
* prevent indefinite hangs in the common case where the caller does not configure timeouts.
62+
* Callers who need infinite (e.g. long batch jobs or open streams) can opt in by passing an
63+
* {@link HttpOptions} with {@code timeout() == 0}.
64+
*/
65+
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(5);
66+
67+
/**
68+
* Shared OkHttpClient instance whose connection pool and thread dispatcher are reused across all
69+
* {@link ChatCompletionsHttpClient} instances. Each instance forks this client via {@link
70+
* OkHttpClient#newBuilder()} to apply per-instance timeouts without leaking pools.
71+
*/
72+
private static final OkHttpClient SHARED_POOL_CLIENT = new OkHttpClient();
73+
74+
private final OkHttpClient client;
75+
private final HttpUrl completionsUrl;
76+
private final ImmutableMap<String, String> headers;
77+
78+
/**
79+
* Constructs a new {@link ChatCompletionsHttpClient} that facilitates API interaction with the
80+
* standard {@code /chat/completions} REST endpoint.
81+
*
82+
* <p>All configuration is sourced from the supplied {@link HttpOptions}:
83+
*
84+
* <ul>
85+
* <li>{@link HttpOptions#baseUrl()} -- <b>required</b>. The base URL of the chat completions
86+
* endpoint. The {@code chat/completions} path segments are appended automatically using
87+
* {@link HttpUrl}, which handles trailing slashes and percent-encoding deterministically.
88+
* Set via {@code HttpOptions.builder().baseUrl("https://...").build()}.
89+
* <li>{@link HttpOptions#headers()} -- optional. Extra HTTP headers to include in outgoing
90+
* requests. The {@code Content-Type} header is set automatically and cannot be overridden.
91+
* Set via {@code HttpOptions.builder().headers(Map.of("Authorization", "Bearer ...")) }.
92+
* <li>{@link HttpOptions#timeout()} -- optional. Per-call timeout in milliseconds. A missing
93+
* timeout defaults to 5 minutes ({@link #DEFAULT_CALL_TIMEOUT}). A timeout of {@code 0} is
94+
* respected as the explicit caller opt-in to infinite wait. Set via {@code
95+
* HttpOptions.builder().timeout(10_000).build()}.
96+
* </ul>
97+
*
98+
* <p>Example:
99+
*
100+
* <pre>{@code
101+
* HttpOptions options =
102+
* HttpOptions.builder()
103+
* .baseUrl("https://example.com/v1/")
104+
* .headers(ImmutableMap.of("Authorization", "Bearer my-token"))
105+
* .timeout(30_000)
106+
* .build();
107+
* ChatCompletionsHttpClient client = new ChatCompletionsHttpClient(options);
108+
* }</pre>
109+
*
110+
* @param httpOptions HTTP configuration. Must not be {@code null}, and {@link
111+
* HttpOptions#baseUrl()} must be present and parseable as an HTTP(S) URL.
112+
* @throws IllegalArgumentException if {@code httpOptions.baseUrl()} is missing or is not a valid
113+
* HTTP(S) URL.
114+
*/
115+
public ChatCompletionsHttpClient(HttpOptions httpOptions) {
116+
Objects.requireNonNull(httpOptions, "httpOptions cannot be null");
117+
String baseUrl =
118+
httpOptions
119+
.baseUrl()
120+
.orElseThrow(() -> new IllegalArgumentException("httpOptions.baseUrl() must be set"));
121+
HttpUrl parsedBaseUrl = HttpUrl.parse(baseUrl);
122+
if (parsedBaseUrl == null) {
123+
throw new IllegalArgumentException(
124+
"httpOptions.baseUrl() is not a valid HTTP(S) URL: " + baseUrl);
125+
}
126+
// Pre-build the completions URL once. HttpUrl.addPathSegment handles trailing slashes,
127+
// percent-encoding, and existing path components on baseUrl deterministically.
128+
this.completionsUrl =
129+
parsedBaseUrl.newBuilder().addPathSegment("chat").addPathSegment("completions").build();
130+
// Defensive copy of caller-supplied headers; absent is treated as no extra headers.
131+
this.headers =
132+
httpOptions
133+
.headers()
134+
.<ImmutableMap<String, String>>map(ImmutableMap::copyOf)
135+
.orElse(ImmutableMap.of());
136+
137+
// Apply custom timeouts per instance. All internal timeouts are bounded by callTimeout.
138+
OkHttpClient.Builder builder = SHARED_POOL_CLIENT.newBuilder();
139+
builder.connectTimeout(Duration.ZERO);
140+
builder.readTimeout(Duration.ZERO);
141+
builder.writeTimeout(Duration.ZERO);
142+
builder.callTimeout(resolveCallTimeout(httpOptions));
143+
this.client = builder.build();
144+
}
145+
146+
/** Resolves the call timeout from HttpOptions. */
147+
private static Duration resolveCallTimeout(HttpOptions httpOptions) {
148+
if (httpOptions.timeout().isEmpty()) {
149+
return DEFAULT_CALL_TIMEOUT;
150+
}
151+
long timeoutMs = httpOptions.timeout().get();
152+
// 0 is treated as no timeout (Duration.ZERO).
153+
return timeoutMs == 0L ? Duration.ZERO : Duration.ofMillis(timeoutMs);
154+
}
155+
156+
/**
157+
* Generates a conversational response from the chat completions endpoint based on the provided
158+
* messages. This encapsulates building the HTTP payload, sending the request to the completions
159+
* endpoint, and initiating the handling of complete calls.
160+
*
161+
* @param llmRequest The request containing the model, configuration, and sequence of messages.
162+
* @param stream Whether to request a streaming response.
163+
* @return A {@link Flowable} emitting the discrete (or combined) {@link LlmResponse} objects.
164+
*/
165+
public Flowable<LlmResponse> complete(LlmRequest llmRequest, boolean stream) {
166+
return Flowable.defer(
167+
() -> {
168+
ChatCompletionsRequest dtoRequest =
169+
ChatCompletionsRequest.fromLlmRequest(llmRequest, stream);
170+
String jsonPayload = objectMapper.writeValueAsString(dtoRequest);
171+
logger.trace(
172+
"Chat Completion Request: model={}, stream={}, messagesCount={}",
173+
dtoRequest.model,
174+
dtoRequest.stream,
175+
dtoRequest.messages != null ? dtoRequest.messages.size() : 0);
176+
177+
Request.Builder requestBuilder =
178+
new Request.Builder().url(completionsUrl).post(RequestBody.create(jsonPayload, JSON));
179+
180+
for (Map.Entry<String, String> entry : headers.entrySet()) {
181+
requestBuilder.addHeader(entry.getKey(), entry.getValue());
182+
}
183+
// Defensively force Content-Type to JSON by replacing instead of appending.
184+
requestBuilder.header("Content-Type", JSON.toString());
185+
186+
Request request = requestBuilder.build();
187+
if (stream) {
188+
return createStreamingFlowable(request);
189+
} else {
190+
return createNonStreamingFlowable(request);
191+
}
192+
});
193+
}
194+
195+
/** Placeholder for streaming responses. Errors with {@link UnsupportedOperationException}. */
196+
@SuppressWarnings("UnusedVariable")
197+
private Flowable<LlmResponse> createStreamingFlowable(Request request) {
198+
return Flowable.error(
199+
new UnsupportedOperationException("Streaming is not yet implemented in this client."));
200+
}
201+
202+
/**
203+
* Wraps an OkHttp {@link Callback} in a reactive {@link Flowable} for single-turn, non-streaming
204+
* responses.
205+
*/
206+
private Flowable<LlmResponse> createNonStreamingFlowable(Request request) {
207+
return Flowable.create(
208+
emitter -> {
209+
Call call = client.newCall(request);
210+
emitter.setCancellable(call::cancel);
211+
call.enqueue(new NonStreamingCallback(emitter));
212+
},
213+
BackpressureStrategy.BUFFER);
214+
}
215+
216+
/**
217+
* Handles OkHttp failure and success callbacks, pushing {@link LlmResponse} results to the given
218+
* emitter.
219+
*/
220+
private static final class NonStreamingCallback implements Callback {
221+
private final FlowableEmitter<LlmResponse> emitter;
222+
223+
NonStreamingCallback(FlowableEmitter<LlmResponse> emitter) {
224+
this.emitter = emitter;
225+
}
226+
227+
@Override
228+
public void onFailure(Call call, IOException e) {
229+
emitter.tryOnError(e);
230+
}
231+
232+
@Override
233+
public void onResponse(Call call, Response response) {
234+
try (ResponseBody body = response.body()) {
235+
if (!response.isSuccessful()) {
236+
String bodyStr = body != null ? body.string() : "";
237+
emitter.tryOnError(
238+
new IOException("Unexpected code " + response + " - body: " + bodyStr));
239+
return;
240+
}
241+
if (body == null) {
242+
emitter.tryOnError(new IOException("Empty response body"));
243+
return;
244+
}
245+
246+
String jsonResponse = body.string();
247+
ChatCompletionsResponse.ChatCompletion completion =
248+
objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class);
249+
emitter.onNext(completion.toLlmResponse());
250+
emitter.onComplete();
251+
} catch (Exception e) {
252+
emitter.tryOnError(e);
253+
}
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)