Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
4ff1ce9
Disaggregated write-ahead log for libSQL
penberg Feb 22, 2024
caadc9a
Move to frame store
avinassh Mar 29, 2024
ef93323
Add frame count and framepage req
avinassh Mar 29, 2024
6ab7f76
Add `frame_page_num` and `frames_in_wal`
avinassh Mar 29, 2024
0376491
Update WAL trait to use mut
avinassh Mar 29, 2024
dcae758
Use cache to store the frames
avinassh Apr 1, 2024
d2ffdf7
increase txn timeout
avinassh Apr 3, 2024
796131a
implement a shitty mutex lock manager
avinassh Apr 3, 2024
6627384
Reset WAL interface changes and use mutex for RPC client
avinassh Apr 3, 2024
f588889
Add working docker
avinassh Apr 3, 2024
03c4ddb
Add fly config and move the Dockerfile
avinassh Apr 3, 2024
4c88315
simplify fly config
avinassh Apr 3, 2024
b10dd9b
make the storage server configurable and use the fly version
avinassh Apr 3, 2024
179c300
Add store trait
avinassh Apr 4, 2024
3fe9f6a
rename traits and struct
avinassh Apr 4, 2024
300d5f7
Add redis framestore impl
avinassh Apr 4, 2024
ab729a7
handle nil responses from redis and return proper default
avinassh Apr 4, 2024
762d558
improve redis code: transactions, commands, env var
avinassh Apr 9, 2024
ca9d5a5
empty
avinassh Apr 11, 2024
53da4e9
Add updated workflow image
avinassh Apr 11, 2024
88ba612
merge `5cfb62f58064`
avinassh Apr 21, 2024
34f09ec
propagate lock manager down to replicator wrapper
avinassh Apr 22, 2024
a690bf9
create the handle if it doesnt exist
avinassh Apr 23, 2024
d59868d
store a ref to runtime
avinassh Apr 23, 2024
c565609
Add destroy WAL
avinassh Apr 23, 2024
71d12a2
Impl destroy
avinassh Apr 23, 2024
738af86
Merge branch 'main' into durable-wal2
avinassh Apr 30, 2024
a2c6ae3
Merge branch 'durable-wal2' into durable-wal
avinassh Apr 30, 2024
12953f2
Merge branch 'main' into durable-wal
avinassh Apr 30, 2024
c362eb2
Merge branch 'main' into durable-wal
avinassh May 1, 2024
e3770ab
merge and add changes
avinassh May 1, 2024
a19e31b
merge hell
avinassh May 1, 2024
7d039b9
remove todo
avinassh May 1, 2024
1515b4e
missing work and minor fixes
avinassh May 1, 2024
fa75ca3
minor diff
avinassh May 2, 2024
9245b63
set limits on client, read all payload
avinassh May 2, 2024
527e947
Merge branch 'main' into durable-wal
avinassh May 3, 2024
8ba9dd4
refactor
avinassh May 15, 2024
a73d67e
Make code async and move to FDB
avinassh May 16, 2024
350095e
Merge branch 'main' into durable-wal
avinassh May 17, 2024
fb82f96
Move redis methods to async
avinassh May 18, 2024
7fd5379
fix bug while inserting mx_frame
avinassh May 18, 2024
24e019c
Add a bulk insert method
avinassh May 21, 2024
c2834e0
update proto to propagate ns
avinassh May 21, 2024
1d5f84d
propagate namespace from server
avinassh May 21, 2024
44ee1f8
Propagate max frame no
avinassh May 21, 2024
3d8e82c
Merge branch 'main' into durable-wal
avinassh May 26, 2024
992907d
Merge branch 'main' into durable-wal
avinassh May 28, 2024
86e5a06
Merge branch 'main' into durable-wal
avinassh May 29, 2024
b197656
Merge branch 'main' into durable-wal
avinassh May 29, 2024
e5d1748
Add storage server proto definition
avinassh May 30, 2024
f956f6d
Merge branch 'ss-proto' into durable-wal
avinassh May 30, 2024
5037a9d
Merge branch 'main' into durable-wal
avinassh May 30, 2024
467675a
update clients
avinassh May 30, 2024
cb76a4d
refactor
avinassh May 31, 2024
913d87e
Refactor to have pluggable store
avinassh May 31, 2024
b2def08
refactor to service
avinassh May 31, 2024
df14f12
cargo fix
avinassh May 31, 2024
5f73b16
moar ref
avinassh May 31, 2024
f5fa261
Refactor
avinassh May 31, 2024
601d1a2
do not use mutex for clinet
avinassh Jun 1, 2024
a642809
Add storage addr configurable
avinassh Jun 1, 2024
ef8b4bf
Update trait to take non mutable ref
avinassh Jun 1, 2024
1d5355a
cargo fix
avinassh Jun 1, 2024
5e4218a
Merge branch 'main' into durable-wal
avinassh Jun 3, 2024
91549fb
Merge branch 'main' into durable-wal
avinassh Jun 6, 2024
38cb30f
send max frame num
avinassh Jun 6, 2024
341e8bf
Merge branch 'main' into durable-wal
avinassh Jun 8, 2024
66fc5d8
set max_frame_size in begin_tx
avinassh Jun 8, 2024
ed31b45
send mx frame in insert req
avinassh Jun 8, 2024
9ad3d2d
Update proto to send `max_frame_no` while inserting frames
avinassh Jun 8, 2024
d8ae21f
Updates storage trait's insert_frames definition
avinassh Jun 8, 2024
a821b66
remove insert_frames
avinassh Jun 8, 2024
f9a875a
Merge branch 'ss-proto-updates' into durable-wal
avinassh Jun 8, 2024
097d4d0
Use `insert_frames` method
avinassh Jun 8, 2024
5b7e287
cleanup: remove `insert_frame` method and impl
avinassh Jun 8, 2024
66854f4
Remove `FrameData`, use `Frame` from proto instead
avinassh Jun 8, 2024
9c2d328
Merge branch 'ss-proto-updates' into durable-wal
avinassh Jun 8, 2024
6daa25d
Merge branch 'main' into durable-wal
avinassh Jun 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ members = [
"bindings/wasm",
"libsql-sys",
"libsql-server",
"libsql-storage",
"libsql-storage-server",
"bottomless",
"bottomless-cli",
"libsql-replication",
Expand Down
7 changes: 4 additions & 3 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bottomless = { version = "0", path = "../bottomless", features = ["libsql_linked
bytes = { version = "1.2.1", features = ["serde"] }
bytesize = { version = "1.2.0", features = ["serde"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.0.23", features = [ "derive", "env", "string" ] }
clap = { version = "4.0.23", features = ["derive", "env", "string"] }
console-subscriber = { git = "https://github.com/tokio-rs/console.git", rev = "5a80b98", optional = true }
crc = "3.0.0"
enclose = "1.1"
Expand Down Expand Up @@ -63,7 +63,8 @@ sha2 = "0.10"
sha256 = "1.1.3"
libsql-sys = { path = "../libsql-sys", features = ["wal"], default-features = false }
libsql-hrana = { path = "../libsql-hrana" }
sqlite3-parser = { package = "libsql-sqlite3-parser", path = "../vendored/sqlite3-parser", version = "0.11.0", default-features = false, features = [ "YYNOERRORRECOVERY" ] }
libsql-storage = { path = "../libsql-storage" }
sqlite3-parser = { package = "libsql-sqlite3-parser", path = "../vendored/sqlite3-parser", version = "0.11.0", default-features = false, features = ["YYNOERRORRECOVERY"] }
tempfile = "3.7.0"
thiserror = "1.0.38"
tokio = { version = "1.22.2", features = ["rt-multi-thread", "net", "io-std", "io-util", "time", "macros", "sync", "fs", "signal"] }
Expand Down Expand Up @@ -94,7 +95,7 @@ aws-sdk-s3 = "0.28"
env_logger = "0.10"
hyper = { version = "0.14", features = ["client"] }
insta = { version = "1.26.0", features = ["json"] }
libsql = { path = "../libsql/"}
libsql = { path = "../libsql/" }
libsql-client = { version = "0.6.5", default-features = false, features = ["reqwest_backend"] }
proptest = "1.0.0"
rand = "0.8.5"
Expand Down
4 changes: 3 additions & 1 deletion libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use std::time::{Duration, Instant};
use crossbeam::deque::Steal;
use crossbeam::sync::{Parker, Unparker};
use hashbrown::HashMap;
use libsql_storage::DurableWal as Sqlite3Wal;
use libsql_storage::DurableWalManager as Sqlite3WalManager;
use libsql_sys::wal::either::Either;
use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Sqlite3WalManager, Wal};
use libsql_sys::wal::{CheckpointMode, Wal};
use libsql_wal::io::StdIO;
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
use metrics::atomics::AtomicU64;
Expand Down
7 changes: 7 additions & 0 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use libsql_storage::{DurableWalManager, LockManager};
use libsql_sys::wal::wrapper::{WrapWal, WrappedWal};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager};
use libsql_sys::EncryptionConfig;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct MakeLibSqlConn<W> {
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
lock_manager: Arc<std::sync::Mutex<LockManager>>,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

Expand All @@ -70,6 +72,7 @@ where
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
lock_manager: Arc<std::sync::Mutex<LockManager>>,
) -> Result<Self> {
let txn_timeout = config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT);

Expand All @@ -88,6 +91,7 @@ where
block_writes,
resolve_attach_path,
connection_manager: ConnectionManager::new(txn_timeout),
lock_manager,
make_wal_manager,
};

Expand Down Expand Up @@ -144,6 +148,7 @@ where
self.block_writes.clone(),
self.resolve_attach_path.clone(),
self.connection_manager.clone(),
self.lock_manager.clone(),
self.make_wal_manager.clone(),
)
.await
Expand Down Expand Up @@ -318,13 +323,15 @@ where
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
connection_manager: ConnectionManager,
lock_manager: Arc<std::sync::Mutex<LockManager>>,
make_wal: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
) -> crate::Result<Self> {
let (conn, id) = tokio::task::spawn_blocking({
let connection_manager = connection_manager.clone();
move || -> crate::Result<_> {
let manager = ManagedConnectionWalWrapper::new(connection_manager);
let id = manager.id();

let wal = make_wal().wrap(manager).wrap(wal_wrapper);

let conn = Connection::new(
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod program;
pub mod write_proxy;

#[cfg(not(test))]
const TXN_TIMEOUT: Duration = Duration::from_secs(5);
const TXN_TIMEOUT: Duration = Duration::from_secs(500);
#[cfg(test)]
const TXN_TIMEOUT: Duration = Duration::from_millis(100);

Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use libsql_replication::rpc::proxy::proxy_client::ProxyClient;
use libsql_replication::rpc::proxy::{
exec_req, exec_resp, ExecReq, ExecResp, StreamDescribeReq, StreamProgramReq,
};
use libsql_storage::LockManager;
use libsql_sys::wal::wrapper::PassthroughWalWrapper;
use libsql_sys::EncryptionConfig;
use parking_lot::Mutex as PMutex;
Expand Down Expand Up @@ -78,6 +79,7 @@ impl MakeWriteProxyConn {
Arc::new(AtomicBool::new(false)), // this is always false for write proxy
resolve_attach_path,
make_wal_manager,
Arc::new(std::sync::Mutex::new(LockManager::new())),
)
.await?;

Expand Down
19 changes: 16 additions & 3 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};

use crate::connection::{Connection, MakeConnection};
use crate::database::DatabaseKind;
Expand Down Expand Up @@ -31,8 +31,8 @@ use futures::Future;
use http::user::UserApi;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use libsql_storage::{DurableWalManager as Sqlite3WalManager, DurableWalManager};
use libsql_sys::wal::either::Either;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_wal::registry::WalRegistry;
use libsql_wal::wal::LibsqlWalManager;
use namespace::meta_store::MetaStoreHandle;
Expand Down Expand Up @@ -61,6 +61,7 @@ pub mod rpc;
pub mod version;

pub use hrana::proto as hrana_proto;
use libsql_storage::LockManager;

mod database;
mod error;
Expand Down Expand Up @@ -444,6 +445,7 @@ where
channel: channel.clone(),
uri: uri.clone(),
migration_scheduler: scheduler_sender.into(),
lock_manager: Arc::new(Mutex::new(LockManager::new())),
make_wal_manager,
};

Expand Down Expand Up @@ -684,8 +686,19 @@ where
Ok((Arc::new(move || Either::B(wal.clone())), shutdown_fut))
} else {
tracing::info!("using sqlite3 wal");
// // connect to external storage server
// // export LIBSQL_STORAGE_SERVER_ADDR=http://libsql-storage-server.internal:5002
let address = std::env::var("LIBSQL_STORAGE_SERVER_ADDR")
.unwrap_or("http://127.0.0.1:5002".to_string());
let lock_manager = Arc::new(std::sync::Mutex::new(LockManager::new()));
// let durable_wal = Sqlite3WalManager::new(lock_manager, address);
Ok((
Arc::new(|| Either::A(Sqlite3WalManager::default())),
Arc::new(move || {
Either::A(Sqlite3WalManager::new(
lock_manager.clone(),
address.clone(),
))
}),
Box::pin(ready(Ok(()))),
))
}
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{
};

pub use fork::ForkError;
use libsql_storage::LockManager;

use self::fork::{ForkTask, PointInTimeRestore};
use self::meta_store::MetaStoreHandle;
Expand Down Expand Up @@ -372,6 +373,7 @@ impl Namespace {
block_writes,
resolve_attach_path,
ns_config.make_wal_manager.clone(),
ns_config.lock_manager.clone(),
)
.await?
.throttled(
Expand Down Expand Up @@ -759,6 +761,8 @@ pub struct NamespaceConfig {
pub(crate) bottomless_replication: Option<bottomless::replicator::Options>,
pub(crate) scripted_backup: Option<ScriptBackupManager>,
pub(crate) migration_scheduler: SchedulerHandle,

pub(crate) lock_manager: Arc<std::sync::Mutex<LockManager>>,
pub(crate) make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
}

Expand Down
20 changes: 20 additions & 0 deletions libsql-storage-server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# run this from the root
# docker build -f libsql-storage-server/Dockerfile -t lss .
# docker run -p lss

FROM rust:latest as builder

WORKDIR /app
COPY . .

RUN --mount=type=cache,target=/usr/local/cargo,from=rust:latest,source=/usr/local/cargo \
--mount=type=cache,target=target \
cargo build --bin libsql-storage-server && mv ./target/debug/libsql-storage-server ./libsql-storage-server && chmod +x ./libsql-storage-server


FROM debian:bookworm-slim
EXPOSE 5002

COPY --from=builder /app/libsql-storage-server /

CMD ["/libsql-storage-server"]
18 changes: 18 additions & 0 deletions libsql-storage-server/fly.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# fly deploy -c libsql-storage-server/fly.toml --dockerfile libsql-storage-server/Dockerfile

app = 'libsql-storage-server'
primary_region = 'ams'

[build]

[[services]]
http_checks = []
internal_port = 5002
processes = ["app"]
protocol = "tcp"
script_checks = []
[[services.ports]]
port = 5002

[[vm]]
size = 'shared-cpu-1x'
34 changes: 30 additions & 4 deletions libsql-storage-server/src/fdb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use foundationdb::api::NetworkAutoStop;
use foundationdb::tuple::pack;
use foundationdb::tuple::unpack;
use foundationdb::Transaction;
use foundationdb::{KeySelector, Transaction};
use libsql_storage::rpc::Frame;
use tracing::error;

Expand All @@ -13,6 +13,7 @@ pub struct FDBFrameStore {

impl FDBFrameStore {
pub fn new() -> Self {
println!("I was called");
let _network = unsafe { foundationdb::boot() };
Self { _network }
}
Expand Down Expand Up @@ -43,10 +44,12 @@ impl FDBFrameStore {
let frame_data_key = format!("{}/f/{}/f", namespace, frame_no);
let frame_page_key = format!("{}/f/{}/p", namespace, frame_no);
let page_key = format!("{}/p/{}", namespace, frame.page_no);
let page_frame_idx = format!("{}/pf/{}/{}", namespace, frame.page_no, frame_no);

txn.set(&frame_data_key.as_bytes(), &frame.data);
txn.set(&frame_page_key.as_bytes(), &pack(&frame.page_no));
txn.set(&page_key.as_bytes(), &pack(&frame_no));
txn.set(&page_frame_idx.as_bytes(), &pack(&frame_no));
}
}

Expand Down Expand Up @@ -79,12 +82,34 @@ impl FrameStore for FDBFrameStore {
None
}

async fn find_frame(&self, namespace: &str, page_no: u32) -> Option<u64> {
let page_key = format!("{}/p/{}", namespace, page_no);

async fn find_frame(&self, namespace: &str, page_no: u32, max_frame_no: u64) -> Option<u64> {
let db = foundationdb::Database::default().unwrap();
let txn = db.create_trx().expect("unable to create transaction");

let page_key = format!("{}/pf/{}/{}", namespace, page_no, max_frame_no);
let result = txn
.get(
KeySelector::last_less_or_equal(page_key.as_bytes()).key(),
false,
)
.await;
// if let Err(e) = result {
// error!("get failed: {:?}", e);
// return None;
// }
// if let Ok(None) = result {
// error!("page not found (with max)");
// return None;
// }
if let Ok(result) = result {
if let Some(frame_no) = result {
let frame_no: u64 = unpack(&frame_no).expect("failed to decode u64");
tracing::trace!("got the frame_no = {} (with max)", frame_no);
};
};

let page_key = format!("{}/p/{}", namespace, page_no);

let result = txn.get(&page_key.as_bytes(), false).await;
if let Err(e) = result {
error!("get failed: {:?}", e);
Expand All @@ -95,6 +120,7 @@ impl FrameStore for FDBFrameStore {
return None;
}
let frame_no: u64 = unpack(&result.unwrap().unwrap()).expect("failed to decode u64");
tracing::trace!("got the frame_no = {} (without max)", frame_no);
Some(frame_no)
}

Expand Down
1 change: 1 addition & 0 deletions libsql-storage-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
enum StorageType {
InMemory,
Redis,
#[cfg(feature = "foundation-db")]
FoundationDB,
}

Expand Down
2 changes: 1 addition & 1 deletion libsql-storage-server/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl FrameStore for InMemFrameStore {
}

// given a page number, return the maximum frame for the page
async fn find_frame(&self, _namespace: &str, page_no: u32) -> Option<u64> {
async fn find_frame(&self, _namespace: &str, page_no: u32, _max_frame_no: u64) -> Option<u64> {
self.inner
.lock()
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion libsql-storage-server/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl FrameStore for RedisFrameStore {
}
}

async fn find_frame(&self, namespace: &str, page_no: u32) -> Option<u64> {
async fn find_frame(&self, namespace: &str, page_no: u32, _max_frame_no: u64) -> Option<u64> {
let page_key = format!("p/{}/{}", namespace, page_no);
let mut con = self.client.get_connection().unwrap();
let frame_no = con.get::<String, u64>(page_key.clone());
Expand Down
6 changes: 5 additions & 1 deletion libsql-storage-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ impl Storage for Service {
let page_no = request.page_no;
let namespace = request.namespace;
trace!("find_frame(page_no={})", page_no);
if let Some(frame_no) = self.store.find_frame(&namespace, page_no).await {
if let Some(frame_no) = self
.store
.find_frame(&namespace, page_no, request.max_frame_no)
.await
{
Ok(Response::new(rpc::FindFrameResponse {
frame_no: Some(frame_no),
}))
Expand Down
2 changes: 1 addition & 1 deletion libsql-storage-server/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use libsql_storage::rpc::Frame;
pub trait FrameStore: Send + Sync {
async fn insert_frames(&self, namespace: &str, max_frame_no: u64, frames: Vec<Frame>) -> u64;
async fn read_frame(&self, namespace: &str, frame_no: u64) -> Option<bytes::Bytes>;
async fn find_frame(&self, namespace: &str, page_no: u32) -> Option<u64>;
async fn find_frame(&self, namespace: &str, page_no: u32, max_frame_no: u64) -> Option<u64>;
async fn frame_page_no(&self, namespace: &str, frame_no: u64) -> Option<u32>;
async fn frames_in_wal(&self, namespace: &str) -> u64;
async fn destroy(&self, namespace: &str);
Expand Down
Loading