Skip to content

Commit a5d847e

Browse files
authored
Split RespDeserializer into RespDeserializer + RespFrameParser (#125)
- Improve performance by parsing the RESP response on the network thread to detect the end of the frame and deserializing the RESP response on the client thread, based on the result of the parsing from the network thread
1 parent e4d94d3 commit a5d847e

54 files changed

Lines changed: 2499 additions & 2481 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,14 @@ harness = false
106106
required-features = ["bench"]
107107

108108
[[bench]]
109-
name = "large_array_parsing"
109+
name = "parse_integer"
110110
harness = false
111111
required-features = ["bench"]
112112

113+
[[bench]]
114+
name = "large_array_parsing"
115+
harness = false
116+
required-features = ["bench"]
113117

114118
[[example]]
115119
name = "actix_crud"

benches/generic_api.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ fn bench_rustis_simple_getsetdel_async(b: &mut Bencher) {
113113
let key = "test_key";
114114

115115
client
116-
.send(cmd("SET").arg(key).arg(42.423456), None)
116+
.send::<()>(cmd("SET").arg(key).arg(42.423456), None)
117117
.await?;
118-
let _: f64 = client.send(cmd("GET").arg(key), None).await?.to()?;
119-
client.send(cmd("DEL").arg(key), None).await?;
118+
let _: f64 = client.send(cmd("GET").arg(key), None).await?;
119+
client.send::<u32>(cmd("DEL").arg(key), None).await?;
120120

121121
Ok::<_, Error>(())
122122
})

benches/parse_integer.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use criterion::{Criterion, criterion_group, criterion_main};
2+
use memchr::memchr;
3+
use std::hint::black_box;
4+
5+
#[derive(Debug)]
6+
pub enum Error {
7+
EOF,
8+
CannotParseInteger,
9+
}
10+
11+
// --- VERSION 1 : MEMCHR + ATOI ---
12+
fn parse_integer_atoi(buf: &[u8], pos: &mut usize) -> Result<i64, Error> {
13+
let rem = &buf[*pos..];
14+
let i = memchr(b'\r', rem).ok_or(Error::EOF)?;
15+
if i + 1 >= rem.len() || rem[i + 1] != b'\n' {
16+
return Err(Error::EOF);
17+
}
18+
19+
let val = atoi::atoi::<i64>(&rem[..i]).ok_or(Error::CannotParseInteger)?;
20+
21+
*pos += i + 2;
22+
Ok(val)
23+
}
24+
25+
// --- VERSION 2 : MANUAL (*10) ---
26+
fn parse_integer_manual(buf: &[u8], pos: &mut usize) -> Result<i64, Error> {
27+
let mut n = 0i64;
28+
let b_slice = &buf[*pos..];
29+
30+
for (i, &b) in b_slice.iter().enumerate() {
31+
match b {
32+
b'0'..=b'9' => {
33+
n = n.wrapping_mul(10).wrapping_add((b - b'0') as i64);
34+
}
35+
b'\r' => {
36+
if i + 1 < b_slice.len() && b_slice[i + 1] == b'\n' {
37+
*pos += i + 2;
38+
return Ok(n);
39+
}
40+
return Err(Error::CannotParseInteger);
41+
}
42+
_ => return Err(Error::CannotParseInteger),
43+
}
44+
}
45+
Err(Error::EOF)
46+
}
47+
48+
fn criterion_benchmark(c: &mut Criterion) {
49+
// On teste sur un petit entier (cas typique des longueurs RESP)
50+
let data_small = b"12\r\n";
51+
// On teste sur un grand entier
52+
let data_large = b"1234567890\r\n";
53+
54+
let mut group = c.benchmark_group("Integer Parsing");
55+
56+
group.bench_function("atoi_small", |b| {
57+
b.iter(|| {
58+
let mut pos = 0;
59+
let _ = parse_integer_atoi(black_box(data_small), &mut pos);
60+
})
61+
});
62+
63+
group.bench_function("manual_small", |b| {
64+
b.iter(|| {
65+
let mut pos = 0;
66+
let _ = parse_integer_manual(black_box(data_small), &mut pos);
67+
})
68+
});
69+
70+
group.bench_function("atoi_large", |b| {
71+
b.iter(|| {
72+
let mut pos = 0;
73+
let _ = parse_integer_atoi(black_box(data_large), &mut pos);
74+
})
75+
});
76+
77+
group.bench_function("manual_large", |b| {
78+
b.iter(|| {
79+
let mut pos = 0;
80+
let _ = parse_integer_manual(black_box(data_large), &mut pos);
81+
})
82+
});
83+
84+
group.finish();
85+
}
86+
87+
criterion_group!(benches, criterion_benchmark);
88+
criterion_main!(benches);

examples/resp_profiling.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
1-
use rustis::resp::RespDeserializer;
2-
use serde::Deserialize;
1+
// use rustis::resp::RespDeserializer;
2+
// use serde::Deserialize;
33

44
fn main() -> rustis::Result<()> {
5-
let mut raw_data = Vec::new();
6-
raw_data.extend_from_slice(b"$1024\r\n");
7-
raw_data.extend_from_slice(&vec![b'A'; 1024]);
8-
raw_data.extend_from_slice(b"\r\n");
5+
// let mut raw_data = Vec::new();
6+
// raw_data.extend_from_slice(b"$1024\r\n");
7+
// raw_data.extend_from_slice(&vec![b'A'; 1024]);
8+
// raw_data.extend_from_slice(b"\r\n");
99

10-
println!("desrializer stress-test startup...");
10+
// println!("desrializer stress-test startup...");
1111

12-
for i in 0..500_000 {
13-
let mut resp_deserializer = RespDeserializer::new(&raw_data);
14-
let result = String::deserialize(&mut resp_deserializer);
15-
let _ = std::hint::black_box(result);
12+
// for i in 0..500_000 {
13+
// let (frame, _) = RespFrameParser::new(&raw_data).parse()?;
14+
// let response = RespResponse::new(RespBuf::from(Bytes::copy_from_slice(buf)), frame);
15+
// let mut resp_deserializer = RespDeserializer::new(&raw_data);
16+
// let result = String::deserialize(&mut resp_deserializer);
17+
// let _ = std::hint::black_box(result);
1618

17-
if i % 100_000 == 0 {
18-
println!("Processed {} iterations", i);
19-
}
20-
}
19+
// if i % 100_000 == 0 {
20+
// println!("Processed {} iterations", i);
21+
// }
22+
// }
2123

2224
Ok(())
2325
}

run_tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
cargo test --features tokio-rustls,pool,json,client-cache -- --test-threads=1

src/cache.rs

Lines changed: 80 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ use crate::{
88
StringCommands, ZRangeOptions,
99
},
1010
resp::{
11-
BulkString, Command, CommandArgsMut, RespBuf, RespDeserializer, RespSerializer, Response,
12-
Value, cmd,
11+
BulkString, Command, CommandArgsMut, FastPathCommandBuilder, RespDeserializer,
12+
RespResponse, Response,
1313
},
1414
};
15-
use bytes::BytesMut;
15+
use bytes::Bytes;
1616
use dashmap::DashMap;
1717
use futures_util::StreamExt;
18-
use serde::{Deserialize, Serialize, de::DeserializeOwned};
19-
use std::{fmt::Write, sync::Arc, time::Duration};
18+
use serde::{Serialize, de::DeserializeOwned};
19+
use std::{sync::Arc, time::Duration};
2020

2121
/// Re-export the moka cache builder.
2222
pub use moka::future::CacheBuilder;
2323

24-
type SubCache = DashMap<Command, RespBuf>;
24+
type SubCache = DashMap<Bytes, RespResponse>;
2525
type MokaCache = moka::future::Cache<BulkString, Arc<SubCache>>;
2626
type MokaCacheBuilder = moka::future::CacheBuilder<BulkString, Arc<SubCache>, MokaCache>;
2727

@@ -134,65 +134,78 @@ impl Cache {
134134
/// Executes the `MGET` command with client-side caching.
135135
pub async fn mget<R: Response + DeserializeOwned>(&self, keys: impl Serialize) -> Result<R> {
136136
let prepared_command = self.client.mget::<R>(keys);
137-
let mut collection_buf = BytesMut::new();
138-
let _ =
139-
collection_buf.write_fmt(format_args!("*{}\r\n", prepared_command.command.num_args()));
140-
141-
for arg in (0..prepared_command.command.num_args())
142-
.filter_map(|i| prepared_command.command.get_arg(i))
143-
{
144-
let key = BulkString::from(arg.to_vec());
145-
146-
let Some(values) = self.cache.get(&key).await else {
147-
collection_buf.clear();
148-
break;
149-
};
150-
151-
let prepared_command = self.client.get::<R>(arg);
152-
let Some(buf) = values.get(&prepared_command.command) else {
153-
collection_buf.clear();
154-
break;
155-
};
156-
157-
collection_buf.extend(buf.iter());
137+
let mut responses = Vec::with_capacity(prepared_command.command.num_args());
138+
let mut missing_indices = Vec::new();
139+
let mut missing_keys = Vec::new();
140+
141+
// 1. check cache
142+
for (i, arg) in prepared_command.command.args().enumerate() {
143+
let key = BulkString::from(arg.clone());
144+
145+
if let Some(values) = self.cache.get(&key).await
146+
&& let Some(response) = values.get(FastPathCommandBuilder::get(key.clone()).bytes())
147+
{
148+
log::debug!(
149+
"[{}] Cache hit on key `{}`",
150+
self.client.connection_tag(),
151+
key
152+
);
153+
responses.push(response.clone());
154+
} else {
155+
log::debug!(
156+
"[{}] Cache miss on key `{}`",
157+
self.client.connection_tag(),
158+
key
159+
);
160+
responses.push(RespResponse::null());
161+
missing_indices.push(i);
162+
missing_keys.push(key);
163+
}
158164
}
159165

160-
if !collection_buf.is_empty() {
161-
log::debug!("[{}] Cache hit on mget", self.client.connection_tag(),);
166+
// 2. Fetch missing keys from Redis server if any
167+
if !missing_keys.is_empty() {
168+
let missing_prepared_command = self.client.mget::<R>(missing_keys);
169+
let response = self
170+
.client
171+
.internal_send(missing_prepared_command.command, None)
172+
.await?;
173+
let Ok(array_iter) = response.clone().into_array_iter() else {
174+
return Err(Error::Client(ClientError::ExpectedArrayForMGet));
175+
};
162176

163-
let mut deserializer = RespDeserializer::new(&collection_buf);
164-
return R::deserialize(&mut deserializer);
165-
}
177+
for (idx_in_missing, response) in array_iter.enumerate() {
178+
let original_idx = missing_indices[idx_in_missing];
179+
180+
let Some(key) = prepared_command
181+
.command
182+
.get_arg(original_idx)
183+
.map(BulkString::from)
184+
else {
185+
break;
186+
};
187+
188+
// Insert into cache
189+
self.cache
190+
.entry(key.clone())
191+
.or_insert_with(async { Arc::new(DashMap::new()) })
192+
.await
193+
.value()
194+
.insert(
195+
FastPathCommandBuilder::get(key).bytes().clone(),
196+
response.clone(),
197+
);
166198

167-
let buf = self
168-
.client
169-
.send(prepared_command.command.clone(), None)
170-
.await?;
171-
let mut deserializer = RespDeserializer::new(&buf);
172-
let Value::Array(values) = Value::deserialize(&mut deserializer)? else {
173-
return Err(Error::Client(ClientError::ExpectedArrayForMGet));
174-
};
175-
176-
for (value, key) in values.iter().zip(
177-
(0..prepared_command.command.num_args())
178-
.filter_map(|i| prepared_command.command.get_arg(i)),
179-
) {
180-
let mut serializer = RespSerializer::new();
181-
value.serialize(&mut serializer)?;
182-
183-
// Insert into cache
184-
self.cache
185-
.entry(key.to_vec().into())
186-
.or_insert_with(async { Arc::new(DashMap::new()) })
187-
.await
188-
.value()
189-
.insert(
190-
cmd("GET").arg(key).into(),
191-
RespBuf::new(serializer.get_output().into()),
192-
);
199+
responses[original_idx] = response;
200+
}
201+
} else {
202+
log::debug!("[{}] Cache hit on mget", self.client.connection_tag());
193203
}
194204

195-
R::deserialize(&Value::Array(values))
205+
// 3. deserialize
206+
let response = RespResponse::owned_array(responses);
207+
let deserializer = RespDeserializer::new(response.view());
208+
R::deserialize(deserializer)
196209
}
197210

198211
/// Executes the `GETRANGE` command with client-side caching.
@@ -458,15 +471,15 @@ impl Cache {
458471
R: Response + DeserializeOwned,
459472
{
460473
if let Some(values) = self.cache.get(&key).await
461-
&& let Some(buf) = values.get(&command)
474+
&& let Some(response) = values.get(command.bytes())
462475
{
463476
log::debug!(
464477
"[{}] Cache hit on key `{}`",
465478
self.client.connection_tag(),
466479
key
467480
);
468-
let mut deserializer = RespDeserializer::new(&buf);
469-
return R::deserialize(&mut deserializer);
481+
let deserializer = RespDeserializer::new(response.view());
482+
return R::deserialize(deserializer);
470483
}
471484

472485
// Cache miss: fetch from Redis
@@ -476,17 +489,18 @@ impl Cache {
476489
key
477490
);
478491

479-
let buf = self.client.send(command.clone(), None).await?;
480-
let mut deserializer = RespDeserializer::new(&buf);
481-
let deserialized = R::deserialize(&mut deserializer)?;
492+
let command_bytes = command.bytes().clone();
493+
let response = self.client.internal_send(command, None).await?;
494+
let deserializer = RespDeserializer::new(response.view());
495+
let deserialized = R::deserialize(deserializer)?;
482496

483497
// Insert into cache
484498
self.cache
485499
.entry(key)
486500
.or_insert_with(async { Arc::new(DashMap::new()) })
487501
.await
488502
.value()
489-
.insert(command, buf);
503+
.insert(command_bytes, response);
490504

491505
Ok(deserialized)
492506
}

0 commit comments

Comments
 (0)