Skip to content

Commit bb15342

Browse files
init snapshot work
1 parent 433f380 commit bb15342

7 files changed

Lines changed: 1652 additions & 53 deletions

File tree

src/datasets/api.rs

Lines changed: 286 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::collections::HashSet;
22

3-
use anyhow::{bail, Result};
3+
use anyhow::{bail, Context, Result};
4+
use reqwest::header::HeaderMap;
45
use serde::{Deserialize, Serialize};
56
use serde_json::{json, Map, Value};
67
use urlencoding::encode;
78

8-
use crate::http::ApiClient;
9+
use crate::http::{ApiClient, HttpError};
910

1011
use super::records::DATASET_RECORD_FIELDS;
1112

@@ -72,11 +73,51 @@ impl Dataset {
7273
}
7374
}
7475

76+
#[derive(Debug, Clone, Serialize, Deserialize)]
77+
pub struct DatasetSnapshot {
78+
pub id: String,
79+
pub name: String,
80+
pub dataset_id: String,
81+
pub description: Option<String>,
82+
pub xact_id: String,
83+
pub created: Option<String>,
84+
}
85+
86+
#[derive(Debug, Clone, Serialize, Deserialize)]
87+
pub struct CreateDatasetSnapshotResult {
88+
pub dataset_snapshot: DatasetSnapshot,
89+
pub found_existing: bool,
90+
}
91+
92+
#[derive(Debug, Clone, Serialize, Deserialize)]
93+
pub struct DatasetRestorePreview {
94+
pub rows_to_restore: usize,
95+
pub rows_to_delete: usize,
96+
}
97+
98+
#[derive(Debug, Clone, Serialize, Deserialize)]
99+
pub struct DatasetRestoreResult {
100+
pub xact_id: String,
101+
pub rows_restored: usize,
102+
pub rows_deleted: usize,
103+
}
104+
75105
#[derive(Debug, Deserialize)]
76106
struct ListResponse {
77107
objects: Vec<Dataset>,
78108
}
79109

110+
#[derive(Debug, Deserialize)]
111+
struct ListResponseGeneric<T> {
112+
objects: Vec<T>,
113+
}
114+
115+
#[derive(Debug, Deserialize)]
116+
struct DatasetHeadXactRow {
117+
#[serde(rename = "_xact_id", default)]
118+
xact_id: Option<String>,
119+
}
120+
80121
pub async fn list_datasets(client: &ApiClient, project_id: &str) -> Result<Vec<Dataset>> {
81122
let path = format!(
82123
"/v1/dataset?org_name={}&project_id={}",
@@ -203,6 +244,87 @@ pub async fn delete_dataset(client: &ApiClient, dataset_id: &str) -> Result<()>
203244
client.delete(&path).await
204245
}
205246

247+
pub async fn list_dataset_snapshots(
248+
client: &ApiClient,
249+
dataset_id: &str,
250+
) -> Result<Vec<DatasetSnapshot>> {
251+
let path = format!("/v1/dataset_snapshot?dataset_id={}", encode(dataset_id));
252+
let list: ListResponseGeneric<DatasetSnapshot> = client.get(&path).await?;
253+
Ok(list.objects)
254+
}
255+
256+
pub async fn create_dataset_snapshot(
257+
client: &ApiClient,
258+
dataset_id: &str,
259+
name: &str,
260+
description: Option<&str>,
261+
xact_id: &str,
262+
) -> Result<CreateDatasetSnapshotResult> {
263+
let mut body = serde_json::json!({
264+
"dataset_id": dataset_id,
265+
"name": name,
266+
"xact_id": xact_id,
267+
});
268+
if let Some(description) = description {
269+
body["description"] = Value::String(description.to_string());
270+
}
271+
let response = client
272+
.post_with_headers_raw("/v1/dataset_snapshot", &body, &[])
273+
.await?;
274+
if !response.status().is_success() {
275+
let status = response.status();
276+
let body = response.text().await.unwrap_or_default();
277+
return Err(HttpError { status, body }.into());
278+
}
279+
280+
let found_existing = found_existing_snapshot_header(response.headers());
281+
let dataset_snapshot = response.json().await.context("failed to parse response")?;
282+
Ok(CreateDatasetSnapshotResult {
283+
dataset_snapshot,
284+
found_existing,
285+
})
286+
}
287+
288+
pub async fn preview_dataset_restore(
289+
client: &ApiClient,
290+
dataset_id: &str,
291+
xact_id: &str,
292+
) -> Result<DatasetRestorePreview> {
293+
let path = format!("/v1/dataset/{}/restore/preview", encode(dataset_id));
294+
client
295+
.post(&path, &serde_json::json!({ "version": xact_id }))
296+
.await
297+
}
298+
299+
pub async fn restore_dataset(
300+
client: &ApiClient,
301+
dataset_id: &str,
302+
xact_id: &str,
303+
) -> Result<DatasetRestoreResult> {
304+
let path = format!("/v1/dataset/{}/restore", encode(dataset_id));
305+
client
306+
.post(&path, &serde_json::json!({ "version": xact_id }))
307+
.await
308+
}
309+
310+
pub async fn get_dataset_head_xact_id(
311+
client: &ApiClient,
312+
dataset_id: &str,
313+
) -> Result<Option<String>> {
314+
let query = build_dataset_head_xact_query(dataset_id);
315+
let response = client
316+
.btql_structured::<DatasetHeadXactRow, _>(&query)
317+
.await?;
318+
let head = response
319+
.data
320+
.into_iter()
321+
.filter_map(|row| row.xact_id)
322+
.map(|value| value.trim().to_string())
323+
.filter(|value| !value.is_empty())
324+
.max_by(compare_xact_ids);
325+
Ok(head)
326+
}
327+
206328
fn resolve_dataset_rows_page_limit(max_rows: Option<usize>, loaded_rows: usize) -> Option<usize> {
207329
match max_rows {
208330
None => Some(MAX_DATASET_ROWS_PAGE_LIMIT),
@@ -247,10 +369,53 @@ fn build_dataset_rows_query(
247369
fn dataset_rows_select_fields() -> Vec<Value> {
248370
DATASET_RECORD_FIELDS
249371
.iter()
250-
.map(|field| json!({"op": "ident", "name": [field]}))
372+
.map(|field| {
373+
json!({
374+
"expr": {"op": "ident", "name": [field]},
375+
"alias": field,
376+
})
377+
})
251378
.collect()
252379
}
253380

381+
fn build_dataset_head_xact_query(dataset_id: &str) -> Value {
382+
json!({
383+
"select": [{
384+
"expr": {"op": "ident", "name": ["_xact_id"]},
385+
"alias": "_xact_id",
386+
}],
387+
"from": {
388+
"op": "function",
389+
"name": {"op": "ident", "name": ["dataset"]},
390+
"args": [{"op": "literal", "value": dataset_id}]
391+
},
392+
"filter": {
393+
"op": "ge",
394+
"left": {"op": "ident", "name": ["created"]},
395+
"right": {"op": "literal", "value": DATASET_ROWS_SINCE}
396+
},
397+
"sort": [{
398+
"expr": {"op": "ident", "name": ["_xact_id"]},
399+
"dir": "desc",
400+
}],
401+
"limit": 1
402+
})
403+
}
404+
405+
fn compare_xact_ids(left: &String, right: &String) -> std::cmp::Ordering {
406+
match (left.parse::<u64>(), right.parse::<u64>()) {
407+
(Ok(left), Ok(right)) => left.cmp(&right),
408+
_ => left.cmp(right),
409+
}
410+
}
411+
412+
fn found_existing_snapshot_header(headers: &HeaderMap) -> bool {
413+
headers
414+
.get("x-bt-found-existing")
415+
.and_then(|value| value.to_str().ok())
416+
.is_some_and(|value| value.eq_ignore_ascii_case("true") || value == "1")
417+
}
418+
254419
#[cfg(test)]
255420
mod tests {
256421
use super::*;
@@ -267,12 +432,12 @@ mod tests {
267432
query,
268433
serde_json::json!({
269434
"select": [
270-
{"op": "ident", "name": ["id"]},
271-
{"op": "ident", "name": ["input"]},
272-
{"op": "ident", "name": ["expected"]},
273-
{"op": "ident", "name": ["metadata"]},
274-
{"op": "ident", "name": ["tags"]},
275-
{"op": "ident", "name": ["origin"]}
435+
{"expr": {"op": "ident", "name": ["id"]}, "alias": "id"},
436+
{"expr": {"op": "ident", "name": ["input"]}, "alias": "input"},
437+
{"expr": {"op": "ident", "name": ["expected"]}, "alias": "expected"},
438+
{"expr": {"op": "ident", "name": ["metadata"]}, "alias": "metadata"},
439+
{"expr": {"op": "ident", "name": ["tags"]}, "alias": "tags"},
440+
{"expr": {"op": "ident", "name": ["origin"]}, "alias": "origin"}
276441
],
277442
"from": {
278443
"op": "function",
@@ -333,6 +498,99 @@ mod tests {
333498
);
334499
}
335500

501+
#[test]
502+
fn dataset_head_query_includes_required_filter_and_limit() {
503+
let query = build_dataset_head_xact_query("dataset-id");
504+
assert_eq!(
505+
query,
506+
serde_json::json!({
507+
"select": [{
508+
"expr": {"op": "ident", "name": ["_xact_id"]},
509+
"alias": "_xact_id",
510+
}],
511+
"from": {
512+
"op": "function",
513+
"name": {"op": "ident", "name": ["dataset"]},
514+
"args": [{"op": "literal", "value": "dataset-id"}]
515+
},
516+
"filter": {
517+
"op": "ge",
518+
"left": {"op": "ident", "name": ["created"]},
519+
"right": {"op": "literal", "value": "1970-01-01T00:00:00Z"}
520+
},
521+
"sort": [{
522+
"expr": {"op": "ident", "name": ["_xact_id"]},
523+
"dir": "desc",
524+
}],
525+
"limit": 1
526+
})
527+
);
528+
}
529+
530+
#[test]
531+
fn dataset_head_query_keeps_dataset_id_as_literal() {
532+
let query = build_dataset_head_xact_query("dataset'with-quote");
533+
assert_eq!(
534+
query.pointer("/from/args/0/value").and_then(Value::as_str),
535+
Some("dataset'with-quote")
536+
);
537+
}
538+
539+
#[test]
540+
fn compare_xact_ids_prefers_numeric_order_when_possible() {
541+
assert_eq!(
542+
compare_xact_ids(&"10".to_string(), &"2".to_string()),
543+
std::cmp::Ordering::Greater
544+
);
545+
assert_eq!(
546+
compare_xact_ids(&"b".to_string(), &"a".to_string()),
547+
std::cmp::Ordering::Greater
548+
);
549+
}
550+
551+
#[test]
552+
fn dataset_snapshot_deserializes_service_schema() {
553+
let snapshot: DatasetSnapshot = serde_json::from_value(serde_json::json!({
554+
"id": "01926568-8088-7109-99ab-123456789abc",
555+
"dataset_id": "01926568-8088-7109-99ab-abcdef012345",
556+
"name": "baseline",
557+
"description": null,
558+
"xact_id": "1000192656880881099",
559+
"created": null
560+
}))
561+
.expect("deserialize snapshot");
562+
563+
assert_eq!(snapshot.dataset_id, "01926568-8088-7109-99ab-abcdef012345");
564+
assert_eq!(snapshot.name, "baseline");
565+
assert!(snapshot.description.is_none());
566+
assert_eq!(snapshot.xact_id, "1000192656880881099");
567+
assert!(snapshot.created.is_none());
568+
}
569+
570+
#[test]
571+
fn dataset_restore_preview_deserializes_count_fields() {
572+
let preview: DatasetRestorePreview = serde_json::from_value(serde_json::json!({
573+
"rows_to_restore": 7,
574+
"rows_to_delete": 2
575+
}))
576+
.expect("deserialize preview");
577+
assert_eq!(preview.rows_to_restore, 7);
578+
assert_eq!(preview.rows_to_delete, 2);
579+
}
580+
581+
#[test]
582+
fn dataset_restore_result_deserializes_count_fields() {
583+
let result: DatasetRestoreResult = serde_json::from_value(serde_json::json!({
584+
"xact_id": "1000192656880881099",
585+
"rows_restored": 7,
586+
"rows_deleted": 2
587+
}))
588+
.expect("deserialize result");
589+
assert_eq!(result.xact_id, "1000192656880881099");
590+
assert_eq!(result.rows_restored, 7);
591+
assert_eq!(result.rows_deleted, 2);
592+
}
593+
336594
#[test]
337595
fn dataset_rows_page_limit_defaults_to_api_max() {
338596
assert_eq!(
@@ -351,4 +609,23 @@ mod tests {
351609
fn dataset_rows_page_limit_stops_when_limit_reached() {
352610
assert_eq!(resolve_dataset_rows_page_limit(Some(200), 200), None);
353611
}
612+
613+
#[test]
614+
fn found_existing_snapshot_header_accepts_true_and_one() {
615+
let mut headers = HeaderMap::new();
616+
headers.insert("x-bt-found-existing", "true".parse().expect("header"));
617+
assert!(found_existing_snapshot_header(&headers));
618+
619+
headers.insert("x-bt-found-existing", "1".parse().expect("header"));
620+
assert!(found_existing_snapshot_header(&headers));
621+
}
622+
623+
#[test]
624+
fn found_existing_snapshot_header_rejects_missing_or_false() {
625+
assert!(!found_existing_snapshot_header(&HeaderMap::new()));
626+
627+
let mut headers = HeaderMap::new();
628+
headers.insert("x-bt-found-existing", "false".parse().expect("header"));
629+
assert!(!found_existing_snapshot_header(&headers));
630+
}
354631
}

0 commit comments

Comments
 (0)