Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
runs-on: ubuntu-latest
name: Run Checks
env:
RUST_BACKTRACE: 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use full?

RUSTFLAGS: -D warnings --cfg tokio_unstable
steps:
- uses: hecrj/setup-rust-action@v2
Expand Down Expand Up @@ -179,6 +180,7 @@ jobs:
runs-on: ubuntu-latest
name: Run Tests
env:
RUST_BACKTRACE: 1
RUSTFLAGS: -D warnings --cfg tokio_unstable
steps:
- uses: hecrj/setup-rust-action@v2
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/tests/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ async fn insert_rows(conn: &Connection, start: u32, count: u32) -> libsql::Resul

async fn insert_rows_with_args(conn: &Connection, start: u32, count: u32) -> libsql::Result<()> {
for i in start..(start + count) {
let mut stmt = conn.prepare("INSERT INTO test(a, b) VALUES(?,?)").await?;
let stmt = conn.prepare("INSERT INTO test(a, b) VALUES(?,?)").await?;
stmt.execute(params![i, i]).await?;
}
Ok(())
Expand Down
12 changes: 6 additions & 6 deletions libsql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("in-memory-select-1-prepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -113,7 +113,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("in-memory-select-star-from-users-limit-1-unprepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -128,7 +128,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 100")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down Expand Up @@ -156,7 +156,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("local-replica-select-1-prepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down Expand Up @@ -188,7 +188,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -204,7 +204,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 100")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/deserialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (name, age, vision, avatar) VALUES (?1, ?2, ?3, ?4)")
.await
.unwrap();
stmt.execute(("Ferris the Crab", 8, -6.5, vec![1, 2, 3]))
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE name = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/example_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/flutter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl Connection {
/// For more info on how to pass params check [`IntoParams`]'s docs and on how to
/// extract values out of the rows check the [`Rows`] docs.
pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
let mut stmt = self.prepare(sql).await?;
let stmt = self.prepare(sql).await?;

stmt.query(params).await
}
Expand Down
10 changes: 5 additions & 5 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,25 +237,25 @@ impl Conn for HttpConnection<HttpSender> {
impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
fn finalize(&mut self) {}

async fn execute(&mut self, params: &Params) -> crate::Result<usize> {
async fn execute(&self, params: &Params) -> crate::Result<usize> {
self.execute(params).await
}

async fn query(&mut self, params: &Params) -> crate::Result<Rows> {
async fn query(&self, params: &Params) -> crate::Result<Rows> {
self.query(params).await
}

async fn run(&mut self, params: &Params) -> crate::Result<()> {
async fn run(&self, params: &Params) -> crate::Result<()> {
self.run(params).await
}

fn interrupt(&mut self) -> crate::Result<()> {
fn interrupt(&self) -> crate::Result<()> {
Err(crate::Error::Misuse(
"interrupt is not supported for remote connections".to_string(),
))
}

fn reset(&mut self) {}
fn reset(&self) {}

fn parameter_count(&self) -> usize {
let stmt = &self.inner;
Expand Down
8 changes: 4 additions & 4 deletions libsql/src/hrana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ where
}
}

pub async fn execute(&mut self, params: &Params) -> crate::Result<usize> {
pub async fn execute(&self, params: &Params) -> crate::Result<usize> {
let mut stmt = self.inner.clone();
bind_params(params.clone(), &mut stmt);

let result = self.stream.execute_inner(stmt, self.close_stream).await?;
Ok(result.affected_row_count as usize)
}

pub async fn run(&mut self, params: &Params) -> crate::Result<()> {
pub async fn run(&self, params: &Params) -> crate::Result<()> {
let mut stmt = self.inner.clone();
bind_params(params.clone(), &mut stmt);

Expand All @@ -179,7 +179,7 @@ where
}

pub(crate) async fn query_raw(
&mut self,
&self,
params: &Params,
) -> crate::Result<HranaRows<T::Stream, T>> {
let mut stmt = self.inner.clone();
Expand All @@ -197,7 +197,7 @@ where
T: HttpSend + Send + Sync + 'static,
<T as HttpSend>::Stream: Send + Sync + 'static,
{
pub async fn query(&mut self, params: &Params) -> crate::Result<super::Rows> {
pub async fn query(&self, params: &Params) -> crate::Result<super::Rows> {
let rows = self.query_raw(params).await?;
Ok(super::Rows::new(rows))
}
Expand Down
35 changes: 19 additions & 16 deletions libsql/src/hrana/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use libsql_hrana::proto::{
GetAutocommitStreamReq, PipelineReqBody, PipelineRespBody, SequenceStreamReq,
StoreSqlStreamReq, StreamRequest, StreamResponse, StreamResult,
};
use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -66,8 +67,8 @@ where
pipeline_url,
cursor_url,
auth_token,
sql_id_generator: 0,
baton: None,
sql_id_generator: RefCell::new(0),
baton: RefCell::new(None),
}),
}),
}
Expand All @@ -77,7 +78,7 @@ where
/// Returns true if request was finalized correctly, false if stream was already closed.
pub(super) async fn finalize(&mut self, req: StreamRequest) -> Result<bool> {
let mut client = self.inner.stream.lock().await;
if client.baton.is_none() {
if client.baton.borrow().is_none() {
tracing::trace!("baton not found - skipping finalize for {:?}", req);
return Ok(false);
}
Expand Down Expand Up @@ -298,11 +299,11 @@ where
T: HttpSend,
{
client: T,
baton: Option<String>,
baton: RefCell<Option<String>>,
pipeline_url: Arc<str>,
cursor_url: Arc<str>,
auth_token: Arc<str>,
sql_id_generator: SqlId,
sql_id_generator: RefCell<SqlId>,
}

impl<T> RawStream<T>
Expand All @@ -316,7 +317,7 @@ where

pub async fn open_cursor(&mut self, batch: Batch) -> Result<Cursor<T::Stream>> {
let msg = CursorReq {
baton: self.baton.clone(),
baton: self.baton.borrow().clone(),
batch,
};
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
Expand All @@ -336,7 +337,7 @@ where
} // stream has been closed by the server
Some(baton) => {
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
self.baton = Some(baton)
*self.baton.borrow_mut() = Some(baton)
}
}
Ok(cursor)
Expand All @@ -349,11 +350,11 @@ where
tracing::trace!(
"client stream sending {} requests with baton `{}`: {:?}",
N,
self.baton.as_deref().unwrap_or_default(),
self.baton.borrow().as_deref().unwrap_or_default(),
requests
);
let msg = PipelineReqBody {
baton: self.baton.clone(),
baton: self.baton.borrow().clone(),
requests: Vec::from(requests),
};
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
Expand All @@ -375,7 +376,7 @@ where
} // stream has been closed by the server
Some(baton) => {
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
self.baton = Some(baton)
*self.baton.borrow_mut() = Some(baton)
}
}

Expand Down Expand Up @@ -424,16 +425,17 @@ where
Ok((resp, is_autocommit))
}

fn reset(&mut self) {
if let Some(baton) = self.baton.take() {
fn reset(&self) {
if let Some(baton) = self.baton.borrow_mut().take() {
tracing::trace!("closing client stream (baton: `{}`)", baton);
}
self.sql_id_generator = 0;
*self.sql_id_generator.borrow_mut() = 0;
}

fn next_sql_id(&mut self) -> SqlId {
self.sql_id_generator = self.sql_id_generator.wrapping_add(1);
self.sql_id_generator
let mut gen = self.sql_id_generator.borrow_mut();
*gen = gen.wrapping_add(1);
*gen
}
}

Expand All @@ -443,7 +445,8 @@ where
T: HttpSend,
{
fn drop(&mut self) {
if let Some(baton) = self.baton.take() {
let baton = self.baton.get_mut().take();
if let Some(baton) = baton {
// only send a close request if stream was ever used to send the data
tracing::trace!("closing client stream (baton: `{}`)", baton);
let req = serde_json::to_string(&PipelineReqBody {
Expand Down
10 changes: 5 additions & 5 deletions libsql/src/local/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,32 @@ impl Stmt for LibsqlStmt {
self.0.finalize();
}

async fn execute(&mut self, params: &Params) -> Result<usize> {
async fn execute(&self, params: &Params) -> Result<usize> {
let params = params.clone();
let stmt = self.0.clone();

stmt.execute(&params).map(|i| i as usize)
}

async fn query(&mut self, params: &Params) -> Result<Rows> {
async fn query(&self, params: &Params) -> Result<Rows> {
let params = params.clone();
let stmt = self.0.clone();

stmt.query(&params).map(LibsqlRows).map(Rows::new)
}

async fn run(&mut self, params: &Params) -> Result<()> {
async fn run(&self, params: &Params) -> Result<()> {
let params = params.clone();
let stmt = self.0.clone();

stmt.run(&params)
}

fn interrupt(&mut self) -> Result<()> {
fn interrupt(&self) -> Result<()> {
self.0.interrupt()
}

fn reset(&mut self) {
fn reset(&self) {
self.0.reset();
}

Expand Down
Loading
Loading