diff --git a/frameworks/Rust/water-http/Cargo.toml b/frameworks/Rust/water-http/Cargo.toml index 2e25535e243..baccbd91c7c 100644 --- a/frameworks/Rust/water-http/Cargo.toml +++ b/frameworks/Rust/water-http/Cargo.toml @@ -3,10 +3,14 @@ name = "water-http" version = "0.1.0" edition = "2024" +default-run = "water-http" + [dependencies] askama = "0.14.0" -tokio = { version = "1.47.1", features = ["full"] } -water_http = { features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } +tokio = { version = "1.48.0", features = ["full"] } + +water_http = { rev = "c275d8e", features = ["use_only_http1"],optional = true , version = "4.0.1-beta.96",git = "https://github.com/HassanSharara/water_http"} +#water_http_unrealistic = {package = "water_http" ,features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" } smallvec = "1.15.1" nanorand = "0.8.0" tokio-postgres = "0.7.15" @@ -18,28 +22,39 @@ num_cpus = "1.17.0" httpdate = "1.0.3" parking_lot = "0.12.5" yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] } -itoa = {version = "1.0.15" ,optional = true} +#itoa = {version = "1.0.15" ,optional = true} +smallbox = "0.8.8" +mimalloc = "0.1.48" +#chopin-pg = "0.5.18" +water_buffer = {version = "1.2.10",features = ["bytes","unsafe_clone"]} +integer_to_bytes = "0.2.2" + -[[bin]] -name = "plaintext" -path = "src/plaintext.rs" -required-features = ["json_plaintext"] [[bin]] -name = "json" -path = "src/json.rs" -required-features = ["json_plaintext"] +name = "mini" +path = "src/mini.rs" +required-features = ["mini"] + + + -[[bin]] -name = "cache" -path = "src/cached.rs" -required-features = ["cache"] [features] json_plaintext = ["water_http"] db = ["water_http/thread_shared_struct"] -cache = ["water_http/thread_shared_struct","itoa"] -all = ["water_http/thread_shared_struct"] +all = ["water_http","water_http/thread_shared_struct","water_http/cpu_affinity"] +uring = ["water_http","water_http/thread_shared_struct", "water_http/use_io_uring"] +mini = ["water_http","water_http/cpu_affinity"] + +[profile.release] +opt-level = 3 +codegen-units = 1 +panic = 'abort' +lto = "fat" +debug = false +incremental = false +overflow-checks = false diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json index f0c592a58e6..737a2d247d0 100644 --- a/frameworks/Rust/water-http/benchmark_config.json +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -1,17 +1,16 @@ { "framework": "water-http", - "maintainers" : ["HassanSharara"], + "maintainers": ["HassanSharara"], "tests": [ { - "default": { + "default": { "json_url": "/json", "plaintext_url": "/plaintext", "fortune_url": "/fortunes", "db_url": "/db", "query_url": "/queries?q=", "update_url": "/updates?q=", - "cached_query_url": "/cached-queries?q=", "port": 8080, "approach": "Stripped", "classification": "Fullstack", @@ -25,6 +24,70 @@ "database_os": "Linux", "display_name": "water_http", "tags": [] + }, + + "mini": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Stripped", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [mini]", + "tags": [] + }, + + "tokio": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [uns]" + }, + + "uring": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Fullstack", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http [io-uring]" } } ] diff --git a/frameworks/Rust/water-http/src/buf.rs b/frameworks/Rust/water-http/src/buf.rs new file mode 100644 index 00000000000..b58806e1067 --- /dev/null +++ b/frameworks/Rust/water-http/src/buf.rs @@ -0,0 +1,189 @@ +use std::cell::RefCell; +use crate::models::{Fortune, World}; +// Assuming water_buffer crate exists +// use water_buffer::WaterBuffer as BM; + +// Mocking the type for compilation demo +type WaterBuffer = Vec; + + +const INITIAL_VEC_CAPACITY: usize = 2000; +const DEFAULT_SIZE: usize = 4048; +const MAX_BUFFER_SIZE: usize = 4048; // Allow some growth before discarding +const MAX_CACHED_BUFFERS: usize = 512; // Set to your test requirement + + +thread_local! { + static BUFFER_CACHE: RefCell> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); + static FORTUNE_CACHE: RefCell>> = RefCell::new(Vec::with_capacity(INITIAL_VEC_CAPACITY)); + static WORLDS_CACHE: RefCell>> = RefCell::new(Vec::with_capacity(500)); + static IDS_NUMBERS: RefCell,Vec)>> = RefCell::new(Vec::with_capacity(500)); + +} + +pub struct PooledBuffer { + inner: Option, +} + +pub struct FortunesPool { + inner: Option>, +} + +pub struct WorldsPool { + inner: Option>, +} + +pub struct IDsPool { + inner: Option<(Vec,Vec)>, +} + + +impl PooledBuffer { + pub fn new() -> Self { + Self::with_capacity(DEFAULT_SIZE) + } + + pub fn with_capacity(cap: usize) -> Self { + let buf = BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf.clear(); + existing_buf + } else { + // Fallback to new allocation if cache is empty + WaterBuffer::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> WaterBuffer { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn recycle(buf: WaterBuffer) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= MAX_BUFFER_SIZE { + BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < MAX_CACHED_BUFFERS { + cache.push(buf); + } + }); + } + } +} +impl FortunesPool { + pub fn new() -> Self { + Self::with_capacity(16) + } + + pub fn with_capacity(cap: usize) -> Self { + let buf = FORTUNE_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf + } else { + // Fallback to new allocation if cache is empty + Vec::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> Vec { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn save_heap_allocation(buf: WaterBuffer) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= 16 { + BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 2000 { + cache.push(buf); + } + }); + } + } +} +impl WorldsPool { + pub fn new() -> Self { + Self::with_capacity(500) + } + + pub fn with_capacity(cap: usize) -> WorldsPool { + let buf = WORLDS_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + // Assuming your WaterBuffer has a clear/reset method + existing_buf + } else { + // Fallback to new allocation if cache is empty + Vec::with_capacity(cap) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> Vec { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn save_heap_allocation(buf: Vec) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + if buf.capacity() <= 500 { + WORLDS_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 2000 { + cache.push(buf); + } + }); + } + } +} +impl IDsPool { + pub fn new() -> Self { + Self::with_capacity(500) + } + + pub fn with_capacity(cap: usize) -> IDsPool { + let buf = IDS_NUMBERS.with(|cache| { + let mut cache = cache.borrow_mut(); + if let Some(mut existing_buf) = cache.pop() { + existing_buf.0.clear(); + existing_buf.1.clear(); + existing_buf + } else { + // Fallback to new allocation if cache is empty + (Vec::with_capacity(cap),Vec::with_capacity(cap)) + } + }); + + Self { inner: Some(buf) } + } + + pub fn take_inner(&mut self) -> (Vec,Vec) { + self.inner.take().expect("Buffer already taken or dropped") + } + + pub fn save_heap_allocation(buf: (Vec,Vec)) { + // Use capacity() check to ensure we don't cache + // a buffer that grew to a massive size during one specific request + IDS_NUMBERS.with(|cache| { + let mut cache = cache.borrow_mut(); + if cache.len() < 1000 { + cache.push(buf); + } + }); + } +} + diff --git a/frameworks/Rust/water-http/src/cached.rs b/frameworks/Rust/water-http/src/cached.rs deleted file mode 100644 index 2643aca511e..00000000000 --- a/frameworks/Rust/water-http/src/cached.rs +++ /dev/null @@ -1,308 +0,0 @@ -#![allow(static_mut_refs)] -use std::io; -use std::fmt::Arguments; -use std::io::Write; -use std::mem::MaybeUninit; -use std::rc::Rc; -use std::cell::UnsafeCell; -use std::collections::HashMap; -use nanorand::{Rng, WyRand}; -use tokio_postgres::{connect, Client, NoTls}; -use tokio_postgres::types::private::BytesMut; -use sonic_rs::prelude::WriteExt; -use std::pin::Pin; -use tokio::task::LocalSet; -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::http::{HttpSender, ResponseData}; -use water_http::server::{HttpContext, ServerConfigurations}; -use water_http::http::HttpSenderTrait; - -pub struct DbConnectionPool { - pub connections: Vec>, - pub next: UnsafeCell, -} - -impl DbConnectionPool { - /// Get a connection from the pool (round-robin, relaxed ordering) - #[inline(always)] - pub fn get_connection(&self) -> &Rc { - let n = unsafe{&mut *self.next.get()}; - *n +=1; - let idx = *n % self.connections.len(); - unsafe { self.connections.get_unchecked(idx) } - } - - /// Fill the pool with connections - pub async fn fill_pool(&mut self, url: &'static str, size: usize) { - let mut tasks = Vec::with_capacity(size); - for _ in 0..size { - tasks.push(tokio::task::spawn_local(async move { - for attempt in 0..5 { - match PgConnection::connect(url).await { - Ok(conn) => { - - return Ok(conn); }, - Err(_) if attempt < 4 => { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - Err(_) => return Err(()), - } - } - Err(()) - })); - } - for t in tasks { - if let Ok(Ok(conn)) = t.await { - self.connections.push(Rc::new(conn)); - } - } - } - - -} - - - - -pub struct PgConnection { - cl:Client, - _connection_task: tokio::task::JoinHandle<()>, -} - -// Safety: Only used within LocalSet, no cross-thread access -impl PgConnection { - /// Connect to the database - - pub async fn connect(db_url: &str) -> Result { - let (cl, c) = tokio::time::timeout( - std::time::Duration::from_secs(5), - connect(db_url, NoTls), - ) - .await - .map_err(|_| ())? - .map_err(|_| ())?; - - let connection_task = tokio::task::spawn_local(async move { - let _ = c.await; - }); - - Ok(PgConnection { - _connection_task: connection_task, - cl - }) - } -} - -/// Zero-copy writer for BytesMut -pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); - -impl BytesMuteWriter<'_> { - - #[inline(always)] - pub fn extend_from_slice(&mut self,data:&[u8]){ - self.0.extend_from_slice(data); - } -} - -impl Write for BytesMuteWriter<'_> { - #[inline(always)] - fn write(&mut self, src: &[u8]) -> Result { - self.0.extend_from_slice(src); - Ok(src.len()) - } - - #[inline(always)] - fn flush(&mut self) -> Result<(), io::Error> { - Ok(()) - } -} - -impl std::fmt::Write for BytesMuteWriter<'_> { - #[inline(always)] - fn write_str(&mut self, s: &str) -> std::fmt::Result { - self.0.extend_from_slice(s.as_bytes()); - Ok(()) - } - - #[inline(always)] - fn write_char(&mut self, c: char) -> std::fmt::Result { - let mut buf = [0u8; 4]; - self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); - Ok(()) - } - - #[inline(always)] - fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { - std::fmt::write(self, args) - } -} - -impl WriteExt for BytesMuteWriter<'_> { - #[inline(always)] - fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { - self.0.reserve(additional); - unsafe { - let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; - Ok(std::slice::from_raw_parts_mut(ptr, additional)) - } - } - - #[inline(always)] - unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { - self.0.set_len(self.0.len() + additional); - Ok(()) - } -} - - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, - shared_type:SH, -} - -pub struct ThreadSharedStruct{ - writing_buffer:UnsafeCell, - rng:WyRand, -} - - -impl ThreadSharedStruct { - - #[inline(always)] - pub fn get_value(id:i32)->Option<&'static i32>{ - let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ; - map - } - pub fn get_cached_queries(&self,num:usize)->&[u8]{ - let buf = unsafe{&mut *(self.writing_buffer.get())}; - buf.clear(); - buf.extend_from_slice(br#"["#); - let mut writer = BytesMuteWriter(buf); - let mut rn = self.rng.clone(); - for _ in 0..num { - let rd = (rn.generate::() % 10_000 ) as i32; - let v = match Self::get_value(rd) { - None => {continue} - Some(c) => {c} - }; - writer.extend_from_slice(br"{"); - _ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v); - writer.extend_from_slice(br"},"); - } - if buf.len() >1 {buf.truncate(buf.len() - 1);} - buf.extend_from_slice(b"]"); - return &buf[..] - } -} - -pub type MainType = u8; -pub type SH = Rc; - - -static mut CACHED_VALUES:Option> = None; - -pub fn run_server(){ - - _= std::thread::spawn( - ||{ - let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - rt.block_on(async move { - const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - let mut pool = DbConnectionPool{ - connections:Vec::with_capacity( 1 - ), - next:0.into(), - // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() - }; - - let local_set = LocalSet::new(); - - _= local_set.run_until(async move { - tokio::task::spawn_local(async move { - pool.fill_pool(URL, 1).await; - let connection = pool.get_connection(); - let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap(); - let res = connection.cl.query(&statement,&[]).await.unwrap(); - let mut map = HashMap::new(); - for row in res { - map.insert(row.get(0),row.get(1)); - } - unsafe { - let static_map = &mut CACHED_VALUES; - *static_map = Some(map); - } - }).await - }).await; - - }); - } - ).join(); - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 1 ; - - // let addresses = (0..cpu_nums).map(|_| { - // ("0.0.0.0".to_string(),8080) - // }).collect::>(); - // conf.addresses = addresses; - RunServer!( - conf, - ROOT, - EntryController, - shared_factory - ); -} - -fn shared_factory()->Pin>>{ - Box::pin(async { - - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - Rc::new(ThreadSharedStruct{ - writing_buffer:UnsafeCell::new(BytesMut::with_capacity(100_000)), - rng:WyRand::new() - }) - }) -} - -pub async fn handle_cached_queries(context:&mut HttpContext<'_,MainType,SH,HS,QS>){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:SH = context.thread_shared_struct.clone().unwrap().clone(); - let data = connection.get_cached_queries(q); - let mut sender:HttpSender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - ResponseData::Slice(data) - ).await; -} - -WaterController! { - holder -> super::MainType, - shared -> super::SH, - name -> EntryController, - functions -> { - - GET -> "cached-queries" -> query (context) { - _=super::handle_cached_queries(context).await; - } - } -} - -fn main() { - run_server(); -} - diff --git a/frameworks/Rust/water-http/src/date.rs b/frameworks/Rust/water-http/src/date.rs new file mode 100644 index 00000000000..0896f479733 --- /dev/null +++ b/frameworks/Rust/water-http/src/date.rs @@ -0,0 +1,32 @@ +use std::cell::UnsafeCell; +use std::time::{SystemTime, UNIX_EPOCH}; + +thread_local! { + // We store the bytes directly in the Thread Local Storage (TLS) + // to ensure the reference stays valid for the life of the thread. + static CACHED_DATE: UnsafeCell<(u64, [u8; 29])> = UnsafeCell::new((0, [0u8; 29])); +} + +#[inline(always)] +pub fn get_date_fast() -> &'static str { + CACHED_DATE.with(|cell| { + unsafe { + let cache = &mut *cell.get(); + // 1. Use a fast duration check + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_unchecked() // Benchmark trick: avoid panic branches + .as_secs(); + + if now != cache.0 { + cache.0 = now; + let date_str = httpdate::fmt_http_date(SystemTime::now()); + cache.1.copy_from_slice(date_str.as_bytes()); + } + + // 2. Return a reference to the TLS memory directly. + // This is safe because the thread never dies during the benchmark. + std::str::from_utf8_unchecked(&cache.1) + } + }) +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/db.rs b/frameworks/Rust/water-http/src/db.rs index bcd33931486..db6f3028199 100644 --- a/frameworks/Rust/water-http/src/db.rs +++ b/frameworks/Rust/water-http/src/db.rs @@ -1,19 +1,23 @@ -#![cfg(any(feature = "db",feature = "all"))] -use std::{borrow::Cow, io, ptr}; +#![cfg(any(feature = "db",feature = "all",feature = "uring",feature = "mini"))] +use std::{borrow::Cow, io}; use std::fmt::Arguments; use std::io::Write; use std::mem::MaybeUninit; use std::rc::Rc; use std::cell::UnsafeCell; -use std::collections::HashMap; -use bytes::Buf; +use bytes::{Buf, BufMut}; use nanorand::{Rng, WyRand}; -use tokio_postgres::{connect, Client, Statement, NoTls, Error}; +use smallvec::SmallVec; +use tokio_postgres::{connect, Client, Statement, NoTls}; use tokio_postgres::types::private::BytesMut; use crate::models::{Fortune, FortuneTemplate, World}; use sonic_rs::prelude::WriteExt; +use water_buffer::WaterBuffer; use yarte::TemplateBytesTrait; -pub static mut CACHED_VALUES:Option> = None; +use crate::buf::{FortunesPool, IDsPool, PooledBuffer, WorldsPool}; + + +use tokio::pin; /// Database connection pool with thread-local RNG pub struct DbConnectionPool { @@ -21,6 +25,9 @@ pub struct DbConnectionPool { pub next: UnsafeCell, } +unsafe impl Sync for DbConnectionPool {} +unsafe impl Send for DbConnectionPool {} + impl DbConnectionPool { /// Get a connection from the pool (round-robin, relaxed ordering) #[inline(always)] @@ -35,7 +42,7 @@ impl DbConnectionPool { pub async fn fill_pool(&mut self, url: &'static str, size: usize) { let mut tasks = Vec::with_capacity(size); for _ in 0..size { - tasks.push(tokio::task::spawn_local(async move { + tasks.push(tokio::task::spawn(async move { for attempt in 0..5 { match PgConnection::connect(url).await { Ok(conn) => { @@ -62,6 +69,8 @@ impl DbConnectionPool { /// Reusable buffer pool per connection struct BufferPool { body: BytesMut, + worlds: Vec, + numbers: Vec, fortunes: Vec, fortune_output: Vec, } @@ -70,6 +79,8 @@ impl BufferPool { fn new() -> Self { Self { body: BytesMut::with_capacity(4096), + worlds: Vec::with_capacity(501), + numbers: Vec::with_capacity(501), fortunes: Vec::with_capacity(501), fortune_output: Vec::with_capacity(4096), @@ -86,7 +97,6 @@ pub struct PgConnection { pub updates: Vec, rang:WyRand, buffers: UnsafeCell, - _connection_task: tokio::task::JoinHandle<()>, } // Safety: Only used within LocalSet, no cross-thread access @@ -102,7 +112,7 @@ impl PgConnection { .map_err(|_| ())? .map_err(|_| ())?; - let connection_task = tokio::task::spawn_local(async move { + _= tokio::task::spawn(async move { let _ = conn.await; }); @@ -111,7 +121,7 @@ impl PgConnection { // Pre-compile update statements for batch sizes 1-500 let mut updates = vec![]; - for num in 1..=500 { + for num in 1..=500 { let sql = Self::generate_update_values_stmt(num); updates.push(cl.prepare(&sql).await.unwrap()); } @@ -122,14 +132,12 @@ impl PgConnection { world, updates, buffers: UnsafeCell::new(BufferPool::new()), - _connection_task: connection_task, rang: WyRand::new() }) - } /// Connect to the database - + } + /// Connect to the database #[inline(always)] pub fn generate_update_values_stmt(batch_size: usize) -> String { - let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES "); for i in 0..batch_size { @@ -153,36 +161,40 @@ impl PgConnection { /// Get a single random world - optimized with buffer reuse #[inline] - pub async fn get_world(&self) -> &[u8] { - let rd = (self.rang.clone().generate::() % 10_000 ) as i32; + pub async fn get_world(&self) -> Vec { + let rd = (self.rang.clone().generate::() % 10_000 + 1) as i32; let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap(); - let buffers = self.buffers(); - buffers.body.clear(); + + let mut buffer = PooledBuffer::new().take_inner(); + sonic_rs::to_writer( - BytesMuteWriter(&mut buffers.body), + &mut buffer, &World { id: row.get(0), randomnumber: row.get(1), }, ).unwrap(); - buffers.body.chunk() + + buffer } /// Get multiple random worlds - optimized with buffer reuse - pub async fn get_worlds(&self, num: usize) -> &[u8] { - let buffers = self.buffers(); - let mut worlds = Vec::with_capacity(num); + pub async fn get_worlds(&self, num: usize) -> Vec { + let mut worlds = WorldsPool::new().take_inner(); + worlds.clear(); + let mut rn = self.rang.clone(); for _ in 0..num { - let id = (self.rang.clone().generate::() % 10_000 ) as i32; + let id: i32 = (rn.generate::() & 0x3FFF) as i32 % 10_000 + 1; let row = self.cl.query_one(&self.world, &[&id]).await.unwrap(); worlds.push(World { id: row.get(0), randomnumber: row.get(1), }); } - buffers.body.clear(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); - buffers.body.chunk() + let mut buffer = PooledBuffer::new().take_inner(); + sonic_rs::to_writer(&mut buffer, &worlds).unwrap(); + WorldsPool::save_heap_allocation(worlds); + buffer } /// Update worlds in batch - optimized with buffer reuse /// Update worlds in batch - optimized with buffer reuse @@ -194,114 +206,180 @@ impl PgConnection { /// Update worlds in batch - optimized with RETURNING clause to minimize reads /// Update worlds - fetch and update each row to handle duplicates correctly /// Update worlds in batch using CASE statement - pub async fn update(&self, num: usize) -> &[u8] { - let buffers = self.buffers(); - let mut ids:Vec = Vec::with_capacity(num); - let mut rng = self.rang.clone(); + pub async fn update(&self,num:usize)->Vec{ + + let mut output = PooledBuffer::new().take_inner(); + let (mut ids,mut numbers) = IDsPool::new().take_inner(); + let mut worlds = WorldsPool::new().take_inner(); + let mut rn = self.rang.clone(); + let mut futures = Vec::with_capacity(num); let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = - Vec::with_capacity(num * 2); - let mut futures =vec![]; - for _ in 0..num { - let w_id = (rng.generate::() % 10_000 + 1) as i32; - ids.push(w_id); - } - futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await})); - futures_util::future::join_all(futures).await; - ids.sort_unstable(); - let mut worlds = Vec::with_capacity(num); - let mut numbers = Vec::with_capacity(num); - for index in 0..num { - let s_id = (rng.generate::() % 10_000 + 1 ) as i32; - worlds.push(World{ - id:ids[index], - randomnumber:s_id + Vec::with_capacity(num * 2); + + if worlds.len() == num { + for w in &worlds { + ids.push(w.id); + params.push(&w.id); + numbers.push(w.randomnumber); + params.push(&w.randomnumber); + } + } else { + worlds.clear(); + (0..num).for_each(|n|{ + let id = (rn.generate::() % 10_000 + 1) as i32; + let number = (rn.generate::() % 10_000 + 1) as i32; + ids.push(id); + numbers.push(number); }); - numbers.push(s_id); - } - buffers.body.clear(); - for index in 0..num { - params.push(&ids[index]); - params.push(&numbers[index]); - } - _=self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); - buffers.body.chunk() + ids.sort(); + for ind in 0..num { + worlds.push( + World{ + id:ids[ind], + randomnumber:numbers[ind] + } + ); + params.push(&ids[ind]); + params.push(&numbers[ind]); + } + } + futures.extend(ids.iter().map(|x| async move { self.cl.query_one(&self.world, &[&x]).await })); + futures_util::future::join_all(futures).await; + _ = self.cl.execute(&self.updates[num - 1], ¶ms).await; + _= sonic_rs::to_writer(&mut output,&worlds); + WorldsPool::save_heap_allocation(worlds); + IDsPool::save_heap_allocation((ids,numbers)); + output } - - /// Tell fortunes - optimized with buffer reuse - pub async fn tell_fortune(&self) -> Result<&[u8], ()> { - let res = self.cl.query(&self.fortune, &[]).await.map_err(|_| ())?; - - let buffers = self.buffers(); - buffers.fortunes.clear(); - buffers.fortune_output.clear(); - - buffers.fortunes.push(Fortune { - id: 0, - message: Cow::Borrowed("Additional fortune added at request time."), - }); - - for row in res { - buffers.fortunes.push(Fortune { - id: row.get(0), - message: Cow::Owned(row.get(1)), - }); + // pub async fn update(&self, num: usize) -> &[u8] { + // let buffers = self.buffers(); + // let mut ids: Vec = Vec::with_capacity(num); + // let mut rng = self.rang.clone(); + // let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + // Vec::with_capacity(num * 2); + // let mut futures = vec![]; + // for _ in 0..num { + // let w_id = (rng.generate::() % 10_000 + 1) as i32; + // ids.push(w_id); + // } + // futures.extend(ids.iter().map(|x| async move { self.cl.query_one(&self.world, &[&x]).await })); + // futures_util::future::join_all(futures).await; + // ids.sort_unstable(); + // buffers.worlds.clear(); + // for index in 0..num { + // let s_id = (rng.generate::() % 10_000 + 1) as i32; + // buffers.worlds.push(World { + // id: ids[index], + // randomnumber: s_id + // }); + // buffers.numbers.push(s_id); + // } + // buffers.body.clear(); + // for index in 0..num { + // params.push(&ids[index]); + // params.push(&buffers.numbers[index]); + // } + // + // _ = self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); + // sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); + // buffers.body.chunk() + // } + + + pub async fn tell_fortune(&self) -> Result, ()> { + // 1. Explicitly type the empty params to satisfy the compiler's inference + let mut res = self.cl.query(&self.fortune, &[]).await.unwrap(); + let mut fortunes = FortunesPool::new().take_inner(); + if fortunes.is_empty() { + fortunes.push(Fortune{ + id:0, + message:Cow::Borrowed("Additional fortune added at request time.") + }); + for r in res { + let id :&'static str = unsafe { + let a :&str = r.get(1); + &*(a as *const str) + }; + fortunes.push( + Fortune { + id:r.get(0), + message:Cow::Borrowed(id) + } + ); + } + fortunes.sort_by(|a, b| a.message.cmp(&b.message)); } + let mut output = PooledBuffer::new().take_inner(); + let template = FortuneTemplate { + items: &fortunes, + }; + template.write_call(&mut output); + Ok(output) + } +} - buffers.fortunes.sort_unstable_by(|a, b| a.message.cmp(&b.message)); - - let template = FortuneTemplate { items: &buffers.fortunes }; - template.write_call(&mut buffers.fortune_output); +unsafe impl Sync for PgConnection {} +unsafe impl Send for PgConnection {} +/// Zero-copy writer for BytesMut +pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); - // Return reference to buffer - zero-copy! - Ok(&buffers.fortune_output) +impl Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write(&mut self, src: &[u8]) -> Result { + self.0.extend_from_slice(src); + Ok(src.len()) } + #[inline(always)] + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} - pub fn get_cached_queries(&self,num:usize)->&[u8]{ - let buffers = self.buffers(); - let mut worlds = Vec::::with_capacity(num); - for _ in 0..num { - let rd = (self.rang.clone().generate::() % 10_000 ) as i32; - let v = match self.get_world_id_for_cache(rd){ - None => {continue} - Some(e)=>{e} - }; - worlds.push(World{ - id:rd, - randomnumber:*v - }) - } - buffers.body.clear(); - sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap(); - return &buffers.body[..] +impl std::fmt::Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) } - fn get_world_id_for_cache(&self, id: i32) -> Option<&i32> { - unsafe { - let ptr = ptr::addr_of!(CACHED_VALUES); + #[inline(always)] + fn write_char(&mut self, c: char) -> std::fmt::Result { + let mut buf = [0u8; 4]; + self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + Ok(()) + } - match &*ptr { - Some(map) => map.get(&id), - None => None, - } - } + #[inline(always)] + fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { + std::fmt::write(self, args) } } -/// Zero-copy writer for BytesMut -pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); -impl BytesMuteWriter<'_> { +impl WriteExt for BytesMuteWriter<'_> { + #[inline(always)] + fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + self.0.reserve(additional); + unsafe { + let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; + Ok(std::slice::from_raw_parts_mut(ptr, additional)) + } + } #[inline(always)] - pub fn extend_from_slice(&mut self,data:&[u8]){ - self.0.extend_from_slice(data); + unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { + self.0.set_len(self.0.len() + additional); + Ok(()) } } -impl Write for BytesMuteWriter<'_> { + +/// Zero-copy writer for WaterBuffer +pub struct WaterMutWriter<'a>(pub &'a mut WaterBuffer); + +impl Write for WaterMutWriter<'_> { #[inline(always)] fn write(&mut self, src: &[u8]) -> Result { self.0.extend_from_slice(src); @@ -314,7 +392,7 @@ impl Write for BytesMuteWriter<'_> { } } -impl std::fmt::Write for BytesMuteWriter<'_> { +impl std::fmt::Write for WaterMutWriter<'_> { #[inline(always)] fn write_str(&mut self, s: &str) -> std::fmt::Result { self.0.extend_from_slice(s.as_bytes()); @@ -324,7 +402,8 @@ impl std::fmt::Write for BytesMuteWriter<'_> { #[inline(always)] fn write_char(&mut self, c: char) -> std::fmt::Result { let mut buf = [0u8; 4]; - self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + self.0 + .extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); Ok(()) } @@ -334,10 +413,14 @@ impl std::fmt::Write for BytesMuteWriter<'_> { } } -impl WriteExt for BytesMuteWriter<'_> { +impl WriteExt for WaterMutWriter<'_> { #[inline(always)] - fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + fn reserve_with( + &mut self, + additional: usize, + ) -> Result<&mut [MaybeUninit], io::Error> { self.0.reserve(additional); + unsafe { let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; Ok(std::slice::from_raw_parts_mut(ptr, additional)) @@ -346,7 +429,7 @@ impl WriteExt for BytesMuteWriter<'_> { #[inline(always)] unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { - self.0.set_len(self.0.len() + additional); + self.0.advance_mut(self.0.len() + additional); Ok(()) } } \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/json.rs b/frameworks/Rust/water-http/src/json.rs deleted file mode 100644 index 71f406b6051..00000000000 --- a/frameworks/Rust/water-http/src/json.rs +++ /dev/null @@ -1,55 +0,0 @@ -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::server::ServerConfigurations; - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, -} - - - -pub type MainType = u8; - - -fn main() { - run_server(); -} - - -pub fn run_server(){ - - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 2 ; - - RunServer!( - conf, - ROOT, - EntryController - ); -} - -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; - - -WaterController! { - holder -> super::MainType, - name -> EntryController, - functions -> { - - - GET => json => j(cx){ - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; - } - - } -} - diff --git a/frameworks/Rust/water-http/src/main.rs b/frameworks/Rust/water-http/src/main.rs index 487409af1d6..61282df5f00 100644 --- a/frameworks/Rust/water-http/src/main.rs +++ b/frameworks/Rust/water-http/src/main.rs @@ -1,7 +1,21 @@ -mod server; +pub mod server; pub mod models; -mod db; +pub mod db; +pub mod date; +pub mod buf; -fn main() { - server::run_server(); +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + + fn main() { + + #[cfg(feature = "mini")] + { + mini::run(); + } + #[cfg(not(feature = "mini"))] + server::run_server(); } + diff --git a/frameworks/Rust/water-http/src/mini.rs b/frameworks/Rust/water-http/src/mini.rs new file mode 100644 index 00000000000..d43a22fd439 --- /dev/null +++ b/frameworks/Rust/water-http/src/mini.rs @@ -0,0 +1,167 @@ +use std::cell::OnceCell; +use water_http::http::status_code::HttpStatusCode; +use water_http::server::mini::{CtxPtr, HandlerFn, serve}; +use water_http::server::ServerConfigurations; +use crate::buf::{FortunesPool, PooledBuffer}; +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + + +mod date; +mod models; +mod buf; +mod db; +use crate::db::PgConnection; +use crate::date::get_date_fast; +const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; +// const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; + +const PLAINTEXT:&'static [u8] = br#"Hello, World!"#; + +fn main() { + let thread_init = || async { + get_connection().await; + }; + let mut conf = ServerConfigurations::bind("0.0.0.0", 8080); + conf.max_cached_buffers_count = 2500; + conf.default_write_buffer_size = conf.default_read_buffer_size * 2; + conf.max_buffer_size_for_cache = conf.default_write_buffer_size; + serve::<16, 10, _,_,_>(conf, HandlerFn(|ctx: CtxPtr<16, 10>| handler(ctx)),Some(thread_init)); +} + +async fn handler(mut ctx: CtxPtr<16,10>){ + let ctx = ctx.get(); + let req = ctx.request(); + + let path = req.path(); + match path { + "/plaintext"=>{ + ctx.set_header("Content-Type","text/plain; charset=utf-8"); + ctx.set_header("Content-Length",PLAINTEXT.len()); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.write_body_bytes(PLAINTEXT); + } + "/json" =>{ + let js = models::JsonHolder{message:"Hello, World!"}; + let mut buffer = buf::PooledBuffer::new().take_inner(); + _=sonic_rs::to_writer(&mut buffer,&js); + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Content-Length",buffer.len()); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.write_body_bytes(&buffer); + buf::PooledBuffer::recycle(buffer) + } + + "/fortunes"=>{ + let connection = get_connection().await; + let fortunes = connection.tell_fortune().await.unwrap(); + ctx.set_header("Content-Type","text/html; charset=UTF-8"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",fortunes.len()); + ctx.write_body_bytes(&fortunes); + FortunesPool::save_heap_allocation(fortunes); + } + + "/db" => { + let connection = get_connection().await; + let db = connection.get_world().await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",db.len()); + ctx.write_body_bytes(&db); + PooledBuffer::recycle(db); + } + + _=>{ + if path.contains("?") { + let value = path.split("=").last() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1) + .clamp(1, 500); + let s =&path[0..=1]; + match s { + "/q"=>{ + let q = get_connection().await; + let data = q.get_worlds(value).await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",data.len()); + ctx.write_body_bytes(&data); + PooledBuffer::recycle(data); + return + } + "/u"=> { + let q = get_connection().await; + let data = q.update(value).await; + ctx.set_header("Content-Type","application/json"); + ctx.set_header("Server","W"); + ctx.set_header("Date",get_date_fast()); + ctx.set_header("Content-Length",data.len()); + ctx.write_body_bytes(&data); + PooledBuffer::recycle(data); + return + } + _=>{} + } + + } + ctx.set_status_code(HttpStatusCode::NOT_FOUND); + } + } + +} + + +thread_local! { + static PG:OnceCell = OnceCell::new(); +} + +async fn get_connection() -> &'static PgConnection { + // 1. Try to get the pointer from thread-local + let ptr = PG.with(|cell| { + cell.get().map(|conn| conn as *const PgConnection) + }); + + if let Some(p) = ptr { + // SAFETY: We are promoting a thread-local pointer to 'static. + // This is only safe in TFB if the thread never dies. + return unsafe { &*p }; + } + + let mut t_count = 0_usize; + // 2. Initialize + let conn = loop { + match PgConnection::connect(URL) + .await { + Ok(r) => { + break r + } + Err(_) => { + t_count+=1; + println!("failed count = {t_count}"); + if t_count > 10 { + panic!("could not connect to db"); + } + continue } + } + }; + println!("{:?} connected successfully", std::thread::current().name().unwrap()); + // 3. Store it + PG.with(|cell| { + _= cell.set(conn); + }); + + // 4. Retrieve the pointer again + let ptr = PG.with(|cell| { + cell.get().map(|conn| conn as *const PgConnection) + }).expect("Should be initialized"); + + unsafe { &*ptr } +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs index 1fee7283ba9..d1e008cc589 100644 --- a/frameworks/Rust/water-http/src/models.rs +++ b/frameworks/Rust/water-http/src/models.rs @@ -17,6 +17,10 @@ pub struct Fortune { pub struct FortuneTemplate<'a>{ pub items:&'a Vec } +#[derive(Serialize,Debug)] +pub struct JsonHolder { + pub message:&'static str +} diff --git a/frameworks/Rust/water-http/src/plaintext.rs b/frameworks/Rust/water-http/src/plaintext.rs deleted file mode 100644 index 197da216e9c..00000000000 --- a/frameworks/Rust/water-http/src/plaintext.rs +++ /dev/null @@ -1,54 +0,0 @@ -use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::server::ServerConfigurations; - -InitControllersRoot! { - name:ROOT, - holder_type:MainType, -} - - - -pub type MainType = u8; - - -fn main() { - run_server(); -} - - -pub fn run_server(){ - - let cpu_nums = num_cpus::get(); - - - println!("start listening on port 8080 while workers count {cpu_nums}"); - let mut conf = ServerConfigurations::bind("0.0.0.0",8080); - conf.worker_threads_count = cpu_nums * 1 ; - - RunServer!( - conf, - ROOT, - EntryController - ); -} - -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; -const P:&'static [u8] = br#"Hello, World!"#; - - -WaterController! { - holder -> super::MainType, - name -> EntryController, - functions -> { - - GET => plaintext => p(cx) { - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Str("Hello, World!")).await; - } - } -} - diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs index 309aee70eb3..b2ce34241df 100644 --- a/frameworks/Rust/water-http/src/server.rs +++ b/frameworks/Rust/water-http/src/server.rs @@ -1,18 +1,13 @@ -use std::collections::HashMap; + use std::pin::Pin; -use std::ptr; use std::rc::Rc; -use tokio::task::LocalSet; use water_http::{InitControllersRoot, RunServer, WaterController}; -use water_http::http::{HttpSender, ResponseData}; -use water_http::server::{HttpContext, ServerConfigurations}; -use crate::db::{CACHED_VALUES, DbConnectionPool}; +use water_http::server::ServerConfigurations; +use crate::db::{DbConnectionPool}; InitControllersRoot! { name:ROOT, holder_type:MainType, shared_type:SharedType, - headers_length:6, - queries_length:3 } pub struct ThreadSharedStruct{ @@ -27,43 +22,6 @@ pub type SharedType = Rc; pub fn run_server(){ - - _= std::thread::spawn( - ||{ - let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - rt.block_on(async move { - const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; - // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; - - let mut pool = DbConnectionPool{ - connections:Vec::with_capacity( 1 - ), - next:0.into(), - // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() - }; - - let local_set = LocalSet::new(); - - _= local_set.run_until(async move { - tokio::task::spawn_local(async move { - pool.fill_pool(URL, 1).await; - let connection = pool.get_connection(); - let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap(); - let res = connection.cl.query(&statement,&[]).await.unwrap(); - let mut map = HashMap::new(); - for row in res { - map.insert(row.get(0),row.get(1)); - } - unsafe { - let ptr = ptr::addr_of_mut!(CACHED_VALUES); - ptr.write(Some(map)); - } - }).await - }).await; - - }); - } - ).join(); let cpu_nums = num_cpus::get(); @@ -78,10 +36,9 @@ pub fn run_server(){ conf.worker_threads_count = cpu_nums * 1 ; } - // let addresses = (0..cpu_nums).map(|_| { - // ("0.0.0.0".to_string(),8080) - // }).collect::>(); - // conf.addresses = addresses; + conf.max_cached_buffers_count = 2500; + conf.default_write_buffer_size = conf.default_read_buffer_size * 2; + conf.max_buffer_size_for_cache = conf.default_write_buffer_size; RunServer!( conf, ROOT, @@ -108,15 +65,12 @@ fn shared_factory()->Pin>>{ }) } -#[cfg(any(feature = "json_plaintext",feature = "all"))] -const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; -#[cfg(any(feature = "json_plaintext",feature = "all"))] -const P:&'static [u8] = br#"Hello, World!"#; - +#[cfg(any(feature = "json_plaintext",feature = "all",feature = "uring"))] +const P:&'static [u8] = br#"Hello, World!"#; -#[cfg(feature = "all")] +#[cfg(any(feature = "all",feature = "uring"))] WaterController! { holder -> super::MainType, shared -> super::SharedType, @@ -128,9 +82,12 @@ WaterController! { let mut sender = cx.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + let js = crate::models::JsonHolder{message:"Hello, World!"}; + let mut buffer = crate::buf::PooledBuffer::new().take_inner(); + _=sonic_rs::to_writer(&mut buffer,&js); + sender.set_header_ef("Date",crate::date::get_date_fast()); + _=sender.send_data_as_final_response(http::ResponseData::Slice(&buffer)).await; + crate::buf::PooledBuffer::recycle(buffer); } GET => plaintext => p(cx){ @@ -150,11 +107,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } GET -> queries -> query (context){ let q = context @@ -169,11 +126,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } GET -> updates -> update (context){ @@ -188,32 +145,11 @@ WaterController! { let mut sender = context.sender(); sender.set_header_ef("Content-Type","application/json"); sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) + http::ResponseData::Slice(&data) ).await; - } - - - GET -> "cached-queries" -> cached(context)async { - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_cached_queries(q); - let mut sender= context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; + crate::buf::PooledBuffer::recycle(data); } @@ -230,127 +166,16 @@ WaterController! { }; let mut sender = context.sender(); sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); + sender.set_header_ef("Server","W"); + // let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",crate::date::get_date_fast()); _= sender.send_data_as_final_response( http::ResponseData::Slice(&data) ).await; + crate::buf::PooledBuffer::recycle(data); } } } - -#[cfg(all(not(feature = "all"),feature = "json_plaintext"))] -WaterController! { - holder -> super::MainType, - shared -> super::SharedType, - name -> EntryController, - functions -> { - - - GET => json => j(cx){ - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; - } - - GET => plaintext => p(cx) { - let mut sender = cx.sender(); - sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await; - } - } -} - -#[cfg(all(not(feature = "all"),feature = "db"))] -WaterController! { - holder -> super::MainType, - shared -> super::SharedType, - name -> EntryController, - functions -> { - - - GET -> db -> db (context){ - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_world().await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - GET -> queries -> query (context){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.get_worlds(q).await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - - GET -> updates -> update (context){ - let q = context - .get_from_path_query("q") - .and_then(|v| v.parse::().ok()) // safely parse - .unwrap_or(1) // default to 1 if missing or invalid - .clamp(1, 500); - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = connection.update(q).await; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","application/json"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(data) - ).await; - } - - - GET -> fortunes -> ft (context){ - - let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); - let connection = connection.pg_connection.get_connection(); - let data = match connection.tell_fortune().await { - Ok(r)=>{r}, - _=>{ - _= context.send_str("failed to connect").await; - return - } - }; - let mut sender = context.sender(); - sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); - sender.set_header_ef("Server","water"); - let date = httpdate::fmt_http_date(std::time::SystemTime::now()); - sender.set_header_ef("Date",date); - _= sender.send_data_as_final_response( - http::ResponseData::Slice(&data) - ).await; - } - } -} \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-mini.dockerfile b/frameworks/Rust/water-http/water-http-mini.dockerfile new file mode 100644 index 00000000000..7acbcec530f --- /dev/null +++ b/frameworks/Rust/water-http/water-http-mini.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable" cargo build --release --bin mini --features mini + +EXPOSE 8080 + +CMD ./target/release/mini \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-tokio.dockerfile b/frameworks/Rust/water-http/water-http-tokio.dockerfile new file mode 100644 index 00000000000..a5f3fc1401d --- /dev/null +++ b/frameworks/Rust/water-http/water-http-tokio.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native --cfg tokio_unstable" cargo build --release --bin water-http --features all + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-uring.dockerfile b/frameworks/Rust/water-http/water-http-uring.dockerfile new file mode 100644 index 00000000000..50d5a991f6a --- /dev/null +++ b/frameworks/Rust/water-http/water-http-uring.dockerfile @@ -0,0 +1,13 @@ +FROM rust:1.93 + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +WORKDIR /water +COPY . . + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features uring + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file