Skip to content

Commit afa4f65

Browse files
committed
Add LlamaPublisher reactive-streams token publisher (§2.3 follow-up)
Backpressure-aware Publisher<LlamaOutput> on top of the existing streaming iterator. Reactor / RxJava / Kotlin coroutines all bridge to the Reactive Streams interface natively, so consumers wrap with Flux.from(...) / Flowable.fromPublisher(...) / asFlow() in one line. LlamaPublisher: - Single-subscriber (second subscribe signals onError per RS spec). - Each subscribe starts a dedicated emitter daemon thread. - Demand honoured via AtomicLong + monitor: emitter blocks while demand == 0 and only calls iterator.next() when demand > 0. - request(n <= 0) signals onError with IllegalArgumentException per reactive-streams §3.9. - cancel() closes the underlying iterator (cooperative, same path as LlamaIterator.close); idempotent. - onComplete fires on stop token, onError on any throwable from the iterator path. LlamaModel: - streamPublisher(InferenceParameters) and streamChatPublisher(InferenceParameters) factories. Dependency: adds org.reactivestreams:reactive-streams 1.0.4 (~5 KB, Java 8 compatible) to pom.xml. Tests in LlamaPublisherTest: - nullSubscriberThrows (model-free). - backpressureAndCancel, singleSubscriberContract, invalidRequestSignalsError (model-gated). mvn javadoc:jar BUILD SUCCESS, no new warnings. https://claude.ai/code/session_01R4ZrEy3ptJDLuUgUKuM4Gy
1 parent de457b2 commit afa4f65

4 files changed

Lines changed: 360 additions & 0 deletions

File tree

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ SPDX-License-Identifier: MIT
7373
<artifactId>jackson-databind</artifactId>
7474
<version>2.21.3</version>
7575
</dependency>
76+
<!-- Reactive Streams API used by LlamaPublisher to expose token streams as a
77+
Publisher<LlamaOutput>. Java 8 compatible, ~5 KB, supplies the standard
78+
interfaces that Reactor / RxJava / Kotlin coroutines bridge to. -->
79+
<dependency>
80+
<groupId>org.reactivestreams</groupId>
81+
<artifactId>reactive-streams</artifactId>
82+
<version>1.0.4</version>
83+
</dependency>
7684
<!-- Required by OSInfo (vendored from xerial/sqlite-jdbc) for log emission. -->
7785
<dependency>
7886
<groupId>org.slf4j</groupId>

src/main/java/net/ladenthin/llama/LlamaModel.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,30 @@ public java.util.List<ChatResponse> chatBatch(java.util.List<ChatRequest> reques
181181
return out;
182182
}
183183

184+
/**
185+
* Reactive-streams variant of {@link #generate(InferenceParameters)}. Returns a
186+
* {@link org.reactivestreams.Publisher} of {@link LlamaOutput} tokens. Each subscriber
187+
* triggers a fresh streaming inference on a dedicated background thread; backpressure
188+
* is honoured via the Reactive Streams {@code request(n)} protocol. Use
189+
* {@link org.reactivestreams.Subscription#cancel()} to stop the inference early.
190+
*
191+
* @param parameters the inference configuration
192+
* @return a single-subscriber {@link org.reactivestreams.Publisher} of tokens
193+
*/
194+
public LlamaPublisher streamPublisher(InferenceParameters parameters) {
195+
return new LlamaPublisher(this, parameters, false);
196+
}
197+
198+
/**
199+
* Reactive-streams variant of {@link #generateChat(InferenceParameters)}.
200+
*
201+
* @param parameters the inference parameters including messages
202+
* @return a single-subscriber {@link org.reactivestreams.Publisher} of tokens
203+
*/
204+
public LlamaPublisher streamChatPublisher(InferenceParameters parameters) {
205+
return new LlamaPublisher(this, parameters, true);
206+
}
207+
184208
/**
185209
* Asynchronous variant of {@link #complete(InferenceParameters)}. Runs the inference on
186210
* the common {@link java.util.concurrent.ForkJoinPool} so it does not block the calling
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// SPDX-FileCopyrightText: 2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
2+
//
3+
// SPDX-License-Identifier: MIT
4+
5+
package net.ladenthin.llama;
6+
7+
import org.reactivestreams.Publisher;
8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
/**
15+
* Reactive Streams {@link Publisher} that emits {@link LlamaOutput} tokens from a
16+
* llama.cpp streaming completion. Bridges to Reactor / RxJava / Kotlin coroutines via
17+
* the standard {@code reactive-streams} interface.
18+
* <p>
19+
* Each {@link #subscribe(Subscriber)} starts a fresh inference task on a dedicated
20+
* background thread and honours {@code Subscription.request(n)} for backpressure:
21+
* the emitter thread only calls {@code iterator.next()} while there is outstanding
22+
* demand. When the iterator's stop token arrives the publisher calls
23+
* {@code onComplete}; on cancellation it closes the iterator and stops emitting.
24+
* </p>
25+
* <p>
26+
* Construct via {@link LlamaModel#streamPublisher(InferenceParameters)} or
27+
* {@link LlamaModel#streamChatPublisher(InferenceParameters)}. The publisher is
28+
* single-subscriber: a second {@link #subscribe(Subscriber)} call signals
29+
* {@code onError(IllegalStateException)}.
30+
* </p>
31+
*/
32+
public final class LlamaPublisher implements Publisher<LlamaOutput> {
33+
34+
private final LlamaModel model;
35+
private final InferenceParameters parameters;
36+
private final boolean chat;
37+
private final AtomicBoolean subscribed = new AtomicBoolean(false);
38+
39+
LlamaPublisher(LlamaModel model, InferenceParameters parameters, boolean chat) {
40+
this.model = model;
41+
this.parameters = parameters;
42+
this.chat = chat;
43+
}
44+
45+
@Override
46+
public void subscribe(Subscriber<? super LlamaOutput> subscriber) {
47+
if (subscriber == null) {
48+
throw new NullPointerException("subscriber");
49+
}
50+
if (!subscribed.compareAndSet(false, true)) {
51+
EmptySubscription.signalError(subscriber,
52+
new IllegalStateException("LlamaPublisher is single-subscriber; already subscribed"));
53+
return;
54+
}
55+
LlamaIterable iterable = chat ? model.generateChat(parameters) : model.generate(parameters);
56+
LlamaSubscription sub = new LlamaSubscription(iterable, subscriber);
57+
subscriber.onSubscribe(sub);
58+
sub.start();
59+
}
60+
61+
/** Subscription that honours backpressure and pumps tokens on a dedicated thread. */
62+
private static final class LlamaSubscription implements Subscription {
63+
private final LlamaIterable iterable;
64+
private final Subscriber<? super LlamaOutput> subscriber;
65+
private final AtomicLong demand = new AtomicLong(0);
66+
private final AtomicBoolean cancelled = new AtomicBoolean(false);
67+
private final AtomicBoolean started = new AtomicBoolean(false);
68+
private final Object monitor = new Object();
69+
private Thread worker;
70+
71+
LlamaSubscription(LlamaIterable iterable, Subscriber<? super LlamaOutput> subscriber) {
72+
this.iterable = iterable;
73+
this.subscriber = subscriber;
74+
}
75+
76+
void start() {
77+
if (!started.compareAndSet(false, true)) return;
78+
worker = new Thread(this::pump, "LlamaPublisher-emitter");
79+
worker.setDaemon(true);
80+
worker.start();
81+
}
82+
83+
@Override
84+
public void request(long n) {
85+
if (n <= 0) {
86+
cancel();
87+
subscriber.onError(new IllegalArgumentException(
88+
"reactive-streams §3.9: request must be > 0, got " + n));
89+
return;
90+
}
91+
// Saturating add
92+
for (;;) {
93+
long cur = demand.get();
94+
long next = cur + n;
95+
if (next < 0) next = Long.MAX_VALUE;
96+
if (demand.compareAndSet(cur, next)) break;
97+
}
98+
synchronized (monitor) {
99+
monitor.notifyAll();
100+
}
101+
}
102+
103+
@Override
104+
public void cancel() {
105+
if (cancelled.compareAndSet(false, true)) {
106+
try {
107+
iterable.close();
108+
} catch (Throwable ignored) {
109+
// best-effort
110+
}
111+
synchronized (monitor) {
112+
monitor.notifyAll();
113+
}
114+
}
115+
}
116+
117+
private void pump() {
118+
LlamaIterator iterator = iterable.iterator();
119+
try {
120+
while (!cancelled.get() && iterator.hasNext()) {
121+
// Wait for demand.
122+
while (demand.get() == 0 && !cancelled.get()) {
123+
synchronized (monitor) {
124+
if (demand.get() == 0 && !cancelled.get()) {
125+
try {
126+
monitor.wait();
127+
} catch (InterruptedException e) {
128+
Thread.currentThread().interrupt();
129+
cancel();
130+
return;
131+
}
132+
}
133+
}
134+
}
135+
if (cancelled.get()) return;
136+
LlamaOutput next = iterator.next();
137+
demand.decrementAndGet();
138+
subscriber.onNext(next);
139+
if (next.stop) {
140+
subscriber.onComplete();
141+
return;
142+
}
143+
}
144+
if (!cancelled.get()) {
145+
subscriber.onComplete();
146+
}
147+
} catch (Throwable t) {
148+
if (!cancelled.get()) {
149+
try {
150+
subscriber.onError(t);
151+
} catch (Throwable ignored) {
152+
// subscriber threw from onError; nothing more we can do
153+
}
154+
}
155+
} finally {
156+
try {
157+
iterable.close();
158+
} catch (Throwable ignored) {
159+
// best-effort
160+
}
161+
}
162+
}
163+
}
164+
165+
/** No-op subscription used to signal onError on rejected subscriptions. */
166+
private static final class EmptySubscription implements Subscription {
167+
@Override public void request(long n) { }
168+
@Override public void cancel() { }
169+
170+
static void signalError(Subscriber<?> subscriber, Throwable error) {
171+
subscriber.onSubscribe(new EmptySubscription());
172+
subscriber.onError(error);
173+
}
174+
}
175+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// SPDX-FileCopyrightText: 2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
2+
//
3+
// SPDX-License-Identifier: MIT
4+
5+
package net.ladenthin.llama;
6+
7+
import org.junit.Assume;
8+
import org.junit.Test;
9+
import org.reactivestreams.Subscriber;
10+
import org.reactivestreams.Subscription;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
import java.util.concurrent.atomic.AtomicReference;
16+
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.assertNotNull;
19+
import static org.junit.Assert.assertTrue;
20+
21+
@ClaudeGenerated(
22+
purpose = "Verify LlamaPublisher honours Reactive Streams contracts: backpressure via request(n), "
23+
+ "stops on cancel, signals onError for invalid demand, and rejects a second subscriber."
24+
)
25+
public class LlamaPublisherTest {
26+
27+
/**
28+
* Model-gated: subscribe, request a small batch with backpressure, observe tokens, cancel early.
29+
*/
30+
@Test
31+
public void backpressureAndCancel() throws Exception {
32+
Assume.assumeTrue("Model file not found", new java.io.File(TestConstants.MODEL_PATH).exists());
33+
int gpuLayers = Integer.getInteger(TestConstants.PROP_TEST_NGL, TestConstants.DEFAULT_TEST_NGL);
34+
35+
try (LlamaModel model = new LlamaModel(new ModelParameters()
36+
.setCtxSize(128)
37+
.setModel(TestConstants.MODEL_PATH)
38+
.setGpuLayers(gpuLayers)
39+
.setFit(false))) {
40+
41+
LlamaPublisher pub = model.streamPublisher(
42+
new InferenceParameters("def hello():").setNPredict(20).setSeed(1));
43+
44+
CountDownLatch done = new CountDownLatch(1);
45+
AtomicReference<Subscription> subRef = new AtomicReference<>();
46+
AtomicInteger received = new AtomicInteger();
47+
48+
pub.subscribe(new Subscriber<LlamaOutput>() {
49+
@Override public void onSubscribe(Subscription s) {
50+
subRef.set(s);
51+
s.request(2); // initial demand
52+
}
53+
@Override public void onNext(LlamaOutput o) {
54+
int n = received.incrementAndGet();
55+
if (n == 2) {
56+
// Verify backpressure: with demand=0 we should pause until next request.
57+
// Request one more to trigger another emission.
58+
subRef.get().request(1);
59+
} else if (n == 3) {
60+
// Cancel after the third token; subsequent onNext must not occur.
61+
subRef.get().cancel();
62+
done.countDown();
63+
}
64+
}
65+
@Override public void onError(Throwable t) { done.countDown(); }
66+
@Override public void onComplete() { done.countDown(); }
67+
});
68+
69+
assertTrue("subscriber did not terminate in 30s", done.await(30, TimeUnit.SECONDS));
70+
// After cancel we may receive 3-4 in-flight tokens; should not be far above the
71+
// demand actually requested (3 here).
72+
int got = received.get();
73+
assertTrue("expected ~3 tokens, got " + got, got >= 3 && got <= 6);
74+
}
75+
}
76+
77+
@Test
78+
public void singleSubscriberContract() throws Exception {
79+
Assume.assumeTrue("Model file not found", new java.io.File(TestConstants.MODEL_PATH).exists());
80+
int gpuLayers = Integer.getInteger(TestConstants.PROP_TEST_NGL, TestConstants.DEFAULT_TEST_NGL);
81+
82+
try (LlamaModel model = new LlamaModel(new ModelParameters()
83+
.setCtxSize(128)
84+
.setModel(TestConstants.MODEL_PATH)
85+
.setGpuLayers(gpuLayers)
86+
.setFit(false))) {
87+
88+
LlamaPublisher pub = model.streamPublisher(
89+
new InferenceParameters("def f():").setNPredict(2).setSeed(1));
90+
91+
CountDownLatch first = new CountDownLatch(1);
92+
pub.subscribe(new Subscriber<LlamaOutput>() {
93+
@Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); }
94+
@Override public void onNext(LlamaOutput o) { }
95+
@Override public void onError(Throwable t) { first.countDown(); }
96+
@Override public void onComplete() { first.countDown(); }
97+
});
98+
assertTrue(first.await(30, TimeUnit.SECONDS));
99+
100+
// Second subscribe must signal onError.
101+
AtomicReference<Throwable> err = new AtomicReference<>();
102+
CountDownLatch second = new CountDownLatch(1);
103+
pub.subscribe(new Subscriber<LlamaOutput>() {
104+
@Override public void onSubscribe(Subscription s) { }
105+
@Override public void onNext(LlamaOutput o) { }
106+
@Override public void onError(Throwable t) { err.set(t); second.countDown(); }
107+
@Override public void onComplete() { second.countDown(); }
108+
});
109+
assertTrue(second.await(5, TimeUnit.SECONDS));
110+
assertNotNull("expected onError on second subscribe", err.get());
111+
assertTrue(err.get() instanceof IllegalStateException);
112+
}
113+
}
114+
115+
@Test
116+
public void invalidRequestSignalsError() throws Exception {
117+
Assume.assumeTrue("Model file not found", new java.io.File(TestConstants.MODEL_PATH).exists());
118+
int gpuLayers = Integer.getInteger(TestConstants.PROP_TEST_NGL, TestConstants.DEFAULT_TEST_NGL);
119+
120+
try (LlamaModel model = new LlamaModel(new ModelParameters()
121+
.setCtxSize(128)
122+
.setModel(TestConstants.MODEL_PATH)
123+
.setGpuLayers(gpuLayers)
124+
.setFit(false))) {
125+
126+
LlamaPublisher pub = model.streamPublisher(
127+
new InferenceParameters("def f():").setNPredict(5).setSeed(1));
128+
129+
AtomicReference<Throwable> err = new AtomicReference<>();
130+
CountDownLatch done = new CountDownLatch(1);
131+
pub.subscribe(new Subscriber<LlamaOutput>() {
132+
@Override public void onSubscribe(Subscription s) { s.request(0); }
133+
@Override public void onNext(LlamaOutput o) { }
134+
@Override public void onError(Throwable t) { err.set(t); done.countDown(); }
135+
@Override public void onComplete() { done.countDown(); }
136+
});
137+
assertTrue(done.await(10, TimeUnit.SECONDS));
138+
assertNotNull("expected onError for request(0)", err.get());
139+
assertTrue(err.get() instanceof IllegalArgumentException);
140+
}
141+
}
142+
143+
@Test
144+
public void nullSubscriberThrows() {
145+
// Construct a publisher without a model — subscribe(null) must NPE before any model use.
146+
try {
147+
new LlamaPublisher(null, null, false).subscribe(null);
148+
org.junit.Assert.fail("expected NPE");
149+
} catch (NullPointerException expected) {
150+
assertEquals("subscriber", expected.getMessage());
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)