Skip to content

Commit a91bcc6

Browse files
committed
chore
1 parent ba606af commit a91bcc6

1 file changed

Lines changed: 39 additions & 7 deletions

File tree

datafusion/datasource-json/benches/json_boundary.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,31 @@ use std::sync::Arc;
4141
use std::sync::atomic::{AtomicU64, Ordering};
4242
use tokio::runtime::{Builder, Runtime};
4343

44+
// Add CPU cost per requested KB to make read amplification visible in timings.
45+
const CPU_COST_PER_KB_ROUNDS: u32 = 64;
46+
const BYTES_PER_KB: u64 = 1024;
47+
4448
#[derive(Debug)]
4549
struct CountingObjectStore {
4650
inner: Arc<dyn ObjectStore>,
4751
requested_bytes: AtomicU64,
48-
requested_calls: AtomicU64,
52+
cpu_cost_per_kb_rounds: u32,
4953
}
5054

5155
impl CountingObjectStore {
52-
fn new(inner: Arc<dyn ObjectStore>) -> Self {
56+
fn new_with_cpu_cost(
57+
inner: Arc<dyn ObjectStore>,
58+
cpu_cost_per_kb_rounds: u32,
59+
) -> Self {
5360
Self {
5461
inner,
5562
requested_bytes: AtomicU64::new(0),
56-
requested_calls: AtomicU64::new(0),
63+
cpu_cost_per_kb_rounds,
5764
}
5865
}
5966

6067
fn reset(&self) {
6168
self.requested_bytes.store(0, Ordering::Relaxed);
62-
self.requested_calls.store(0, Ordering::Relaxed);
6369
}
6470

6571
fn requested_bytes(&self) -> u64 {
@@ -97,15 +103,21 @@ impl ObjectStore for CountingObjectStore {
97103
location: &Path,
98104
options: GetOptions,
99105
) -> object_store::Result<GetResult> {
106+
let should_burn_cpu = self.cpu_cost_per_kb_rounds > 0;
107+
let mut requested_len = 0u64;
100108
if let Some(range) = options.range.as_ref() {
101109
let requested = match range {
102110
GetRange::Bounded(r) => r.end.saturating_sub(r.start),
103111
GetRange::Offset(_) | GetRange::Suffix(_) => 0,
104112
};
113+
requested_len = requested;
105114
self.requested_bytes.fetch_add(requested, Ordering::Relaxed);
106115
}
107-
self.requested_calls.fetch_add(1, Ordering::Relaxed);
108-
self.inner.get_opts(location, options).await
116+
let result = self.inner.get_opts(location, options).await;
117+
if should_burn_cpu {
118+
burn_cpu_kb(requested_len, self.cpu_cost_per_kb_rounds);
119+
}
120+
result
109121
}
110122

111123
async fn delete(&self, location: &Path) -> object_store::Result<()> {
@@ -158,6 +170,21 @@ fn build_fixed_json_lines(line_len: usize, lines: usize) -> Bytes {
158170
Bytes::from(data)
159171
}
160172

173+
fn burn_cpu_kb(bytes: u64, rounds: u32) {
174+
if bytes == 0 || rounds == 0 {
175+
return;
176+
}
177+
let kb = (bytes + BYTES_PER_KB - 1) / BYTES_PER_KB;
178+
let mut checksum = 0u64;
179+
let mut remaining = kb.saturating_mul(rounds as u64);
180+
while remaining > 0 {
181+
checksum = checksum.wrapping_add(remaining);
182+
checksum = checksum.rotate_left(5) ^ 0x9e3779b97f4a7c15;
183+
remaining -= 1;
184+
}
185+
std::hint::black_box(checksum);
186+
}
187+
161188
struct Fixture {
162189
store: Arc<CountingObjectStore>,
163190
task_ctx: Arc<TaskContext>,
@@ -166,7 +193,10 @@ struct Fixture {
166193

167194
fn build_fixture(rt: &Runtime) -> Fixture {
168195
let inner: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
169-
let store = Arc::new(CountingObjectStore::new(Arc::clone(&inner)));
196+
let store = Arc::new(CountingObjectStore::new_with_cpu_cost(
197+
Arc::clone(&inner),
198+
CPU_COST_PER_KB_ROUNDS,
199+
));
170200
let store_dyn: Arc<dyn ObjectStore> = store.clone();
171201
let path = Path::from("bench.json");
172202

@@ -183,6 +213,7 @@ fn build_fixture(rt: &Runtime) -> Fixture {
183213
let task_ctx = Arc::new(TaskContext::default());
184214
let runtime_env = task_ctx.runtime_env();
185215
let object_store_url = ObjectStoreUrl::parse("test://bucket").unwrap();
216+
// Register a CPU-costed store to approximate non-streaming remote reads.
186217
runtime_env.register_object_store(object_store_url.as_ref(), Arc::clone(&store_dyn));
187218
let schema = Arc::new(Schema::new(vec![Field::new(
188219
"value",
@@ -253,6 +284,7 @@ fn bench_json_boundary(c: &mut Criterion) {
253284
let exec_bytes = measure_datasource_exec_bytes(&rt, &fixture);
254285

255286
let mut exec_group = c.benchmark_group("json_boundary_datasource_exec");
287+
// The read_bytes tag is the primary signal; time reflects simulated CPU cost.
256288
exec_group.bench_function(
257289
BenchmarkId::new("execute", format!("read_bytes={exec_bytes}")),
258290
|b| {

0 commit comments

Comments
 (0)