Skip to content

Commit 7a84787

Browse files
committed
Refactor header parser
1 parent 132d676 commit 7a84787

6 files changed

Lines changed: 557 additions & 111 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/vespera_inprocess/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ tokio = { version = "1", features = ["rt"] }
1919

2020
[dev-dependencies]
2121
criterion = { version = "0.8", features = ["html_reports"] }
22+
# The criterion bench runs under mimalloc (set as its `#[global_allocator]`
23+
# in benches/dispatch.rs) to match the SHIPPED JNI cdylib, which enables
24+
# mimalloc by default. Measured 2026-06: the default Windows system heap
25+
# routes per-request `Vec` allocations >= ~1 MiB through a slow
26+
# VirtualAlloc commit/decommit path (e.g. 1 MiB `dispatch_from_bytes`
27+
# materialise = 311 us system-heap vs 30 us mimalloc — a ~10x cliff that is
28+
# pure harness artifact, never seen by the cdylib). Benching under mimalloc
29+
# keeps the large-body absolute numbers representative of production.
30+
mimalloc = "0.1"
2231
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
2332
# `FutureExt::catch_unwind` for the `async_spawn_pattern` bench, which
2433
# A/Bs the vespera_jni `dispatchAsync` spawn-mechanism change (inner

crates/vespera_inprocess/benches/dispatch.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ use vespera_inprocess::{
4242
register_app,
4343
};
4444

45+
// Bench under mimalloc to match the shipped JNI cdylib (which enables mimalloc
46+
// by default). Without this, the default Windows system heap routes the
47+
// per-request `Vec` allocations these benches stress (input `wire.clone()`,
48+
// response materialisation) through a slow VirtualAlloc commit/decommit path
49+
// for blocks >= ~1 MiB, producing a ~10x large-body "cliff" that no shipped
50+
// build ever pays. See the `mimalloc` dev-dependency note in Cargo.toml.
51+
#[global_allocator]
52+
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
53+
4554
// ── Test fixtures ────────────────────────────────────────────────────
4655

4756
#[derive(Serialize, Deserialize)]

libs/vespera-bridge/src/main/java/com/devfive/vespera/bridge/VesperaBridge.java

Lines changed: 164 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package com.devfive.vespera.bridge;
22

3-
import com.fasterxml.jackson.core.JsonFactory;
4-
import com.fasterxml.jackson.core.JsonGenerator;
5-
import com.fasterxml.jackson.core.JsonParser;
6-
import com.fasterxml.jackson.core.JsonToken;
7-
import com.fasterxml.jackson.databind.ObjectMapper;
8-
93
import java.io.ByteArrayOutputStream;
104
import java.io.IOException;
115
import java.io.InputStream;
@@ -17,8 +11,6 @@
1711
import java.nio.file.Files;
1812
import java.nio.file.Path;
1913
import java.nio.file.StandardCopyOption;
20-
import java.util.ArrayList;
21-
import java.util.LinkedHashMap;
2214
import java.util.List;
2315
import java.util.Map;
2416
import java.util.concurrent.CompletableFuture;
@@ -51,18 +43,20 @@
5143
*/
5244
public class VesperaBridge {
5345

54-
private static final ObjectMapper MAPPER = new ObjectMapper();
55-
private static final JsonFactory JSON_FACTORY = MAPPER.getFactory();
46+
/** Lowercase hex digits for the JSON C0 control-character escapes. */
47+
private static final byte[] HEX = {
48+
'0', '1', '2', '3', '4', '5', '6', '7',
49+
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
50+
};
5651
private static final int WIRE_VERSION = 1;
5752
/** Shared empty request body — avoids a {@code new byte[0]} per call. */
5853
private static final byte[] EMPTY_BODY = new byte[0];
5954
/**
6055
* Per-thread reusable byte buffer for {@link #fillHeaderJson}.
61-
* Reset (size cleared, capacity preserved) per call; only the
62-
* buffer is pooled — a fresh {@link JsonGenerator} is created per
63-
* call because generators bind to stream state. Virtual-thread
64-
* caveat as {@link #DIRECT_POOL}: each vthread gets its own ~256 B
65-
* buffer in Java 21+ and loses pooling until GC.
56+
* Reset (size cleared, capacity preserved) per call and filled
57+
* byte-direct — no per-call encoder object. Virtual-thread caveat
58+
* as {@link #DIRECT_POOL}: each vthread gets its own ~256 B buffer
59+
* in Java 21+ and loses pooling until GC.
6660
*/
6761
private static final ThreadLocal<ExposedByteArrayOutputStream> HEADER_BUF =
6862
ThreadLocal.withInitial(() -> new ExposedByteArrayOutputStream(256));
@@ -87,6 +81,38 @@ private static final class ExposedByteArrayOutputStream extends ByteArrayOutputS
8781
byte[] backingArray() {
8882
return buf;
8983
}
84+
85+
/**
86+
* Append one byte WITHOUT the inherited {@code synchronized} —
87+
* {@link #HEADER_BUF} is thread-local, so the monitor is pure
88+
* overhead on this single-threaded encode hot path. Grows the
89+
* backing array by doubling, mirroring {@link ByteArrayOutputStream}.
90+
*/
91+
void put(int b) {
92+
if (count == buf.length) {
93+
buf = java.util.Arrays.copyOf(buf, buf.length << 1);
94+
}
95+
buf[count++] = (byte) b;
96+
}
97+
98+
/**
99+
* Append the bytes of an ASCII literal (caller guarantees every
100+
* char is {@code < 0x80}) — used for the fixed JSON structure
101+
* (keys, braces, colons). Non-synchronized, single bulk reserve.
102+
*/
103+
void putAscii(String lit) {
104+
int n = lit.length();
105+
if (count + n > buf.length) {
106+
int cap = buf.length;
107+
while (cap < count + n) {
108+
cap <<= 1;
109+
}
110+
buf = java.util.Arrays.copyOf(buf, cap);
111+
}
112+
for (int i = 0; i < n; i++) {
113+
buf[count++] = (byte) lit.charAt(i);
114+
}
115+
}
90116
}
91117

92118
private static volatile boolean loaded = false;
@@ -980,47 +1006,127 @@ public static byte[] encodeRequest(
9801006
}
9811007

9821008
/**
983-
* Internal: serialise the wire request header JSON via Jackson's
984-
* streaming {@link JsonGenerator} writing directly into the
985-
* per-thread {@link #HEADER_BUF}. Byte-identical to the prior
986-
* {@code createObjectNode() + writeValueAsBytes()} path: same
987-
* field order ({@code v}, {@code method}, {@code path}, optional
988-
* {@code query}/{@code headers}/{@code app}), same omission rules,
989-
* same {@code UTF8JsonGenerator} emitter — the {@code ObjectNode}
990-
* tree and {@code writeValueAsBytes} scratch buffer go away.
991-
* (A 3-pass {@code StringBuilder} encoder was previously measured
992-
* <em>slower</em>, 656 vs 487 ns/op; the generator writes bytes
993-
* directly, so this rewrite keeps that win and drops the tree.)
1009+
* Internal: serialise the wire request header JSON
1010+
* <strong>byte-direct</strong> into the per-thread {@link #HEADER_BUF}
1011+
* — no Jackson generator (and its per-call object + scratch buffer)
1012+
* is allocated. Emits the same shape and field order the prior
1013+
* {@code JsonGenerator} path did ({@code v}, {@code method},
1014+
* {@code path}, optional {@code query}/{@code headers}/{@code app}),
1015+
* with the same omission rules. String values are escaped + UTF-8
1016+
* encoded by {@link #writeJsonString} using exactly the escape set
1017+
* Jackson's {@code UTF8JsonGenerator} produced (the quote, the
1018+
* backslash, and the C0 controls; {@code /} and non-ASCII pass
1019+
* through), so the bytes stay valid JSON the Rust {@code serde_json}
1020+
* side parses identically.
9941021
*/
9951022
private static ExposedByteArrayOutputStream fillHeaderJson(String appName, String method,
9961023
String path, String query, Map<String, String> headers) {
9971024
ExposedByteArrayOutputStream buf = HEADER_BUF.get();
9981025
buf.reset();
999-
try (JsonGenerator gen = JSON_FACTORY.createGenerator(buf)) {
1000-
gen.writeStartObject();
1001-
gen.writeNumberField("v", WIRE_VERSION);
1002-
gen.writeStringField("method", method);
1003-
gen.writeStringField("path", path);
1004-
if (query != null && !query.isEmpty()) {
1005-
gen.writeStringField("query", query);
1006-
}
1007-
if (headers != null && !headers.isEmpty()) {
1008-
gen.writeObjectFieldStart("headers");
1009-
for (Map.Entry<String, String> e : headers.entrySet()) {
1010-
gen.writeStringField(e.getKey(), e.getValue());
1026+
// {"v":<WIRE_VERSION>, ...} — WIRE_VERSION is a single decimal digit.
1027+
buf.putAscii("{\"v\":");
1028+
buf.put('0' + WIRE_VERSION);
1029+
buf.putAscii(",\"method\":");
1030+
writeJsonString(buf, method);
1031+
buf.putAscii(",\"path\":");
1032+
writeJsonString(buf, path);
1033+
if (query != null && !query.isEmpty()) {
1034+
buf.putAscii(",\"query\":");
1035+
writeJsonString(buf, query);
1036+
}
1037+
if (headers != null && !headers.isEmpty()) {
1038+
buf.putAscii(",\"headers\":{");
1039+
boolean first = true;
1040+
for (Map.Entry<String, String> e : headers.entrySet()) {
1041+
if (!first) {
1042+
buf.put(',');
10111043
}
1012-
gen.writeEndObject();
1044+
first = false;
1045+
writeJsonString(buf, e.getKey());
1046+
buf.put(':');
1047+
writeJsonString(buf, e.getValue());
10131048
}
1014-
if (appName != null && !appName.isBlank()) {
1015-
gen.writeStringField("app", appName.trim());
1016-
}
1017-
gen.writeEndObject();
1018-
} catch (IOException e) {
1019-
throw new IllegalStateException("encodeRequest serialisation failed", e);
1049+
buf.put('}');
10201050
}
1051+
if (appName != null && !appName.isBlank()) {
1052+
buf.putAscii(",\"app\":");
1053+
writeJsonString(buf, appName.trim());
1054+
}
1055+
buf.put('}');
10211056
return buf;
10221057
}
10231058

1059+
/**
1060+
* Append {@code s} as a quoted JSON string straight into {@code out}
1061+
* as UTF-8, escaping only the JSON-mandatory characters — the quote,
1062+
* the backslash, and the C0 controls (short {@code \b \t \n \f \r}
1063+
* forms, four-hex escapes otherwise) — exactly the set the prior
1064+
* Jackson {@code UTF8JsonGenerator} emitted (it does not escape
1065+
* {@code /} or non-ASCII). Single pass, no per-string {@code byte[]}:
1066+
* printable ASCII is written verbatim, the rest UTF-8 encoded inline
1067+
* (surrogate pairs become 4-byte sequences).
1068+
*/
1069+
private static void writeJsonString(ExposedByteArrayOutputStream out, String s) {
1070+
out.put('"');
1071+
int n = s.length();
1072+
for (int i = 0; i < n; i++) {
1073+
char c = s.charAt(i);
1074+
if (c >= 0x20 && c < 0x80) {
1075+
if (c == '"' || c == '\\') {
1076+
out.put('\\');
1077+
}
1078+
out.put(c);
1079+
} else if (c < 0x20) {
1080+
switch (c) {
1081+
case '\b' -> {
1082+
out.put('\\');
1083+
out.put('b');
1084+
}
1085+
case '\t' -> {
1086+
out.put('\\');
1087+
out.put('t');
1088+
}
1089+
case '\n' -> {
1090+
out.put('\\');
1091+
out.put('n');
1092+
}
1093+
case '\f' -> {
1094+
out.put('\\');
1095+
out.put('f');
1096+
}
1097+
case '\r' -> {
1098+
out.put('\\');
1099+
out.put('r');
1100+
}
1101+
default -> {
1102+
out.put('\\');
1103+
out.put('u');
1104+
out.put('0');
1105+
out.put('0');
1106+
out.put(HEX[(c >> 4) & 0xF]);
1107+
out.put(HEX[c & 0xF]);
1108+
}
1109+
}
1110+
} else if (c < 0x800) {
1111+
out.put(0xC0 | (c >> 6));
1112+
out.put(0x80 | (c & 0x3F));
1113+
} else if (Character.isHighSurrogate(c)
1114+
&& i + 1 < n
1115+
&& Character.isLowSurrogate(s.charAt(i + 1))) {
1116+
int cp = Character.toCodePoint(c, s.charAt(++i));
1117+
out.put(0xF0 | (cp >> 18));
1118+
out.put(0x80 | ((cp >> 12) & 0x3F));
1119+
out.put(0x80 | ((cp >> 6) & 0x3F));
1120+
out.put(0x80 | (cp & 0x3F));
1121+
} else {
1122+
out.put(0xE0 | (c >> 12));
1123+
out.put(0x80 | ((c >> 6) & 0x3F));
1124+
out.put(0x80 | (c & 0x3F));
1125+
}
1126+
}
1127+
out.put('"');
1128+
}
1129+
10241130
/**
10251131
* Decode a wire-format response.
10261132
*
@@ -1039,74 +1145,21 @@ public static DecodedResponse decodeResponse(byte[] wire) {
10391145
"wire header_len " + headerLen
10401146
+ " overflows response (" + wire.length + " bytes)");
10411147
}
1042-
// Streaming decode via JsonParser (no JsonNode tree); defaults match
1043-
// the readTree path, unknown fields (incl. "v") are skipChildren'd.
1044-
int status = 500;
1045-
Map<String, Object> headers = null;
1046-
// Pre-size to the actual occupancy: the wire metadata object
1047-
// carries only a handful of keys (typically just "version"), so a
1048-
// capacity-4 table (Node[4]) is allocated instead of the default
1049-
// capacity-16 (Node[16]) on the first put — a deterministic
1050-
// per-response heap saving with no behavioural change.
1051-
Map<String, String> metadata = new LinkedHashMap<>(4);
1052-
List<Map<String, Object>> validationErrors = null;
1053-
try (JsonParser p = JSON_FACTORY.createParser(wire, 4, headerLen)) {
1054-
if (p.nextToken() == JsonToken.START_OBJECT) {
1055-
while (p.nextToken() == JsonToken.FIELD_NAME) {
1056-
String name = p.currentName();
1057-
JsonToken t = p.nextToken();
1058-
switch (name) {
1059-
case "status" -> status = p.getValueAsInt(500);
1060-
case "headers" -> {
1061-
if (t != JsonToken.START_OBJECT) { p.skipChildren(); break; }
1062-
while (p.nextToken() == JsonToken.FIELD_NAME) {
1063-
String k = p.currentName();
1064-
// Pre-size for a typical response header count
1065-
// (content-type, content-length, a few more):
1066-
// capacity-8 table holds up to 6 entries before
1067-
// resizing, vs the default capacity-16 — a
1068-
// deterministic per-response heap saving.
1069-
if (headers == null) headers = new LinkedHashMap<>(8);
1070-
if (p.nextToken() == JsonToken.START_ARRAY) {
1071-
List<String> list = new ArrayList<>();
1072-
while (p.nextToken() != JsonToken.END_ARRAY) list.add(p.getValueAsString());
1073-
headers.put(k, list);
1074-
} else {
1075-
headers.put(k, p.getValueAsString());
1076-
}
1077-
}
1078-
}
1079-
case "metadata" -> {
1080-
if (t != JsonToken.START_OBJECT) { p.skipChildren(); break; }
1081-
while (p.nextToken() == JsonToken.FIELD_NAME) {
1082-
String k = p.currentName();
1083-
p.nextToken();
1084-
metadata.put(k, p.getValueAsString());
1085-
}
1086-
}
1087-
case "validation_errors" -> {
1088-
if (t != JsonToken.START_ARRAY) { p.skipChildren(); break; }
1089-
validationErrors = new ArrayList<>();
1090-
while (p.nextToken() == JsonToken.START_OBJECT) {
1091-
Map<String, Object> entry = new LinkedHashMap<>();
1092-
while (p.nextToken() == JsonToken.FIELD_NAME) {
1093-
String k = p.currentName();
1094-
p.nextToken();
1095-
entry.put(k, p.getValueAsString());
1096-
}
1097-
validationErrors.add(entry);
1098-
}
1099-
}
1100-
default -> p.skipChildren();
1101-
}
1102-
}
1103-
}
1104-
} catch (IOException e) {
1105-
throw new IllegalArgumentException("wire header JSON parse failed", e);
1106-
}
1148+
// Manual decode via the allocation-lean WireHeaderReader tokenizer
1149+
// (the same parser the DIRECT / streaming header callbacks use)
1150+
// instead of a Jackson JsonParser — drops the per-response parser +
1151+
// IOContext allocation. Output is shape-identical: status (default
1152+
// 500), headers (String | List<String>), metadata (pre-sized),
1153+
// validation_errors, and unknown fields (incl. "v") skipped.
1154+
WireHeaderReader.Decoded d =
1155+
WireHeaderReader.decode(ByteBuffer.wrap(wire), 4, headerLen);
11071156
ByteBuffer body = ByteBuffer.wrap(wire, 4 + headerLen, wire.length - 4 - headerLen);
11081157
return new DecodedResponse(
1109-
status, headers == null ? Map.of() : headers, metadata, body, validationErrors);
1158+
d.status,
1159+
d.headers == null ? Map.of() : d.headers,
1160+
d.metadata,
1161+
body,
1162+
d.validationErrors);
11101163
}
11111164

11121165
private static void loadBundled(String libraryName) {

0 commit comments

Comments
 (0)