Skip to content

Commit c6feef7

Browse files
committed
refactor: remove LlamaPublisher in favour of consumer-side reactive adapters
Removes the hand-rolled reactive-streams Publisher and the associated mandatory runtime dependency on org.reactivestreams. Adds consumer-facing documentation showing how to wrap LlamaIterable with each mainstream reactive library's resource-management primitive — verified end-to-end by a new ReactorIntegrationTest using test-scope reactor-core. Why now ======= LlamaPublisher (introduced in PR #188 as section 2.3 of the Kotlin SDK feature comparison) had zero non-test callers. The feature-investigation document itself describes its source spec as "no longer a roadmap". The real-world Android consumer LLaMAndroid uses the existing LlamaIterable API directly inside a Kotlin flow { } block — bypassing the publisher entirely. Upstream kherud/java-llama.cpp never carried this class. LlamaIterable already implements Iterable<LlamaOutput> + AutoCloseable — the contract every reactive library needs to bridge a blocking source: - Project Reactor : Flux.using(supplier, Flux::fromIterable, ::close) - RxJava 3 / RxAndroid : Flowable.using(supplier, Flowable::fromIterable, ::close) - Kotlin Flow : flow { iterable.use { for (x in it) emit(x) } } - Akka Streams : Source.fromIterator(() -> iterable.iterator()) These are the canonical patterns the libraries themselves recommend for blocking sources. Keeping a Publisher in the binding forced every consumer onto the org.reactivestreams runtime dep just to access a class nobody called. Critical correctness note: Flux.fromIterable / Flowable.fromIterable do NOT auto-close AutoCloseable iterables on cancel — the consumer must use .using(...) or equivalent. The README documents this caveat explicitly; the ReactorIntegrationTest pins the correct pattern. Changes ======= Deletes: - src/main/java/net/ladenthin/llama/LlamaPublisher.java (175 LOC) - src/test/java/net/ladenthin/llama/LlamaPublisherTest.java (204 LOC) - LlamaModel.streamPublisher / streamChatPublisher (23 LOC) - pom.xml org.reactivestreams runtime dep + version property - module-info.java javadoc reference Adds: - src/test/java/net/ladenthin/llama/ReactorIntegrationTest.java Mock-iterable contract test (always runs) + real-model gate test proving end-to-end cancel propagation via Flux.using + LlamaIterable.close - pom.xml reactor-core + reactor-test at test scope, 3.6.11 - README.md new "Reactive integration" section covering Reactor, RxJava 3, Kotlin Flow (with LLaMAndroid reference), Akka Streams, and the why-no-built-in-Publisher rationale Updates: - docs/feature-investigation-llama-stack-client-kotlin.md: section 2.3 status now reads "SHIPPED + REVERTED REACTIVE PUBLISHER" with full rationale and pointer to the README - TODO.md: new Done entry capturing the decision trail Net: -331 LOC (-503 source, +172 test/docs); -1 runtime dep (org.reactivestreams); +2 test-scope deps (reactor-core, reactor-test). SpotBugs Max+Low: total drops 25 -> 19 (all 6 LlamaPublisher$LlamaSubscription findings cleared as a side effect: MDM_WAIT_WITHOUT_TIMEOUT x4 + CWO_CLOSED_WITHOUT_OPENED + PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS). Tests ===== ReactorIntegrationTest: 2 tests, mock variant always runs, real-model variant gated on TestConstants.MODEL_PATH. Mock test proves Reactor backpressure (request(2) delivers exactly 2 items, never more) and cleanup-on-cancel (Flux.using cleanup function fires on cancel). 887 of 888 tests pass (the 1 error is the known sandbox-without-native-lib UnsatisfiedLinkError in RerankingModelTest, unrelated to this change).
1 parent 07cabfb commit c6feef7

9 files changed

Lines changed: 275 additions & 450 deletions

File tree

README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,67 @@ try (LlamaModel model = new LlamaModel(modelParams)) {
417417
}
418418
```
419419

420+
### Reactive integration (Reactor, RxJava, Kotlin Flow, Akka)
421+
422+
`LlamaIterable` (returned by `model.generate(...)` and `model.generateChat(...)`)
423+
implements `Iterable<LlamaOutput> & AutoCloseable`, so every mainstream reactive
424+
library wraps it in a few lines without `java-llama.cpp` pulling in a runtime
425+
reactive dependency.
426+
427+
**Always wrap with the library's resource-management primitive**`Flux.using`,
428+
`Flowable.using`, Kotlin `use {}`, etc. — so that subscription cancellation
429+
flows into `LlamaIterable.close()` and from there into llama.cpp's native
430+
`cancelCompletion`. A plain `Flux.fromIterable(iterable)` or `for (x in iter)`
431+
loop will NOT close the iterable on cancel; the native task slot stays
432+
occupied until the model is closed.
433+
434+
#### Project Reactor (Spring WebFlux)
435+
```java
436+
Flux<LlamaOutput> tokens = Flux.using(
437+
() -> model.generate(params),
438+
Flux::fromIterable,
439+
LlamaIterable::close)
440+
.subscribeOn(Schedulers.boundedElastic());
441+
```
442+
443+
#### RxJava 3 (also for RxAndroid)
444+
```java
445+
Flowable<LlamaOutput> tokens = Flowable.using(
446+
() -> model.generate(params),
447+
Flowable::fromIterable,
448+
LlamaIterable::close)
449+
.subscribeOn(Schedulers.io());
450+
```
451+
452+
#### Kotlin Flow (Android / coroutines)
453+
```kotlin
454+
fun llama(model: LlamaModel, params: InferenceParameters) = flow {
455+
model.generate(params).use { iterable ->
456+
for (output in iterable) emit(output)
457+
}
458+
}.flowOn(Dispatchers.IO)
459+
```
460+
The companion Android sample [LLaMAndroid](https://github.com/Rattlyy/LLaMAndroid)
461+
demonstrates the `flow { for (output in model.generate(params)) emit(output) }`
462+
shape against the upstream binding. Wrap the `for` loop in
463+
`.use { }` if your collector may cancel mid-stream — otherwise the native task
464+
slot will not be released until the model is closed.
465+
466+
#### Akka Streams
467+
```scala
468+
val tokens: Source[LlamaOutput, NotUsed] = Source
469+
.fromIterator(() => model.generate(params).iterator())
470+
.async("blocking-io-dispatcher")
471+
```
472+
473+
**Why no built-in `Publisher`?** Earlier snapshots of this fork shipped a
474+
hand-rolled `LlamaModel.streamPublisher(...)` returning a Reactive Streams
475+
`Publisher<LlamaOutput>`. Since every reactive library bridges blocking
476+
iterables in a few lines via its own resource-management primitive, the binding
477+
now stays free of any reactive runtime dependency — pick whichever library your
478+
app already uses. The pattern is verified end-to-end by
479+
`ReactorIntegrationTest` in the test sources.
480+
420481
### Logging
421482

422483
Per default, logs are written to stdout.

TODO.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ These are JNI plumbing items for upstream API additions. Policy: add only after
6969

7070
## Done (kept for history)
7171

72+
- **Reactive `LlamaPublisher` removed in favour of consumer-side adapters.**
73+
The hand-rolled `LlamaPublisher` + `LlamaModel.streamPublisher` /
74+
`streamChatPublisher` (shipped in PR #188 as §2.3 of the Kotlin SDK
75+
feature comparison) had zero non-test callers. `LlamaIterable` is
76+
already `Iterable<LlamaOutput> & AutoCloseable`, and every mainstream
77+
reactive library wraps it in a few lines via its own resource-management
78+
primitive (`Flux.using`, `Flowable.using`, Kotlin `use {}`). The real-world
79+
Android consumer [LLaMAndroid](https://github.com/Rattlyy/LLaMAndroid)
80+
already uses `LlamaIterable` inside a Kotlin `flow {}` block — bypassing
81+
the publisher entirely. README "Reactive integration" section documents
82+
the Reactor / RxJava 3 / Kotlin Flow / Akka patterns; correctness is
83+
pinned end-to-end by a new `ReactorIntegrationTest` using
84+
test-scope `reactor-core` (zero runtime deps added; `org.reactivestreams`
85+
runtime dep dropped). Cleared 6 fb-contrib Max+Low findings on
86+
`LlamaPublisher$LlamaSubscription` as a side effect.
87+
7288
- **Error Prone bug-pattern promotions to `ERROR`**`855f447` (12 patterns promoted; `-Xlint:all` enabled).
7389
- **`javac -Werror` + `-Xlint:all,-serial,-options,-classfile,-processing`**`3e2efbb`. ~20 EP warnings addressed first (EqualsGetClass on `Pair` via instanceof; MissingOverride on `PoolingType` / `RopeScalingType`; JdkObsolete `LinkedList``ArrayList` in `LlamaLoader`; StringSplitter inline-suppressed; 3× StringCaseLocaleUsage `Locale.ROOT` in `OSInfo`; EmptyCatch in `OSInfo.isAlpineLinux`; FutureReturnValueIgnored in `LlamaModel.completeAsync`; Finalize on `LlamaModel.finalize`; MixedMutabilityReturnType in 4 parser methods; EnumOrdinal in `InferenceParameters.setMiroStat`; EscapedEntity in `InferenceParameters` javadoc; 4× TypeParameterUnusedInFormals; AnnotateFormatMethod on `Java8CompatibilityHelper.formatted`; SafeVarargs + varargs on `Java8CompatibilityHelper.listOf`).
7490
- **`-parameters` javac arg**`4350cf2`.

docs/feature-investigation-llama-stack-client-kotlin.md

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,27 @@ papercut.
158158

159159
### 2.3 Async / non-blocking API — **S–M**
160160

161-
**Status: SHIPPED.** `CompletableFuture` wrappers (`completeAsync`,
162-
`chatCompleteAsync`, `chatCompleteTextAsync`, plus a
161+
**Status: SHIPPED + REVERTED REACTIVE PUBLISHER.** `CompletableFuture` wrappers
162+
(`completeAsync`, `chatCompleteAsync`, `chatCompleteTextAsync`, plus a
163163
`completeAsync(params, CancellationToken)` bridge that propagates
164-
`future.cancel(true)` into the cooperative token) in commit `1e673a9`.
165-
The reactive `Publisher<LlamaOutput>` follow-up (backpressure via
166-
Reactive Streams, single-subscriber) shipped in commit `afa4f65` as
167-
`LlamaModel.streamPublisher(...)` and `streamChatPublisher(...)` backed
168-
by `LlamaPublisher`. New runtime dep: `org.reactivestreams:reactive-streams:1.0.4`.
164+
`future.cancel(true)` into the cooperative token) in commit `1e673a9`
165+
**still shipped**.
166+
167+
The reactive `Publisher<LlamaOutput>` follow-up was shipped in commit `afa4f65`
168+
as `LlamaModel.streamPublisher(...)` / `streamChatPublisher(...)` backed by
169+
`LlamaPublisher`. **It has since been removed** — see the README section
170+
"Reactive integration" for the rationale and the canonical replacement
171+
patterns. In short: `LlamaIterable` is already
172+
`Iterable<LlamaOutput> & AutoCloseable`, and every mainstream reactive
173+
library (Project Reactor, RxJava 3, Kotlin coroutines `Flow`, Akka Streams)
174+
wraps it in a few lines via its own resource-management primitive
175+
(`Flux.using`, `Flowable.using`, Kotlin `use {}`, etc.). Keeping a hand-rolled
176+
`Publisher` in the binding imposed a mandatory `org.reactivestreams` runtime
177+
dep on every consumer for a class that had **zero non-test callers**
178+
including the canonical Android sample [LLaMAndroid](https://github.com/Rattlyy/LLaMAndroid),
179+
which uses `LlamaIterable` directly inside a Kotlin `flow { }` block. Pattern
180+
correctness is now pinned end-to-end by `ReactorIntegrationTest`
181+
(test-scope `reactor-core`); zero runtime deps added.
169182

170183
**Gap.** All `LlamaModel` methods are blocking. Kotlin offers
171184
`suspend fun` + Flow variants. JVM users currently dedicate platform

pom.xml

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ SPDX-License-Identifier: MIT
5656
<nullaway.version>0.13.4</nullaway.version>
5757
<checker.version>4.2.0</checker.version>
5858
<jackson.version>2.22.0</jackson.version>
59-
<reactive-streams.version>1.0.4</reactive-streams.version>
59+
<reactor.version>3.6.11</reactor.version>
6060
<slf4j.version>2.0.18</slf4j.version>
6161
<logback.version>1.5.34</logback.version>
6262
<animal-sniffer.version>1.27</animal-sniffer.version>
@@ -140,14 +140,6 @@ SPDX-License-Identifier: MIT
140140
<artifactId>jackson-databind</artifactId>
141141
<version>${jackson.version}</version>
142142
</dependency>
143-
<!-- Reactive Streams API used by LlamaPublisher to expose token streams as a
144-
Publisher<LlamaOutput>. Java 8 compatible, ~5 KB, supplies the standard
145-
interfaces that Reactor / RxJava / Kotlin coroutines bridge to. -->
146-
<dependency>
147-
<groupId>org.reactivestreams</groupId>
148-
<artifactId>reactive-streams</artifactId>
149-
<version>${reactive-streams.version}</version>
150-
</dependency>
151143
<!-- Required by OSInfo (vendored from xerial/sqlite-jdbc) for log emission. -->
152144
<dependency>
153145
<groupId>org.slf4j</groupId>
@@ -202,6 +194,23 @@ SPDX-License-Identifier: MIT
202194
<version>${logcaptor.version}</version>
203195
<scope>test</scope>
204196
</dependency>
197+
<!-- Test-only Project Reactor used by ReactorIntegrationTest to prove the
198+
"wrap LlamaIterable with Flux.fromIterable + subscribeOn" pattern works
199+
end-to-end (backpressure, AutoCloseable cancel propagation). NOT a runtime
200+
dependency — consumers bring whichever reactive lib they already use
201+
(Reactor / RxJava / Kotlin Flow / Akka — see README "Reactive integration"). -->
202+
<dependency>
203+
<groupId>io.projectreactor</groupId>
204+
<artifactId>reactor-core</artifactId>
205+
<version>${reactor.version}</version>
206+
<scope>test</scope>
207+
</dependency>
208+
<dependency>
209+
<groupId>io.projectreactor</groupId>
210+
<artifactId>reactor-test</artifactId>
211+
<version>${reactor.version}</version>
212+
<scope>test</scope>
213+
</dependency>
205214
</dependencies>
206215

207216
<build>

src/main/java/module-info.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
* <p>{@code requires static org.jspecify} is needed only at compile time of this
2727
* descriptor; JSpecify annotations carry {@code RetentionPolicy.CLASS} so module-path
2828
* consumers never need jspecify on their runtime path. Checker Framework qualifiers and
29-
* the Codehaus animal-sniffer annotation are likewise compile-time only. Jackson, SLF4J,
30-
* and Reactive Streams API are referenced from ordinary sources only; javac in the
31-
* separate {@code module-info-compile} execution compiles {@code module-info.java} in
32-
* isolation and therefore does not need their module names. Consumers that put this jar
29+
* the Codehaus animal-sniffer annotation are likewise compile-time only. Jackson and
30+
* SLF4J are referenced from ordinary sources only; javac in the separate
31+
* {@code module-info-compile} execution compiles {@code module-info.java} in isolation
32+
* and therefore does not need their module names. Consumers that put this jar
3333
* on the module path will load these dependencies through their own {@code requires}
3434
* graph; consumers on the classpath are unaffected.</p>
3535
*

src/main/java/net/ladenthin/llama/LlamaModel.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -213,29 +213,6 @@ public java.util.List<ChatResponse> chatBatch(java.util.Collection<ChatRequest>
213213
return out;
214214
}
215215

216-
/**
217-
* Reactive-streams variant of {@link #generate(InferenceParameters)}. Returns a
218-
* {@link org.reactivestreams.Publisher} of {@link LlamaOutput} tokens. Each subscriber
219-
* triggers a fresh streaming inference on a dedicated background thread; backpressure
220-
* is honoured via the Reactive Streams {@code request(n)} protocol. Use
221-
* {@link org.reactivestreams.Subscription#cancel()} to stop the inference early.
222-
*
223-
* @param parameters the inference configuration
224-
* @return a single-subscriber {@link org.reactivestreams.Publisher} of tokens
225-
*/
226-
public LlamaPublisher streamPublisher(InferenceParameters parameters) {
227-
return new LlamaPublisher(this, parameters, false);
228-
}
229-
230-
/**
231-
* Reactive-streams variant of {@link #generateChat(InferenceParameters)}.
232-
*
233-
* @param parameters the inference parameters including messages
234-
* @return a single-subscriber {@link org.reactivestreams.Publisher} of tokens
235-
*/
236-
public LlamaPublisher streamChatPublisher(InferenceParameters parameters) {
237-
return new LlamaPublisher(this, parameters, true);
238-
}
239216

240217
/**
241218
* Asynchronous variant of {@link #complete(InferenceParameters)}. Runs the inference on

0 commit comments

Comments
 (0)