Skip to content

Commit b0227db

Browse files
connortsui20claude
andcommitted
[claude] add benchmarks website v3 design overview and plan (#7756)
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com> --------- Signed-off-by: Claude <noreply@anthropic.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 606c52c commit b0227db

30 files changed

Lines changed: 5254 additions & 4669 deletions
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Per-fact-table row accumulators + their `RecordBatch` builders.
5+
//!
6+
//! Each `*Accum` collects classified records during the streaming JSONL
7+
//! pass and then materialises one Arrow `RecordBatch` per fact table at
8+
//! flush time. Three of the four use parallel column vectors with a
9+
//! `seen` map keyed by `measurement_id`; `CompressionSizeAccum` is a
10+
//! `HashMap<i64, CompressionSize>` because it has two collision semantics
11+
//! (replace from `data.json.gz`, sum from `file-sizes-*.json.gz`).
12+
13+
use std::sync::Arc;
14+
15+
use anyhow::Result;
16+
use arrow_array::ArrayRef;
17+
use arrow_array::Int32Array;
18+
use arrow_array::Int64Array;
19+
use arrow_array::ListArray;
20+
use arrow_array::RecordBatch;
21+
use arrow_array::StringArray;
22+
use arrow_buffer::OffsetBuffer;
23+
use arrow_schema::DataType;
24+
use arrow_schema::Field;
25+
use arrow_schema::Schema;
26+
use vortex_bench_server::records::CompressionSize;
27+
use vortex_bench_server::records::CompressionTime;
28+
use vortex_bench_server::records::QueryMeasurement;
29+
use vortex_bench_server::records::RandomAccessTime;
30+
use vortex_utils::aliases::hash_map::HashMap;
31+
32+
use super::MigrationSummary;
33+
34+
/// `query_measurements` accumulator. Parallel column vectors plus a
35+
/// `measurement_id`-keyed seen map; first-write wins on collision.
36+
#[derive(Default)]
37+
pub(super) struct QueryAccum {
38+
pub(super) measurement_id: Vec<i64>,
39+
pub(super) commit_sha: Vec<String>,
40+
pub(super) dataset: Vec<String>,
41+
pub(super) dataset_variant: Vec<Option<String>>,
42+
pub(super) scale_factor: Vec<Option<String>>,
43+
pub(super) query_idx: Vec<i32>,
44+
pub(super) storage: Vec<String>,
45+
pub(super) engine: Vec<String>,
46+
pub(super) format: Vec<String>,
47+
pub(super) value_ns: Vec<i64>,
48+
pub(super) all_runtimes_ns: Vec<Vec<i64>>,
49+
pub(super) peak_physical: Vec<Option<i64>>,
50+
pub(super) peak_virtual: Vec<Option<i64>>,
51+
pub(super) physical_delta: Vec<Option<i64>>,
52+
pub(super) virtual_delta: Vec<Option<i64>>,
53+
pub(super) env_triple: Vec<Option<String>>,
54+
/// `mid` -> index in the parallel column vecs. Lets us look up the
55+
/// kept row's `value_ns` on collision so we can flag conflicts.
56+
pub(super) seen: HashMap<i64, usize>,
57+
}
58+
59+
impl QueryAccum {
60+
pub(super) fn push(&mut self, mid: i64, r: QueryMeasurement, summary: &mut MigrationSummary) {
61+
if let Some(&idx) = self.seen.get(&mid) {
62+
summary.deduped += 1;
63+
if self.value_ns[idx] != r.value_ns {
64+
summary.deduped_with_conflict += 1;
65+
}
66+
return;
67+
}
68+
let idx = self.measurement_id.len();
69+
self.seen.insert(mid, idx);
70+
self.measurement_id.push(mid);
71+
self.commit_sha.push(r.commit_sha);
72+
self.dataset.push(r.dataset);
73+
self.dataset_variant.push(r.dataset_variant);
74+
self.scale_factor.push(r.scale_factor);
75+
self.query_idx.push(r.query_idx);
76+
self.storage.push(r.storage);
77+
self.engine.push(r.engine);
78+
self.format.push(r.format);
79+
self.value_ns.push(r.value_ns);
80+
self.all_runtimes_ns.push(r.all_runtimes_ns);
81+
self.peak_physical.push(r.peak_physical);
82+
self.peak_virtual.push(r.peak_virtual);
83+
self.physical_delta.push(r.physical_delta);
84+
self.virtual_delta.push(r.virtual_delta);
85+
self.env_triple.push(r.env_triple);
86+
}
87+
}
88+
89+
/// `compression_times` accumulator. Same shape as [`QueryAccum`] minus the
90+
/// query-only columns.
91+
#[derive(Default)]
92+
pub(super) struct CompressionTimeAccum {
93+
pub(super) measurement_id: Vec<i64>,
94+
pub(super) commit_sha: Vec<String>,
95+
pub(super) dataset: Vec<String>,
96+
pub(super) dataset_variant: Vec<Option<String>>,
97+
pub(super) format: Vec<String>,
98+
pub(super) op: Vec<String>,
99+
pub(super) value_ns: Vec<i64>,
100+
pub(super) all_runtimes_ns: Vec<Vec<i64>>,
101+
pub(super) env_triple: Vec<Option<String>>,
102+
pub(super) seen: HashMap<i64, usize>,
103+
}
104+
105+
impl CompressionTimeAccum {
106+
pub(super) fn push(&mut self, mid: i64, r: CompressionTime, summary: &mut MigrationSummary) {
107+
if let Some(&idx) = self.seen.get(&mid) {
108+
summary.deduped += 1;
109+
if self.value_ns[idx] != r.value_ns {
110+
summary.deduped_with_conflict += 1;
111+
}
112+
return;
113+
}
114+
let idx = self.measurement_id.len();
115+
self.seen.insert(mid, idx);
116+
self.measurement_id.push(mid);
117+
self.commit_sha.push(r.commit_sha);
118+
self.dataset.push(r.dataset);
119+
self.dataset_variant.push(r.dataset_variant);
120+
self.format.push(r.format);
121+
self.op.push(r.op);
122+
self.value_ns.push(r.value_ns);
123+
self.all_runtimes_ns.push(r.all_runtimes_ns);
124+
self.env_triple.push(r.env_triple);
125+
}
126+
}
127+
128+
/// `random_access_times` accumulator. Smallest of the three parallel-vec
129+
/// accumulators.
130+
#[derive(Default)]
131+
pub(super) struct RandomAccessAccum {
132+
pub(super) measurement_id: Vec<i64>,
133+
pub(super) commit_sha: Vec<String>,
134+
pub(super) dataset: Vec<String>,
135+
pub(super) format: Vec<String>,
136+
pub(super) value_ns: Vec<i64>,
137+
pub(super) all_runtimes_ns: Vec<Vec<i64>>,
138+
pub(super) env_triple: Vec<Option<String>>,
139+
pub(super) seen: HashMap<i64, usize>,
140+
}
141+
142+
impl RandomAccessAccum {
143+
pub(super) fn push(&mut self, mid: i64, r: RandomAccessTime, summary: &mut MigrationSummary) {
144+
if let Some(&idx) = self.seen.get(&mid) {
145+
summary.deduped += 1;
146+
if self.value_ns[idx] != r.value_ns {
147+
summary.deduped_with_conflict += 1;
148+
}
149+
return;
150+
}
151+
let idx = self.measurement_id.len();
152+
self.seen.insert(mid, idx);
153+
self.measurement_id.push(mid);
154+
self.commit_sha.push(r.commit_sha);
155+
self.dataset.push(r.dataset);
156+
self.format.push(r.format);
157+
self.value_ns.push(r.value_ns);
158+
self.all_runtimes_ns.push(r.all_runtimes_ns);
159+
self.env_triple.push(r.env_triple);
160+
}
161+
}
162+
163+
/// `compression_sizes` is fed by both `data.json.gz` (replace-on-collision)
164+
/// and `file-sizes-*.json.gz` (sum-on-collision). Stored as a map; converted
165+
/// to a `RecordBatch` at flush time.
166+
#[derive(Default)]
167+
pub(super) struct CompressionSizeAccum {
168+
pub(super) rows: HashMap<i64, CompressionSize>,
169+
}
170+
171+
impl CompressionSizeAccum {
172+
/// data.json.gz path: latest write wins, mirroring the prior
173+
/// `ON CONFLICT DO UPDATE SET value_bytes = excluded.value_bytes`.
174+
/// Bumps `deduped_with_conflict` when an existing row's
175+
/// `value_bytes` differs from the incoming row's, so silent
176+
/// value-corruption is observable.
177+
pub(super) fn push_replace(
178+
&mut self,
179+
mid: i64,
180+
r: CompressionSize,
181+
summary: &mut MigrationSummary,
182+
) {
183+
if let Some(existing) = self.rows.get(&mid)
184+
&& existing.value_bytes != r.value_bytes
185+
{
186+
summary.deduped_with_conflict += 1;
187+
}
188+
self.rows.insert(mid, r);
189+
}
190+
191+
/// file-sizes-*.json.gz path: per-file rows aggregate into one
192+
/// `(commit, dataset, dataset_variant, format)` row by summing,
193+
/// mirroring the prior `value_bytes = compression_sizes.value_bytes
194+
/// + excluded.value_bytes`.
195+
pub(super) fn push_sum(&mut self, mid: i64, r: CompressionSize) {
196+
let add = r.value_bytes;
197+
self.rows
198+
.entry(mid)
199+
.and_modify(|x| x.value_bytes += add)
200+
.or_insert(r);
201+
}
202+
}
203+
204+
pub(super) fn build_query_batch(a: QueryAccum) -> Result<RecordBatch> {
205+
let schema = Arc::new(Schema::new(vec![
206+
Field::new("measurement_id", DataType::Int64, false),
207+
Field::new("commit_sha", DataType::Utf8, false),
208+
Field::new("dataset", DataType::Utf8, false),
209+
Field::new("dataset_variant", DataType::Utf8, true),
210+
Field::new("scale_factor", DataType::Utf8, true),
211+
Field::new("query_idx", DataType::Int32, false),
212+
Field::new("storage", DataType::Utf8, false),
213+
Field::new("engine", DataType::Utf8, false),
214+
Field::new("format", DataType::Utf8, false),
215+
Field::new("value_ns", DataType::Int64, false),
216+
Field::new(
217+
"all_runtimes_ns",
218+
DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
219+
false,
220+
),
221+
Field::new("peak_physical", DataType::Int64, true),
222+
Field::new("peak_virtual", DataType::Int64, true),
223+
Field::new("physical_delta", DataType::Int64, true),
224+
Field::new("virtual_delta", DataType::Int64, true),
225+
Field::new("env_triple", DataType::Utf8, true),
226+
]));
227+
let cols: Vec<ArrayRef> = vec![
228+
Arc::new(Int64Array::from(a.measurement_id)),
229+
Arc::new(StringArray::from(a.commit_sha)),
230+
Arc::new(StringArray::from(a.dataset)),
231+
Arc::new(StringArray::from(a.dataset_variant)),
232+
Arc::new(StringArray::from(a.scale_factor)),
233+
Arc::new(Int32Array::from(a.query_idx)),
234+
Arc::new(StringArray::from(a.storage)),
235+
Arc::new(StringArray::from(a.engine)),
236+
Arc::new(StringArray::from(a.format)),
237+
Arc::new(Int64Array::from(a.value_ns)),
238+
Arc::new(build_list_int64(a.all_runtimes_ns)),
239+
Arc::new(Int64Array::from(a.peak_physical)),
240+
Arc::new(Int64Array::from(a.peak_virtual)),
241+
Arc::new(Int64Array::from(a.physical_delta)),
242+
Arc::new(Int64Array::from(a.virtual_delta)),
243+
Arc::new(StringArray::from(a.env_triple)),
244+
];
245+
Ok(RecordBatch::try_new(schema, cols)?)
246+
}
247+
248+
pub(super) fn build_compression_time_batch(a: CompressionTimeAccum) -> Result<RecordBatch> {
249+
let schema = Arc::new(Schema::new(vec![
250+
Field::new("measurement_id", DataType::Int64, false),
251+
Field::new("commit_sha", DataType::Utf8, false),
252+
Field::new("dataset", DataType::Utf8, false),
253+
Field::new("dataset_variant", DataType::Utf8, true),
254+
Field::new("format", DataType::Utf8, false),
255+
Field::new("op", DataType::Utf8, false),
256+
Field::new("value_ns", DataType::Int64, false),
257+
Field::new(
258+
"all_runtimes_ns",
259+
DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
260+
false,
261+
),
262+
Field::new("env_triple", DataType::Utf8, true),
263+
]));
264+
let cols: Vec<ArrayRef> = vec![
265+
Arc::new(Int64Array::from(a.measurement_id)),
266+
Arc::new(StringArray::from(a.commit_sha)),
267+
Arc::new(StringArray::from(a.dataset)),
268+
Arc::new(StringArray::from(a.dataset_variant)),
269+
Arc::new(StringArray::from(a.format)),
270+
Arc::new(StringArray::from(a.op)),
271+
Arc::new(Int64Array::from(a.value_ns)),
272+
Arc::new(build_list_int64(a.all_runtimes_ns)),
273+
Arc::new(StringArray::from(a.env_triple)),
274+
];
275+
Ok(RecordBatch::try_new(schema, cols)?)
276+
}
277+
278+
pub(super) fn build_random_access_batch(a: RandomAccessAccum) -> Result<RecordBatch> {
279+
let schema = Arc::new(Schema::new(vec![
280+
Field::new("measurement_id", DataType::Int64, false),
281+
Field::new("commit_sha", DataType::Utf8, false),
282+
Field::new("dataset", DataType::Utf8, false),
283+
Field::new("format", DataType::Utf8, false),
284+
Field::new("value_ns", DataType::Int64, false),
285+
Field::new(
286+
"all_runtimes_ns",
287+
DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
288+
false,
289+
),
290+
Field::new("env_triple", DataType::Utf8, true),
291+
]));
292+
let cols: Vec<ArrayRef> = vec![
293+
Arc::new(Int64Array::from(a.measurement_id)),
294+
Arc::new(StringArray::from(a.commit_sha)),
295+
Arc::new(StringArray::from(a.dataset)),
296+
Arc::new(StringArray::from(a.format)),
297+
Arc::new(Int64Array::from(a.value_ns)),
298+
Arc::new(build_list_int64(a.all_runtimes_ns)),
299+
Arc::new(StringArray::from(a.env_triple)),
300+
];
301+
Ok(RecordBatch::try_new(schema, cols)?)
302+
}
303+
304+
pub(super) fn build_compression_size_batch(a: CompressionSizeAccum) -> Result<RecordBatch> {
305+
let n = a.rows.len();
306+
let mut measurement_id = Vec::with_capacity(n);
307+
let mut commit_sha = Vec::with_capacity(n);
308+
let mut dataset = Vec::with_capacity(n);
309+
let mut dataset_variant = Vec::with_capacity(n);
310+
let mut format = Vec::with_capacity(n);
311+
let mut value_bytes = Vec::with_capacity(n);
312+
for (mid, cs) in a.rows {
313+
measurement_id.push(mid);
314+
commit_sha.push(cs.commit_sha);
315+
dataset.push(cs.dataset);
316+
dataset_variant.push(cs.dataset_variant);
317+
format.push(cs.format);
318+
value_bytes.push(cs.value_bytes);
319+
}
320+
let schema = Arc::new(Schema::new(vec![
321+
Field::new("measurement_id", DataType::Int64, false),
322+
Field::new("commit_sha", DataType::Utf8, false),
323+
Field::new("dataset", DataType::Utf8, false),
324+
Field::new("dataset_variant", DataType::Utf8, true),
325+
Field::new("format", DataType::Utf8, false),
326+
Field::new("value_bytes", DataType::Int64, false),
327+
]));
328+
let cols: Vec<ArrayRef> = vec![
329+
Arc::new(Int64Array::from(measurement_id)),
330+
Arc::new(StringArray::from(commit_sha)),
331+
Arc::new(StringArray::from(dataset)),
332+
Arc::new(StringArray::from(dataset_variant)),
333+
Arc::new(StringArray::from(format)),
334+
Arc::new(Int64Array::from(value_bytes)),
335+
];
336+
Ok(RecordBatch::try_new(schema, cols)?)
337+
}
338+
339+
/// Build a non-nullable `List<Int64>` Arrow array from one inner Vec
340+
/// per row. The outer list is non-null; inner i64 values are non-null.
341+
fn build_list_int64(values: Vec<Vec<i64>>) -> ListArray {
342+
let mut offsets: Vec<i32> = Vec::with_capacity(values.len() + 1);
343+
offsets.push(0);
344+
let mut flat: Vec<i64> = Vec::new();
345+
for inner in values {
346+
flat.extend_from_slice(&inner);
347+
offsets.push(flat.len() as i32);
348+
}
349+
let values_arr = Int64Array::from(flat);
350+
let field = Arc::new(Field::new("item", DataType::Int64, false));
351+
ListArray::new(
352+
field,
353+
OffsetBuffer::new(offsets.into()),
354+
Arc::new(values_arr),
355+
None,
356+
)
357+
}

0 commit comments

Comments
 (0)