Skip to content

Commit e1ca772

Browse files
committed
fix: add NDJSON batch split utilities for parallel line parsing
1 parent 311af18 commit e1ca772

2 files changed

Lines changed: 168 additions & 0 deletions

File tree

src/worker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod accumulator;
4343
mod batch;
4444
mod config;
4545
pub(crate) mod metrics;
46+
pub mod ndjson;
4647
mod pool;
4748
pub(crate) mod scaler;
4849
mod stats;

src/worker/ndjson.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Project: hyperi-rustlib
2+
// File: src/worker/ndjson.rs
3+
// Purpose: NDJSON batch splitting and parallel parsing utilities
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! NDJSON (newline-delimited JSON) batch processing utilities.
10+
//!
11+
//! Splits NDJSON byte payloads into individual lines and optionally parses
12+
//! them in parallel via [`AdaptiveWorkerPool::process_batch`].
13+
//!
14+
//! Does NOT depend on a specific JSON parser — the parse function is a closure.
15+
//! Use with `sonic-rs`, `serde_json`, or any other parser.
16+
//!
17+
//! ## Example
18+
//!
19+
//! ```rust,ignore
20+
//! use hyperi_rustlib::worker::ndjson;
21+
//!
22+
//! let payload = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
23+
//! let lines = ndjson::split_lines(payload);
24+
//! assert_eq!(lines.len(), 3);
25+
//!
26+
//! // Parallel parse (with worker pool)
27+
//! let parsed = pool.process_batch(&lines, |line| {
28+
//! sonic_rs::from_slice::<Value>(line).map_err(|e| e.to_string())
29+
//! });
30+
//! ```
31+
32+
/// Split an NDJSON payload into individual line slices.
33+
///
34+
/// Handles `\n`, `\r\n`, trailing newlines, and blank lines (skipped).
35+
/// Zero-copy — returns slices into the original payload.
36+
#[must_use]
37+
pub fn split_lines(payload: &[u8]) -> Vec<&[u8]> {
38+
let mut lines = Vec::new();
39+
let mut start = 0;
40+
41+
for (i, &byte) in payload.iter().enumerate() {
42+
if byte == b'\n' {
43+
let mut end = i;
44+
// Handle \r\n
45+
if end > start && payload[end - 1] == b'\r' {
46+
end -= 1;
47+
}
48+
if end > start {
49+
lines.push(&payload[start..end]);
50+
}
51+
start = i + 1;
52+
}
53+
}
54+
55+
// Handle last line without trailing newline
56+
if start < payload.len() {
57+
let end = if payload[payload.len() - 1] == b'\r' {
58+
payload.len() - 1
59+
} else {
60+
payload.len()
61+
};
62+
if end > start {
63+
lines.push(&payload[start..end]);
64+
}
65+
}
66+
67+
lines
68+
}
69+
70+
/// Count the number of NDJSON lines in a payload without allocating.
71+
///
72+
/// Useful for pre-sizing buffers before splitting.
73+
#[must_use]
74+
pub fn count_lines(payload: &[u8]) -> usize {
75+
if payload.is_empty() {
76+
return 0;
77+
}
78+
79+
let newlines = payload.iter().filter(|&&b| b == b'\n').count();
80+
// If the payload doesn't end with \n, there's one more line
81+
let trailing = if payload.last() == Some(&b'\n') { 0 } else { 1 };
82+
newlines + trailing
83+
}
84+
85+
#[cfg(test)]
86+
mod tests {
87+
use super::*;
88+
89+
#[test]
90+
fn test_split_simple() {
91+
let payload = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
92+
let lines = split_lines(payload);
93+
assert_eq!(lines.len(), 3);
94+
assert_eq!(lines[0], b"{\"a\":1}");
95+
assert_eq!(lines[1], b"{\"b\":2}");
96+
assert_eq!(lines[2], b"{\"c\":3}");
97+
}
98+
99+
#[test]
100+
fn test_split_no_trailing_newline() {
101+
let payload = b"{\"a\":1}\n{\"b\":2}";
102+
let lines = split_lines(payload);
103+
assert_eq!(lines.len(), 2);
104+
assert_eq!(lines[0], b"{\"a\":1}");
105+
assert_eq!(lines[1], b"{\"b\":2}");
106+
}
107+
108+
#[test]
109+
fn test_split_single_line() {
110+
let payload = b"{\"x\":42}";
111+
let lines = split_lines(payload);
112+
assert_eq!(lines.len(), 1);
113+
assert_eq!(lines[0], b"{\"x\":42}");
114+
}
115+
116+
#[test]
117+
fn test_split_empty() {
118+
let lines = split_lines(b"");
119+
assert!(lines.is_empty());
120+
}
121+
122+
#[test]
123+
fn test_split_blank_lines_skipped() {
124+
let payload = b"{\"a\":1}\n\n{\"b\":2}\n\n";
125+
let lines = split_lines(payload);
126+
assert_eq!(lines.len(), 2);
127+
}
128+
129+
#[test]
130+
fn test_split_crlf() {
131+
let payload = b"{\"a\":1}\r\n{\"b\":2}\r\n";
132+
let lines = split_lines(payload);
133+
assert_eq!(lines.len(), 2);
134+
assert_eq!(lines[0], b"{\"a\":1}");
135+
assert_eq!(lines[1], b"{\"b\":2}");
136+
}
137+
138+
#[test]
139+
fn test_split_large_payload() {
140+
let mut payload = Vec::new();
141+
for i in 0..1000 {
142+
payload.extend_from_slice(format!("{{\"id\":{i}}}\n").as_bytes());
143+
}
144+
let lines = split_lines(&payload);
145+
assert_eq!(lines.len(), 1000);
146+
}
147+
148+
#[test]
149+
fn test_count_lines_simple() {
150+
assert_eq!(count_lines(b"{}\n{}\n{}\n"), 3);
151+
}
152+
153+
#[test]
154+
fn test_count_lines_no_trailing() {
155+
assert_eq!(count_lines(b"{}\n{}"), 2);
156+
}
157+
158+
#[test]
159+
fn test_count_lines_empty() {
160+
assert_eq!(count_lines(b""), 0);
161+
}
162+
163+
#[test]
164+
fn test_count_lines_single() {
165+
assert_eq!(count_lines(b"{}"), 1);
166+
}
167+
}

0 commit comments

Comments
 (0)