1717package com .google .adk .models ;
1818
1919import static com .google .common .base .StandardSystemProperty .JAVA_VERSION ;
20+ import static net .javacrumbs .futureconverter .java8guava .FutureConverter .toListenableFuture ;
2021
2122import com .google .adk .Version ;
23+ import com .google .async .rxjava3 .Singles ;
2224import com .google .common .collect .ImmutableMap ;
25+ import com .google .common .util .concurrent .ListenableFuture ;
2326import com .google .errorprone .annotations .CanIgnoreReturnValue ;
2427import com .google .genai .Client ;
2528import com .google .genai .ResponseStream ;
3235import com .google .genai .types .LiveConnectConfig ;
3336import com .google .genai .types .Part ;
3437import io .reactivex .rxjava3 .core .Flowable ;
38+ import io .reactivex .rxjava3 .schedulers .Schedulers ;
3539import java .util .ArrayList ;
3640import java .util .List ;
3741import java .util .Objects ;
3842import java .util .Optional ;
39- import java .util .concurrent .CompletableFuture ;
4043import org .slf4j .Logger ;
4144import org .slf4j .LoggerFactory ;
4245
@@ -218,14 +221,17 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
218221
219222 if (stream ) {
220223 logger .debug ("Sending streaming generateContent request to model {}" , effectiveModelName );
221- CompletableFuture <ResponseStream <GenerateContentResponse >> streamFuture =
222- apiClient .async .models .generateContentStream (
223- effectiveModelName , llmRequest .contents (), config );
224+ ListenableFuture <ResponseStream <GenerateContentResponse >> streamFuture =
225+ toListenableFuture (
226+ apiClient .async .models .generateContentStream (
227+ effectiveModelName , llmRequest .contents (), config ));
224228
225229 return Flowable .defer (
226230 () ->
227231 processRawResponses (
228- Flowable .fromFuture (streamFuture ).flatMapIterable (iterable -> iterable )))
232+ Singles .toSingle (() -> streamFuture , Schedulers .io ())
233+ .toFlowable ()
234+ .flatMapIterable (iterable -> iterable )))
229235 .filter (
230236 llmResponse ->
231237 llmResponse
@@ -243,12 +249,17 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
243249 .orElse (false ));
244250 } else {
245251 logger .debug ("Sending generateContent request to model {}" , effectiveModelName );
246- return Flowable .fromFuture (
247- apiClient
248- .async
249- .models
250- .generateContent (effectiveModelName , llmRequest .contents (), config )
251- .thenApplyAsync (LlmResponse ::create ));
252+ final LlmRequest finalLlmRequest = llmRequest ;
253+ return Singles .toSingle (
254+ () ->
255+ toListenableFuture (
256+ apiClient
257+ .async
258+ .models
259+ .generateContent (effectiveModelName , finalLlmRequest .contents (), config )
260+ .thenApplyAsync (LlmResponse ::create )),
261+ Schedulers .io ())
262+ .toFlowable ();
252263 }
253264 }
254265
0 commit comments