Skip to content

Commit 61335e7

Browse files
v0.15.0 apiproxy: true streaming passthrough, /_version endpoint, configurable --delta-min-bytes
- Streaming (SSE): forward_to_upstream now uses bytes_stream() instead of .bytes().await — SSE chunks are piped in real time rather than buffered. streaming_passthrough counter added to /_stats. - /_version: new lightweight GET endpoint returning {name, version} JSON; useful for health checks and CI smoke tests. - --delta-min-bytes: new CLI flag (default 1024) that controls the minimum block size eligible for OMCD/Axis-5 delta storage. Previously hardcoded. Both try_delta_store() and register_prefix() respect the setting. - 3 new tests: streaming_flag_detected, high_delta_min_bytes_skips_delta, low_delta_min_bytes_triggers_delta (total: 15/15 green).
1 parent 9afb6bf commit 61335e7

1 file changed

Lines changed: 146 additions & 21 deletions

File tree

omnimcode-apiproxy/src/main.rs

Lines changed: 146 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
//! and persisting the cache across turns)
2020
//!
2121
//! Hard limits in this MVP:
22-
//! - No streaming (`stream: true` requests pass through untouched)
22+
//! - Response-side rewriting skipped for streaming sessions (the LLM can
23+
//! still expand markers in a streamed response via the tool call path)
2324
//! - No image / tool_use_block / citation rewriting
2425
//! - No request batching
2526
//! - Auth header is forwarded as-is; we never read/log it
@@ -74,6 +75,12 @@ struct Args {
7475
/// preview alone is enough or it needs to expand.
7576
#[arg(long, default_value_t = 200)]
7677
preview_bytes: usize,
78+
79+
/// Minimum content size (bytes) for a block to be eligible for delta
80+
/// (OMCD/Axis-5) storage. Blocks shorter than this are always stored
81+
/// plain — no diff attempted. Lower = more delta attempts, more disk I/O.
82+
#[arg(long, default_value_t = 1024)]
83+
delta_min_bytes: usize,
7784
}
7885

7986
#[derive(Default, Debug, Clone)]
@@ -90,6 +97,9 @@ struct RewriteStats {
9097
cache_control_inserted: u64,
9198
conversation_count: u64,
9299
delta_stores_attempted: u64,
100+
/// Streaming requests pass through rewritten (request side) but
101+
/// the response is piped directly rather than buffered.
102+
streaming_passthrough: u64,
93103
}
94104

95105
/// Per-conversation state the proxy remembers across turns. Key is a stable
@@ -115,6 +125,7 @@ struct AppState {
115125
upstream: String,
116126
rewrite_threshold: usize,
117127
preview_bytes: usize,
128+
delta_min_bytes: usize,
118129
http: reqwest::Client,
119130
store: Arc<MemoryStore>,
120131
stats: Arc<std::sync::Mutex<RewriteStats>>,
@@ -145,9 +156,9 @@ async fn main() -> Result<()> {
145156

146157
let args = Args::parse();
147158
info!(
148-
"omnimcode-apiproxy v{} starting — bind={} upstream={} threshold={}B preview={}B",
159+
"omnimcode-apiproxy v{} starting — bind={} upstream={} threshold={}B preview={}B delta_min={}B",
149160
env!("CARGO_PKG_VERSION"),
150-
args.bind, args.upstream, args.rewrite_threshold, args.preview_bytes,
161+
args.bind, args.upstream, args.rewrite_threshold, args.preview_bytes, args.delta_min_bytes,
151162
);
152163
info!(
153164
"this proxy sees the full LLM conversation. localhost-only bind unless you change --bind."
@@ -157,6 +168,7 @@ async fn main() -> Result<()> {
157168
upstream: args.upstream.clone(),
158169
rewrite_threshold: args.rewrite_threshold,
159170
preview_bytes: args.preview_bytes,
171+
delta_min_bytes: args.delta_min_bytes,
160172
http: reqwest::Client::builder()
161173
.timeout(std::time::Duration::from_secs(300))
162174
.build()?,
@@ -171,6 +183,7 @@ async fn main() -> Result<()> {
171183
let app = Router::new()
172184
.route("/v1/messages", post(handle_messages))
173185
.route("/_stats", axum::routing::get(stats_endpoint))
186+
.route("/_version", axum::routing::get(version_endpoint))
174187
.fallback(any(passthrough))
175188
.with_state(state);
176189

@@ -240,10 +253,12 @@ async fn handle_messages(State(state): State<AppState>, req: Request) -> Respons
240253
let _saved_unused = body_bytes.len() as i64 - rewritten.len() as i64;
241254

242255
if is_streaming {
243-
// SSE response: just pass through. The LLM can still emit the expand
244-
// tool_use in the stream; the client will surface it. We accept this
245-
// sharp edge in exchange for getting request-side compression on
246-
// streaming sessions (the common case for Claude Code).
256+
// Streaming response: pipe SSE chunks in real time. The request body
257+
// was still fully rewritten (compression happens), and the response
258+
// is forwarded chunk-by-chunk so the client sees tokens immediately.
259+
// The auto-expand loop is skipped for streaming; the LLM can still
260+
// call omc_proxy_expand_ref and the client surfaces it normally.
261+
state.stats.lock().unwrap().streaming_passthrough += 1;
247262
forward_to_upstream(&state, &parts.headers, rewritten).await
248263
} else {
249264
handle_with_expand_loop(&state, &parts.headers, rewritten).await
@@ -417,6 +432,11 @@ async fn passthrough(State(state): State<AppState>, req: Request) -> Response {
417432

418433
/// Used by the streaming-passthrough path in handle_messages and by the
419434
/// catch-all passthrough route. Bytes-in, bytes-out, no rewriting.
435+
/// Forward the rewritten request to upstream and stream the response back
436+
/// to the client as chunks arrive — works for both:
437+
/// • JSON (non-streaming) responses: the body is small, single chunk
438+
/// • SSE (`text/event-stream`) responses: chunks are forwarded in real-time
439+
/// instead of buffering the entire stream before returning
420440
async fn forward_to_upstream(
421441
state: &AppState, headers: &HeaderMap, body: Bytes,
422442
) -> Response {
@@ -425,19 +445,25 @@ async fn forward_to_upstream(
425445
for (k, v) in headers.iter() {
426446
if k != "host" && k != "content-length" { req = req.header(k, v); }
427447
}
428-
match req.send().await {
429-
Ok(r) => {
430-
let status = r.status();
431-
let h = r.headers().clone();
432-
match r.bytes().await {
433-
Ok(b) => rebuild_response(status, &h, b),
434-
Err(e) => error_response(StatusCode::BAD_GATEWAY,
435-
&format!("read upstream: {}", e)),
436-
}
437-
}
438-
Err(e) => error_response(StatusCode::BAD_GATEWAY,
448+
let upstream = match req.send().await {
449+
Ok(r) => r,
450+
Err(e) => return error_response(StatusCode::BAD_GATEWAY,
439451
&format!("upstream: {}", e)),
452+
};
453+
let status = upstream.status();
454+
let resp_headers = upstream.headers().clone();
455+
// Pipe upstream bytes through to the client as they arrive.
456+
// Hop-by-hop headers must not be forwarded; content-length is unknown
457+
// for streamed bodies and is omitted (axum uses chunked encoding).
458+
let stream = upstream.bytes_stream();
459+
let mut resp = Response::builder().status(status);
460+
for (k, v) in resp_headers.iter() {
461+
if k == "content-length" || k == "transfer-encoding" || k == "connection" { continue; }
462+
resp = resp.header(k, v);
440463
}
464+
resp.body(axum::body::Body::from_stream(stream))
465+
.unwrap_or_else(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR,
466+
&format!("response build: {}", e)))
441467
}
442468

443469
fn error_response(code: StatusCode, msg: &str) -> Response {
@@ -495,13 +521,33 @@ async fn stats_endpoint(State(state): State<AppState>) -> Response {
495521
},
496522
"cache_control_inserted_count": s.cache_control_inserted,
497523
"conversations_seen": s.conversation_count,
498-
"delta_stores_attempted": s.delta_stores_attempted
524+
"delta_stores_attempted": s.delta_stores_attempted,
525+
"streaming_passthrough": s.streaming_passthrough,
499526
})).unwrap();
500527
(StatusCode::OK,
501528
[(axum::http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))],
502529
json).into_response()
503530
}
504531

532+
/// GET `/_version` — lightweight health-check endpoint.
533+
///
534+
/// Returns a JSON object with the proxy binary version and name. Useful for
535+
/// systemd ExecStartPost health probes, monitoring scripts, and CI smoke tests.
536+
///
537+
/// ```text
538+
/// curl http://localhost:8088/_version
539+
/// {"name":"omnimcode-apiproxy","version":"1.0.0"}
540+
/// ```
541+
async fn version_endpoint() -> Response {
542+
let json = serde_json::json!({
543+
"name": "omnimcode-apiproxy",
544+
"version": env!("CARGO_PKG_VERSION"),
545+
});
546+
(StatusCode::OK,
547+
[(axum::http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))],
548+
json.to_string()).into_response()
549+
}
550+
505551
/// Walk the request body and rewrite every eligible large block.
506552
///
507553
/// What gets rewritten (each independently):
@@ -921,7 +967,7 @@ fn make_marker_with_dedup(
921967
.or_else(|| state.store.store(PROXY_CACHE_NAMESPACE, text).ok())
922968
.ok_or_else(|| anyhow::anyhow!("cache write failed"))?;
923969
// Index this body's prefix so the NEXT near-edit can find it as base.
924-
if text.len() >= 1024 { register_prefix(text, hash, state); }
970+
if text.len() >= state.delta_min_bytes { register_prefix(text, hash, state); }
925971

926972
// v0.14.7-L: if we've already emitted a full marker for this hash this
927973
// request, the subsequent ones can be the bare-minimum form.
@@ -976,7 +1022,7 @@ fn register_prefix(text: &str, hash: i64, state: &AppState) {
9761022
/// The hash returned is still the hash of the FULL text (so the marker / recall
9771023
/// path is unchanged for the LLM).
9781024
fn try_delta_store(text: &str, state: &AppState) -> Option<i64> {
979-
if text.len() < 1024 { return None; }
1025+
if text.len() < state.delta_min_bytes { return None; }
9801026
let prefix = &text.as_bytes()[..text.len().min(256)];
9811027
let prefix_hash = omnimcode_core::tokenizer::fnv1a_64(prefix) as u64;
9821028
let base_hash = {
@@ -1048,6 +1094,7 @@ mod tests {
10481094
upstream: "http://127.0.0.1:0".into(),
10491095
rewrite_threshold: threshold,
10501096
preview_bytes: 80,
1097+
delta_min_bytes: 1024,
10511098
http: reqwest::Client::new(),
10521099
store: Arc::new(MemoryStore::from_env()),
10531100
stats: Arc::new(std::sync::Mutex::new(RewriteStats::default())),
@@ -1566,4 +1613,82 @@ mod tests {
15661613
assert_eq!(got, expected);
15671614
}
15681615
}
1616+
1617+
// ── Feature: streaming detection ──────────────────────────────────────────
1618+
1619+
/// `is_streaming_request` must return true only when the top-level
1620+
/// `stream` field is the boolean `true`.
1621+
#[test]
1622+
fn streaming_flag_detected() {
1623+
let to_bytes = |v: &serde_json::Value| serde_json::to_vec(v).unwrap();
1624+
1625+
let streaming = json!({"model":"claude-3","stream":true, "messages":[]});
1626+
let not_streaming = json!({"model":"claude-3","stream":false,"messages":[]});
1627+
let no_field = json!({"model":"claude-3","messages":[]});
1628+
let null_field = json!({"model":"claude-3","stream":null,"messages":[]});
1629+
1630+
assert!( is_streaming_request(&to_bytes(&streaming)), "stream:true should be detected");
1631+
assert!(!is_streaming_request(&to_bytes(&not_streaming)), "stream:false must not be detected");
1632+
assert!(!is_streaming_request(&to_bytes(&no_field)), "missing field must not be detected");
1633+
assert!(!is_streaming_request(&to_bytes(&null_field)), "null field must not be detected");
1634+
}
1635+
1636+
// ── Feature: configurable delta_min_bytes ─────────────────────────────────
1637+
1638+
/// With a very high `delta_min_bytes`, near-edits no longer attempt
1639+
/// delta storage — delta_stores_attempted stays zero.
1640+
#[test]
1641+
fn high_delta_min_bytes_skips_delta() {
1642+
let mut state = test_state(200);
1643+
state.delta_min_bytes = 999_999; // nothing will ever be this long in tests
1644+
1645+
let base = "a".repeat(500);
1646+
let edit = format!("{}modified", &base[..400]);
1647+
1648+
// Store base, then attempt a near-edit — both should be plain stores.
1649+
let body1 = json!({"model":"m","messages":[
1650+
{"role":"user","content": base.clone()}
1651+
]});
1652+
let body2 = json!({"model":"m","messages":[
1653+
{"role":"assistant","content": base.clone()},
1654+
{"role":"user","content": edit.clone()}
1655+
]});
1656+
let _ = rewrite_request_body(
1657+
&serde_json::to_vec(&body1).unwrap(), &state);
1658+
let _ = rewrite_request_body(
1659+
&serde_json::to_vec(&body2).unwrap(), &state);
1660+
1661+
assert_eq!(state.stats.lock().unwrap().delta_stores_attempted, 0,
1662+
"delta_stores_attempted must stay 0 when delta_min_bytes is very large");
1663+
}
1664+
1665+
/// With a low `delta_min_bytes`, even short near-edits route through the
1666+
/// delta store (delta_stores_attempted increments).
1667+
#[test]
1668+
fn low_delta_min_bytes_triggers_delta() {
1669+
let mut state = test_state(64);
1670+
state.delta_min_bytes = 64; // very low — any block ≥64 bytes attempts delta
1671+
1672+
let base = "x".repeat(200);
1673+
1674+
// First turn: assistant reply with base content — assistant messages are
1675+
// always eligible for rewriting, so the prefix gets registered.
1676+
let body1 = json!({"model":"m","messages":[
1677+
{"role":"user","content":"hi"},
1678+
{"role":"assistant","content":[{"type":"text","text": base.clone()}]}
1679+
]});
1680+
let _ = rewrite_request_body(&serde_json::to_vec(&body1).unwrap(), &state);
1681+
1682+
// Second turn: same assistant content appears again as history.
1683+
// The prefix_index now has an entry for it, so try_delta_store fires.
1684+
let body2 = json!({"model":"m","messages":[
1685+
{"role":"user","content":"hi"},
1686+
{"role":"assistant","content":[{"type":"text","text": base.clone()}]},
1687+
{"role":"user","content":"follow-up question?"},
1688+
]});
1689+
let _ = rewrite_request_body(&serde_json::to_vec(&body2).unwrap(), &state);
1690+
1691+
assert!(state.stats.lock().unwrap().delta_stores_attempted > 0,
1692+
"expected delta_stores_attempted > 0 with low delta_min_bytes");
1693+
}
15691694
}

0 commit comments

Comments
 (0)