forked from tursodatabase/libsql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfdb_store.rs
More file actions
150 lines (132 loc) · 5.3 KB
/
fdb_store.rs
File metadata and controls
150 lines (132 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use crate::store::FrameStore;
use async_trait::async_trait;
use foundationdb::api::NetworkAutoStop;
use foundationdb::tuple::pack;
use foundationdb::tuple::unpack;
use foundationdb::{KeySelector, Transaction};
use libsql_storage::rpc::Frame;
use tracing::error;
pub struct FDBFrameStore {
_network: NetworkAutoStop,
}
impl FDBFrameStore {
pub fn new() -> Self {
println!("I was called");
let _network = unsafe { foundationdb::boot() };
Self { _network }
}
async fn get_max_frame_no(&self, txn: &Transaction, namespace: &str) -> u64 {
let max_frame_key = format!("{}/max_frame_no", namespace);
let result = txn.get(&max_frame_key.as_bytes(), false).await;
if let Err(e) = result {
error!("get failed: {:?}", e);
return 0;
}
if let Ok(None) = result {
error!("page not found");
return 0;
}
let frame_no: u64 = unpack(&result.unwrap().unwrap()).expect("failed to decode u64");
tracing::info!("max_frame_no ({}) = {}", max_frame_key, frame_no);
frame_no
}
async fn insert_with_tx(
&self,
namespace: &str,
txn: &Transaction,
frame_no: u64,
frame: Frame,
) {
let frame_data_key = format!("{}/f/{}/f", namespace, frame_no);
let frame_page_key = format!("{}/f/{}/p", namespace, frame_no);
let page_key = format!("{}/p/{}", namespace, frame.page_no);
let page_frame_idx = format!("{}/pf/{}/{}", namespace, frame.page_no, frame_no);
txn.set(&frame_data_key.as_bytes(), &frame.data);
txn.set(&frame_page_key.as_bytes(), &pack(&frame.page_no));
txn.set(&page_key.as_bytes(), &pack(&frame_no));
txn.set(&page_frame_idx.as_bytes(), &pack(&frame_no));
}
}
#[async_trait]
impl FrameStore for FDBFrameStore {
async fn insert_frames(&self, namespace: &str, _max_frame_no: u64, frames: Vec<Frame>) -> u64 {
let max_frame_key = format!("{}/max_frame_no", namespace);
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");
let mut frame_no = self.get_max_frame_no(&txn, namespace).await;
for f in frames {
frame_no += 1;
self.insert_with_tx(namespace, &txn, frame_no, f).await;
}
txn.set(&max_frame_key.as_bytes(), &pack(&(frame_no)));
txn.commit().await.expect("commit failed");
frame_no
}
async fn read_frame(&self, namespace: &str, frame_no: u64) -> Option<bytes::Bytes> {
let frame_key = format!("{}/f/{}/f", namespace, frame_no);
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");
let frame = txn.get(frame_key.as_bytes(), false).await;
if let Ok(Some(data)) = frame {
return Some(data.to_vec().into());
}
None
}
async fn find_frame(&self, namespace: &str, page_no: u32, max_frame_no: u64) -> Option<u64> {
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");
let page_key = format!("{}/pf/{}/{}", namespace, page_no, max_frame_no);
let result = txn
.get(
KeySelector::last_less_or_equal(page_key.as_bytes()).key(),
false,
)
.await;
// if let Err(e) = result {
// error!("get failed: {:?}", e);
// return None;
// }
// if let Ok(None) = result {
// error!("page not found (with max)");
// return None;
// }
if let Ok(result) = result {
if let Some(frame_no) = result {
let frame_no: u64 = unpack(&frame_no).expect("failed to decode u64");
tracing::trace!("got the frame_no = {} (with max)", frame_no);
};
};
let page_key = format!("{}/p/{}", namespace, page_no);
let result = txn.get(&page_key.as_bytes(), false).await;
if let Err(e) = result {
error!("get failed: {:?}", e);
return None;
}
if let Ok(None) = result {
error!("page not found");
return None;
}
let frame_no: u64 = unpack(&result.unwrap().unwrap()).expect("failed to decode u64");
tracing::trace!("got the frame_no = {} (without max)", frame_no);
Some(frame_no)
}
async fn frame_page_no(&self, namespace: &str, frame_no: u64) -> Option<u32> {
let frame_key = format!("{}/f/{}/p", namespace, frame_no);
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");
let page_no: u32 = unpack(
&txn.get(&frame_key.as_bytes(), true)
.await
.expect("get failed")
.expect("frame not found"),
)
.expect("failed to decode u64");
Some(page_no)
}
async fn frames_in_wal(&self, namespace: &str) -> u64 {
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");
self.get_max_frame_no(&txn, namespace).await
}
async fn destroy(&self, _namespace: &str) {}
}