Skip to content

Commit 1562792

Browse files
committed
Merge remote-tracking branch 'origin/master' into bfops/smoketest-flakes
2 parents 68ae475 + 6108b79 commit 1562792

44 files changed

Lines changed: 1720 additions & 1257 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.cargo/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ rustflags = ["--cfg", "tokio_unstable"]
55
bump-versions = "run -p upgrade-version --"
66
llm = "run --package xtask-llm-benchmark --bin llm_benchmark --"
77
ci = "run -p ci --"
8+
regen = "run -p regen --"
89
smoketest = "ci smoketests --"
910
smoketests = "smoketest"
1011
lint = "ci lint --"

.github/workflows/ci.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ jobs:
689689
}
690690
691691
- name: Hydrate Unity SDK DLLs
692-
run: cargo ci dlls
692+
run: cargo regen csharp dlls
693693

694694
- name: Check Unity meta files
695695
uses: DeNA/unity-meta-check@v3
@@ -830,10 +830,9 @@ jobs:
830830
ln -sf $CARGO_HOME/bin/spacetimedb-cli $CARGO_HOME/bin/spacetime
831831
832832
- name: Check quickstart-chat bindings are up to date
833-
working-directory: sdks/csharp
834833
run: |
835-
bash tools~/gen-quickstart.sh
836-
"${GITHUB_WORKSPACE}"/tools/check-diff.sh examples~/quickstart-chat || {
834+
bash sdks/csharp/tools~/gen-quickstart.sh
835+
tools/check-diff.sh templates/chat-console-cs/module_bindings || {
837836
echo 'Error: quickstart-chat bindings have changed. Please run `sdks/csharp/tools~/gen-quickstart.sh`.'
838837
exit 1
839838
}

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ members = [
6565
"tools/replace-spacetimedb",
6666
"tools/generate-client-api",
6767
"tools/gen-bindings",
68+
"tools/regen",
6869
"tools/xtask-llm-benchmark",
6970
"crates/bindings-typescript/test-app/server",
7071
"crates/bindings-typescript/test-react-router-app/server",

crates/cli/src/subcommands/subscribe.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ enum Error {
240240
},
241241
#[error("encountered failed transaction: {reason}")]
242242
TransactionFailure { reason: Box<str> },
243+
#[error("encountered error in initial subscribe: {reason}")]
244+
SubscribeFailure { reason: Box<str> },
243245
#[error("error formatting response: {source:#}")]
244246
Reformat {
245247
#[source]
@@ -295,6 +297,9 @@ where
295297
}
296298
break;
297299
}
300+
ws_v1::ServerMessage::SubscriptionError(error) => {
301+
return Err(Error::SubscribeFailure { reason: error.error });
302+
}
298303
ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status, .. }) => {
299304
return Err(match status {
300305
ws_v1::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
@@ -341,6 +346,9 @@ where
341346
details: "received a second initial subscription update",
342347
})
343348
}
349+
ws_v1::ServerMessage::SubscriptionError(error) => {
350+
return Err(Error::SubscribeFailure { reason: error.error });
351+
}
344352
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { update, .. })
345353
| ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate {
346354
status: ws_v1::UpdateStatus::Committed(update),

crates/commitlog/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use log::trace;
99
use repo::{fs::OnNewSegmentFn, Repo};
1010
use spacetimedb_paths::server::CommitLogDir;
1111

12+
pub use spacetimedb_fs_utils::compression::CompressionStats;
13+
1214
pub mod commit;
1315
pub mod commitlog;
1416
mod index;
@@ -330,16 +332,18 @@ impl<T> Commitlog<T> {
330332
}
331333

332334
/// Compress the segments at the offsets provided, marking them as immutable.
333-
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
335+
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<CompressionStats> {
334336
// even though `compress_segment` takes &self, we take an
335337
// exclusive lock to avoid any weirdness happening.
336338
#[allow(clippy::readonly_write_lock)]
337339
let inner = self.inner.write().unwrap();
338340
assert!(!offsets.contains(&inner.head.min_tx_offset()));
339341
// TODO: parallelize, maybe
340-
offsets
341-
.iter()
342-
.try_for_each(|&offset| inner.repo.compress_segment(offset))
342+
let mut stats = <_>::default();
343+
for offset in offsets {
344+
stats += inner.repo.compress_segment(*offset)?;
345+
}
346+
Ok(stats)
343347
}
344348

345349
/// Remove all data from the log and reopen it.

crates/commitlog/src/repo/fs.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use std::io;
44
use std::sync::Arc;
55

66
use log::{debug, warn};
7-
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
7+
use spacetimedb_fs_utils::compression::{CompressReader, CompressionStats};
88
use spacetimedb_fs_utils::lockfile;
99
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
1010
use tempfile::NamedTempFile;
1111

12-
use crate::segment::{self, FileLike};
13-
1412
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
13+
use crate::repo::CompressOnce;
14+
use crate::segment::{self, FileLike};
1515

1616
const SEGMENT_FILE_EXT: &str = ".stdb.log";
1717

@@ -317,21 +317,18 @@ impl Repo for Fs {
317317
fs::remove_file(self.segment_path(offset))
318318
}
319319

320-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
320+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
321321
let src = self.open_segment_reader(offset)?;
322322
// if it's already compressed, leave it be
323323
let CompressReader::None(mut src) = src.inner else {
324-
return Ok(());
324+
return Ok(<_>::default());
325325
};
326326

327327
let mut dst = NamedTempFile::new_in(&self.root)?;
328-
// bytes per frame. in the future, it might be worth looking into putting
329-
// every commit into its own frame, to make seeking more efficient.
330-
let max_frame_size = 0x1000;
331-
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
328+
let stats = f.compress(&mut src, &mut dst)?;
332329
dst.persist(self.segment_path(offset))?;
333330

334-
Ok(())
331+
Ok(stats)
335332
}
336333

337334
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/commitlog/src/repo/mem.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::repo::{
1111

1212
mod segment;
1313
pub use segment::{ReadOnlySegment, Segment};
14+
use spacetimedb_fs_utils::compression::CompressionStats;
1415

1516
pub const PAGE_SIZE: usize = 4096;
1617

@@ -105,8 +106,8 @@ impl Repo for Memory {
105106
Ok(())
106107
}
107108

108-
fn compress_segment(&self, _offset: u64) -> io::Result<()> {
109-
Ok(())
109+
fn compress_segment_with(&self, _: u64, _: impl super::CompressOnce) -> io::Result<CompressionStats> {
110+
Ok(<_>::default())
110111
}
111112

112113
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

crates/commitlog/src/repo/mod.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{
44
};
55

66
use log::{debug, warn};
7+
use spacetimedb_fs_utils::compression::Zstd;
8+
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};
79

810
use crate::{
911
commit::Commit,
@@ -121,7 +123,13 @@ pub trait Repo: Clone + fmt::Display {
121123
fn remove_segment(&self, offset: u64) -> io::Result<()>;
122124

123125
/// Compress a segment in storage, marking it as immutable.
124-
fn compress_segment(&self, offset: u64) -> io::Result<()>;
126+
fn compress_segment(&self, offset: u64) -> io::Result<CompressionStats> {
127+
self.compress_segment_with(offset, segment_compressor())
128+
}
129+
130+
/// Compress a segment using a supplied [CompressOnce], marking it as
131+
/// immutable.
132+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats>;
125133

126134
/// Traverse all segments in this repository and return list of their
127135
/// offsets, sorted in ascending order.
@@ -164,8 +172,8 @@ impl<T: Repo> Repo for &T {
164172
T::remove_segment(self, offset)
165173
}
166174

167-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
168-
T::compress_segment(self, offset)
175+
fn compress_segment_with(&self, offset: u64, f: impl CompressOnce) -> io::Result<CompressionStats> {
176+
T::compress_segment_with(self, offset, f)
169177
}
170178

171179
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
@@ -408,6 +416,17 @@ pub fn open_segment_reader<R: Repo>(
408416
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
409417
}
410418

419+
/// Obtain the canonical [CompressOnce] compressor for segments.
420+
///
421+
/// The compressor will create seekable [Zstd] archives with a max frame size
422+
/// of 4KiB. That is, seeking to an arbitrary byte offset (of the uncompressed
423+
/// segment) within the archive will decompress 4KiB of data on average.
424+
pub fn segment_compressor() -> Zstd {
425+
Zstd {
426+
max_frame_size: Some(0x1000),
427+
}
428+
}
429+
411430
fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
412431
repo.segment_file_path(offset)
413432
.unwrap_or_else(|| format!("offset {offset}"))

crates/commitlog/src/tests/partial.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88

99
use log::{debug, info};
1010
use pretty_assertions::assert_matches;
11+
use spacetimedb_fs_utils::compression::CompressionStats;
1112

1213
use crate::{
1314
commitlog, payload,
@@ -295,8 +296,8 @@ impl Repo for ShortMem {
295296
self.inner.remove_segment(offset)
296297
}
297298

298-
fn compress_segment(&self, offset: u64) -> io::Result<()> {
299-
self.inner.compress_segment(offset)
299+
fn compress_segment_with(&self, offset: u64, f: impl repo::CompressOnce) -> io::Result<CompressionStats> {
300+
self.inner.compress_segment_with(offset, f)
300301
}
301302

302303
fn existing_offsets(&self) -> io::Result<Vec<u64>> {

0 commit comments

Comments
 (0)