Skip to content

Commit 447c088

Browse files
committed
fix: true parallel tool execution via subscribeOn(Schedulers.io()) in concatMapEager
Fixes #1152
1 parent 4009905 commit 447c088

3 files changed

Lines changed: 497 additions & 2 deletions

File tree

core/src/main/java/com/google/adk/flows/llmflows/Functions.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.reactivex.rxjava3.core.Single;
4848
import io.reactivex.rxjava3.disposables.Disposable;
4949
import io.reactivex.rxjava3.functions.Function;
50+
import io.reactivex.rxjava3.schedulers.Schedulers;
5051
import java.util.ArrayList;
5152
import java.util.HashMap;
5253
import java.util.HashSet;
@@ -160,7 +161,9 @@ public static Maybe<Event> handleFunctionCalls(
160161
} else {
161162
functionResponseEventsObservable =
162163
Observable.fromIterable(validFunctionCalls)
163-
.concatMapEager(call -> functionCallMapper.apply(call).toObservable());
164+
.concatMapEager(
165+
call ->
166+
functionCallMapper.apply(call).toObservable().subscribeOn(Schedulers.io()));
164167
}
165168
return functionResponseEventsObservable
166169
.toList()
@@ -231,7 +234,9 @@ public static Maybe<Event> handleFunctionCallsLive(
231234
} else {
232235
responseEventsObservable =
233236
Observable.fromIterable(validFunctionCalls)
234-
.concatMapEager(call -> functionCallMapper.apply(call).toObservable());
237+
.concatMapEager(
238+
call ->
239+
functionCallMapper.apply(call).toObservable().subscribeOn(Schedulers.io()));
235240
}
236241

237242
return responseEventsObservable

0 commit comments

Comments
 (0)