Skip to content

Commit 96d19e7

Browse files
committed
feat: US-035 - Add Depot inspect API with big metadata JSON and paginated row scans
1 parent 5e238b4 commit 96d19e7

18 files changed

Lines changed: 1617 additions & 219 deletions

File tree

.agent/specs/depot-inspect-api.md

Lines changed: 154 additions & 179 deletions
Large diffs are not rendered by default.

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ license.workspace = true
99
anyhow.workspace = true
1010
axum.workspace = true
1111
base64.workspace = true
12+
depot.workspace = true
1213
epoxy.workspace = true
1314
epoxy-protocol.workspace = true
1415
futures-util.workspace = true
@@ -32,3 +33,7 @@ universalpubsub.workspace = true
3233
utoipa.workspace = true
3334
uuid.workspace = true
3435
universaldb.workspace = true
36+
37+
[dev-dependencies]
38+
axum-test.workspace = true
39+
tempfile.workspace = true
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use anyhow::{Context, Result};
2+
use depot::{
3+
inspect::{self, CatalogQuery, RawScanQuery, RowsQuery, SampleQuery},
4+
types::{BucketId, DatabaseBranchId},
5+
};
6+
use rivet_api_builder::ApiCtx;
7+
use serde::Deserialize;
8+
use uuid::Uuid;
9+
10+
#[derive(Debug, Deserialize)]
11+
pub struct BucketPath {
12+
pub bucket_id: String,
13+
}
14+
15+
#[derive(Debug, Deserialize)]
16+
pub struct DatabasePath {
17+
pub bucket_id: String,
18+
pub database_id: String,
19+
}
20+
21+
#[derive(Debug, Deserialize)]
22+
pub struct BranchPath {
23+
pub branch_id: String,
24+
}
25+
26+
#[derive(Debug, Deserialize)]
27+
pub struct BranchRowsPath {
28+
pub branch_id: String,
29+
pub family: String,
30+
}
31+
32+
#[derive(Debug, Deserialize)]
33+
pub struct PageTracePath {
34+
pub branch_id: String,
35+
pub pgno: u32,
36+
}
37+
38+
#[derive(Debug, Deserialize)]
39+
pub struct RawKeyPath {
40+
pub key: String,
41+
}
42+
43+
pub async fn summary(ctx: ApiCtx, _path: (), _query: ()) -> Result<inspect::InspectResponse> {
44+
let udb = ctx.pools().udb()?;
45+
inspect::summary(&udb, ctx.pools().node_id()).await
46+
}
47+
48+
pub async fn catalog(
49+
ctx: ApiCtx,
50+
_path: (),
51+
query: CatalogQuery,
52+
) -> Result<inspect::CatalogResponse> {
53+
let udb = ctx.pools().udb()?;
54+
inspect::catalog(&udb, ctx.pools().node_id(), query).await
55+
}
56+
57+
pub async fn bucket(
58+
ctx: ApiCtx,
59+
path: BucketPath,
60+
query: SampleQuery,
61+
) -> Result<inspect::InspectResponse> {
62+
let bucket_id = parse_bucket_id(&path.bucket_id)?;
63+
let udb = ctx.pools().udb()?;
64+
inspect::bucket(&udb, ctx.pools().node_id(), bucket_id, query).await
65+
}
66+
67+
pub async fn database(
68+
ctx: ApiCtx,
69+
path: DatabasePath,
70+
query: SampleQuery,
71+
) -> Result<inspect::InspectResponse> {
72+
let bucket_id = parse_bucket_id(&path.bucket_id)?;
73+
let udb = ctx.pools().udb()?;
74+
inspect::database(
75+
&udb,
76+
ctx.pools().node_id(),
77+
bucket_id,
78+
path.database_id,
79+
query,
80+
)
81+
.await
82+
}
83+
84+
pub async fn branch(
85+
ctx: ApiCtx,
86+
path: BranchPath,
87+
query: SampleQuery,
88+
) -> Result<inspect::InspectResponse> {
89+
let branch_id = parse_database_branch_id(&path.branch_id)?;
90+
let udb = ctx.pools().udb()?;
91+
inspect::branch(&udb, ctx.pools().node_id(), branch_id, query).await
92+
}
93+
94+
pub async fn page_trace(
95+
ctx: ApiCtx,
96+
path: PageTracePath,
97+
_query: (),
98+
) -> Result<inspect::InspectResponse> {
99+
let branch_id = parse_database_branch_id(&path.branch_id)?;
100+
let udb = ctx.pools().udb()?;
101+
inspect::page_trace(
102+
&udb,
103+
ctx.pools().node_id(),
104+
branch_id,
105+
path.pgno,
106+
)
107+
.await
108+
}
109+
110+
pub async fn branch_rows(
111+
ctx: ApiCtx,
112+
path: BranchRowsPath,
113+
query: RowsQuery,
114+
) -> Result<inspect::PaginatedRowsResponse> {
115+
let branch_id = parse_database_branch_id(&path.branch_id)?;
116+
let family = inspect::RowFamily::parse(&path.family)?;
117+
let udb = ctx.pools().udb()?;
118+
inspect::branch_rows(
119+
&udb,
120+
ctx.pools().node_id(),
121+
branch_id,
122+
family,
123+
query,
124+
)
125+
.await
126+
}
127+
128+
pub async fn raw_key(
129+
ctx: ApiCtx,
130+
path: RawKeyPath,
131+
_query: (),
132+
) -> Result<inspect::InspectResponse> {
133+
let key = inspect::decode_path_key(&path.key)?;
134+
let udb = ctx.pools().udb()?;
135+
inspect::raw_key(&udb, ctx.pools().node_id(), key).await
136+
}
137+
138+
pub async fn raw_scan(
139+
ctx: ApiCtx,
140+
_path: (),
141+
query: RawScanQuery,
142+
) -> Result<inspect::PaginatedRowsResponse> {
143+
let udb = ctx.pools().udb()?;
144+
inspect::raw_scan(&udb, ctx.pools().node_id(), query).await
145+
}
146+
147+
pub async fn decode_key(
148+
ctx: ApiCtx,
149+
path: RawKeyPath,
150+
_query: (),
151+
) -> Result<inspect::InspectResponse> {
152+
let key = inspect::decode_path_key(&path.key)?;
153+
inspect::decode_key_response(ctx.pools().node_id(), key)
154+
}
155+
156+
fn parse_bucket_id(value: &str) -> Result<BucketId> {
157+
Ok(BucketId::from_uuid(
158+
Uuid::parse_str(value).context("parse Depot bucket id")?,
159+
))
160+
}
161+
162+
fn parse_database_branch_id(value: &str) -> Result<DatabaseBranchId> {
163+
Ok(DatabaseBranchId::from_uuid(
164+
Uuid::parse_str(value).context("parse Depot database branch id")?,
165+
))
166+
}

engine/packages/api-peer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::net::SocketAddr;
33
use anyhow::*;
44

55
pub mod actors;
6+
pub mod depot_inspect;
67
pub mod envoys;
78
pub mod internal;
89
pub mod namespaces;

engine/packages/api-peer/src/router.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use rivet_api_builder::{create_router, prelude::*};
22

3-
use crate::{actors, envoys, internal, namespaces, runner_configs, runners};
3+
use crate::{actors, depot_inspect, envoys, internal, namespaces, runner_configs, runners};
44

55
#[tracing::instrument(skip_all)]
66
pub async fn router(
@@ -40,6 +40,38 @@ pub async fn router(
4040
.route("/runners/names", get(runners::list_names))
4141
// MARK: Envoys
4242
.route("/envoys", get(envoys::list))
43+
// MARK: Depot inspect
44+
.route("/depot/inspect/summary", get(depot_inspect::summary))
45+
.route("/depot/inspect/catalog", get(depot_inspect::catalog))
46+
.route(
47+
"/depot/inspect/buckets/{bucket_id}",
48+
get(depot_inspect::bucket),
49+
)
50+
.route(
51+
"/depot/inspect/buckets/{bucket_id}/databases/{database_id}",
52+
get(depot_inspect::database),
53+
)
54+
.route(
55+
"/depot/inspect/branches/{branch_id}",
56+
get(depot_inspect::branch),
57+
)
58+
.route(
59+
"/depot/inspect/branches/{branch_id}/pages/{pgno}/trace",
60+
get(depot_inspect::page_trace),
61+
)
62+
.route(
63+
"/depot/inspect/branches/{branch_id}/rows/{family}",
64+
get(depot_inspect::branch_rows),
65+
)
66+
.route(
67+
"/depot/inspect/raw/key/{key}",
68+
get(depot_inspect::raw_key),
69+
)
70+
.route("/depot/inspect/raw/scan", get(depot_inspect::raw_scan))
71+
.route(
72+
"/depot/inspect/raw/decode-key/{key}",
73+
get(depot_inspect::decode_key),
74+
)
4375
// MARK: Internal
4476
.route("/cache/purge", post(internal::cache_purge))
4577
.route(
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use axum_test::TestServer;
2+
use anyhow::Result;
3+
use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
4+
use rivet_config::config::{Database, Root, db::FileSystem};
5+
6+
#[tokio::test]
7+
async fn depot_inspect_routes_are_registered_on_api_peer() -> Result<()> {
8+
let tempdir = tempfile::tempdir()?;
9+
let mut root = Root::default();
10+
root.database = Some(Database::FileSystem(FileSystem {
11+
path: tempdir.path().join("udb"),
12+
}));
13+
let config = rivet_config::Config::from_root(root);
14+
let pools = rivet_pools::Pools::test(config.clone()).await?;
15+
let app = rivet_api_peer::create_router(config, pools).await?;
16+
let server = TestServer::new(app)?;
17+
18+
let res = server.get("/depot/inspect/summary").await;
19+
res.assert_status_ok();
20+
21+
let key = BASE64_URL_SAFE_NO_PAD.encode([depot::keys::SQLITE_SUBSPACE_PREFIX]);
22+
let res = server
23+
.get(&format!("/depot/inspect/raw/decode-key/{key}"))
24+
.await;
25+
res.assert_status_ok();
26+
27+
let branch_id = uuid::Uuid::nil();
28+
let res = server
29+
.get(&format!("/depot/inspect/branches/{branch_id}/rows/commits"))
30+
.add_query_param("limit", "1")
31+
.await;
32+
res.assert_status_ok();
33+
34+
Ok(())
35+
}

engine/packages/depot/CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ These come from `r2-prior-art/.agent/research/sqlite/requirements.md` and supers
9898
- **`Db` constructors do not take UPS.** Keep workflow wakeups behind `CompactionSignaler`; do not reintroduce legacy compactor PubSub wiring.
9999
- **Truncate cleanup fences observed PIDX and SHARD values before writing.** Snapshot-selected rows must be re-read with Serializable isolation before cleanup clears or rewrites them so concurrent workflow output cannot be clobbered and quota accounting retries cleanly.
100100
- **UDB helper scans return logical conveyer keys.** Apply the supplied `Subspace` to the physical range scan, then strip it before returning rows to callers.
101+
- **Depot inspect decode and pagination logic lives in `depot::inspect`.** `api-peer` should only mount thin internal `/depot/inspect/...` handlers and must not expose this surface through public SDKs.
101102
- **Conveyer persisted payload structs use `serde::{Serialize, Deserialize}` as the serde_bare/vbare-compatible derive pattern.** Add `OwnedVersionedData` wrappers when introducing encode/decode helpers.
102103
- **Conveyer type domains live behind the `conveyer/types.rs` facade.** Add branch, restore_point, compaction, history-pin, cold-manifest, storage, page, and id payloads under `conveyer/types/*.rs` and re-export public names from the facade.
103104
- **META splits into single-writer sub-keys:** `/META/head` (commit-owned), `/META/quota` (atomic-add counter, raw i64 LE, not vbare), and workflow `CMP/root` (manager-owned compaction manifest).

engine/packages/depot/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ async-channel.workspace = true
1111
async-trait.workspace = true
1212
aws-config = "1"
1313
aws-sdk-s3 = "1"
14+
base64.workspace = true
1415
futures-util.workspace = true
1516
gas.workspace = true
1617
lazy_static.workspace = true

engine/packages/depot/src/conveyer/commit/apply.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{collections::BTreeSet, sync::Arc};
22

3-
use anyhow::{Context, Result};
3+
use anyhow::{Context, Result, ensure};
44
use universaldb::{
55
options::MutationType,
66
utils::IsolationLevel::{Serializable, Snapshot},
@@ -43,6 +43,8 @@ impl Db {
4343
db_size_pages: u32,
4444
now_ms: i64,
4545
) -> Result<()> {
46+
validate_dirty_pages(&dirty_pages)?;
47+
4648
let node_id = self.node_id.to_string();
4749
let labels = &[node_id.as_str()];
4850
let _timer = metrics::SQLITE_PUMP_COMMIT_DURATION
@@ -401,6 +403,27 @@ impl Db {
401403
}
402404
}
403405

406+
fn validate_dirty_pages(dirty_pages: &[DirtyPage]) -> Result<()> {
407+
let mut seen = BTreeSet::new();
408+
for page in dirty_pages {
409+
ensure!(page.pgno > 0, "sqlite commit does not accept page 0");
410+
ensure!(
411+
page.bytes.len() == keys::PAGE_SIZE as usize,
412+
"sqlite commit page {} had {} bytes, expected {}",
413+
page.pgno,
414+
page.bytes.len(),
415+
keys::PAGE_SIZE
416+
);
417+
ensure!(
418+
seen.insert(page.pgno),
419+
"sqlite commit duplicated page {} in a single request",
420+
page.pgno
421+
);
422+
}
423+
424+
Ok(())
425+
}
426+
404427
struct CommitTxResult {
405428
branch_id: DatabaseBranchId,
406429
branch_ancestry: BranchAncestry,

0 commit comments

Comments
 (0)