diff --git a/Cargo.lock b/Cargo.lock index f7e134dbdb..f9ab458ed4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3470,6 +3470,7 @@ dependencies = [ "libsql-hrana", "libsql-rusqlite", "libsql-sqlite3-parser", + "libsql-storage", "libsql-sys", "libsql-wal", "libsql_replication", diff --git a/Cargo.toml b/Cargo.toml index 79a2798af1..a45948e6be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,8 @@ members = [ "bindings/wasm", "libsql-sys", "libsql-server", + "libsql-storage", + "libsql-storage-server", "bottomless", "bottomless-cli", "libsql-replication", diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index cf3cbf527a..a3cb953c71 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -22,7 +22,7 @@ bottomless = { version = "0", path = "../bottomless", features = ["libsql_linked bytes = { version = "1.2.1", features = ["serde"] } bytesize = { version = "1.2.0", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] } -clap = { version = "4.0.23", features = [ "derive", "env", "string" ] } +clap = { version = "4.0.23", features = ["derive", "env", "string"] } console-subscriber = { git = "https://github.com/tokio-rs/console.git", rev = "5a80b98", optional = true } crc = "3.0.0" enclose = "1.1" @@ -63,7 +63,8 @@ sha2 = "0.10" sha256 = "1.1.3" libsql-sys = { path = "../libsql-sys", features = ["wal"], default-features = false } libsql-hrana = { path = "../libsql-hrana" } -sqlite3-parser = { package = "libsql-sqlite3-parser", path = "../vendored/sqlite3-parser", version = "0.11.0", default-features = false, features = [ "YYNOERRORRECOVERY" ] } +libsql-storage = { path = "../libsql-storage" } +sqlite3-parser = { package = "libsql-sqlite3-parser", path = "../vendored/sqlite3-parser", version = "0.11.0", default-features = false, features = ["YYNOERRORRECOVERY"] } tempfile = "3.7.0" thiserror = "1.0.38" tokio = { version = "1.22.2", features = ["rt-multi-thread", "net", "io-std", "io-util", "time", "macros", "sync", "fs", "signal"] } @@ -94,7 +95,7 @@ aws-sdk-s3 = "0.28" env_logger = "0.10" hyper = { version = "0.14", features = ["client"] } insta = { version = "1.26.0", features = ["json"] } -libsql = { path = "../libsql/"} +libsql = { path = "../libsql/" } libsql-client = { version = "0.6.5", default-features = false, features = ["reqwest_backend"] } proptest = "1.0.0" rand = "0.8.5" diff --git a/libsql-server/src/connection/connection_manager.rs b/libsql-server/src/connection/connection_manager.rs index ed7ca5aef4..2b13eb7b28 100644 --- a/libsql-server/src/connection/connection_manager.rs +++ b/libsql-server/src/connection/connection_manager.rs @@ -6,9 +6,11 @@ use std::time::{Duration, Instant}; use crossbeam::deque::Steal; use crossbeam::sync::{Parker, Unparker}; use hashbrown::HashMap; +use libsql_storage::DurableWal as Sqlite3Wal; +use libsql_storage::DurableWalManager as Sqlite3WalManager; use libsql_sys::wal::either::Either; use libsql_sys::wal::wrapper::{WrapWal, WrappedWal}; -use libsql_sys::wal::{CheckpointMode, Sqlite3Wal, Sqlite3WalManager, Wal}; +use libsql_sys::wal::{CheckpointMode, Wal}; use libsql_wal::io::StdIO; use libsql_wal::wal::{LibsqlWal, LibsqlWalManager}; use metrics::atomics::AtomicU64; diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index cb5b6f40a0..83c690e7ee 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -4,6 +4,7 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use libsql_storage::{DurableWalManager, LockManager}; use libsql_sys::wal::wrapper::{WrapWal, WrappedWal}; use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager}; use libsql_sys::EncryptionConfig; @@ -48,6 +49,7 @@ pub struct MakeLibSqlConn { encryption_config: Option, block_writes: Arc, resolve_attach_path: ResolveNamespacePathFn, + lock_manager: Arc>, make_wal_manager: Arc InnerWalManager + Sync + Send + 'static>, } @@ -70,6 +72,7 @@ where block_writes: Arc, resolve_attach_path: ResolveNamespacePathFn, make_wal_manager: Arc InnerWalManager + Sync + Send + 'static>, + lock_manager: Arc>, ) -> Result { let txn_timeout = config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT); @@ -88,6 +91,7 @@ where block_writes, resolve_attach_path, connection_manager: ConnectionManager::new(txn_timeout), + lock_manager, make_wal_manager, }; @@ -144,6 +148,7 @@ where self.block_writes.clone(), self.resolve_attach_path.clone(), self.connection_manager.clone(), + self.lock_manager.clone(), self.make_wal_manager.clone(), ) .await @@ -318,6 +323,7 @@ where block_writes: Arc, resolve_attach_path: ResolveNamespacePathFn, connection_manager: ConnectionManager, + lock_manager: Arc>, make_wal: Arc InnerWalManager + Sync + Send + 'static>, ) -> crate::Result { let (conn, id) = tokio::task::spawn_blocking({ @@ -325,6 +331,7 @@ where move || -> crate::Result<_> { let manager = ManagedConnectionWalWrapper::new(connection_manager); let id = manager.id(); + let wal = make_wal().wrap(manager).wrap(wal_wrapper); let conn = Connection::new( diff --git a/libsql-server/src/connection/mod.rs b/libsql-server/src/connection/mod.rs index e554130a80..6d786dae3e 100644 --- a/libsql-server/src/connection/mod.rs +++ b/libsql-server/src/connection/mod.rs @@ -31,7 +31,7 @@ pub mod program; pub mod write_proxy; #[cfg(not(test))] -const TXN_TIMEOUT: Duration = Duration::from_secs(5); +const TXN_TIMEOUT: Duration = Duration::from_secs(500); #[cfg(test)] const TXN_TIMEOUT: Duration = Duration::from_millis(100); diff --git a/libsql-server/src/connection/write_proxy.rs b/libsql-server/src/connection/write_proxy.rs index 4058bae973..eec77a950f 100644 --- a/libsql-server/src/connection/write_proxy.rs +++ b/libsql-server/src/connection/write_proxy.rs @@ -8,6 +8,7 @@ use libsql_replication::rpc::proxy::proxy_client::ProxyClient; use libsql_replication::rpc::proxy::{ exec_req, exec_resp, ExecReq, ExecResp, StreamDescribeReq, StreamProgramReq, }; +use libsql_storage::LockManager; use libsql_sys::wal::wrapper::PassthroughWalWrapper; use libsql_sys::EncryptionConfig; use parking_lot::Mutex as PMutex; @@ -78,6 +79,7 @@ impl MakeWriteProxyConn { Arc::new(AtomicBool::new(false)), // this is always false for write proxy resolve_attach_path, make_wal_manager, + Arc::new(std::sync::Mutex::new(LockManager::new())), ) .await?; diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 04f9d5cb61..40e596a92e 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Mutex, Weak}; use crate::connection::{Connection, MakeConnection}; use crate::database::DatabaseKind; @@ -31,8 +31,8 @@ use futures::Future; use http::user::UserApi; use hyper::client::HttpConnector; use hyper_rustls::HttpsConnector; +use libsql_storage::{DurableWalManager as Sqlite3WalManager, DurableWalManager}; use libsql_sys::wal::either::Either; -use libsql_sys::wal::Sqlite3WalManager; use libsql_wal::registry::WalRegistry; use libsql_wal::wal::LibsqlWalManager; use namespace::meta_store::MetaStoreHandle; @@ -61,6 +61,7 @@ pub mod rpc; pub mod version; pub use hrana::proto as hrana_proto; +use libsql_storage::LockManager; mod database; mod error; @@ -444,6 +445,7 @@ where channel: channel.clone(), uri: uri.clone(), migration_scheduler: scheduler_sender.into(), + lock_manager: Arc::new(Mutex::new(LockManager::new())), make_wal_manager, }; @@ -684,8 +686,19 @@ where Ok((Arc::new(move || Either::B(wal.clone())), shutdown_fut)) } else { tracing::info!("using sqlite3 wal"); + // // connect to external storage server + // // export LIBSQL_STORAGE_SERVER_ADDR=http://libsql-storage-server.internal:5002 + let address = std::env::var("LIBSQL_STORAGE_SERVER_ADDR") + .unwrap_or("http://127.0.0.1:5002".to_string()); + let lock_manager = Arc::new(std::sync::Mutex::new(LockManager::new())); + // let durable_wal = Sqlite3WalManager::new(lock_manager, address); Ok(( - Arc::new(|| Either::A(Sqlite3WalManager::default())), + Arc::new(move || { + Either::A(Sqlite3WalManager::new( + lock_manager.clone(), + address.clone(), + )) + }), Box::pin(ready(Ok(()))), )) } diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index d379884837..7231c5a3f9 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -48,6 +48,7 @@ use crate::{ }; pub use fork::ForkError; +use libsql_storage::LockManager; use self::fork::{ForkTask, PointInTimeRestore}; use self::meta_store::MetaStoreHandle; @@ -372,6 +373,7 @@ impl Namespace { block_writes, resolve_attach_path, ns_config.make_wal_manager.clone(), + ns_config.lock_manager.clone(), ) .await? .throttled( @@ -759,6 +761,8 @@ pub struct NamespaceConfig { pub(crate) bottomless_replication: Option, pub(crate) scripted_backup: Option, pub(crate) migration_scheduler: SchedulerHandle, + + pub(crate) lock_manager: Arc>, pub(crate) make_wal_manager: Arc InnerWalManager + Sync + Send + 'static>, } diff --git a/libsql-storage-server/Dockerfile b/libsql-storage-server/Dockerfile new file mode 100644 index 0000000000..654aca40c1 --- /dev/null +++ b/libsql-storage-server/Dockerfile @@ -0,0 +1,20 @@ +# run this from the root +# docker build -f libsql-storage-server/Dockerfile -t lss . +# docker run -p lss + +FROM rust:latest as builder + +WORKDIR /app +COPY . . + +RUN --mount=type=cache,target=/usr/local/cargo,from=rust:latest,source=/usr/local/cargo \ + --mount=type=cache,target=target \ + cargo build --bin libsql-storage-server && mv ./target/debug/libsql-storage-server ./libsql-storage-server && chmod +x ./libsql-storage-server + + +FROM debian:bookworm-slim +EXPOSE 5002 + +COPY --from=builder /app/libsql-storage-server / + +CMD ["/libsql-storage-server"] \ No newline at end of file diff --git a/libsql-storage-server/fly.toml b/libsql-storage-server/fly.toml new file mode 100644 index 0000000000..0790cd4a81 --- /dev/null +++ b/libsql-storage-server/fly.toml @@ -0,0 +1,18 @@ +# fly deploy -c libsql-storage-server/fly.toml --dockerfile libsql-storage-server/Dockerfile + +app = 'libsql-storage-server' +primary_region = 'ams' + +[build] + +[[services]] +http_checks = [] +internal_port = 5002 +processes = ["app"] +protocol = "tcp" +script_checks = [] +[[services.ports]] +port = 5002 + +[[vm]] +size = 'shared-cpu-1x' diff --git a/libsql-storage-server/src/fdb_store.rs b/libsql-storage-server/src/fdb_store.rs index 8352a808f8..2d118776ba 100644 --- a/libsql-storage-server/src/fdb_store.rs +++ b/libsql-storage-server/src/fdb_store.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use foundationdb::api::NetworkAutoStop; use foundationdb::tuple::pack; use foundationdb::tuple::unpack; -use foundationdb::Transaction; +use foundationdb::{KeySelector, Transaction}; use libsql_storage::rpc::Frame; use tracing::error; @@ -13,6 +13,7 @@ pub struct FDBFrameStore { impl FDBFrameStore { pub fn new() -> Self { + println!("I was called"); let _network = unsafe { foundationdb::boot() }; Self { _network } } @@ -43,10 +44,12 @@ impl FDBFrameStore { let frame_data_key = format!("{}/f/{}/f", namespace, frame_no); let frame_page_key = format!("{}/f/{}/p", namespace, frame_no); let page_key = format!("{}/p/{}", namespace, frame.page_no); + let page_frame_idx = format!("{}/pf/{}/{}", namespace, frame.page_no, frame_no); txn.set(&frame_data_key.as_bytes(), &frame.data); txn.set(&frame_page_key.as_bytes(), &pack(&frame.page_no)); txn.set(&page_key.as_bytes(), &pack(&frame_no)); + txn.set(&page_frame_idx.as_bytes(), &pack(&frame_no)); } } @@ -79,12 +82,34 @@ impl FrameStore for FDBFrameStore { None } - async fn find_frame(&self, namespace: &str, page_no: u32) -> Option { - let page_key = format!("{}/p/{}", namespace, page_no); - + async fn find_frame(&self, namespace: &str, page_no: u32, max_frame_no: u64) -> Option { let db = foundationdb::Database::default().unwrap(); let txn = db.create_trx().expect("unable to create transaction"); + let page_key = format!("{}/pf/{}/{}", namespace, page_no, max_frame_no); + let result = txn + .get( + KeySelector::last_less_or_equal(page_key.as_bytes()).key(), + false, + ) + .await; + // if let Err(e) = result { + // error!("get failed: {:?}", e); + // return None; + // } + // if let Ok(None) = result { + // error!("page not found (with max)"); + // return None; + // } + if let Ok(result) = result { + if let Some(frame_no) = result { + let frame_no: u64 = unpack(&frame_no).expect("failed to decode u64"); + tracing::trace!("got the frame_no = {} (with max)", frame_no); + }; + }; + + let page_key = format!("{}/p/{}", namespace, page_no); + let result = txn.get(&page_key.as_bytes(), false).await; if let Err(e) = result { error!("get failed: {:?}", e); @@ -95,6 +120,7 @@ impl FrameStore for FDBFrameStore { return None; } let frame_no: u64 = unpack(&result.unwrap().unwrap()).expect("failed to decode u64"); + tracing::trace!("got the frame_no = {} (without max)", frame_no); Some(frame_no) } diff --git a/libsql-storage-server/src/main.rs b/libsql-storage-server/src/main.rs index 6a22db589f..0e1e1fdc6b 100644 --- a/libsql-storage-server/src/main.rs +++ b/libsql-storage-server/src/main.rs @@ -22,6 +22,7 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; enum StorageType { InMemory, Redis, + #[cfg(feature = "foundation-db")] FoundationDB, } diff --git a/libsql-storage-server/src/memory_store.rs b/libsql-storage-server/src/memory_store.rs index 38de84aa60..24bbcc305d 100644 --- a/libsql-storage-server/src/memory_store.rs +++ b/libsql-storage-server/src/memory_store.rs @@ -56,7 +56,7 @@ impl FrameStore for InMemFrameStore { } // given a page number, return the maximum frame for the page - async fn find_frame(&self, _namespace: &str, page_no: u32) -> Option { + async fn find_frame(&self, _namespace: &str, page_no: u32, _max_frame_no: u64) -> Option { self.inner .lock() .unwrap() diff --git a/libsql-storage-server/src/redis_store.rs b/libsql-storage-server/src/redis_store.rs index a580ca34b9..2c07dd36b8 100644 --- a/libsql-storage-server/src/redis_store.rs +++ b/libsql-storage-server/src/redis_store.rs @@ -69,7 +69,7 @@ impl FrameStore for RedisFrameStore { } } - async fn find_frame(&self, namespace: &str, page_no: u32) -> Option { + async fn find_frame(&self, namespace: &str, page_no: u32, _max_frame_no: u64) -> Option { let page_key = format!("p/{}/{}", namespace, page_no); let mut con = self.client.get_connection().unwrap(); let frame_no = con.get::(page_key.clone()); diff --git a/libsql-storage-server/src/service.rs b/libsql-storage-server/src/service.rs index 5077e04a96..bbe9bd0888 100644 --- a/libsql-storage-server/src/service.rs +++ b/libsql-storage-server/src/service.rs @@ -50,7 +50,11 @@ impl Storage for Service { let page_no = request.page_no; let namespace = request.namespace; trace!("find_frame(page_no={})", page_no); - if let Some(frame_no) = self.store.find_frame(&namespace, page_no).await { + if let Some(frame_no) = self + .store + .find_frame(&namespace, page_no, request.max_frame_no) + .await + { Ok(Response::new(rpc::FindFrameResponse { frame_no: Some(frame_no), })) diff --git a/libsql-storage-server/src/store.rs b/libsql-storage-server/src/store.rs index f30cd11ee9..f2b395009d 100644 --- a/libsql-storage-server/src/store.rs +++ b/libsql-storage-server/src/store.rs @@ -5,7 +5,7 @@ use libsql_storage::rpc::Frame; pub trait FrameStore: Send + Sync { async fn insert_frames(&self, namespace: &str, max_frame_no: u64, frames: Vec) -> u64; async fn read_frame(&self, namespace: &str, frame_no: u64) -> Option; - async fn find_frame(&self, namespace: &str, page_no: u32) -> Option; + async fn find_frame(&self, namespace: &str, page_no: u32, max_frame_no: u64) -> Option; async fn frame_page_no(&self, namespace: &str, frame_no: u64) -> Option; async fn frames_in_wal(&self, namespace: &str) -> u64; async fn destroy(&self, namespace: &str); diff --git a/libsql-storage/src/lib.rs b/libsql-storage/src/lib.rs index d16e38dea7..8c890463de 100644 --- a/libsql-storage/src/lib.rs +++ b/libsql-storage/src/lib.rs @@ -110,6 +110,7 @@ pub struct DurableWal { frames_cache: SieveCache>, write_cache: BTreeMap, lock_manager: Arc>, + max_frame_no: u64, } impl DurableWal { @@ -129,6 +130,7 @@ impl DurableWal { frames_cache: page_frames, write_cache: BTreeMap::new(), lock_manager, + max_frame_no: 0, } } @@ -141,7 +143,7 @@ impl DurableWal { let req = rpc::FindFrameRequest { namespace: self.namespace.clone(), page_no: page_no.get(), - max_frame_no: 0, + max_frame_no: self.max_frame_no, }; let mut binding = self.client.clone(); let resp = binding.find_frame(req).await.unwrap(); @@ -174,6 +176,12 @@ impl Wal for DurableWal { // - create a read lock // - save the current max_frame_no for this txn // + let rt = tokio::runtime::Handle::current(); + let size = tokio::task::block_in_place(|| rt.block_on(self.frames_count())) + .try_into() + .unwrap(); + trace!("DurableWal::db_size() => {}", size); + self.max_frame_no = size; Ok(true) } @@ -337,7 +345,7 @@ impl Wal for DurableWal { let req = rpc::InsertFramesRequest { namespace: self.namespace.clone(), frames: self.write_cache.values().cloned().collect(), - max_frame_no: 0, + max_frame_no: self.max_frame_no, }; self.write_cache.clear(); let mut binding = self.client.clone();