forked from bernardladenthin/java-llama.cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAnthropicStreamTranslator.java
More file actions
122 lines (112 loc) · 5.11 KB
/
Copy pathAnthropicStreamTranslator.java
File metadata and controls
122 lines (112 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// SPDX-FileCopyrightText: 2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
//
// SPDX-License-Identifier: MIT
package net.ladenthin.llama.server;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import lombok.ToString;
/**
* Stateful translator that turns the OpenAI streaming chat chunks into the Anthropic Messages SSE event
* sequence: {@code message_start} → (a {@code text} content block with {@code content_block_start} +
* {@code content_block_delta}s + {@code content_block_stop}) → one {@code tool_use} block per tool call
* (start + {@code input_json_delta} + stop) → {@code message_delta} (stop reason) → {@code message_stop}.
*
* <p>Text deltas are emitted live; tool calls are reconstructed via {@link ToolCallDeltaAccumulator} and
* emitted as whole blocks at the end (Anthropic expects each tool_use block's input as one
* {@code input_json_delta}). Free of JNI / model dependencies; unit-testable by feeding chunk JSON.
*/
@ToString
final class AnthropicStreamTranslator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final String id;
private final String model;
private final ToolCallDeltaAccumulator accumulator = new ToolCallDeltaAccumulator();
private boolean textBlockOpen;
private int textBlockIndex = -1;
private int nextIndex;
private String finishReason = "stop";
private int inputTokens;
private int outputTokens;
private int cachedTokens;
AnthropicStreamTranslator(String id, String model) {
this.id = id;
this.model = model;
}
/**
* The opening {@code message_start} event.
*
* @return the framed SSE event
*/
String begin() {
return AnthropicApiSupport.messageStartEvent(id, model);
}
/**
* Translate one OpenAI chunk into the SSE events it produces (text block start/delta), accumulating
* tool-call fragments and capturing the finish reason. Returns an empty string when the chunk yields
* no event (role-only / finish-only / tool-call-only chunks).
*
* @param openAiChunkJson one OpenAI {@code chat.completion.chunk}
* @return zero or more framed SSE events, concatenated
*/
String onChunk(String openAiChunkJson) {
StringBuilder out = new StringBuilder();
try {
JsonNode chunk = OBJECT_MAPPER.readTree(openAiChunkJson);
accumulator.accept(chunk);
JsonNode usage = chunk.path("usage");
if (usage.isObject()) {
int promptTokens = usage.path("prompt_tokens").asInt(0);
cachedTokens = usage.path("prompt_tokens_details")
.path("cached_tokens")
.asInt(0);
inputTokens = Math.max(0, promptTokens - cachedTokens);
outputTokens = usage.path("completion_tokens").asInt(0);
}
JsonNode choice = chunk.path("choices").path(0);
JsonNode content = choice.path("delta").path("content");
if (content.isTextual() && !content.asText().isEmpty()) {
if (!textBlockOpen) {
textBlockIndex = nextIndex++;
out.append(AnthropicApiSupport.textBlockStartEvent(textBlockIndex));
textBlockOpen = true;
}
out.append(AnthropicApiSupport.textDeltaEvent(textBlockIndex, content.asText()));
}
if (choice.path("finish_reason").isTextual()) {
finishReason = choice.path("finish_reason").asText();
}
} catch (IOException e) {
// A malformed chunk produces no events.
}
return out.toString();
}
/**
* The closing events: stop the open text block, emit a {@code tool_use} block per accumulated tool
* call, then {@code message_delta} (mapped stop reason) and {@code message_stop}.
*
* @return the framed SSE events, concatenated
*/
String end() {
StringBuilder out = new StringBuilder();
if (textBlockOpen) {
out.append(AnthropicApiSupport.blockStopEvent(textBlockIndex));
textBlockOpen = false;
}
if (accumulator.hasToolCalls()) {
for (JsonNode toolCall : accumulator.toOpenAiToolCalls()) {
int index = nextIndex++;
String callId = toolCall.path("id").asText("");
String name = toolCall.path("function").path("name").asText("");
String arguments = toolCall.path("function").path("arguments").asText("");
out.append(AnthropicApiSupport.toolUseBlockStartEvent(index, callId, name));
out.append(AnthropicApiSupport.inputJsonDeltaEvent(index, arguments));
out.append(AnthropicApiSupport.blockStopEvent(index));
}
}
out.append(AnthropicApiSupport.messageDeltaEvent(
AnthropicApiSupport.anthropicStopReason(finishReason), inputTokens, outputTokens, cachedTokens));
out.append(AnthropicApiSupport.messageStopEvent());
return out.toString();
}
}