Skip to content

Commit 6d1d4e3

Browse files
committed
refactor(sqlite-storage): extract DBHead to vbare-versioned protocol crate
1 parent 20a8181 commit 6d1d4e3

17 files changed

Lines changed: 226 additions & 162 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ members = [
5757
"engine/sdks/rust/envoy-client",
5858
"engine/sdks/rust/envoy-protocol",
5959
"engine/sdks/rust/epoxy-protocol",
60+
"engine/sdks/rust/sqlite-storage-protocol",
6061
"engine/sdks/rust/test-envoy",
6162
"engine/sdks/rust/ups-protocol",
6263
"rivetkit-rust/packages/client",
@@ -538,6 +539,9 @@ members = [
538539
[workspace.dependencies.rivet-envoy-protocol]
539540
path = "engine/sdks/rust/envoy-protocol"
540541

542+
[workspace.dependencies.rivet-sqlite-storage-protocol]
543+
path = "engine/sdks/rust/sqlite-storage-protocol"
544+
541545
[workspace.dependencies.rivetkit-client-protocol]
542546
path = "rivetkit-rust/packages/client-protocol"
543547

engine/packages/pegboard/tests/actor_sqlite_migration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use sqlite_storage::{
1111
keys::meta_key,
1212
ltx::{LtxHeader, encode_ltx_v3},
1313
open::OpenConfig,
14-
types::{DirtyPage, SqliteOrigin},
14+
types::{DirtyPage, SqliteOrigin, encode_db_head},
1515
udb::{self, WriteOp},
1616
};
1717
use tempfile::tempdir;
@@ -138,7 +138,7 @@ async fn age_v1_migration_head(
138138
db,
139139
&pegboard::actor_sqlite::sqlite_subspace(),
140140
engine.op_counter.as_ref(),
141-
vec![WriteOp::put(meta_key(actor_id), serde_bare::to_vec(&head)?)],
141+
vec![WriteOp::put(meta_key(actor_id), encode_db_head(&head)?)],
142142
)
143143
.await
144144
}

engine/packages/sqlite-storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde_bare.workspace = true
2222
thiserror.workspace = true
2323
tokio.workspace = true
2424
tracing.workspace = true
25+
rivet-sqlite-storage-protocol.workspace = true
2526
universaldb.workspace = true
2627

2728
[dev-dependencies]

engine/packages/sqlite-storage/src/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::keys::{
1616
};
1717
use crate::ltx::{LtxHeader, decode_ltx_v3, encode_ltx_v3};
1818
use crate::quota::{encode_db_head_with_usage, tracked_storage_entry_size};
19-
use crate::types::{DirtyPage, SQLITE_MAX_DELTA_BYTES, SqliteMeta, SqliteOrigin, decode_db_head};
19+
use crate::types::{DirtyPage, SQLITE_MAX_DELTA_BYTES, SqliteMeta, SqliteOrigin, decode_db_head, encode_db_head, new_db_head};
2020
use crate::udb;
2121

2222
#[derive(Debug, Clone, PartialEq, Eq)]

engine/packages/sqlite-storage/src/compaction/shard.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::keys::{
1313
};
1414
use crate::ltx::{LtxHeader, decode_ltx_v3, encode_ltx_v3};
1515
use crate::quota::{encode_db_head_with_usage, tracked_storage_entry_size};
16-
use crate::types::{DBHead, DirtyPage, SQLITE_PAGE_SIZE, decode_db_head};
16+
use crate::types::{DBHead, DirtyPage, SQLITE_PAGE_SIZE, decode_db_head, encode_db_head, new_db_head};
1717
use crate::udb::{self, WriteOp};
1818

1919
const PIDX_PGNO_BYTES: usize = std::mem::size_of::<u32>();
@@ -528,7 +528,8 @@ mod tests {
528528
use crate::test_utils::{read_value, scan_prefix_values, test_db};
529529
use crate::types::{
530530
DBHead, DirtyPage, FetchedPage, SQLITE_DEFAULT_MAX_STORAGE_BYTES, SQLITE_PAGE_SIZE,
531-
SQLITE_SHARD_SIZE, SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin,
531+
SQLITE_SHARD_SIZE, SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin, encode_db_head,
532+
new_db_head,
532533
};
533534
use crate::udb::{WriteOp, apply_write_ops, test_hooks};
534535

@@ -631,7 +632,7 @@ mod tests {
631632
&engine.subspace,
632633
engine.op_counter.as_ref(),
633634
vec![
634-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
635+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
635636
WriteOp::put(
636637
delta_blob_key(TEST_ACTOR, 1),
637638
encoded_blob(1, 5, &[(1, 0x11)]),
@@ -680,7 +681,7 @@ mod tests {
680681
.is_empty()
681682
);
682683

683-
let stored_head: DBHead = serde_bare::from_slice(
684+
let stored_head = decode_db_head(
684685
&read_value(&engine, meta_key(TEST_ACTOR))
685686
.await?
686687
.expect("meta should exist after compaction"),
@@ -729,7 +730,7 @@ mod tests {
729730
&engine.subspace,
730731
engine.op_counter.as_ref(),
731732
vec![
732-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
733+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
733734
WriteOp::put(
734735
shard_key(TEST_ACTOR, 0),
735736
encoded_blob(0.max(1), 2, &[(1, 0x10), (2, 0x20)]),
@@ -789,7 +790,7 @@ mod tests {
789790
&engine.subspace,
790791
engine.op_counter.as_ref(),
791792
vec![
792-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
793+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
793794
WriteOp::put(
794795
delta_blob_key(TEST_ACTOR, 4),
795796
encoded_blob(4, 2, &[(1, 0x10)]),
@@ -835,7 +836,7 @@ mod tests {
835836
&engine.subspace,
836837
engine.op_counter.as_ref(),
837838
vec![
838-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
839+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
839840
WriteOp::put(
840841
shard_key(TEST_ACTOR, 0),
841842
encoded_blob(1, 2, &[(1, 0x10), (2, 0x20)]),
@@ -882,7 +883,7 @@ mod tests {
882883
&engine.subspace,
883884
engine.op_counter.as_ref(),
884885
vec![
885-
WriteOp::put(meta_key(FAIL_ACTOR), serde_bare::to_vec(&head)?),
886+
WriteOp::put(meta_key(FAIL_ACTOR), encode_db_head(&head)?),
886887
WriteOp::put(
887888
delta_blob_key(FAIL_ACTOR, 4),
888889
encoded_blob(4, 2, &[(1, 0x10)]),
@@ -979,7 +980,7 @@ mod tests {
979980
&engine.subspace,
980981
engine.op_counter.as_ref(),
981982
vec![
982-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
983+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
983984
WriteOp::put(
984985
delta_blob_key(TEST_ACTOR, 1),
985986
encoded_blob(1, 1, &[(1, 0x10)]),
@@ -1009,7 +1010,7 @@ mod tests {
10091010
engine.op_counter.as_ref(),
10101011
vec![WriteOp::put(
10111012
meta_key(TEST_ACTOR),
1012-
serde_bare::to_vec(&updated_head)?,
1013+
encode_db_head(&updated_head)?,
10131014
)],
10141015
)
10151016
.await?;
@@ -1057,7 +1058,7 @@ mod tests {
10571058
&engine.subspace,
10581059
engine.op_counter.as_ref(),
10591060
vec![
1060-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1061+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
10611062
WriteOp::put(
10621063
delta_blob_key(TEST_ACTOR, 1),
10631064
encoded_blob(1, 1, &[(1, 0x10)]),
@@ -1132,7 +1133,7 @@ mod tests {
11321133
&engine.subspace,
11331134
engine.op_counter.as_ref(),
11341135
vec![
1135-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1136+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
11361137
WriteOp::put(
11371138
delta_blob_key(TEST_ACTOR, 1),
11381139
encoded_blob(1, 1, &[(1, 0x10)]),
@@ -1194,7 +1195,7 @@ mod tests {
11941195
&engine.subspace,
11951196
engine.op_counter.as_ref(),
11961197
vec![
1197-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1198+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
11981199
WriteOp::put(
11991200
delta_blob_key(TEST_ACTOR, 1),
12001201
encoded_blob(1, 129, &[(1, 0x11), (65, 0x65), (129, 0x81)]),
@@ -1267,7 +1268,7 @@ mod tests {
12671268
&engine.subspace,
12681269
engine.op_counter.as_ref(),
12691270
vec![
1270-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1271+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
12711272
WriteOp::put(
12721273
delta_blob_key(TEST_ACTOR, 4),
12731274
encoded_blob(4, 2, &[(1, 0x10)]),

engine/packages/sqlite-storage/src/compaction/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ mod tests {
6868
use crate::test_utils::{clear_op_count, scan_prefix_values, test_db};
6969
use crate::types::{
7070
DBHead, DirtyPage, SQLITE_DEFAULT_MAX_STORAGE_BYTES, SQLITE_PAGE_SIZE, SQLITE_SHARD_SIZE,
71-
SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin,
71+
SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin, encode_db_head, new_db_head,
7272
};
7373
use crate::udb::{self, WriteOp, apply_write_ops};
7474

@@ -117,7 +117,7 @@ mod tests {
117117
let (engine, _compaction_rx) = SqliteEngine::new(db, subspace);
118118
let mut mutations = vec![WriteOp::put(
119119
meta_key(TEST_ACTOR),
120-
serde_bare::to_vec(&head)?,
120+
encode_db_head(&head)?,
121121
)];
122122

123123
for shard_id in 0..9u32 {
@@ -155,7 +155,7 @@ mod tests {
155155
let (engine, _compaction_rx) = SqliteEngine::new(db, subspace);
156156
let mut mutations = vec![WriteOp::put(
157157
meta_key(TEST_ACTOR),
158-
serde_bare::to_vec(&head)?,
158+
encode_db_head(&head)?,
159159
)];
160160

161161
// Seed 8 single-page shards so one compact_worker call triggers all 8 shard passes.

engine/packages/sqlite-storage/src/open.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::keys::{
1414
use crate::ltx::decode_ltx_v3;
1515
use crate::quota::{encode_db_head_with_usage, tracked_storage_entry_size};
1616
use crate::types::{
17-
DBHead, FetchedPage, SQLITE_MAX_DELTA_BYTES, SqliteMeta, SqliteOrigin, decode_db_head,
17+
DBHead, FetchedPage, SQLITE_MAX_DELTA_BYTES, SqliteMeta, SqliteOrigin, decode_db_head, encode_db_head, new_db_head,
1818
};
1919
use crate::udb::{self, WriteOp};
2020

@@ -145,7 +145,7 @@ impl SqliteEngine {
145145
}
146146
}
147147

148-
let mut head = DBHead::new(now_ms);
148+
let mut head = new_db_head(now_ms);
149149
head.origin = SqliteOrigin::MigrationFromV1InProgress;
150150
let (head, encoded_head) = encode_db_head_with_usage(&actor_id, &head, 0)?;
151151
udb::tx_write_value(&tx, &subspace, &meta_storage_key, &encoded_head)?;
@@ -246,7 +246,7 @@ impl SqliteEngine {
246246
head.sqlite_storage_used = usage_without_meta.saturating_sub(tracked_deleted_bytes);
247247
head
248248
} else {
249-
DBHead::new(config.now_ms)
249+
new_db_head(config.now_ms)
250250
};
251251

252252
let (head, encoded_head) =
@@ -671,7 +671,7 @@ mod tests {
671671
use crate::types::{
672672
DBHead, DirtyPage, FetchedPage, SQLITE_DEFAULT_MAX_STORAGE_BYTES, SQLITE_MAX_DELTA_BYTES,
673673
SQLITE_PAGE_SIZE, SQLITE_SHARD_SIZE, SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin,
674-
decode_db_head,
674+
decode_db_head, encode_db_head, new_db_head,
675675
};
676676
use crate::udb::{WriteOp, apply_write_ops, physical_chunk_key, raw_key_exists};
677677

@@ -764,7 +764,7 @@ mod tests {
764764
let stored_meta = read_value(&engine, meta_key(TEST_ACTOR))
765765
.await?
766766
.expect("meta should exist");
767-
let head: DBHead = serde_bare::from_slice(&stored_meta)?;
767+
let head = decode_db_head(&stored_meta)?;
768768
assert_eq!(head.generation, 1);
769769
assert_eq!(head.creation_ts_ms, 777);
770770

@@ -855,7 +855,7 @@ mod tests {
855855
&engine.subspace,
856856
engine.op_counter.as_ref(),
857857
vec![
858-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
858+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
859859
WriteOp::put(shard_key(TEST_ACTOR, 0), encoded_blob(1, 1, 0x2a)),
860860
],
861861
)
@@ -888,7 +888,7 @@ mod tests {
888888
&engine.subspace,
889889
engine.op_counter.as_ref(),
890890
vec![
891-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
891+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
892892
WriteOp::put(
893893
delta_blob_key(TEST_ACTOR, 7),
894894
encode_ltx_v3(
@@ -958,7 +958,7 @@ mod tests {
958958
&engine.subspace,
959959
engine.op_counter.as_ref(),
960960
vec![
961-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
961+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
962962
WriteOp::put(shard_key(TEST_ACTOR, 0), encoded_blob(1, 1, 0x2a)),
963963
],
964964
)
@@ -981,7 +981,7 @@ mod tests {
981981
&engine.subspace,
982982
engine.op_counter.as_ref(),
983983
vec![
984-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&seeded_head())?),
984+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&seeded_head())?),
985985
WriteOp::put(delta_blob_key(TEST_ACTOR, 2), encoded_blob(2, 1, 0x11)),
986986
WriteOp::put(delta_blob_key(TEST_ACTOR, 5), encoded_blob(5, 2, 0x55)),
987987
WriteOp::put(pidx_delta_key(TEST_ACTOR, 1), 2_u64.to_be_bytes().to_vec()),
@@ -1039,7 +1039,7 @@ mod tests {
10391039
&engine.subspace,
10401040
engine.op_counter.as_ref(),
10411041
vec![
1042-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1042+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
10431043
WriteOp::put(delta_blob_key(TEST_ACTOR, 1), encoded_blob(1, 1, 0x11)),
10441044
WriteOp::put(delta_blob_key(TEST_ACTOR, 2), encoded_blob(2, 70, 0x70)),
10451045
WriteOp::put(shard_key(TEST_ACTOR, 1), encoded_blob(3, 70, 0x71)),
@@ -1088,7 +1088,7 @@ mod tests {
10881088
&engine.subspace,
10891089
engine.op_counter.as_ref(),
10901090
vec![
1091-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&seeded_head())?),
1091+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&seeded_head())?),
10921092
WriteOp::put(delta_blob_key(TEST_ACTOR, 2), encoded_blob(2, 1, 0x11)),
10931093
WriteOp::put(delta_blob_key(TEST_ACTOR, 5), encoded_blob(5, 2, 0x55)),
10941094
WriteOp::put(pidx_delta_key(TEST_ACTOR, 1), 2_u64.to_be_bytes().to_vec()),
@@ -1121,7 +1121,7 @@ mod tests {
11211121
&engine.subspace,
11221122
engine.op_counter.as_ref(),
11231123
vec![
1124-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&seeded_head())?),
1124+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&seeded_head())?),
11251125
WriteOp::put(delta_chunk_key(TEST_ACTOR, 42, 0), vec![1, 2, 3]),
11261126
WriteOp::put(delta_chunk_key(TEST_ACTOR, 42, 1), vec![4, 5, 6]),
11271127
],
@@ -1161,7 +1161,7 @@ mod tests {
11611161
&engine.subspace,
11621162
engine.op_counter.as_ref(),
11631163
vec![
1164-
WriteOp::put(meta_key(TEST_ACTOR), serde_bare::to_vec(&head)?),
1164+
WriteOp::put(meta_key(TEST_ACTOR), encode_db_head(&head)?),
11651165
// Three orphan staged txids (> head_txid).
11661166
WriteOp::put(delta_chunk_key(TEST_ACTOR, 6, 0), vec![0; 256]),
11671167
WriteOp::put(delta_chunk_key(TEST_ACTOR, 6, 1), vec![0; 256]),
@@ -1220,7 +1220,7 @@ mod tests {
12201220
engine.op_counter.as_ref(),
12211221
vec![WriteOp::put(
12221222
meta_key(TEST_ACTOR),
1223-
serde_bare::to_vec(&head)?,
1223+
encode_db_head(&head)?,
12241224
)],
12251225
)
12261226
.await?;
@@ -1260,7 +1260,7 @@ mod tests {
12601260
let result = recovered_engine
12611261
.open(TEST_ACTOR, OpenConfig::new(2_222))
12621262
.await?;
1263-
let stored_head: DBHead = serde_bare::from_slice(
1263+
let stored_head = decode_db_head(
12641264
&read_value(&recovered_engine, meta_key(TEST_ACTOR))
12651265
.await?
12661266
.expect("meta should still exist after recovery"),
@@ -1296,7 +1296,7 @@ mod tests {
12961296
let (engine, mut compaction_rx) = SqliteEngine::new(db, subspace);
12971297
let mut mutations = vec![WriteOp::put(
12981298
meta_key(TEST_ACTOR),
1299-
serde_bare::to_vec(&head)?,
1299+
encode_db_head(&head)?,
13001300
)];
13011301
for txid in 1..=32_u64 {
13021302
mutations.push(WriteOp::put(

engine/packages/sqlite-storage/src/quota.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub fn encode_db_head_with_usage(
4444
let mut encoded_head = head.clone();
4545
encoded_head.sqlite_storage_used = total_usage;
4646

47-
let bytes = serde_bare::to_vec(&encoded_head)
47+
let bytes = crate::types::encode_db_head(&encoded_head)
4848
.context("serialize sqlite db head with quota usage")?;
4949
let next_total_usage = usage_without_meta + meta_key_len + bytes.len() as u64;
5050
if next_total_usage == total_usage {

0 commit comments

Comments
 (0)