Skip to content

Commit 10b00c9

Browse files
authored
Merge branch 'master' into bfops/regen-dlls
2 parents 5552adb + eddb19b commit 10b00c9

64 files changed

Lines changed: 3286 additions & 3786 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 2 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ enum-map = "2.6.3"
195195
env_logger = "0.10"
196196
ethnum = { version = "1.5.0", features = ["serde"] }
197197
flate2 = "1.0.24"
198-
flume = { version = "0.11", default-features = false, features = ["async"] }
199198
foldhash = "0.2.0"
200199
fs-err = "2.9.0"
201200
fs_extra = "1.3.0"

crates/client-api/src/lib.rs

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl Host {
137137
pub async fn exec_sql(
138138
&self,
139139
auth: AuthCtx,
140-
database: Database,
140+
_database: Database,
141141
confirmed_read: bool,
142142
body: String,
143143
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
@@ -146,51 +146,44 @@ impl Host {
146146
.await
147147
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
148148

149-
let (tx_offset, durable_offset, json) = self
150-
.host_controller
151-
.using_database(database, self.replica_id, move |db| async move {
152-
tracing::info!(sql = body);
153-
let mut header = vec![];
154-
let sql_start = std::time::Instant::now();
155-
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
156-
let _guard = sql_span.enter();
157-
158-
let result = sql::execute::run(
159-
db.clone(),
160-
body,
161-
auth,
162-
Some(module_host.info.subscriptions.clone()),
163-
Some(module_host),
164-
&mut header,
165-
)
166-
.await
167-
.map_err(|e| {
168-
log::warn!("{e}");
169-
(StatusCode::BAD_REQUEST, e.to_string())
170-
})?;
171-
172-
let total_duration = sql_start.elapsed();
173-
drop(_guard);
174-
sql_span.record("total_duration", tracing::field::debug(total_duration));
175-
176-
let schema = header
177-
.into_iter()
178-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
179-
.collect();
180-
181-
Ok::<_, (StatusCode, String)>((
182-
result.tx_offset,
183-
db.durable_tx_offset(),
184-
vec![SqlStmtResult {
185-
schema,
186-
rows: result.rows,
187-
total_duration_micros: total_duration.as_micros() as u64,
188-
stats: SqlStmtStats::from_metrics(&result.metrics),
189-
}],
190-
))
191-
})
192-
.await
193-
.map_err(log_and_500)??;
149+
tracing::info!(sql = body);
150+
let mut header = vec![];
151+
let sql_start = std::time::Instant::now();
152+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
153+
let _guard = sql_span.enter();
154+
let db = module_host.relational_db().clone();
155+
let durable_offset = db.durable_tx_offset();
156+
157+
let result = sql::execute::run(
158+
db,
159+
body,
160+
auth,
161+
Some(module_host.info.subscriptions.clone()),
162+
Some(module_host),
163+
&mut header,
164+
)
165+
.await
166+
.map_err(|e| {
167+
log::warn!("{e}");
168+
(StatusCode::BAD_REQUEST, e.to_string())
169+
})?;
170+
171+
let total_duration = sql_start.elapsed();
172+
drop(_guard);
173+
sql_span.record("total_duration", tracing::field::debug(total_duration));
174+
175+
let schema = header
176+
.into_iter()
177+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
178+
.collect();
179+
180+
let tx_offset = result.tx_offset;
181+
let json = vec![SqlStmtResult {
182+
schema,
183+
rows: result.rows,
184+
total_duration_micros: total_duration.as_micros() as u64,
185+
stats: SqlStmtStats::from_metrics(&result.metrics),
186+
}];
194187

195188
if confirmed_read && let Some(mut durable_offset) = durable_offset {
196189
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;

crates/commitlog/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ itertools.workspace = true
2626
log.workspace = true
2727
memmap2 = "0.9.4"
2828
nix = { workspace = true, optional = true, features = ["fs"] }
29+
scopeguard.workspace = true
2930
serde = { workspace = true, optional = true }
3031
spacetimedb-fs-utils.workspace = true
3132
spacetimedb-paths.workspace = true
@@ -46,6 +47,7 @@ pretty_assertions.workspace = true
4647
# Also enable 'test' feature, so integration tests can use the helpers.
4748
spacetimedb-commitlog = { path = ".", features = ["test", "streaming"] }
4849

50+
criterion.workspace = true
4951
env_logger.workspace = true
5052
once_cell.workspace = true
5153
pretty_assertions = { workspace = true, features = ["unstable"] }
@@ -57,3 +59,7 @@ tokio-stream = { version = "0.1.17", features = ["fs"] }
5759

5860
[lints]
5961
workspace = true
62+
63+
[[bench]]
64+
name = "write"
65+
harness = false
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use spacetimedb_sats::buffer::BufWriter;
2+
3+
/// A commitlog payload consisting of uninterpreted bytes.
4+
pub struct Payload(Box<[u8]>);
5+
6+
impl Payload {
7+
pub fn new(bytes: impl Into<Box<[u8]>>) -> Self {
8+
Self(bytes.into())
9+
}
10+
}
11+
12+
impl spacetimedb_commitlog::Encode for Payload {
13+
fn encode_record<W: BufWriter>(&self, writer: &mut W) {
14+
writer.put_u64(self.0.len() as _);
15+
writer.put_slice(&self.0[..]);
16+
}
17+
}
18+
19+
impl spacetimedb_commitlog::Encode for &Payload {
20+
fn encode_record<W: BufWriter>(&self, writer: &mut W) {
21+
(*self).encode_record(writer)
22+
}
23+
}

crates/commitlog/benches/write.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use core::fmt;
2+
use std::num::NonZeroU16;
3+
4+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput};
5+
use spacetimedb_commitlog::{Commitlog, Options, Transaction};
6+
use spacetimedb_paths::{server::CommitLogDir, FromPathUnchecked as _};
7+
use tempfile::tempdir_in;
8+
9+
mod common;
10+
use common::Payload;
11+
12+
struct Params {
13+
payloads: Box<[Payload]>,
14+
txs_per_commit: NonZeroU16,
15+
total_appends: u64,
16+
fsync_every: u64,
17+
}
18+
19+
impl Params {
20+
fn with_payloads(payloads: impl Into<Box<[Payload]>>) -> Self {
21+
Self {
22+
payloads: payloads.into(),
23+
txs_per_commit: NonZeroU16::new(1).unwrap(),
24+
total_appends: 1_000,
25+
fsync_every: 32,
26+
}
27+
}
28+
}
29+
30+
impl fmt::Display for Params {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
write!(
33+
f,
34+
"n={} tx/commit={} fsync={}",
35+
self.total_appends, self.txs_per_commit, self.fsync_every
36+
)
37+
}
38+
}
39+
40+
fn bench_append(c: &mut Criterion, label: &str, params: Params) {
41+
let id = BenchmarkId::from_parameter(&params);
42+
c.benchmark_group(label)
43+
.sample_size(10)
44+
.sampling_mode(SamplingMode::Flat)
45+
.throughput(Throughput::Elements(params.total_appends))
46+
.bench_with_input(
47+
id,
48+
&params,
49+
|b,
50+
Params {
51+
payloads,
52+
txs_per_commit,
53+
total_appends,
54+
fsync_every,
55+
}| {
56+
let tmp = tempdir_in(".").unwrap();
57+
let dir = CommitLogDir::from_path_unchecked(tmp.path());
58+
let clog = Commitlog::open(dir, Options::default(), None).unwrap();
59+
let mut offset = clog.max_committed_offset().unwrap_or_default();
60+
61+
b.iter(|| {
62+
let mut payloads = payloads.iter().cycle();
63+
for i in 0..*total_appends {
64+
clog.commit(payloads.by_ref().take(txs_per_commit.get() as usize).map(|payload| {
65+
let tx = Transaction {
66+
offset,
67+
txdata: payload,
68+
};
69+
offset += 1;
70+
tx
71+
}))
72+
.unwrap();
73+
if i % fsync_every == 0 {
74+
clog.flush_and_sync().unwrap();
75+
}
76+
}
77+
clog.flush_and_sync().unwrap();
78+
})
79+
},
80+
);
81+
}
82+
83+
fn baseline(c: &mut Criterion) {
84+
let params = Params::with_payloads([Payload::new([b'z'; 64])]);
85+
bench_append(c, "baseline", params);
86+
}
87+
88+
fn large_payload(c: &mut Criterion) {
89+
let params = Params::with_payloads([Payload::new([b'z'; 4096])]);
90+
bench_append(c, "large payload", params);
91+
}
92+
93+
fn mixed_payloads(c: &mut Criterion) {
94+
let params = Params::with_payloads([
95+
Payload::new([b'a'; 64]),
96+
Payload::new([b'b'; 512]),
97+
Payload::new([b'c'; 1024]),
98+
Payload::new([b'd'; 4096]),
99+
Payload::new([b'e'; 8102]),
100+
]);
101+
bench_append(c, "mixed payloads", params);
102+
}
103+
104+
fn mixed_payloads_with_batching(c: &mut Criterion) {
105+
let params = Params {
106+
txs_per_commit: NonZeroU16::new(16).unwrap(),
107+
..Params::with_payloads([
108+
Payload::new([b'a'; 64]),
109+
Payload::new([b'b'; 512]),
110+
Payload::new([b'c'; 1024]),
111+
Payload::new([b'd'; 4096]),
112+
Payload::new([b'e'; 8102]),
113+
])
114+
};
115+
bench_append(c, "mixed payloads with batching", params);
116+
}
117+
118+
criterion_group!(
119+
benches,
120+
baseline,
121+
large_payload,
122+
mixed_payloads,
123+
mixed_payloads_with_batching
124+
);
125+
criterion_main!(benches);

crates/commitlog/src/commitlog.rs

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,33 +59,27 @@ impl<R: Repo, T> Generic<R, T> {
5959
if !tail.is_empty() {
6060
debug!("segments: {tail:?}");
6161
}
62-
let head = if let Some(last) = tail.pop() {
63-
debug!("resuming last segment: {last}");
64-
// Resume the last segment for writing, or create a new segment
65-
// starting from the last good commit + 1.
66-
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
67-
// The first commit in the last segment being corrupt is an
68-
// edge case: we'd try to start a new segment with an offset
69-
// equal to the already existing one, which would fail.
70-
//
71-
// We cannot just skip it either, as we don't know the reason
72-
// for the corruption (there could be more, potentially
73-
// recoverable commits in the segment).
74-
//
75-
// Thus, provide some context about what is wrong and refuse to
76-
// start.
77-
if meta.tx_range.is_empty() {
78-
return Err(io::Error::new(
79-
io::ErrorKind::InvalidData,
80-
format!("repo {}: first commit in resumed segment {} is corrupt", repo, last),
81-
));
62+
63+
// Resume the last segment for writing, or
64+
// create a new segment starting from the last good commit + 1.
65+
let head = loop {
66+
if let Some(last) = tail.pop() {
67+
info!("repo {}: resuming last segment: {}", repo, last);
68+
match repo::resume_segment_writer(&repo, opts, last)? {
69+
repo::ResumedSegment::Empty => {
70+
repo.remove_segment(last)?;
71+
continue;
72+
}
73+
repo::ResumedSegment::Resumed(writer) => break writer,
74+
repo::ResumedSegment::Sealed(meta) | repo::ResumedSegment::Corrupted(meta) => {
75+
tail.push(meta.tx_range.start);
76+
break repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)?;
77+
}
8278
}
83-
tail.push(meta.tx_range.start);
84-
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
85-
})?
86-
} else {
87-
debug!("starting fresh log");
88-
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
79+
} else {
80+
info!("repo {}: starting fresh log", repo);
81+
break repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?;
82+
}
8983
};
9084

9185
Ok(Self {

0 commit comments

Comments
 (0)