Skip to content

Commit ad0979e

Browse files
authored
perf/server-latency-buffering : Buffer latencies in memory instead of per-message file I/O (#109)
* perf: Buffer latencies in memory instead of per-message file I/O Replace per-message file writes with in-memory buffering for one-way latency measurements. The server now accumulates latencies in a Vec during the test and writes them all at once when the test completes. This fixes a severe performance bottleneck in blocking mode where high-throughput tests (284K msg/sec) caused one-way latency to appear as 241ms instead of the actual ~29us due to file I/O overhead. Affected modes: - Blocking server mode (all mechanisms) - Async server mode (SHM, PMQ) Cherry-picked from container-to-container-ipc branch (7815318). AI-assisted-by: Claude claude-4.6-opus-high-thinking (Anthropic) Made-with: Cursor * refactor: Extract latency buffering helpers with unit tests - Extract should_buffer_latency() to decide whether a latency value should be buffered (excludes warmup canary messages with id == u64::MAX) - Extract write_latency_buffer() for batch file output of buffered latencies in one-value-per-line decimal format - Both async and blocking server functions now call shared helpers instead of duplicated inline logic - Add 7 unit tests covering canary exclusion, normal message inclusion, disabled buffering, file format, empty buffer, round-trip parsing, and invalid path error handling - Upgrade bytes 1.10.1 -> 1.11.1 to fix RUSTSEC-2026-0007 - Add .cargo/audit.toml to ignore RUSTSEC-2026-0009 (time crate, MSRV 1.70 constraint prevents upgrade) - All tests passing, zero clippy warnings AI-assisted-by: Claude Sonnet 4 (claude-sonnet-4-20250514) Made-with: Cursor * fix: Write latency buffer before propagating transport close errors - Blocking path: capture close_blocking() result, write latency file, then propagate error — prevents lost latency data on close failure - Async path: capture close() result, write latency file, then log warning on close error (was silently discarded with let _ =) - All tests passing AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor
1 parent f7f8eb9 commit ad0979e

1 file changed

Lines changed: 181 additions & 30 deletions

File tree

src/main.rs

Lines changed: 181 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -676,14 +676,13 @@ fn run_server_mode_blocking(args: cli::Args) -> Result<()> {
676676
.context("Failed to write server ready byte to stdout")?;
677677
io::stdout().flush().ok();
678678

679-
// Open latency file for writing if specified
680-
let mut latency_file = if let Some(ref path) = args.internal_latency_file {
681-
Some(
682-
std::fs::File::create(path)
683-
.with_context(|| format!("Failed to create latency file: {}", path))?,
684-
)
679+
// Buffer latencies in memory instead of per-message file I/O
680+
// This avoids the massive overhead of writing to disk for each message
681+
let latency_file_path = args.internal_latency_file.clone();
682+
let mut latency_buffer: Vec<u64> = if latency_file_path.is_some() {
683+
Vec::with_capacity(100_000) // Pre-allocate for performance
685684
} else {
686-
None
685+
Vec::new()
687686
};
688687

689688
// Persistent server loop: receive messages and optionally reply
@@ -695,12 +694,8 @@ fn run_server_mode_blocking(args: cli::Args) -> Result<()> {
695694
let receive_time_ns = get_monotonic_time_ns();
696695
let latency_ns = receive_time_ns.saturating_sub(message.timestamp);
697696

698-
// Write latency to file if enabled (one latency per line in nanoseconds)
699-
// Skip canary messages (ID == u64::MAX) which are used for warmup
700-
if let Some(ref mut file) = latency_file {
701-
if message.id != u64::MAX {
702-
writeln!(file, "{}", latency_ns).ok();
703-
}
697+
if should_buffer_latency(latency_file_path.is_some(), message.id) {
698+
latency_buffer.push(latency_ns);
704699
}
705700

706701
// Check for shutdown message (used by PMQ and other queue-based transports)
@@ -735,7 +730,14 @@ fn run_server_mode_blocking(args: cli::Args) -> Result<()> {
735730
}
736731
}
737732

738-
transport.close_blocking()?;
733+
let close_result = transport.close_blocking();
734+
735+
if let Some(ref path) = latency_file_path {
736+
write_latency_buffer(path, &latency_buffer)?;
737+
}
738+
739+
close_result?;
740+
739741
info!("Server exiting cleanly.");
740742
Ok(())
741743
}
@@ -871,15 +873,13 @@ async fn run_server_mode(args: cli::Args) -> Result<()> {
871873
.context("Failed to write server ready byte to stdout")?;
872874
io::stdout().flush().ok();
873875

874-
// Open latency file for writing if specified
875-
let mut latency_file = if let Some(ref path) = args.internal_latency_file {
876-
Some(
877-
tokio::fs::File::create(path)
878-
.await
879-
.with_context(|| format!("Failed to create latency file: {}", path))?,
880-
)
876+
// Buffer latencies in memory instead of per-message file I/O
877+
// This avoids the massive overhead of writing to disk for each message
878+
let latency_file_path = args.internal_latency_file.clone();
879+
let mut latency_buffer: Vec<u64> = if latency_file_path.is_some() {
880+
Vec::with_capacity(100_000) // Pre-allocate for performance
881881
} else {
882-
None
882+
Vec::new()
883883
};
884884

885885
// Persistent server loop: receive messages and optionally reply to
@@ -894,13 +894,8 @@ async fn run_server_mode(args: cli::Args) -> Result<()> {
894894
let receive_time_ns = get_monotonic_time_ns();
895895
let latency_ns = receive_time_ns.saturating_sub(msg.timestamp);
896896

897-
// Write latency to file if enabled (one latency per line in nanoseconds)
898-
// Skip canary messages (ID == u64::MAX) which are used for warmup
899-
if let Some(ref mut file) = latency_file {
900-
if msg.id != u64::MAX {
901-
use tokio::io::AsyncWriteExt;
902-
let _ = file.write_all(format!("{}\n", latency_ns).as_bytes()).await;
903-
}
897+
if should_buffer_latency(latency_file_path.is_some(), msg.id) {
898+
latency_buffer.push(latency_ns);
904899
}
905900

906901
// Message received
@@ -932,7 +927,16 @@ async fn run_server_mode(args: cli::Args) -> Result<()> {
932927
}
933928
}
934929

935-
let _ = transport.close().await;
930+
let close_result = transport.close().await;
931+
932+
if let Some(ref path) = latency_file_path {
933+
write_latency_buffer(path, &latency_buffer)?;
934+
}
935+
936+
if let Err(e) = close_result {
937+
warn!("Transport close error: {}", e);
938+
}
939+
936940
info!("Server mode finished.");
937941
Ok(())
938942
}
@@ -979,3 +983,150 @@ async fn run_benchmark_for_mechanism(
979983
results_manager.add_results(results).await?;
980984
Ok(())
981985
}
986+
987+
/// Returns `true` if a latency value should be buffered.
988+
///
989+
/// Latencies are only buffered when a latency file path is
990+
/// configured and the message is not a warmup canary
991+
/// (canary messages use `id == u64::MAX`).
992+
fn should_buffer_latency(latency_file_enabled: bool, message_id: u64) -> bool {
993+
latency_file_enabled && message_id != u64::MAX
994+
}
995+
996+
/// Write a buffer of latency values to a file.
997+
///
998+
/// Each latency is written as a single line containing the
999+
/// nanosecond value in decimal. This format matches what the
1000+
/// client-side benchmark reader expects.
1001+
///
1002+
/// # Errors
1003+
///
1004+
/// Returns an error if the file cannot be created or written.
1005+
fn write_latency_buffer(path: &str, buffer: &[u64]) -> Result<()> {
1006+
debug!(
1007+
"Writing {} buffered latencies to file: {}",
1008+
buffer.len(),
1009+
path,
1010+
);
1011+
let mut file = std::fs::File::create(path)
1012+
.with_context(|| format!("Failed to create latency file: {}", path))?;
1013+
for latency_ns in buffer {
1014+
writeln!(file, "{}", latency_ns).ok();
1015+
}
1016+
debug!("Finished writing latencies to file");
1017+
Ok(())
1018+
}
1019+
1020+
#[cfg(test)]
1021+
mod tests {
1022+
use super::*;
1023+
use std::io::{BufRead, BufReader};
1024+
1025+
/// Canary messages (id == u64::MAX) must not be buffered
1026+
/// because they are warmup probes, not real measurements.
1027+
#[test]
1028+
fn test_should_buffer_latency_excludes_canary() {
1029+
assert!(
1030+
!should_buffer_latency(true, u64::MAX),
1031+
"canary messages must be excluded"
1032+
);
1033+
}
1034+
1035+
/// Normal messages should be buffered when latency file
1036+
/// collection is enabled.
1037+
#[test]
1038+
fn test_should_buffer_latency_includes_normal() {
1039+
assert!(should_buffer_latency(true, 0));
1040+
assert!(should_buffer_latency(true, 1));
1041+
assert!(should_buffer_latency(true, 42));
1042+
assert!(should_buffer_latency(true, u64::MAX - 1));
1043+
}
1044+
1045+
/// When the latency file path is not configured, no
1046+
/// messages should be buffered regardless of id.
1047+
#[test]
1048+
fn test_should_buffer_latency_disabled() {
1049+
assert!(!should_buffer_latency(false, 0));
1050+
assert!(!should_buffer_latency(false, 42));
1051+
assert!(!should_buffer_latency(false, u64::MAX));
1052+
}
1053+
1054+
/// Verify that write_latency_buffer produces one decimal
1055+
/// u64 value per line, matching the format that the
1056+
/// client-side reader expects.
1057+
#[test]
1058+
fn test_write_latency_buffer_format() {
1059+
let dir = std::env::temp_dir();
1060+
let path = dir
1061+
.join("test_latency_buffer_format.txt")
1062+
.to_string_lossy()
1063+
.to_string();
1064+
1065+
let latencies: Vec<u64> = vec![100, 200, 999, 0, 42];
1066+
write_latency_buffer(&path, &latencies).unwrap();
1067+
1068+
let file = std::fs::File::open(&path).unwrap();
1069+
let lines: Vec<String> = BufReader::new(file).lines().map(|l| l.unwrap()).collect();
1070+
1071+
assert_eq!(lines.len(), 5);
1072+
assert_eq!(lines[0], "100");
1073+
assert_eq!(lines[1], "200");
1074+
assert_eq!(lines[2], "999");
1075+
assert_eq!(lines[3], "0");
1076+
assert_eq!(lines[4], "42");
1077+
1078+
let _ = std::fs::remove_file(&path);
1079+
}
1080+
1081+
/// An empty buffer should produce an empty file.
1082+
#[test]
1083+
fn test_write_latency_buffer_empty() {
1084+
let dir = std::env::temp_dir();
1085+
let path = dir
1086+
.join("test_latency_buffer_empty.txt")
1087+
.to_string_lossy()
1088+
.to_string();
1089+
1090+
write_latency_buffer(&path, &[]).unwrap();
1091+
1092+
let contents = std::fs::read_to_string(&path).unwrap();
1093+
assert!(
1094+
contents.is_empty(),
1095+
"empty buffer should produce empty file"
1096+
);
1097+
1098+
let _ = std::fs::remove_file(&path);
1099+
}
1100+
1101+
/// Verify values round-trip through file I/O correctly,
1102+
/// matching the same parse logic the benchmark reader uses.
1103+
#[test]
1104+
fn test_write_latency_buffer_round_trip_parse() {
1105+
let dir = std::env::temp_dir();
1106+
let path = dir
1107+
.join("test_latency_buffer_roundtrip.txt")
1108+
.to_string_lossy()
1109+
.to_string();
1110+
1111+
let original: Vec<u64> = vec![1, u64::MAX - 1, 0, 123_456_789];
1112+
write_latency_buffer(&path, &original).unwrap();
1113+
1114+
let file = std::fs::File::open(&path).unwrap();
1115+
let parsed: Vec<u64> = BufReader::new(file)
1116+
.lines()
1117+
.filter_map(|l| l.ok().and_then(|s| s.trim().parse::<u64>().ok()))
1118+
.collect();
1119+
1120+
assert_eq!(parsed, original);
1121+
1122+
let _ = std::fs::remove_file(&path);
1123+
}
1124+
1125+
/// write_latency_buffer should return an error when given
1126+
/// an invalid path (e.g. a non-existent directory).
1127+
#[test]
1128+
fn test_write_latency_buffer_invalid_path() {
1129+
let result = write_latency_buffer("/no/such/directory/latencies.txt", &[1, 2, 3]);
1130+
assert!(result.is_err(), "writing to invalid path should fail");
1131+
}
1132+
}

0 commit comments

Comments
 (0)