Skip to content

Commit cf635fd

Browse files
vaijuraoclaude
andcommitted
feat(server): stream POST /v1/completions token-by-token (no new native code)
The native streaming raw-completion path already exists (requestCompletion / receiveCompletionJson, exposed as LlamaModel.generate(InferenceParameters) -> LlamaIterable), so streaming /v1/completions is pure server wiring — no JNI/C++ change: - OpenAiRequestMapper.toCompletionParameters maps an OpenAI completion request (prompt + sampling) to InferenceParameters; the shared sampling/cache/output fields are factored into applyCommonFields (reused by the chat mapper, whose tests confirm no behaviour change). - OpenAiBackend.streamCompletions(request, sink) (default throws UnsupportedOperationException) + LlamaModelBackend impl drives generate() and emits one OpenAI text_completion chunk per token, mapping StopReason -> finish_reason (length / stop); a sink IOException cancels the native task via LlamaIterable.close(). - OpenAiSseFormatter.completionChunk builds the text_completion chunk; OpenAiCompatServer.handleCompletions branches on stream:true to a new streamCompletions SSE handler (mirrors streamChat: heartbeats, [DONE], graceful client-disconnect). Verified: new streaming HTTP test + 16 mapper + 36 server + 40 adjacent server tests green; Spotless + Javadoc clean. TODO.md updated: corrects the stale "new native method needed" note, marks /v1/completions done, and adds a grounded future-modality (audio/image OUTPUT) design note — llama.cpp generates text only, so that surface stays a documented extension point rather than speculative dead code. Remaining consumers (same pattern, follow-ups): token-streaming Ollama /api/generate and Continue's native POST /completion. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 90f95d0 commit cf635fd

7 files changed

Lines changed: 201 additions & 22 deletions

File tree

TODO.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,25 @@ primary goal: agentic tool-calling with Qwen):
153153

154154
**Open follow-ups (deferred):**
155155

156-
- **Streaming raw-completion path (the shared blocker).** A new native streaming method
157-
(`requestCompletionStream` alongside the existing chat one) is needed before these can be done
158-
token-incrementally: (a) **streaming `/v1/completions`**, (b) **token-streaming `/api/generate`**
159-
(today it computes the full text then emits one NDJSON content line), and (c) **Continue's native
160-
`llama.cpp` provider** which streams `POST /completion` in the native (non-OAI) shape. Until then these
161-
either run non-streaming or emit a single content chunk. JNI + C++ work; the agentic-chat goal does
162-
not need it.
156+
- **Streaming raw-completion path — IN PROGRESS (no new native method needed).** The earlier premise was
157+
wrong: a streaming raw-completion JNI path **already exists** (`requestCompletion`/`receiveCompletionJson`,
158+
exposed as `LlamaModel.generate(InferenceParameters) → LlamaIterable`), so this is **Java-only server
159+
wiring**, not JNI/C++. Progress: **(a) streaming `POST /v1/completions` — DONE** (`OpenAiRequestMapper`
160+
`toCompletionParameters` + `OpenAiBackend.streamCompletions` driving `generate()` + an
161+
`OpenAiSseFormatter.completionChunk` `text_completion` chunk + the `streamCompletions` SSE handler;
162+
HTTP test green). **Remaining:** (b) **token-streaming Ollama `/api/generate`** (translate the
163+
`text_completion` chunks to NDJSON, mirroring the chat→Ollama translator) and (c) **Continue's native
164+
`POST /completion`** route in the llama.cpp-native streaming shape (`{"content":…,"stop":…}` per chunk).
165+
- **Future *output* modalities (audio / image) — design note, not yet actionable.** llama.cpp's server
166+
produces **text** (plus embeddings/rerank); it does **not** generate images or audio output, so there is
167+
no engine behind a TTS/image-gen response today and building that API surface now would be dead code.
168+
When/if it becomes real, the integration points are already isolated: a new `OpenAiBackend.stream*`
169+
primitive + an `OpenAiSseFormatter.*Chunk` formatter per modality, wired into a per-route handler — the
170+
exact shape the text `streamCompletions` path now establishes. Two concrete future hooks: (1) llama.cpp's
171+
**OuteTTS** audio path (if it lands in the embedded server) → an `/v1/audio/speech`-style route emitting
172+
audio chunks; (2) routing image/audio generation to an **external** model behind the same server (the
173+
binding would proxy, not generate). Keep `LlamaOutput`/chunk formatters modality-neutral so neither
174+
requires reworking the streaming core.
163175
- **Incremental tool-call streaming on the alternative surfaces.** Ollama/Anthropic/Responses emit each
164176
tool call *whole* at end-of-stream (reconstructed by `ToolCallDeltaAccumulator`) rather than streaming
165177
argument fragments. Fine for clients that apply tool calls after generation; revisit if a client needs

src/main/java/net/ladenthin/llama/server/LlamaModelBackend.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import java.io.IOException;
9+
import java.util.UUID;
10+
import net.ladenthin.llama.LlamaIterable;
911
import net.ladenthin.llama.LlamaModel;
1012
import net.ladenthin.llama.parameters.InferenceParameters;
13+
import net.ladenthin.llama.value.LlamaOutput;
14+
import net.ladenthin.llama.value.StopReason;
1115

1216
/**
1317
* Production {@link OpenAiBackend} that runs requests against a loaded {@link LlamaModel}.
@@ -80,6 +84,36 @@ public String completions(JsonNode request) {
8084
return model.handleCompletionsOai(request.toString());
8185
}
8286

87+
@Override
88+
public void streamCompletions(JsonNode request, ChunkSink sink) throws IOException {
89+
InferenceParameters params = mapper.toCompletionParameters(request);
90+
String modelId = request.path("model").asText("llama");
91+
String id = "cmpl-" + UUID.randomUUID().toString().replace("-", "");
92+
long created = System.currentTimeMillis() / 1000L;
93+
// Relays a sink IOException (client disconnect) out of the token loop; try-with-resources then
94+
// cancels the in-flight native task via LlamaIterable.close().
95+
IOException sinkFailure = null;
96+
try (LlamaIterable it = model.generate(params)) {
97+
for (LlamaOutput out : it) {
98+
String finishReason = out.stop ? completionFinishReason(out.stopReason) : null;
99+
try {
100+
sink.accept(OpenAiSseFormatter.completionChunk(id, created, modelId, out.text, finishReason));
101+
} catch (IOException e) {
102+
sinkFailure = e;
103+
break;
104+
}
105+
}
106+
}
107+
if (sinkFailure != null) {
108+
throw sinkFailure;
109+
}
110+
}
111+
112+
/** Map a {@link StopReason} to the OpenAI {@code finish_reason} ("length" on the token cap, else "stop"). */
113+
private static String completionFinishReason(StopReason reason) {
114+
return reason == StopReason.MAX_TOKENS ? "length" : "stop";
115+
}
116+
83117
@Override
84118
public String embeddings(JsonNode request) {
85119
// oaiCompat=true so the response uses the OpenAI {"object":"list","data":[{embedding}]} shape.

src/main/java/net/ladenthin/llama/server/OpenAiBackend.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ default String metrics() throws IOException {
6262
*/
6363
String completions(JsonNode request) throws IOException;
6464

65+
/**
66+
* Run a <em>streaming</em> text completion ({@code POST /v1/completions} with {@code stream:true}),
67+
* delivering each OpenAI {@code text_completion} chunk to {@code sink} in order. Implementations must
68+
* not emit the terminating {@code [DONE]} marker; the caller adds it. The default throws
69+
* {@link UnsupportedOperationException}; backends that support streaming completions override it.
70+
*
71+
* @param request the parsed OpenAI {@code /v1/completions} request (must contain {@code "prompt"})
72+
* @param sink receiver for each streamed chunk's JSON
73+
* @throws IOException if a chunk cannot be delivered or generation fails
74+
*/
75+
default void streamCompletions(JsonNode request, ChunkSink sink) throws IOException {
76+
throw new UnsupportedOperationException("streaming /v1/completions is not supported by this backend");
77+
}
78+
6579
/**
6680
* Generate embeddings ({@code POST /v1/embeddings}). Requires the model to have been loaded in
6781
* embedding mode; otherwise the native call fails and the caller surfaces a server error.

src/main/java/net/ladenthin/llama/server/OpenAiCompatServer.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
* <li>{@code POST /v1/chat/completions} — streaming (Server-Sent Events) and non-streaming chat
4343
* completions, forwarded faithfully (messages/tools verbatim; streamed {@code delta.tool_calls}
4444
* preserved).</li>
45-
* <li>{@code POST /v1/completions} — non-streaming text completion.</li>
45+
* <li>{@code POST /v1/completions} — text completion, streaming (Server-Sent Events, token by token
46+
* via {@code generate(...)}) when {@code stream:true} and non-streaming otherwise.</li>
4647
* <li>{@code POST /v1/embeddings} — embeddings (requires the model to be loaded in embedding
4748
* mode).</li>
4849
* <li>{@code GET /v1/models} — advertises the single configured model.</li>
@@ -302,13 +303,48 @@ private void handleCompletions(HttpExchange exchange) throws IOException {
302303
try {
303304
JsonNode request = requirePostJson(exchange);
304305
if (request != null) {
305-
completeNonStreaming(exchange, request, backend::completions);
306+
if (request.path("stream").asBoolean(false)) {
307+
streamCompletions(exchange, request);
308+
} else {
309+
completeNonStreaming(exchange, request, backend::completions);
310+
}
306311
}
307312
} finally {
308313
exchange.close();
309314
}
310315
}
311316

317+
private void streamCompletions(HttpExchange exchange, JsonNode request) throws IOException {
318+
exchange.getResponseHeaders().set("Content-Type", CONTENT_TYPE_SSE);
319+
exchange.getResponseHeaders().set("Cache-Control", "no-cache");
320+
exchange.sendResponseHeaders(HTTP_OK, 0);
321+
try (ResponseStream out = new ResponseStream(exchange.getResponseBody())) {
322+
ScheduledFuture<?> heartbeat = null;
323+
try {
324+
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
325+
() -> out.writeQuietly(OpenAiSseFormatter.heartbeat()),
326+
config.getHeartbeatMillis(),
327+
config.getHeartbeatMillis(),
328+
TimeUnit.MILLISECONDS);
329+
backend.streamCompletions(request, chunkJson -> out.writeStrict(OpenAiSseFormatter.sseData(chunkJson)));
330+
out.writeStrict(OpenAiSseFormatter.sseDone());
331+
} catch (IllegalArgumentException e) {
332+
out.writeQuietly(
333+
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_REQUEST, null)));
334+
} catch (IOException e) {
335+
LOG.debug("client disconnected during stream", e);
336+
} catch (RuntimeException e) {
337+
LOG.warn("streaming completion failed", e);
338+
out.writeQuietly(
339+
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_SERVER, null)));
340+
} finally {
341+
if (heartbeat != null) {
342+
heartbeat.cancel(false);
343+
}
344+
}
345+
}
346+
}
347+
312348
private void handleEmbeddings(HttpExchange exchange) throws IOException {
313349
try {
314350
JsonNode request = requirePostJson(exchange);

src/main/java/net/ladenthin/llama/server/OpenAiRequestMapper.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,9 @@ InferenceParameters toInferenceParameters(JsonNode request) {
4848
.withMessagesJson(messages.toString())
4949
.withCachePrompt(true);
5050

51-
// Preserve llama.cpp extensions when advanced clients opt into them.
52-
JsonNode cachePrompt = request.path("cache_prompt");
53-
if (cachePrompt.isBoolean()) {
54-
params = params.withCachePrompt(cachePrompt.asBoolean());
55-
}
56-
JsonNode cacheReuse = request.path("n_cache_reuse");
57-
if (cacheReuse.isIntegralNumber()) {
58-
params = params.withCacheReuse(cacheReuse.asInt());
59-
}
60-
JsonNode slotId = request.path("id_slot");
61-
if (slotId.isIntegralNumber()) {
62-
params = params.withSlotId(slotId.asInt());
63-
}
51+
params = applyCommonFields(params, request);
6452

53+
// Tools are chat-only.
6554
JsonNode tools = request.path("tools");
6655
if (tools.isArray() && tools.size() > 0) {
6756
params = params.withToolsJson(tools.toString()).withUseChatTemplate(true);
@@ -75,6 +64,51 @@ InferenceParameters toInferenceParameters(JsonNode request) {
7564
}
7665
}
7766

67+
return params;
68+
}
69+
70+
/**
71+
* Translate an OpenAI {@code /v1/completions} request (a raw {@code prompt} string) into
72+
* {@link InferenceParameters} for the streaming {@code generate} path.
73+
*
74+
* @param request the parsed OpenAI completion request object
75+
* @return inference parameters carrying the prompt and mapped sampling options
76+
* @throws IllegalArgumentException if {@code prompt} is missing or not a string
77+
*/
78+
InferenceParameters toCompletionParameters(JsonNode request) {
79+
JsonNode prompt = request.path("prompt");
80+
if (!prompt.isTextual()) {
81+
throw new IllegalArgumentException("'prompt' must be a string");
82+
}
83+
InferenceParameters params =
84+
InferenceParameters.empty().withPrompt(prompt.asText()).withCachePrompt(true);
85+
return applyCommonFields(params, request);
86+
}
87+
88+
/**
89+
* Apply the sampling / KV-cache / output-shaping fields shared by chat and completion requests
90+
* (temperature, top_p/top_k, seed, penalties, max tokens, stop, stream_options, response_format,
91+
* plus the llama.cpp cache extensions). Tools and messages/prompt are handled by the callers.
92+
*
93+
* @param params the parameters to extend
94+
* @param request the parsed OpenAI request object
95+
* @return a new instance with the recognised fields applied
96+
*/
97+
private InferenceParameters applyCommonFields(InferenceParameters params, JsonNode request) {
98+
// Preserve llama.cpp extensions when advanced clients opt into them.
99+
JsonNode cachePrompt = request.path("cache_prompt");
100+
if (cachePrompt.isBoolean()) {
101+
params = params.withCachePrompt(cachePrompt.asBoolean());
102+
}
103+
JsonNode cacheReuse = request.path("n_cache_reuse");
104+
if (cacheReuse.isIntegralNumber()) {
105+
params = params.withCacheReuse(cacheReuse.asInt());
106+
}
107+
JsonNode slotId = request.path("id_slot");
108+
if (slotId.isIntegralNumber()) {
109+
params = params.withSlotId(slotId.asInt());
110+
}
111+
78112
JsonNode temperature = request.path("temperature");
79113
if (temperature.isNumber()) {
80114
params = params.withTemperature((float) temperature.asDouble());

src/main/java/net/ladenthin/llama/server/OpenAiSseFormatter.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,37 @@ static String modelsJson(String modelId) {
134134
return root.toString();
135135
}
136136

137+
/**
138+
* Build one OpenAI {@code text_completion} streaming chunk for {@code POST /v1/completions}.
139+
*
140+
* @param id the completion id, stable across the whole stream
141+
* @param created the creation timestamp in epoch seconds
142+
* @param model the served model id
143+
* @param text the incremental token text carried by this chunk
144+
* @param finishReason the finish reason on the final chunk, or {@code null} for intermediate chunks
145+
* @return the chunk serialized as JSON
146+
*/
147+
static String completionChunk(String id, long created, String model, String text, @Nullable String finishReason) {
148+
ObjectNode choice = OBJECT_MAPPER.createObjectNode();
149+
choice.put("text", text);
150+
choice.put("index", 0);
151+
choice.putNull("logprobs");
152+
if (finishReason == null) {
153+
choice.putNull("finish_reason");
154+
} else {
155+
choice.put("finish_reason", finishReason);
156+
}
157+
ArrayNode choices = OBJECT_MAPPER.createArrayNode();
158+
choices.add(choice);
159+
ObjectNode root = OBJECT_MAPPER.createObjectNode();
160+
root.put("id", id);
161+
root.put("object", "text_completion");
162+
root.put("created", created);
163+
root.put("model", model);
164+
root.set("choices", choices);
165+
return root.toString();
166+
}
167+
137168
/**
138169
* Build the llama.cpp-native {@code GET /props} body. Autocomplete clients (e.g. llama.vscode) read
139170
* {@code default_generation_settings.n_ctx} from here to size their context window, and newer clients

src/test/java/net/ladenthin/llama/server/OpenAiCompatServerHttpTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ public void streamingReturnsSseChunksThenDone() throws IOException {
5454
}
5555
}
5656

57+
@Test
58+
public void streamingCompletionsReturnsTextCompletionChunksThenDone() throws IOException {
59+
try (OpenAiCompatServer server = new OpenAiCompatServer(new FakeBackend(), config()).start()) {
60+
String body = "{\"stream\":true,\"prompt\":\"hi\"}";
61+
Response response = post(server.getPort(), "/v1/completions", body, "");
62+
assertThat(response.code, is(200));
63+
assertThat(response.body, containsString("data: "));
64+
assertThat(response.body, containsString("text_completion"));
65+
assertThat(response.body, containsString("data: [DONE]"));
66+
}
67+
}
68+
5769
@Test
5870
public void streamingEmitsHeartbeatsDuringAGap() throws IOException {
5971
OpenAiServerConfig cfg = OpenAiServerConfig.builder()
@@ -457,6 +469,12 @@ public String completions(JsonNode request) {
457469
return "{\"object\":\"text_completion\",\"choices\":[{\"text\":\"hello\"}]}";
458470
}
459471

472+
@Override
473+
public void streamCompletions(JsonNode request, ChunkSink sink) throws IOException {
474+
sink.accept("{\"object\":\"text_completion\",\"choices\":[{\"text\":\"he\",\"finish_reason\":null}]}");
475+
sink.accept("{\"object\":\"text_completion\",\"choices\":[{\"text\":\"llo\",\"finish_reason\":\"stop\"}]}");
476+
}
477+
460478
@Override
461479
public String embeddings(JsonNode request) {
462480
return "{\"object\":\"list\",\"data\":[{\"object\":\"embedding\",\"embedding\":[0.1,0.2]}]}";

0 commit comments

Comments
 (0)