Skip to content

Commit c9564fa

Browse files
committed
refactor(tpcc): split backends and improve benchmark tooling
1 parent a791ec2 commit c9564fa

File tree

22 files changed

+1317
-566
lines changed

22 files changed

+1317
-566
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ kite_sql_tpcc
2525
copy.csv
2626

2727
tests/data/row_20000.csv
28-
tests/data/distinct_rows.csv
28+
tests/data/distinct_rows.csv

Makefile

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ CARGO ?= cargo
33
WASM_PACK ?= wasm-pack
44
SQLLOGIC_PATH ?= tests/slt/**/*.slt
55
PYO3_PYTHON ?= /usr/bin/python3.12
6+
TPCC_MEASURE_TIME ?= 15
7+
TPCC_NUM_WARE ?= 1
8+
TPCC_PPROF_OUTPUT ?= /tmp/tpcc_lmdb.svg
69

7-
.PHONY: test test-python test-wasm test-slt test-all wasm-build check tpcc tpcc-dual cargo-check build wasm-examples native-examples fmt clippy
10+
.PHONY: test test-python test-wasm test-slt test-all wasm-build check tpcc tpcc-kitesql-rocksdb tpcc-kitesql-lmdb tpcc-lmdb-flamegraph tpcc-sqlite-practical tpcc-dual cargo-check build wasm-examples native-examples fmt clippy
811

912
## Run default Rust tests in the current environment (non-WASM).
1013
test:
@@ -48,9 +51,23 @@ clippy:
4851
## Run formatting (check mode) and clippy linting together.
4952
check: fmt clippy
5053

51-
## Execute the TPCC workload example as a standalone command.
52-
tpcc:
53-
$(CARGO) run -p tpcc --release
54+
tpcc: tpcc-kitesql-lmdb
55+
56+
## Execute the TPCC workload on KiteSQL with RocksDB storage.
57+
tpcc-kitesql-rocksdb:
58+
$(CARGO) run -p tpcc --release -- --backend kitesql-rocksdb
59+
60+
## Execute the TPCC workload on KiteSQL with LMDB storage.
61+
tpcc-kitesql-lmdb:
62+
$(CARGO) run -p tpcc --release -- --backend kitesql-lmdb
63+
64+
## Execute TPCC on LMDB and emit a pprof flamegraph SVG.
65+
tpcc-lmdb-flamegraph:
66+
CARGO_PROFILE_RELEASE_DEBUG=true $(CARGO) run -p tpcc --release --features pprof -- --backend kitesql-lmdb --measure-time $(TPCC_MEASURE_TIME) --num-ware $(TPCC_NUM_WARE) --pprof-output $(TPCC_PPROF_OUTPUT)
67+
68+
## Execute the TPCC workload on SQLite with the practical profile.
69+
tpcc-sqlite-practical:
70+
$(CARGO) run -p tpcc --release -- --backend sqlite --sqlite-profile practical --path kite_sql_tpcc.sqlite
5471

5572
## Execute TPCC while mirroring every statement to an in-memory SQLite instance for validation.
5673
tpcc-dual:

README.md

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,14 @@ Run `make tpcc-dual` to mirror every TPCC statement to an in-memory SQLite datab
179179
- KIOXIA-EXCERIA PLUS G3 SSD
180180
- Tips: TPC-C currently only supports single thread
181181
182-
All cases have been fully optimized.
183-
```shell
184-
<90th Percentile RT (MaxRT)>
185-
New-Order : 0.002 (0.005)
186-
Payment : 0.001 (0.013)
187-
Order-Status : 0.002 (0.006)
188-
Delivery : 0.010 (0.023)
189-
Stock-Level : 0.002 (0.017)
190-
<TpmC>
191-
27226 Tpmc
192-
```
182+
Recent 720-second local comparison on the machine above:
183+
184+
| Backend | TpmC | New-Order p90 | Payment p90 | Order-Status p90 | Delivery p90 | Stock-Level p90 |
185+
| --- | ---: | ---: | ---: | ---: | ---: | ---: |
186+
| SQLite practical | 35516 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s |
187+
| KiteSQL LMDB | 29171 | 0.001s | 0.001s | 0.001s | 0.015s | 0.002s |
188+
189+
The detailed raw outputs for both runs are recorded in [tpcc/README.md](tpcc/README.md).
193190
#### 👉[check more](tpcc/README.md)
194191
195192
## Roadmap

src/execution/dql/aggregate/hash_agg.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,22 +88,6 @@ impl HashAggExecutor {
8888

8989
while arena.next_tuple(self.input)? {
9090
let tuple = arena.result_tuple();
91-
let mut values = Vec::with_capacity(self.agg_calls.len());
92-
93-
for expr in &self.agg_calls {
94-
if let ScalarExpression::AggCall { args, .. } = expr {
95-
if args.len() > 1 {
96-
return Err(DatabaseError::UnsupportedStmt(
97-
"currently aggregate functions only support a single Column as a parameter"
98-
.to_string(),
99-
));
100-
}
101-
values.push(args[0].eval(Some((tuple, &self.input_schema)))?);
102-
} else {
103-
unreachable!()
104-
}
105-
}
106-
10791
let group_keys = self
10892
.groupby_exprs
10993
.iter()
@@ -114,8 +98,19 @@ impl HashAggExecutor {
11498
Entry::Occupied(entry) => entry.into_mut(),
11599
Entry::Vacant(entry) => entry.insert(create_accumulators(&self.agg_calls)?),
116100
};
117-
for (acc, value) in entry.iter_mut().zip_eq(values.iter()) {
118-
acc.update_value(value)?;
101+
102+
for (acc, expr) in entry.iter_mut().zip_eq(self.agg_calls.iter()) {
103+
let ScalarExpression::AggCall { args, .. } = expr else {
104+
unreachable!()
105+
};
106+
if args.len() > 1 {
107+
return Err(DatabaseError::UnsupportedStmt(
108+
"currently aggregate functions only support a single Column as a parameter"
109+
.to_string(),
110+
));
111+
}
112+
let value = args[0].eval(Some((tuple, &self.input_schema)))?;
113+
acc.update_value(&value)?;
119114
}
120115
}
121116

@@ -128,12 +123,15 @@ impl HashAggExecutor {
128123
};
129124

130125
let output = arena.result_tuple_mut();
126+
131127
output.pk = None;
132-
output.values = accs
133-
.iter()
134-
.map(|acc| acc.evaluate())
135-
.chain(group_keys.into_iter().map(Ok))
136-
.try_collect()?;
128+
output.values.clear();
129+
output.values.reserve(accs.len() + group_keys.len());
130+
131+
for acc in accs.iter() {
132+
output.values.push(acc.evaluate()?);
133+
}
134+
output.values.extend(group_keys);
137135
arena.resume();
138136
Ok(())
139137
}

src/execution/dql/aggregate/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ fn create_accumulator(expr: &ScalarExpression) -> Result<Box<dyn Accumulator>, D
6363
}
6464
}
6565

66+
#[inline]
6667
pub(crate) fn create_accumulators(
6768
exprs: &[ScalarExpression],
6869
) -> Result<Vec<Box<dyn Accumulator>>, DatabaseError> {

src/execution/dql/aggregate/simple_agg.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ use crate::planner::operator::aggregate::AggregateOperator;
2020
use crate::planner::LogicalPlan;
2121
use crate::storage::Transaction;
2222
use crate::types::tuple::SchemaRef;
23-
use crate::types::value::DataValue;
24-
use itertools::Itertools;
25-
2623
pub struct SimpleAggExecutor {
2724
agg_calls: Vec<ScalarExpression>,
2825
input_schema: SchemaRef,
@@ -78,25 +75,29 @@ impl SimpleAggExecutor {
7875

7976
while arena.next_tuple(input)? {
8077
let tuple = arena.result_tuple();
81-
let values: Vec<DataValue> = self
82-
.agg_calls
83-
.iter()
84-
.map(|expr| match expr {
85-
ScalarExpression::AggCall { args, .. } => {
86-
args[0].eval(Some((tuple, &self.input_schema)))
87-
}
88-
_ => unreachable!(),
89-
})
90-
.try_collect()?;
78+
for (acc, expr) in accs.iter_mut().zip(self.agg_calls.iter()) {
79+
let ScalarExpression::AggCall { args, .. } = expr else {
80+
unreachable!()
81+
};
82+
if args.len() > 1 {
83+
return Err(DatabaseError::UnsupportedStmt(
84+
"currently aggregate functions only support a single Column as a parameter"
85+
.to_string(),
86+
));
87+
}
9188

92-
for (acc, value) in accs.iter_mut().zip(values.iter()) {
93-
acc.update_value(value)?;
89+
let value = args[0].eval(Some((tuple, &self.input_schema)))?;
90+
acc.update_value(&value)?;
9491
}
9592
}
9693

9794
let output = arena.result_tuple_mut();
9895
output.pk = None;
99-
output.values = accs.into_iter().map(|acc| acc.evaluate()).try_collect()?;
96+
output.values.clear();
97+
output.values.reserve(accs.len());
98+
for acc in accs {
99+
output.values.push(acc.evaluate()?);
100+
}
100101
arena.resume();
101102
Ok(())
102103
}

tpcc/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ name = "tpcc"
33
version = "0.1.1"
44
edition = "2021"
55

6+
[features]
7+
pprof = ["dep:pprof"]
8+
69
[dependencies]
710
clap = { version = "4", features = ["derive"] }
811
chrono = { version = "0.4" }
@@ -14,3 +17,6 @@ rust_decimal = { version = "1" }
1417
thiserror = { version = "1" }
1518
sqlite = { version = "0.34" }
1619
sqlparser = { version = "0.61" }
20+
21+
[target.'cfg(unix)'.dependencies]
22+
pprof = { version = "0.15", features = ["flamegraph"], optional = true }

0 commit comments

Comments
 (0)