From 3609ee8b944f70299adb8cd62790a1c1e13a700f Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 06:24:37 +0300 Subject: [PATCH 1/9] update to new version of water_http --- frameworks/Rust/water-http/Cargo.toml | 25 ++- .../Rust/water-http/benchmark_config.json | 48 +++- frameworks/Rust/water-http/src/buf.rs | 104 +++++++++ frameworks/Rust/water-http/src/cached.rs | 2 + frameworks/Rust/water-http/src/date.rs | 32 +++ frameworks/Rust/water-http/src/db.rs | 208 ++++++++++-------- frameworks/Rust/water-http/src/main.rs | 17 +- frameworks/Rust/water-http/src/models.rs | 8 +- frameworks/Rust/water-http/src/server.rs | 205 ++--------------- .../water-http/water-http-unstable.dockerfile | 13 ++ .../water-http/water-http-uring.dockerfile | 13 ++ 11 files changed, 386 insertions(+), 289 deletions(-) create mode 100644 frameworks/Rust/water-http/src/buf.rs create mode 100644 frameworks/Rust/water-http/src/date.rs create mode 100644 frameworks/Rust/water-http/water-http-unstable.dockerfile create mode 100644 frameworks/Rust/water-http/water-http-uring.dockerfile diff --git a/frameworks/Rust/water-http/Cargo.toml b/frameworks/Rust/water-http/Cargo.toml index 2e25535e243..fe2ae421742 100644 --- a/frameworks/Rust/water-http/Cargo.toml +++ b/frameworks/Rust/water-http/Cargo.toml @@ -5,8 +5,11 @@ edition = "2024" [dependencies] askama = "0.14.0" -tokio = { version = "1.47.1", features = ["full"] } -water_http = { features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } +tokio = { version = "1.48.0", features = ["full"] } + +water_http = { features = ["use_only_http1"],optional = true , version = "4.0.1-beta.89",git = "https://github.com/HassanSharara/water_http", rev = "78c9859"} +#water_http_unrealistic = {package = "water_http" ,features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } + smallvec = "1.15.1" nanorand = "0.8.0" tokio-postgres = "0.7.15" @@ -19,6 +22,12 @@ httpdate = "1.0.3" parking_lot = "0.12.5" yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] } itoa = {version = "1.0.15" ,optional = true} +smallbox = "0.8.8" +mimalloc = "0.1.48" +xitca-postgres = "0.4.0" +#chopin-pg = "0.5.18" +water_buffer = {version = "1.2.9",features = ["bytes","unsafe_clone"]} +integer_to_bytes = "0.2.2" [[bin]] @@ -42,4 +51,14 @@ required-features = ["cache"] json_plaintext = ["water_http"] db = ["water_http/thread_shared_struct"] cache = ["water_http/thread_shared_struct","itoa"] -all = ["water_http/thread_shared_struct"] +all = ["water_http","water_http/thread_shared_struct","water_http/cpu_affinity"] +uring = ["water_http","water_http/thread_shared_struct", "water_http/use_io_uring"] + +[profile.release] +opt-level = 3 +codegen-units = 1 +panic = 'abort' +lto = "fat" +debug = false +incremental = false +overflow-checks = false diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json index f0c592a58e6..7902ea99259 100644 --- a/frameworks/Rust/water-http/benchmark_config.json +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -1,17 +1,15 @@ { "framework": "water-http", - "maintainers" : ["HassanSharara"], "tests": [ { - "default": { + "default": { "json_url": "/json", "plaintext_url": "/plaintext", "fortune_url": "/fortunes", "db_url": "/db", "query_url": "/queries?q=", "update_url": "/updates?q=", - "cached_query_url": "/cached-queries?q=", "port": 8080, "approach": "Stripped", "classification": "Fullstack", @@ -25,6 +23,50 @@ "database_os": "Linux", "display_name": "water_http", "tags": [] + }, + + + + "unstable": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [uns]" + }, + + "uring": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [io-uring]" } } ] diff --git a/frameworks/Rust/water-http/src/buf.rs b/frameworks/Rust/water-http/src/buf.rs new file mode 100644 index 00000000000..15a6720e3d3 --- /dev/null +++ b/frameworks/Rust/water-http/src/buf.rs @@ -0,0 +1,104 @@ +use std::cell::RefCell; +use crate::models::Fortune; +// Assuming water_buffer crate exists +// use water_buffer::WaterBuffer as BM; + +// Mocking the type for compilation demo +type WaterBuffer = Vec; + +const INITIAL_VEC_CAPACITY: usize = 128; +const DEFAULT_SIZE: usize = 4048; +const MAX_BUFFER_SIZE: usize = 8192; // Allow some growth before discarding +const MAX_CACHED_BUFFERS: usize = 117; // Set to your test requirement + +thread_local! { + static BUFFER_CACHE: RefCell> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); + static FORTUNE_CACHE: RefCell>> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); +} + +pub struct PooledBuffer { + inner: Option, +} + +pub struct FortunesPool { + inner: Option>, +} + + +impl PooledBuffer { + pub fn new() -> Self { + Self::with_capacity(DEFAULT_SIZE) + } + + pub fn with_capacity(cap: usize) -> Self { + let buf = BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf.clear(); + existing_buf + } else { + // Fallback to new allocation if cache is empty + WaterBuffer::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> WaterBuffer { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn recycle(buf: WaterBuffer) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= MAX_BUFFER_SIZE { + BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < MAX_CACHED_BUFFERS { + cache.push(buf); + } + }); + } + } +} +impl FortunesPool { + pub fn new() -> Self { + Self::with_capacity(16) + } + + pub fn with_capacity(cap: usize) -> Self { + let buf = FORTUNE_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf.clear(); + existing_buf + } else { + // Fallback to new allocation if cache is empty + Vec::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> Vec { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn recycle(buf: WaterBuffer) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= 16 { + BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 2000 { + cache.push(buf); + } + }); + } + } +} + diff --git a/frameworks/Rust/water-http/src/cached.rs b/frameworks/Rust/water-http/src/cached.rs index 2643aca511e..69d7808255c 100644 --- a/frameworks/Rust/water-http/src/cached.rs +++ b/frameworks/Rust/water-http/src/cached.rs @@ -1,4 +1,6 @@ #![allow(static_mut_refs)] + + use std::io; use std::fmt::Arguments; use std::io::Write; diff --git a/frameworks/Rust/water-http/src/date.rs b/frameworks/Rust/water-http/src/date.rs new file mode 100644 index 00000000000..0896f479733 --- /dev/null +++ b/frameworks/Rust/water-http/src/date.rs @@ -0,0 +1,32 @@ +use std::cell::UnsafeCell; +use std::time::{SystemTime, UNIX_EPOCH}; + +thread_local! { + // We store the bytes directly in the Thread Local Storage (TLS) + // to ensure the reference stays valid for the life of the thread. + static CACHED_DATE: UnsafeCell<(u64, [u8; 29])> = UnsafeCell::new((0, [0u8; 29])); +} + +#[inline(always)] +pub fn get_date_fast() -> &'static str { + CACHED_DATE.with(|cell| { + unsafe { + let cache = &mut *cell.get(); + // 1. Use a fast duration check + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_unchecked() // Benchmark trick: avoid panic branches + .as_secs(); + + if now != cache.0 { + cache.0 = now; + let date_str = httpdate::fmt_http_date(SystemTime::now()); + cache.1.copy_from_slice(date_str.as_bytes()); + } + + // 2. Return a reference to the TLS memory directly. + // This is safe because the thread never dies during the benchmark. + std::str::from_utf8_unchecked(&cache.1) + } + }) +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/db.rs b/frameworks/Rust/water-http/src/db.rs index bcd33931486..1adc092369a 100644 --- a/frameworks/Rust/water-http/src/db.rs +++ b/frameworks/Rust/water-http/src/db.rs @@ -1,19 +1,22 @@ -#![cfg(any(feature = "db",feature = "all"))] -use std::{borrow::Cow, io, ptr}; +#![cfg(any(feature = "db",feature = "all",feature = "uring"))] +use std::{borrow::Cow, io}; use std::fmt::Arguments; use std::io::Write; use std::mem::MaybeUninit; use std::rc::Rc; use std::cell::UnsafeCell; -use std::collections::HashMap; -use bytes::Buf; +use bytes::{Buf, BufMut}; use nanorand::{Rng, WyRand}; -use tokio_postgres::{connect, Client, Statement, NoTls, Error}; +use tokio_postgres::{connect, Client, Statement, NoTls}; use tokio_postgres::types::private::BytesMut; use crate::models::{Fortune, FortuneTemplate, World}; use sonic_rs::prelude::WriteExt; +use water_buffer::WaterBuffer; use yarte::TemplateBytesTrait; -pub static mut CACHED_VALUES:Option> = None; +use crate::buf::{FortunesPool, PooledBuffer}; + + +use tokio::pin; /// Database connection pool with thread-local RNG pub struct DbConnectionPool { @@ -62,6 +65,8 @@ impl DbConnectionPool { /// Reusable buffer pool per connection struct BufferPool { body: BytesMut, + worlds: Vec, + numbers: Vec, fortunes: Vec, fortune_output: Vec, } @@ -70,6 +75,8 @@ impl BufferPool { fn new() -> Self { Self { body: BytesMut::with_capacity(4096), + worlds: Vec::with_capacity(501), + numbers: Vec::with_capacity(501), fortunes: Vec::with_capacity(501), fortune_output: Vec::with_capacity(4096), @@ -111,7 +118,7 @@ impl PgConnection { // Pre-compile update statements for batch sizes 1-500 let mut updates = vec![]; - for num in 1..=500 { + for num in 1..=500 { let sql = Self::generate_update_values_stmt(num); updates.push(cl.prepare(&sql).await.unwrap()); } @@ -125,11 +132,10 @@ impl PgConnection { _connection_task: connection_task, rang: WyRand::new() }) - } /// Connect to the database - + } + /// Connect to the database #[inline(always)] pub fn generate_update_values_stmt(batch_size: usize) -> String { - let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES "); for i in 0..batch_size { @@ -154,10 +160,12 @@ impl PgConnection { /// Get a single random world - optimized with buffer reuse #[inline] pub async fn get_world(&self) -> &[u8] { - let rd = (self.rang.clone().generate::() % 10_000 ) as i32; + let rd = (self.rang.clone().generate::() % 10_000 + 1) as i32; let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap(); + let buffers = self.buffers(); buffers.body.clear(); + sonic_rs::to_writer( BytesMuteWriter(&mut buffers.body), &World { @@ -165,23 +173,25 @@ impl PgConnection { randomnumber: row.get(1), }, ).unwrap(); + buffers.body.chunk() } /// Get multiple random worlds - optimized with buffer reuse pub async fn get_worlds(&self, num: usize) -> &[u8] { let buffers = self.buffers(); - let mut worlds = Vec::with_capacity(num); + buffers.worlds.clear(); + let mut rn = self.rang.clone(); for _ in 0..num { - let id = (self.rang.clone().generate::() % 10_000 ) as i32; + let id: i32 = (rn.generate::() & 0x3FFF) as i32 % 10_000 + 1; let row = self.cl.query_one(&self.world, &[&id]).await.unwrap(); - worlds.push(World { + buffers.worlds.push(World { id: row.get(0), randomnumber: row.get(1), }); } buffers.body.clear(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); + sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); buffers.body.chunk() } /// Update worlds in batch - optimized with buffer reuse @@ -195,113 +205,130 @@ impl PgConnection { /// Update worlds - fetch and update each row to handle duplicates correctly /// Update worlds in batch using CASE statement pub async fn update(&self, num: usize) -> &[u8] { - let buffers = self.buffers(); - let mut ids:Vec = Vec::with_capacity(num); + let mut ids: Vec = Vec::with_capacity(num); let mut rng = self.rang.clone(); let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = Vec::with_capacity(num * 2); - let mut futures =vec![]; + let mut futures = vec![]; for _ in 0..num { let w_id = (rng.generate::() % 10_000 + 1) as i32; ids.push(w_id); } - futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await})); + futures.extend(ids.iter().map(|x| async move { self.cl.query_one(&self.world, &[&x]).await })); futures_util::future::join_all(futures).await; ids.sort_unstable(); - let mut worlds = Vec::with_capacity(num); - let mut numbers = Vec::with_capacity(num); + buffers.worlds.clear(); for index in 0..num { - let s_id = (rng.generate::() % 10_000 + 1 ) as i32; - worlds.push(World{ - id:ids[index], - randomnumber:s_id + let s_id = (rng.generate::() % 10_000 + 1) as i32; + buffers.worlds.push(World { + id: ids[index], + randomnumber: s_id }); - numbers.push(s_id); + buffers.numbers.push(s_id); } buffers.body.clear(); for index in 0..num { params.push(&ids[index]); - params.push(&numbers[index]); + params.push(&buffers.numbers[index]); } - _=self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); + _ = self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); + sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); buffers.body.chunk() } - /// Tell fortunes - optimized with buffer reuse - pub async fn tell_fortune(&self) -> Result<&[u8], ()> { - let res = self.cl.query(&self.fortune, &[]).await.map_err(|_| ())?; + pub async fn tell_fortune(&self) -> Result, ()> { + // 1. Explicitly type the empty params to satisfy the compiler's inference + let mut res = self.cl.query(&self.fortune, &[]).await.unwrap(); + let mut fortunes = FortunesPool::new().take_inner(); - let buffers = self.buffers(); - buffers.fortunes.clear(); - buffers.fortune_output.clear(); - - buffers.fortunes.push(Fortune { - id: 0, - message: Cow::Borrowed("Additional fortune added at request time."), - }); - - for row in res { - buffers.fortunes.push(Fortune { - id: row.get(0), - message: Cow::Owned(row.get(1)), - }); + fortunes.push( + Fortune{ + id:0, + message:Cow::Borrowed("Additional fortune added at request time.") + } + ); + for r in res { + let id :&'static str = unsafe { + let a :&str = r.get(1); + &*(a as *const str) + }; + fortunes.push( + Fortune { + id:r.get(0), + message:Cow::Borrowed(id) + } + ); } + fortunes.sort_by(|a, b| a.message.cmp(&b.message)); + + let mut output = PooledBuffer::new().take_inner(); + let template = FortuneTemplate { + items: &fortunes, + }; + template.write_call(&mut output); + Ok(output) + } +} +/// Zero-copy writer for BytesMut +pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); - buffers.fortunes.sort_unstable_by(|a, b| a.message.cmp(&b.message)); +impl Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write(&mut self, src: &[u8]) -> Result { + self.0.extend_from_slice(src); + Ok(src.len()) + } - let template = FortuneTemplate { items: &buffers.fortunes }; - template.write_call(&mut buffers.fortune_output); + #[inline(always)] + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} - // Return reference to buffer - zero-copy! - Ok(&buffers.fortune_output) +impl std::fmt::Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) } + #[inline(always)] + fn write_char(&mut self, c: char) -> std::fmt::Result { + let mut buf = [0u8; 4]; + self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + Ok(()) + } - pub fn get_cached_queries(&self,num:usize)->&[u8]{ - let buffers = self.buffers(); - let mut worlds = Vec::::with_capacity(num); - for _ in 0..num { - let rd = (self.rang.clone().generate::() % 10_000 ) as i32; - let v = match self.get_world_id_for_cache(rd){ - None => {continue} - Some(e)=>{e} - }; - worlds.push(World{ - id:rd, - randomnumber:*v - }) - } - buffers.body.clear(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); - return &buffers.body[..] + #[inline(always)] + fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { + std::fmt::write(self, args) } +} - fn get_world_id_for_cache(&self, id: i32) -> Option<&i32> { +impl WriteExt for BytesMuteWriter<'_> { + #[inline(always)] + fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + self.0.reserve(additional); unsafe { - let ptr = ptr::addr_of!(CACHED_VALUES); - - match &*ptr { - Some(map) => map.get(&id), - None => None, - } + let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; + Ok(std::slice::from_raw_parts_mut(ptr, additional)) } } -} - -/// Zero-copy writer for BytesMut -pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); -impl BytesMuteWriter<'_> { #[inline(always)] - pub fn extend_from_slice(&mut self,data:&[u8]){ - self.0.extend_from_slice(data); + unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { + self.0.set_len(self.0.len() + additional); + Ok(()) } } -impl Write for BytesMuteWriter<'_> { + +/// Zero-copy writer for WaterBuffer +pub struct WaterMutWriter<'a>(pub &'a mut WaterBuffer); + +impl Write for WaterMutWriter<'_> { #[inline(always)] fn write(&mut self, src: &[u8]) -> Result { self.0.extend_from_slice(src); @@ -314,7 +341,7 @@ impl Write for BytesMuteWriter<'_> { } } -impl std::fmt::Write for BytesMuteWriter<'_> { +impl std::fmt::Write for WaterMutWriter<'_> { #[inline(always)] fn write_str(&mut self, s: &str) -> std::fmt::Result { self.0.extend_from_slice(s.as_bytes()); @@ -324,7 +351,8 @@ impl std::fmt::Write for BytesMuteWriter<'_> { #[inline(always)] fn write_char(&mut self, c: char) -> std::fmt::Result { let mut buf = [0u8; 4]; - self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + self.0 + .extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); Ok(()) } @@ -334,10 +362,14 @@ impl std::fmt::Write for BytesMuteWriter<'_> { } } -impl WriteExt for BytesMuteWriter<'_> { +impl WriteExt for WaterMutWriter<'_> { #[inline(always)] - fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + fn reserve_with( + &mut self, + additional: usize, + ) -> Result<&mut [MaybeUninit], io::Error> { self.0.reserve(additional); + unsafe { let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; Ok(std::slice::from_raw_parts_mut(ptr, additional)) @@ -346,7 +378,7 @@ impl WriteExt for BytesMuteWriter<'_> { #[inline(always)] unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { - self.0.set_len(self.0.len() + additional); + self.0.advance_mut(self.0.len() + additional); Ok(()) } } \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/main.rs b/frameworks/Rust/water-http/src/main.rs index 487409af1d6..68bd6230ae9 100644 --- a/frameworks/Rust/water-http/src/main.rs +++ b/frameworks/Rust/water-http/src/main.rs @@ -1,7 +1,16 @@ -mod server; +pub mod server; pub mod models; -mod db; +pub mod db; +pub mod date; +pub mod buf; +// pub mod chop; -fn main() { - server::run_server(); +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + + fn main() { + server::run_server(); } + diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs index 1fee7283ba9..e6cb20392af 100644 --- a/frameworks/Rust/water-http/src/models.rs +++ b/frameworks/Rust/water-http/src/models.rs @@ -17,8 +17,14 @@ pub struct Fortune { pub struct FortuneTemplate<'a>{ pub items:&'a Vec } +#[derive(Serialize,Debug)] +pub struct JsonHolder { + message:&'static str +} - +impl JsonHolder { + pub const HELLO_WORLD:JsonHolder = JsonHolder{message:"Hello, World!"}; +} // pub async fn to(model:FortuneTemplate<'_>){ // model.r diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs index 309aee70eb3..b9b188f7fb0 100644 --- a/frameworks/Rust/water-http/src/server.rs +++ b/frameworks/Rust/water-http/src/server.rs @@ -1,18 +1,13 @@ -use std::collections::HashMap; + use std::pin::Pin; -use std::ptr; use std::rc::Rc; -use tokio::task::LocalSet; use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::http::{HttpSender, ResponseData}; -use water_http::server::{HttpContext, ServerConfigurations}; -use crate::db::{CACHED_VALUES, DbConnectionPool}; +use water_http::server::ServerConfigurations; +use crate::db::{DbConnectionPool}; InitControllersRoot! { name:ROOT, holder_type:MainType, shared_type:SharedType, - headers_length:6, - queries_length:3 } pub struct ThreadSharedStruct{ @@ -27,43 +22,6 @@ pub type SharedType = Rc; pub fn run_server(){ - - _= std::thread::spawn( - ||{ - let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - rt.block_on(async move { - const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - let mut pool = DbConnectionPool{ - connections:Vec::with_capacity( 1 - ), - next:0.into(), - // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() - }; - - let local_set = LocalSet::new(); - - _= local_set.run_until(async move { - tokio::task::spawn_local(async move { - pool.fill_pool(URL, 1).await; - let connection = pool.get_connection(); - let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap(); - let res = connection.cl.query(&statement,&[]).await.unwrap(); - let mut map = HashMap::new(); - for row in res { - map.insert(row.get(0),row.get(1)); - } - unsafe { - let ptr = ptr::addr_of_mut!(CACHED_VALUES); - ptr.write(Some(map)); - } - }).await - }).await; - - }); - } - ).join(); let cpu_nums = num_cpus::get(); @@ -108,15 +66,12 @@ fn shared_factory()->Pin>>{ }) } -#[cfg(any(feature = "json_plaintext",feature = "all"))] -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; -#[cfg(any(feature = "json_plaintext",feature = "all"))] -const P:&'static [u8] = br#"Hello, World!"#; - +#[cfg(any(feature = "json_plaintext",feature = "all",feature = "uring"))] +const P:&'static [u8] = br#"Hello, World!"#; -#[cfg(feature = "all")] +#[cfg(any(feature = "all",feature = "uring"))] WaterController! { holder -> super::MainType, shared -> super::SharedType, @@ -128,9 +83,12 @@ WaterController! { let mut sender = cx.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + let js = crate::models::JsonHolder::HELLO_WORLD; + let mut buffer = crate::buf::PooledBuffer::new().take_inner(); + _=sonic_rs::to_writer(&mut buffer,&js); + sender.set_header_ef("Date",crate::date::get_date_fast()); + _=sender.send_data_as_final_response(http::ResponseData::Slice(&buffer)).await; + crate::buf::PooledBuffer::recycle(buffer); } GET => plaintext => p(cx){ @@ -196,27 +154,6 @@ WaterController! { } - GET -> "cached-queries" -> cached(context)async { - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_cached_queries(q); - let mut sender= context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - - GET -> fortunes -> ft (context){ let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); @@ -230,9 +167,9 @@ WaterController! { }; let mut sender = context.sender(); sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Server","W"); + // let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( http::ResponseData::Slice(&data) ).await; @@ -242,115 +179,3 @@ WaterController! { } - -#[cfg(all(not(feature = "all"),feature = "json_plaintext"))] -WaterController! { - holder -> super::MainType, - shared -> super::SharedType, - name -> EntryController, - functions -> { - - - GET => json => j(cx){ - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; - } - - GET => plaintext => p(cx) { - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await; - } - } -} - -#[cfg(all(not(feature = "all"),feature = "db"))] -WaterController! { - holder -> super::MainType, - shared -> super::SharedType, - name -> EntryController, - functions -> { - - - GET -> db -> db (context){ - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_world().await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - GET -> queries -> query (context){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_worlds(q).await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - - GET -> updates -> update (context){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.update(q).await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - - - GET -> fortunes -> ft (context){ - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = match connection.tell_fortune().await { - Ok(r)=>{r}, - _=>{ - _= context.send_str("failed to connect").await; - return - } - }; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(&data) - ).await; - } - } -} \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-unstable.dockerfile b/frameworks/Rust/water-http/water-http-unstable.dockerfile new file mode 100644 index 00000000000..a5f3fc1401d --- /dev/null +++ b/frameworks/Rust/water-http/water-http-unstable.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable" cargo build --release --bin water-http --features all + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-uring.dockerfile b/frameworks/Rust/water-http/water-http-uring.dockerfile new file mode 100644 index 00000000000..50d5a991f6a --- /dev/null +++ b/frameworks/Rust/water-http/water-http-uring.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features uring + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file From ab13df7fec506a0ddd7c6ac57f4c86a0718c8a44 Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 18:44:42 +0300 Subject: [PATCH 2/9] replace json.rs file code --- frameworks/Rust/water-http/src/json.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/frameworks/Rust/water-http/src/json.rs b/frameworks/Rust/water-http/src/json.rs index 71f406b6051..0afbe4143f5 100644 --- a/frameworks/Rust/water-http/src/json.rs +++ b/frameworks/Rust/water-http/src/json.rs @@ -32,7 +32,6 @@ pub fn run_server(){ ); } -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; WaterController! { @@ -45,9 +44,12 @@ WaterController! { let mut sender = cx.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + let js = crate::models::JsonHolder::HELLO_WORLD; + let mut buffer = crate::buf::PooledBuffer::new().take_inner(); + _=sonic_rs::to_writer(&mut buffer,&js); + sender.set_header_ef("Date",crate::date::get_date_fast()); + _=sender.send_data_as_final_response(http::ResponseData::Slice(&buffer)).await; + crate::buf::PooledBuffer::recycle(buffer); } } From 09a4d00650198638749ea7421919c5bcec928a45 Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 19:04:43 +0300 Subject: [PATCH 3/9] change test name based on cfg input --- frameworks/Rust/water-http/benchmark_config.json | 4 ++-- ...r-http-unstable.dockerfile => water-http-tokio.dockerfile} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename frameworks/Rust/water-http/{water-http-unstable.dockerfile => water-http-tokio.dockerfile} (100%) diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json index 7902ea99259..d43693e7e77 100644 --- a/frameworks/Rust/water-http/benchmark_config.json +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -27,7 +27,7 @@ - "unstable": { + "tokio": { "json_url": "/json", "plaintext_url": "/plaintext", "fortune_url": "/fortunes", @@ -45,7 +45,7 @@ "webserver": "water_http", "os": "Linux", "database_os": "Linux", - "display_name": "water_http [uns]" + "display_name": "water_http [uns-tokio]" }, "uring": { diff --git a/frameworks/Rust/water-http/water-http-unstable.dockerfile b/frameworks/Rust/water-http/water-http-tokio.dockerfile similarity index 100% rename from frameworks/Rust/water-http/water-http-unstable.dockerfile rename to frameworks/Rust/water-http/water-http-tokio.dockerfile From 6a8dbc704a5dc80498957280dfe02c69db27de53 Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 21:57:18 +0300 Subject: [PATCH 4/9] adding maintainers into benchmark_config.json --- frameworks/Rust/water-http/benchmark_config.json | 1 + 1 file changed, 1 insertion(+) diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json index d43693e7e77..f9a21854fee 100644 --- a/frameworks/Rust/water-http/benchmark_config.json +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -1,6 +1,7 @@ { "framework": "water-http", + "maintainers": ["HassanSharara"], "tests": [ { "default": { From ec668c7e445946bcdc8dafa0e5d6eccb320b062f Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 22:01:32 +0300 Subject: [PATCH 5/9] adding maintainers into benchmark_config.json --- frameworks/Rust/water-http/src/models.rs | 8 ++++---- frameworks/Rust/water-http/src/server.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs index e6cb20392af..f6e271aab3c 100644 --- a/frameworks/Rust/water-http/src/models.rs +++ b/frameworks/Rust/water-http/src/models.rs @@ -19,12 +19,12 @@ pub struct FortuneTemplate<'a>{ } #[derive(Serialize,Debug)] pub struct JsonHolder { - message:&'static str + pub message:&'static str } -impl JsonHolder { - pub const HELLO_WORLD:JsonHolder = JsonHolder{message:"Hello, World!"}; -} +// impl JsonHolder { +// pub const HELLO_WORLD:JsonHolder = JsonHolder{message:"Hello, World!"}; +// } // pub async fn to(model:FortuneTemplate<'_>){ // model.r diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs index b9b188f7fb0..fd9926014f3 100644 --- a/frameworks/Rust/water-http/src/server.rs +++ b/frameworks/Rust/water-http/src/server.rs @@ -83,7 +83,7 @@ WaterController! { let mut sender = cx.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let js = crate::models::JsonHolder::HELLO_WORLD; + let js = crate::models::JsonHolder{message:"Hello, World!"}; let mut buffer = crate::buf::PooledBuffer::new().take_inner(); _=sonic_rs::to_writer(&mut buffer,&js); sender.set_header_ef("Date",crate::date::get_date_fast()); From 8dbbbae141cc9664caaaafc06255889544ac13eb Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Sat, 7 Mar 2026 22:02:08 +0300 Subject: [PATCH 6/9] json changes --- frameworks/Rust/water-http/src/models.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs index f6e271aab3c..d6b97c81448 100644 --- a/frameworks/Rust/water-http/src/models.rs +++ b/frameworks/Rust/water-http/src/models.rs @@ -22,9 +22,7 @@ pub struct JsonHolder { pub message:&'static str } -// impl JsonHolder { -// pub const HELLO_WORLD:JsonHolder = JsonHolder{message:"Hello, World!"}; -// } + // pub async fn to(model:FortuneTemplate<'_>){ // model.r From 48f83e09b33ced669a7c83002b42db63880041cd Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Mon, 9 Mar 2026 19:40:42 +0300 Subject: [PATCH 7/9] remove unused files and add mini water_http server to the tests --- frameworks/Rust/water-http/Cargo.toml | 28 +- .../Rust/water-http/benchmark_config.json | 24 +- frameworks/Rust/water-http/src/buf.rs | 90 ++++- frameworks/Rust/water-http/src/cached.rs | 310 ------------------ frameworks/Rust/water-http/src/db.rs | 179 ++++++---- frameworks/Rust/water-http/src/json.rs | 57 ---- frameworks/Rust/water-http/src/main.rs | 9 +- frameworks/Rust/water-http/src/mini.rs | 165 ++++++++++ frameworks/Rust/water-http/src/models.rs | 2 +- frameworks/Rust/water-http/src/plaintext.rs | 54 --- frameworks/Rust/water-http/src/server.rs | 24 +- .../water-http/water-http-mini.dockerfile | 13 + 12 files changed, 433 insertions(+), 522 deletions(-) delete mode 100644 frameworks/Rust/water-http/src/cached.rs delete mode 100644 frameworks/Rust/water-http/src/json.rs create mode 100644 frameworks/Rust/water-http/src/mini.rs delete mode 100644 frameworks/Rust/water-http/src/plaintext.rs create mode 100644 frameworks/Rust/water-http/water-http-mini.dockerfile diff --git a/frameworks/Rust/water-http/Cargo.toml b/frameworks/Rust/water-http/Cargo.toml index fe2ae421742..f79428468a5 100644 --- a/frameworks/Rust/water-http/Cargo.toml +++ b/frameworks/Rust/water-http/Cargo.toml @@ -3,13 +3,14 @@ name = "water-http" version = "0.1.0" edition = "2024" +default-run = "water-http" + [dependencies] askama = "0.14.0" tokio = { version = "1.48.0", features = ["full"] } -water_http = { features = ["use_only_http1"],optional = true , version = "4.0.1-beta.89",git = "https://github.com/HassanSharara/water_http", rev = "78c9859"} +water_http = { rev = "6420ab6", features = ["use_only_http1"],optional = true , version = "4.0.1-beta.96",git = "https://github.com/HassanSharara/water_http"} #water_http_unrealistic = {package = "water_http" ,features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } - smallvec = "1.15.1" nanorand = "0.8.0" tokio-postgres = "0.7.15" @@ -21,38 +22,33 @@ num_cpus = "1.17.0" httpdate = "1.0.3" parking_lot = "0.12.5" yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] } -itoa = {version = "1.0.15" ,optional = true} +#itoa = {version = "1.0.15" ,optional = true} smallbox = "0.8.8" mimalloc = "0.1.48" -xitca-postgres = "0.4.0" #chopin-pg = "0.5.18" water_buffer = {version = "1.2.9",features = ["bytes","unsafe_clone"]} integer_to_bytes = "0.2.2" -[[bin]] -name = "plaintext" -path = "src/plaintext.rs" -required-features = ["json_plaintext"] + [[bin]] -name = "json" -path = "src/json.rs" -required-features = ["json_plaintext"] +name = "mini" +path = "src/mini.rs" +required-features = ["mini"] + + + -[[bin]] -name = "cache" -path = "src/cached.rs" -required-features = ["cache"] [features] json_plaintext = ["water_http"] db = ["water_http/thread_shared_struct"] -cache = ["water_http/thread_shared_struct","itoa"] all = ["water_http","water_http/thread_shared_struct","water_http/cpu_affinity"] uring = ["water_http","water_http/thread_shared_struct", "water_http/use_io_uring"] +mini = ["water_http","water_http/cpu_affinity"] [profile.release] opt-level = 3 diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json index f9a21854fee..737a2d247d0 100644 --- a/frameworks/Rust/water-http/benchmark_config.json +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -26,7 +26,27 @@ "tags": [] }, - + "mini": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Stripped", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [mini]", + "tags": [] + }, "tokio": { "json_url": "/json", @@ -46,7 +66,7 @@ "webserver": "water_http", "os": "Linux", "database_os": "Linux", - "display_name": "water_http [uns-tokio]" + "display_name": "water_http [uns]" }, "uring": { diff --git a/frameworks/Rust/water-http/src/buf.rs b/frameworks/Rust/water-http/src/buf.rs index 15a6720e3d3..612bd110fcf 100644 --- a/frameworks/Rust/water-http/src/buf.rs +++ b/frameworks/Rust/water-http/src/buf.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use crate::models::Fortune; +use crate::models::{Fortune, World}; // Assuming water_buffer crate exists // use water_buffer::WaterBuffer as BM; @@ -11,9 +11,13 @@ const DEFAULT_SIZE: usize = 4048; const MAX_BUFFER_SIZE: usize = 8192; // Allow some growth before discarding const MAX_CACHED_BUFFERS: usize = 117; // Set to your test requirement + thread_local! { static BUFFER_CACHE: RefCell> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); static FORTUNE_CACHE: RefCell>> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); + static WORLDS_CACHE: RefCell>> = RefCell::new(Vec::with_capacity(500)); + static IDS_NUMBERS: RefCell,Vec)>> = RefCell::new(Vec::with_capacity(500)); + } pub struct PooledBuffer { @@ -24,6 +28,14 @@ pub struct FortunesPool { inner: Option>, } +pub struct WorldsPool { + inner: Option>, +} + +pub struct IDsPool { + inner: Option<(Vec,Vec)>, +} + impl PooledBuffer { pub fn new() -> Self { @@ -73,7 +85,6 @@ impl FortunesPool { let mut cache = cache.borrow_mut(); if let Some(mut existing_buf) = cache.pop() { // Assuming your WaterBuffer has a clear/reset method - existing_buf.clear(); existing_buf } else { // Fallback to new allocation if cache is empty @@ -88,7 +99,7 @@ impl FortunesPool { self.inner.take().expect("Buffer already taken or dropped") } - pub fn recycle(buf: WaterBuffer) { + pub fn save_heap_allocation(buf: WaterBuffer) { // Use capacity() check to ensure we don't cache // a buffer that grew to a massive size during one specific request if buf.capacity() <= 16 { @@ -101,4 +112,77 @@ impl FortunesPool { } } } +impl WorldsPool { + pub fn new() -> Self { + Self::with_capacity(500) + } + + pub fn with_capacity(cap: usize) -> WorldsPool { + let buf = WORLDS_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf + } else { + // Fallback to new allocation if cache is empty + Vec::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> Vec { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn save_heap_allocation(buf: Vec) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= 500 { + WORLDS_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 2000 { + cache.push(buf); + } + }); + } + } +} +impl IDsPool { + pub fn new() -> Self { + Self::with_capacity(500) + } + + pub fn with_capacity(cap: usize) -> IDsPool { + let buf = IDS_NUMBERS.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + existing_buf.0.clear(); + existing_buf.1.clear(); + existing_buf + } else { + // Fallback to new allocation if cache is empty + (Vec::with_capacity(cap),Vec::with_capacity(cap)) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> (Vec,Vec) { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn save_heap_allocation(buf: (Vec,Vec)) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + IDS_NUMBERS.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 1000 { + cache.push(buf); + } + }); + } +} diff --git a/frameworks/Rust/water-http/src/cached.rs b/frameworks/Rust/water-http/src/cached.rs deleted file mode 100644 index 69d7808255c..00000000000 --- a/frameworks/Rust/water-http/src/cached.rs +++ /dev/null @@ -1,310 +0,0 @@ -#![allow(static_mut_refs)] - - -use std::io; -use std::fmt::Arguments; -use std::io::Write; -use std::mem::MaybeUninit; -use std::rc::Rc; -use std::cell::UnsafeCell; -use std::collections::HashMap; -use nanorand::{Rng, WyRand}; -use tokio_postgres::{connect, Client, NoTls}; -use tokio_postgres::types::private::BytesMut; -use sonic_rs::prelude::WriteExt; -use std::pin::Pin; -use tokio::task::LocalSet; -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::http::{HttpSender, ResponseData}; -use water_http::server::{HttpContext, ServerConfigurations}; -use water_http::http::HttpSenderTrait; - -pub struct DbConnectionPool { - pub connections: Vec>, - pub next: UnsafeCell, -} - -impl DbConnectionPool { - /// Get a connection from the pool (round-robin, relaxed ordering) - #[inline(always)] - pub fn get_connection(&self) -> &Rc { - let n = unsafe{&mut *self.next.get()}; - *n +=1; - let idx = *n % self.connections.len(); - unsafe { self.connections.get_unchecked(idx) } - } - - /// Fill the pool with connections - pub async fn fill_pool(&mut self, url: &'static str, size: usize) { - let mut tasks = Vec::with_capacity(size); - for _ in 0..size { - tasks.push(tokio::task::spawn_local(async move { - for attempt in 0..5 { - match PgConnection::connect(url).await { - Ok(conn) => { - - return Ok(conn); }, - Err(_) if attempt < 4 => { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - Err(_) => return Err(()), - } - } - Err(()) - })); - } - for t in tasks { - if let Ok(Ok(conn)) = t.await { - self.connections.push(Rc::new(conn)); - } - } - } - - -} - - - - -pub struct PgConnection { - cl:Client, - _connection_task: tokio::task::JoinHandle<()>, -} - -// Safety: Only used within LocalSet, no cross-thread access -impl PgConnection { - /// Connect to the database - - pub async fn connect(db_url: &str) -> Result { - let (cl, c) = tokio::time::timeout( - std::time::Duration::from_secs(5), - connect(db_url, NoTls), - ) - .await - .map_err(|_| ())? - .map_err(|_| ())?; - - let connection_task = tokio::task::spawn_local(async move { - let _ = c.await; - }); - - Ok(PgConnection { - _connection_task: connection_task, - cl - }) - } -} - -/// Zero-copy writer for BytesMut -pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); - -impl BytesMuteWriter<'_> { - - #[inline(always)] - pub fn extend_from_slice(&mut self,data:&[u8]){ - self.0.extend_from_slice(data); - } -} - -impl Write for BytesMuteWriter<'_> { - #[inline(always)] - fn write(&mut self, src: &[u8]) -> Result { - self.0.extend_from_slice(src); - Ok(src.len()) - } - - #[inline(always)] - fn flush(&mut self) -> Result<(), io::Error> { - Ok(()) - } -} - -impl std::fmt::Write for BytesMuteWriter<'_> { - #[inline(always)] - fn write_str(&mut self, s: &str) -> std::fmt::Result { - self.0.extend_from_slice(s.as_bytes()); - Ok(()) - } - - #[inline(always)] - fn write_char(&mut self, c: char) -> std::fmt::Result { - let mut buf = [0u8; 4]; - self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); - Ok(()) - } - - #[inline(always)] - fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { - std::fmt::write(self, args) - } -} - -impl WriteExt for BytesMuteWriter<'_> { - #[inline(always)] - fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { - self.0.reserve(additional); - unsafe { - let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; - Ok(std::slice::from_raw_parts_mut(ptr, additional)) - } - } - - #[inline(always)] - unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { - self.0.set_len(self.0.len() + additional); - Ok(()) - } -} - - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, - shared_type:SH, -} - -pub struct ThreadSharedStruct{ - writing_buffer:UnsafeCell, - rng:WyRand, -} - - -impl ThreadSharedStruct { - - #[inline(always)] - pub fn get_value(id:i32)->Option<&'static i32>{ - let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ; - map - } - pub fn get_cached_queries(&self,num:usize)->&[u8]{ - let buf = unsafe{&mut *(self.writing_buffer.get())}; - buf.clear(); - buf.extend_from_slice(br#"["#); - let mut writer = BytesMuteWriter(buf); - let mut rn = self.rng.clone(); - for _ in 0..num { - let rd = (rn.generate::() % 10_000 ) as i32; - let v = match Self::get_value(rd) { - None => {continue} - Some(c) => {c} - }; - writer.extend_from_slice(br"{"); - _ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v); - writer.extend_from_slice(br"},"); - } - if buf.len() >1 {buf.truncate(buf.len() - 1);} - buf.extend_from_slice(b"]"); - return &buf[..] - } -} - -pub type MainType = u8; -pub type SH = Rc; - - -static mut CACHED_VALUES:Option> = None; - -pub fn run_server(){ - - _= std::thread::spawn( - ||{ - let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - rt.block_on(async move { - const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - let mut pool = DbConnectionPool{ - connections:Vec::with_capacity( 1 - ), - next:0.into(), - // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() - }; - - let local_set = LocalSet::new(); - - _= local_set.run_until(async move { - tokio::task::spawn_local(async move { - pool.fill_pool(URL, 1).await; - let connection = pool.get_connection(); - let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap(); - let res = connection.cl.query(&statement,&[]).await.unwrap(); - let mut map = HashMap::new(); - for row in res { - map.insert(row.get(0),row.get(1)); - } - unsafe { - let static_map = &mut CACHED_VALUES; - *static_map = Some(map); - } - }).await - }).await; - - }); - } - ).join(); - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 1 ; - - // let addresses = (0..cpu_nums).map(|_| { - // ("0.0.0.0".to_string(),8080) - // }).collect::>(); - // conf.addresses = addresses; - RunServer!( - conf, - ROOT, - EntryController, - shared_factory - ); -} - -fn shared_factory()->Pin>>{ - Box::pin(async { - - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - Rc::new(ThreadSharedStruct{ - writing_buffer:UnsafeCell::new(BytesMut::with_capacity(100_000)), - rng:WyRand::new() - }) - }) -} - -pub async fn handle_cached_queries(context:&mut HttpContext<'_,MainType,SH,HS,QS>){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:SH = context.thread_shared_struct.clone().unwrap().clone(); - let data = connection.get_cached_queries(q); - let mut sender:HttpSender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - ResponseData::Slice(data) - ).await; -} - -WaterController! { - holder -> super::MainType, - shared -> super::SH, - name -> EntryController, - functions -> { - - GET -> "cached-queries" -> query (context) { - _=super::handle_cached_queries(context).await; - } - } -} - -fn main() { - run_server(); -} - diff --git a/frameworks/Rust/water-http/src/db.rs b/frameworks/Rust/water-http/src/db.rs index 1adc092369a..7494aaa99be 100644 --- a/frameworks/Rust/water-http/src/db.rs +++ b/frameworks/Rust/water-http/src/db.rs @@ -1,4 +1,4 @@ -#![cfg(any(feature = "db",feature = "all",feature = "uring"))] +#![cfg(any(feature = "db",feature = "all",feature = "uring",feature = "mini"))] use std::{borrow::Cow, io}; use std::fmt::Arguments; use std::io::Write; @@ -7,13 +7,14 @@ use std::rc::Rc; use std::cell::UnsafeCell; use bytes::{Buf, BufMut}; use nanorand::{Rng, WyRand}; +use smallvec::SmallVec; use tokio_postgres::{connect, Client, Statement, NoTls}; use tokio_postgres::types::private::BytesMut; use crate::models::{Fortune, FortuneTemplate, World}; use sonic_rs::prelude::WriteExt; use water_buffer::WaterBuffer; use yarte::TemplateBytesTrait; -use crate::buf::{FortunesPool, PooledBuffer}; +use crate::buf::{FortunesPool, IDsPool, PooledBuffer, WorldsPool}; use tokio::pin; @@ -24,6 +25,9 @@ pub struct DbConnectionPool { pub next: UnsafeCell, } +unsafe impl Sync for DbConnectionPool {} +unsafe impl Send for DbConnectionPool {} + impl DbConnectionPool { /// Get a connection from the pool (round-robin, relaxed ordering) #[inline(always)] @@ -93,7 +97,6 @@ pub struct PgConnection { pub updates: Vec, rang:WyRand, buffers: UnsafeCell, - _connection_task: tokio::task::JoinHandle<()>, } // Safety: Only used within LocalSet, no cross-thread access @@ -109,7 +112,7 @@ impl PgConnection { .map_err(|_| ())? .map_err(|_| ())?; - let connection_task = tokio::task::spawn_local(async move { + _= tokio::task::spawn(async move { let _ = conn.await; }); @@ -129,7 +132,6 @@ impl PgConnection { world, updates, buffers: UnsafeCell::new(BufferPool::new()), - _connection_task: connection_task, rang: WyRand::new() }) } @@ -159,40 +161,40 @@ impl PgConnection { /// Get a single random world - optimized with buffer reuse #[inline] - pub async fn get_world(&self) -> &[u8] { + pub async fn get_world(&self) -> Vec { let rd = (self.rang.clone().generate::() % 10_000 + 1) as i32; let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap(); - let buffers = self.buffers(); - buffers.body.clear(); + let mut buffer = PooledBuffer::new().take_inner(); sonic_rs::to_writer( - BytesMuteWriter(&mut buffers.body), + &mut buffer, &World { id: row.get(0), randomnumber: row.get(1), }, ).unwrap(); - buffers.body.chunk() + buffer } /// Get multiple random worlds - optimized with buffer reuse - pub async fn get_worlds(&self, num: usize) -> &[u8] { - let buffers = self.buffers(); - buffers.worlds.clear(); + pub async fn get_worlds(&self, num: usize) -> Vec { + let mut worlds = WorldsPool::new().take_inner(); + worlds.clear(); let mut rn = self.rang.clone(); for _ in 0..num { let id: i32 = (rn.generate::() & 0x3FFF) as i32 % 10_000 + 1; let row = self.cl.query_one(&self.world, &[&id]).await.unwrap(); - buffers.worlds.push(World { + worlds.push(World { id: row.get(0), randomnumber: row.get(1), }); } - buffers.body.clear(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); - buffers.body.chunk() + let mut buffer = PooledBuffer::new().take_inner(); + sonic_rs::to_writer(&mut buffer, &worlds).unwrap(); + WorldsPool::save_heap_allocation(worlds); + buffer } /// Update worlds in batch - optimized with buffer reuse /// Update worlds in batch - optimized with buffer reuse @@ -204,66 +206,112 @@ impl PgConnection { /// Update worlds in batch - optimized with RETURNING clause to minimize reads /// Update worlds - fetch and update each row to handle duplicates correctly /// Update worlds in batch using CASE statement - pub async fn update(&self, num: usize) -> &[u8] { - let buffers = self.buffers(); - let mut ids: Vec = Vec::with_capacity(num); - let mut rng = self.rang.clone(); + + pub async fn update(&self,num:usize)->Vec{ + + let mut output = PooledBuffer::new().take_inner(); + let (mut ids,mut numbers) = IDsPool::new().take_inner(); + let mut worlds = WorldsPool::new().take_inner(); + let mut rn = self.rang.clone(); + let mut futures = Vec::with_capacity(num); let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = - Vec::with_capacity(num * 2); - let mut futures = vec![]; - for _ in 0..num { - let w_id = (rng.generate::() % 10_000 + 1) as i32; - ids.push(w_id); + Vec::with_capacity(num * 2); + + if worlds.len() == num { + for w in &worlds { + ids.push(w.id); + params.push(&w.id); + numbers.push(w.randomnumber); + params.push(&w.randomnumber); + } + } else { + worlds.clear(); + (0..num).for_each(|n|{ + let id = (rn.generate::() % 10_000 + 1) as i32; + let number = (rn.generate::() % 10_000 + 1) as i32; + ids.push(id); + numbers.push(number); + }); + + ids.sort(); + for ind in 0..num { + worlds.push( + World{ + id:ids[ind], + randomnumber:numbers[ind] + } + ); + params.push(&ids[ind]); + params.push(&numbers[ind]); + } } futures.extend(ids.iter().map(|x| async move { self.cl.query_one(&self.world, &[&x]).await })); futures_util::future::join_all(futures).await; - ids.sort_unstable(); - buffers.worlds.clear(); - for index in 0..num { - let s_id = (rng.generate::() % 10_000 + 1) as i32; - buffers.worlds.push(World { - id: ids[index], - randomnumber: s_id - }); - buffers.numbers.push(s_id); - } - buffers.body.clear(); - for index in 0..num { - params.push(&ids[index]); - params.push(&buffers.numbers[index]); - } - - _ = self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); - buffers.body.chunk() + _ = self.cl.execute(&self.updates[num - 1], ¶ms).await; + _= sonic_rs::to_writer(&mut output,&worlds); + WorldsPool::save_heap_allocation(worlds); + IDsPool::save_heap_allocation((ids,numbers)); + output } + // pub async fn update(&self, num: usize) -> &[u8] { + // let buffers = self.buffers(); + // let mut ids: Vec = Vec::with_capacity(num); + // let mut rng = self.rang.clone(); + // let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + // Vec::with_capacity(num * 2); + // let mut futures = vec![]; + // for _ in 0..num { + // let w_id = (rng.generate::() % 10_000 + 1) as i32; + // ids.push(w_id); + // } + // futures.extend(ids.iter().map(|x| async move { self.cl.query_one(&self.world, &[&x]).await })); + // futures_util::future::join_all(futures).await; + // ids.sort_unstable(); + // buffers.worlds.clear(); + // for index in 0..num { + // let s_id = (rng.generate::() % 10_000 + 1) as i32; + // buffers.worlds.push(World { + // id: ids[index], + // randomnumber: s_id + // }); + // buffers.numbers.push(s_id); + // } + // buffers.body.clear(); + // for index in 0..num { + // params.push(&ids[index]); + // params.push(&buffers.numbers[index]); + // } + // + // _ = self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); + // sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); + // buffers.body.chunk() + // } + pub async fn tell_fortune(&self) -> Result, ()> { // 1. Explicitly type the empty params to satisfy the compiler's inference let mut res = self.cl.query(&self.fortune, &[]).await.unwrap(); let mut fortunes = FortunesPool::new().take_inner(); - - fortunes.push( - Fortune{ - id:0, - message:Cow::Borrowed("Additional fortune added at request time.") + if fortunes.is_empty() { + fortunes.push(Fortune{ + id:0, + message:Cow::Borrowed("Additional fortune added at request time.") + }); + for r in res { + let id :&'static str = unsafe { + let a :&str = r.get(1); + &*(a as *const str) + }; + fortunes.push( + Fortune { + id:r.get(0), + message:Cow::Borrowed(id) + } + ); } - ); - for r in res { - let id :&'static str = unsafe { - let a :&str = r.get(1); - &*(a as *const str) - }; - fortunes.push( - Fortune { - id:r.get(0), - message:Cow::Borrowed(id) - } - ); + fortunes.sort_by(|a, b| a.message.cmp(&b.message)); } - fortunes.sort_by(|a, b| a.message.cmp(&b.message)); - let mut output = PooledBuffer::new().take_inner(); let template = FortuneTemplate { items: &fortunes, @@ -272,6 +320,9 @@ impl PgConnection { Ok(output) } } + +unsafe impl Sync for PgConnection {} +unsafe impl Send for PgConnection {} /// Zero-copy writer for BytesMut pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); diff --git a/frameworks/Rust/water-http/src/json.rs b/frameworks/Rust/water-http/src/json.rs deleted file mode 100644 index 0afbe4143f5..00000000000 --- a/frameworks/Rust/water-http/src/json.rs +++ /dev/null @@ -1,57 +0,0 @@ -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::server::ServerConfigurations; - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, -} - - - -pub type MainType = u8; - - -fn main() { - run_server(); -} - - -pub fn run_server(){ - - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 2 ; - - RunServer!( - conf, - ROOT, - EntryController - ); -} - - - -WaterController! { - holder -> super::MainType, - name -> EntryController, - functions -> { - - - GET => json => j(cx){ - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let js = crate::models::JsonHolder::HELLO_WORLD; - let mut buffer = crate::buf::PooledBuffer::new().take_inner(); - _=sonic_rs::to_writer(&mut buffer,&js); - sender.set_header_ef("Date",crate::date::get_date_fast()); - _=sender.send_data_as_final_response(http::ResponseData::Slice(&buffer)).await; - crate::buf::PooledBuffer::recycle(buffer); - } - - } -} - diff --git a/frameworks/Rust/water-http/src/main.rs b/frameworks/Rust/water-http/src/main.rs index 68bd6230ae9..61282df5f00 100644 --- a/frameworks/Rust/water-http/src/main.rs +++ b/frameworks/Rust/water-http/src/main.rs @@ -3,7 +3,6 @@ pub mod models; pub mod db; pub mod date; pub mod buf; -// pub mod chop; use mimalloc::MiMalloc; @@ -11,6 +10,12 @@ use mimalloc::MiMalloc; static GLOBAL: MiMalloc = MiMalloc; fn main() { - server::run_server(); + + #[cfg(feature = "mini")] + { + mini::run(); + } + #[cfg(not(feature = "mini"))] + server::run_server(); } diff --git a/frameworks/Rust/water-http/src/mini.rs b/frameworks/Rust/water-http/src/mini.rs new file mode 100644 index 00000000000..676af7b5df3 --- /dev/null +++ b/frameworks/Rust/water-http/src/mini.rs @@ -0,0 +1,165 @@ +use std::cell::OnceCell; +use water_http::http::status_code::HttpStatusCode; +use water_http::server::mini::{CtxPtr, HandlerFn, serve}; +use water_http::server::ServerConfigurations; +use crate::buf::{FortunesPool, PooledBuffer}; +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + + +mod date; +mod models; +mod buf; +mod db; +use crate::db::PgConnection; +use crate::date::get_date_fast; +const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; +// const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; + +const PLAINTEXT:&'static [u8] = br#"Hello, World!"#; + +fn main() { + let thread_init = || async { + get_connection().await; + }; + let mut conf = ServerConfigurations::bind("0.0.0.0", 8080); + conf.max_cached_buffers_count = 2500; + serve::<16, 10, _,_,_>(conf, HandlerFn(|ctx: CtxPtr<16, 10>| handler(ctx)),Some(thread_init)); +} + +async fn handler(mut ctx: CtxPtr<16,10>){ + let ctx = ctx.get(); + let req = ctx.request(); + + let path = req.path(); + match path { + "/plaintext"=>{ + ctx.set_header("Content-Type","text/plain; charset=utf-8"); + ctx.set_header("Content-Length",PLAINTEXT.len()); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.write_body_bytes(PLAINTEXT); + } + "/json" =>{ + let js = models::JsonHolder{message:"Hello, World!"}; + let mut buffer = buf::PooledBuffer::new().take_inner(); + _=sonic_rs::to_writer(&mut buffer,&js); + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Content-Length",buffer.len()); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.write_body_bytes(&buffer); + buf::PooledBuffer::recycle(buffer) + } + + "/fortunes"=>{ + let connection = get_connection().await; + let fortunes = connection.tell_fortune().await.unwrap(); + ctx.set_header("Content-Type","text/html; charset=UTF-8"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",fortunes.len()); + ctx.write_body_bytes(&fortunes); + FortunesPool::save_heap_allocation(fortunes); + } + + "/db" => { + let connection = get_connection().await; + let db = connection.get_world().await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",db.len()); + ctx.write_body_bytes(&db); + PooledBuffer::recycle(db); + } + + _=>{ + if path.contains("?") { + let value = path.split("=").last() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1) + .clamp(1, 500); + let s =&path[0..=1]; + match s { + "/q"=>{ + let q = get_connection().await; + let data = q.get_worlds(value).await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",data.len()); + ctx.write_body_bytes(&data); + PooledBuffer::recycle(data); + return + } + "/u"=> { + let q = get_connection().await; + let data = q.update(value).await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",data.len()); + ctx.write_body_bytes(&data); + PooledBuffer::recycle(data); + return + } + _=>{} + } + + } + ctx.set_status_code(HttpStatusCode::NOT_FOUND); + } + } + +} + + +thread_local! { + static PG:OnceCell = OnceCell::new(); +} + +async fn get_connection() -> &'static PgConnection { + // 1. Try to get the pointer from thread-local + let ptr = PG.with(|cell| { + cell.get().map(|conn| conn as *const PgConnection) + }); + + if let Some(p) = ptr { + // SAFETY: We are promoting a thread-local pointer to 'static. + // This is only safe in TFB if the thread never dies. + return unsafe { &*p }; + } + + let mut t_count = 0_usize; + // 2. Initialize + let conn = loop { + match PgConnection::connect(URL) + .await { + Ok(r) => { + break r + } + Err(_) => { + t_count+=1; + println!("failed count = {t_count}"); + if t_count > 10 { + panic!("could not connect to db"); + } + continue } + } + }; + println!("{:?} connected successfully", std::thread::current().name().unwrap()); + // 3. Store it + PG.with(|cell| { + _= cell.set(conn); + }); + + // 4. Retrieve the pointer again + let ptr = PG.with(|cell| { + cell.get().map(|conn| conn as *const PgConnection) + }).expect("Should be initialized"); + + unsafe { &*ptr } +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs index d6b97c81448..d1e008cc589 100644 --- a/frameworks/Rust/water-http/src/models.rs +++ b/frameworks/Rust/water-http/src/models.rs @@ -19,7 +19,7 @@ pub struct FortuneTemplate<'a>{ } #[derive(Serialize,Debug)] pub struct JsonHolder { - pub message:&'static str + pub message:&'static str } diff --git a/frameworks/Rust/water-http/src/plaintext.rs b/frameworks/Rust/water-http/src/plaintext.rs deleted file mode 100644 index 197da216e9c..00000000000 --- a/frameworks/Rust/water-http/src/plaintext.rs +++ /dev/null @@ -1,54 +0,0 @@ -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::server::ServerConfigurations; - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, -} - - - -pub type MainType = u8; - - -fn main() { - run_server(); -} - - -pub fn run_server(){ - - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 1 ; - - RunServer!( - conf, - ROOT, - EntryController - ); -} - -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; -const P:&'static [u8] = br#"Hello, World!"#; - - -WaterController! { - holder -> super::MainType, - name -> EntryController, - functions -> { - - GET => plaintext => p(cx) { - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Str("Hello, World!")).await; - } - } -} - diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs index fd9926014f3..9ba3efe89a6 100644 --- a/frameworks/Rust/water-http/src/server.rs +++ b/frameworks/Rust/water-http/src/server.rs @@ -36,10 +36,7 @@ pub fn run_server(){ conf.worker_threads_count = cpu_nums * 1 ; } - // let addresses = (0..cpu_nums).map(|_| { - // ("0.0.0.0".to_string(),8080) - // }).collect::>(); - // conf.addresses = addresses; + conf.max_cached_buffers_count = 2500; RunServer!( conf, ROOT, @@ -108,11 +105,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } GET -> queries -> query (context){ let q = context @@ -127,11 +124,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } GET -> updates -> update (context){ @@ -146,11 +143,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } @@ -173,6 +170,7 @@ WaterController! { _= sender.send_data_as_final_response( http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } } diff --git a/frameworks/Rust/water-http/water-http-mini.dockerfile b/frameworks/Rust/water-http/water-http-mini.dockerfile new file mode 100644 index 00000000000..7acbcec530f --- /dev/null +++ b/frameworks/Rust/water-http/water-http-mini.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable" cargo build --release --bin mini --features mini + +EXPOSE 8080 + +CMD ./target/release/mini \ No newline at end of file From 3e15ae0f5431aa5c308eb991c05202a19573ff25 Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Tue, 10 Mar 2026 02:04:27 +0300 Subject: [PATCH 8/9] change github rev for server crate --- frameworks/Rust/water-http/Cargo.toml | 4 ++-- frameworks/Rust/water-http/src/db.rs | 2 +- frameworks/Rust/water-http/src/mini.rs | 2 ++ frameworks/Rust/water-http/src/server.rs | 2 ++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/frameworks/Rust/water-http/Cargo.toml b/frameworks/Rust/water-http/Cargo.toml index f79428468a5..baccbd91c7c 100644 --- a/frameworks/Rust/water-http/Cargo.toml +++ b/frameworks/Rust/water-http/Cargo.toml @@ -9,7 +9,7 @@ default-run = "water-http" askama = "0.14.0" tokio = { version = "1.48.0", features = ["full"] } -water_http = { rev = "6420ab6", features = ["use_only_http1"],optional = true , version = "4.0.1-beta.96",git = "https://github.com/HassanSharara/water_http"} +water_http = { rev = "c275d8e", features = ["use_only_http1"],optional = true , version = "4.0.1-beta.96",git = "https://github.com/HassanSharara/water_http"} #water_http_unrealistic = {package = "water_http" ,features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } smallvec = "1.15.1" nanorand = "0.8.0" @@ -26,7 +26,7 @@ yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] } smallbox = "0.8.8" mimalloc = "0.1.48" #chopin-pg = "0.5.18" -water_buffer = {version = "1.2.9",features = ["bytes","unsafe_clone"]} +water_buffer = {version = "1.2.10",features = ["bytes","unsafe_clone"]} integer_to_bytes = "0.2.2" diff --git a/frameworks/Rust/water-http/src/db.rs b/frameworks/Rust/water-http/src/db.rs index 7494aaa99be..db6f3028199 100644 --- a/frameworks/Rust/water-http/src/db.rs +++ b/frameworks/Rust/water-http/src/db.rs @@ -42,7 +42,7 @@ impl DbConnectionPool { pub async fn fill_pool(&mut self, url: &'static str, size: usize) { let mut tasks = Vec::with_capacity(size); for _ in 0..size { - tasks.push(tokio::task::spawn_local(async move { + tasks.push(tokio::task::spawn(async move { for attempt in 0..5 { match PgConnection::connect(url).await { Ok(conn) => { diff --git a/frameworks/Rust/water-http/src/mini.rs b/frameworks/Rust/water-http/src/mini.rs index 676af7b5df3..d43a22fd439 100644 --- a/frameworks/Rust/water-http/src/mini.rs +++ b/frameworks/Rust/water-http/src/mini.rs @@ -26,6 +26,8 @@ fn main() { }; let mut conf = ServerConfigurations::bind("0.0.0.0", 8080); conf.max_cached_buffers_count = 2500; + conf.default_write_buffer_size = conf.default_read_buffer_size * 2; + conf.max_buffer_size_for_cache = conf.default_write_buffer_size; serve::<16, 10, _,_,_>(conf, HandlerFn(|ctx: CtxPtr<16, 10>| handler(ctx)),Some(thread_init)); } diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs index 9ba3efe89a6..b2ce34241df 100644 --- a/frameworks/Rust/water-http/src/server.rs +++ b/frameworks/Rust/water-http/src/server.rs @@ -37,6 +37,8 @@ pub fn run_server(){ } conf.max_cached_buffers_count = 2500; + conf.default_write_buffer_size = conf.default_read_buffer_size * 2; + conf.max_buffer_size_for_cache = conf.default_write_buffer_size; RunServer!( conf, ROOT, From 7ae51d2bc8f65a8984d36200603745168c79b677 Mon Sep 17 00:00:00 2001 From: HassanSharara Date: Tue, 10 Mar 2026 07:37:53 +0300 Subject: [PATCH 9/9] change default buffers size --- frameworks/Rust/water-http/src/buf.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/frameworks/Rust/water-http/src/buf.rs b/frameworks/Rust/water-http/src/buf.rs index 612bd110fcf..b58806e1067 100644 --- a/frameworks/Rust/water-http/src/buf.rs +++ b/frameworks/Rust/water-http/src/buf.rs @@ -6,10 +6,11 @@ use crate::models::{Fortune, World}; // Mocking the type for compilation demo type WaterBuffer = Vec; -const INITIAL_VEC_CAPACITY: usize = 128; + +const INITIAL_VEC_CAPACITY: usize = 2000; const DEFAULT_SIZE: usize = 4048; -const MAX_BUFFER_SIZE: usize = 8192; // Allow some growth before discarding -const MAX_CACHED_BUFFERS: usize = 117; // Set to your test requirement +const MAX_BUFFER_SIZE: usize = 4048; // Allow some growth before discarding +const MAX_CACHED_BUFFERS: usize = 512; // Set to your test requirement thread_local! {