Skip to content
This repository was archived by the owner on Mar 24, 2026. It is now read-only.

Commit c3cd304

Browse files
[water_http] updating water_http version and add cached queries test to the default config (#10641)
* Add water-http framework * benchmark_config.json update * updating water_http version to release one * updating water_http release version * version update * fix water_http errors in update queries and update water_http version * adding cached-queries to default test * [water-http] remove unnecessary buffers * add maintainers list into benchmark_config.json
1 parent 7f801cd commit c3cd304

7 files changed

Lines changed: 150 additions & 33 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
installs
22
node_modules/
33
.travis.bak
4-
4+
t.bat
55
# Added for pedestal framework test
66
.lein-deps-sum
77

frameworks/Rust/water-http/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
[package]
22
name = "water-http"
33
version = "0.1.0"
4-
edition = "2018"
4+
edition = "2024"
55

66
[dependencies]
77
askama = "0.14.0"
88
tokio = { version = "1.47.1", features = ["full"] }
9-
water_http = { version = "3.2.1-beta.14" ,optional = true , features = ["use_only_http1"]}
9+
water_http = { features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" }
1010
smallvec = "1.15.1"
1111
nanorand = "0.8.0"
1212
tokio-postgres = "0.7.15"

frameworks/Rust/water-http/benchmark_config.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11

22
{
33
"framework": "water-http",
4+
"maintainers" : ["HassanSharara"],
45
"tests": [
56
{
6-
"default": {
7+
"default": {
78
"json_url": "/json",
89
"plaintext_url": "/plaintext",
910
"fortune_url": "/fortunes",
1011
"db_url": "/db",
1112
"query_url": "/queries?q=",
1213
"update_url": "/updates?q=",
14+
"cached_query_url": "/cached-queries?q=",
1315
"port": 8080,
1416
"approach": "Realistic",
1517
"classification": "Fullstack",
@@ -24,5 +26,5 @@
2426
"display_name": "water_http"
2527
}
2628
}
27-
]
29+
]
2830
}

frameworks/Rust/water-http/src/cached.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ pub struct ThreadSharedStruct{
170170
impl ThreadSharedStruct {
171171

172172
#[inline(always)]
173-
pub fn get_value(id:i32)->&'static i32{
173+
pub fn get_value(id:i32)->Option<&'static i32>{
174174
let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ;
175-
map.unwrap()
175+
map
176176
}
177177
pub fn get_cached_queries(&self,num:usize)->&[u8]{
178178
let buf = unsafe{&mut *(self.writing_buffer.get())};
@@ -181,8 +181,11 @@ impl ThreadSharedStruct {
181181
let mut writer = BytesMuteWriter(buf);
182182
let mut rn = self.rng.clone();
183183
for _ in 0..num {
184-
let rd: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
185-
let v = Self::get_value(rd);
184+
let rd = (rn.generate::<u32>() % 10_000 ) as i32;
185+
let v = match Self::get_value(rd) {
186+
None => {continue}
187+
Some(c) => {c}
188+
};
186189
writer.extend_from_slice(br"{");
187190
_ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v);
188191
writer.extend_from_slice(br"},");

frameworks/Rust/water-http/src/db.rs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
#![cfg(any(feature = "db",feature = "all"))]
2-
use std::{borrow::Cow, io};
2+
use std::{borrow::Cow, io, ptr};
33
use std::fmt::Arguments;
44
use std::io::Write;
55
use std::mem::MaybeUninit;
66
use std::rc::Rc;
77
use std::cell::UnsafeCell;
8+
use std::collections::HashMap;
89
use bytes::Buf;
910
use nanorand::{Rng, WyRand};
10-
use tokio_postgres::{connect, Client, Statement, NoTls};
11+
use tokio_postgres::{connect, Client, Statement, NoTls, Error};
1112
use tokio_postgres::types::private::BytesMut;
1213
use crate::models::{Fortune, FortuneTemplate, World};
1314
use sonic_rs::prelude::WriteExt;
1415
use yarte::TemplateBytesTrait;
16+
pub static mut CACHED_VALUES:Option<HashMap<i32,i32>> = None;
1517

1618
/// Database connection pool with thread-local RNG
1719
pub struct DbConnectionPool {
@@ -60,8 +62,6 @@ impl DbConnectionPool {
6062
/// Reusable buffer pool per connection
6163
struct BufferPool {
6264
body: BytesMut,
63-
worlds: Vec<World>,
64-
numbers: Vec<i32>,
6565
fortunes: Vec<Fortune>,
6666
fortune_output: Vec<u8>,
6767
}
@@ -70,8 +70,6 @@ impl BufferPool {
7070
fn new() -> Self {
7171
Self {
7272
body: BytesMut::with_capacity(4096),
73-
worlds: Vec::with_capacity(501),
74-
numbers: Vec::with_capacity(501),
7573

7674
fortunes: Vec::with_capacity(501),
7775
fortune_output: Vec::with_capacity(4096),
@@ -130,7 +128,7 @@ impl PgConnection {
130128
} /// Connect to the database
131129
132130
#[inline(always)]
133-
pub fn generate_update_values_stmt(batch_size: usize) -> String {
131+
pub fn generate_update_values_stmt(batch_size: usize) -> String {
134132

135133
let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES ");
136134

@@ -156,38 +154,34 @@ impl PgConnection {
156154
/// Get a single random world - optimized with buffer reuse
157155
#[inline]
158156
pub async fn get_world(&self) -> &[u8] {
159-
let rd = (self.rang.clone().generate::<u32>() % 10_000 + 1) as i32;
157+
let rd = (self.rang.clone().generate::<u32>() % 10_000 ) as i32;
160158
let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap();
161-
162159
let buffers = self.buffers();
163160
buffers.body.clear();
164-
165161
sonic_rs::to_writer(
166162
BytesMuteWriter(&mut buffers.body),
167163
&World {
168164
id: row.get(0),
169165
randomnumber: row.get(1),
170166
},
171167
).unwrap();
172-
173168
buffers.body.chunk()
174169
}
175170

176171
/// Get multiple random worlds - optimized with buffer reuse
177172
pub async fn get_worlds(&self, num: usize) -> &[u8] {
178173
let buffers = self.buffers();
179-
buffers.worlds.clear();
180-
let mut rn = self.rang.clone();
174+
let mut worlds = Vec::with_capacity(num);
181175
for _ in 0..num {
182-
let id: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
176+
let id = (self.rang.clone().generate::<u32>() % 10_000 ) as i32;
183177
let row = self.cl.query_one(&self.world, &[&id]).await.unwrap();
184-
buffers.worlds.push(World {
178+
worlds.push(World {
185179
id: row.get(0),
186180
randomnumber: row.get(1),
187181
});
188182
}
189183
buffers.body.clear();
190-
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
184+
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap();
191185
buffers.body.chunk()
192186
}
193187
/// Update worlds in batch - optimized with buffer reuse
@@ -215,23 +209,24 @@ impl PgConnection {
215209
futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await}));
216210
futures_util::future::join_all(futures).await;
217211
ids.sort_unstable();
218-
buffers.worlds.clear();
212+
let mut worlds = Vec::with_capacity(num);
213+
let mut numbers = Vec::with_capacity(num);
219214
for index in 0..num {
220215
let s_id = (rng.generate::<u32>() % 10_000 + 1 ) as i32;
221-
buffers.worlds.push(World{
216+
worlds.push(World{
222217
id:ids[index],
223218
randomnumber:s_id
224219
});
225-
buffers.numbers.push(s_id);
220+
numbers.push(s_id);
226221
}
227222
buffers.body.clear();
228223
for index in 0..num {
229224
params.push(&ids[index]);
230-
params.push(&buffers.numbers[index]);
225+
params.push(&numbers[index]);
231226
}
232227

233228
_=self.cl.execute(&self.updates[num - 1], &params).await.unwrap();
234-
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
229+
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap();
235230
buffers.body.chunk()
236231
}
237232

@@ -264,11 +259,50 @@ impl PgConnection {
264259
// Return reference to buffer - zero-copy!
265260
Ok(&buffers.fortune_output)
266261
}
262+
263+
264+
pub fn get_cached_queries(&self,num:usize)->&[u8]{
265+
let buf = self.buffers();
266+
let buf = &mut buf.body;
267+
buf.clear();
268+
buf.extend_from_slice(br#"["#);
269+
let mut writer = BytesMuteWriter(buf);
270+
for _ in 0..num {
271+
let rd = (self.rang.clone().generate::<u32>() % 10_000 ) as i32;
272+
let v = match self.get_world_id_for_cache(rd){
273+
None => {continue}
274+
Some(e)=>{e}
275+
};
276+
writer.extend_from_slice(br"{");
277+
_ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v);
278+
writer.extend_from_slice(br"},");
279+
}
280+
if buf.len() >1 {buf.truncate(buf.len() - 1);}
281+
buf.extend_from_slice(b"]");
282+
return &buf[..]
283+
}
284+
285+
fn get_world_id_for_cache(&self, id: i32) -> Option<&i32> {
286+
unsafe {
287+
let ptr = ptr::addr_of!(CACHED_VALUES);
288+
289+
match &*ptr {
290+
Some(map) => map.get(&id),
291+
None => None,
292+
}
293+
}
294+
}
267295
}
268296

269297
/// Zero-copy writer for BytesMut
270298
pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut);
299+
impl BytesMuteWriter<'_> {
271300

301+
#[inline(always)]
302+
pub fn extend_from_slice(&mut self,data:&[u8]){
303+
self.0.extend_from_slice(data);
304+
}
305+
}
272306
impl Write for BytesMuteWriter<'_> {
273307
#[inline(always)]
274308
fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {

frameworks/Rust/water-http/src/server.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1-
1+
use std::collections::HashMap;
22
use std::pin::Pin;
3+
use std::ptr;
34
use std::rc::Rc;
5+
use tokio::task::LocalSet;
46
use water_http::{InitControllersRoot, RunServer, WaterController};
5-
use water_http::server::ServerConfigurations;
6-
use crate::db::{DbConnectionPool};
7+
use water_http::http::{HttpSender, ResponseData};
8+
use water_http::server::{HttpContext, ServerConfigurations};
9+
use crate::db::{CACHED_VALUES, DbConnectionPool};
710
InitControllersRoot! {
811
name:ROOT,
912
holder_type:MainType,
1013
shared_type:SharedType,
14+
headers_length:6,
15+
queries_length:3
1116
}
1217

1318
pub struct ThreadSharedStruct{
@@ -22,6 +27,43 @@ pub type SharedType = Rc<ThreadSharedStruct>;
2227

2328
pub fn run_server(){
2429

30+
31+
_= std::thread::spawn(
32+
||{
33+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
34+
rt.block_on(async move {
35+
const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
36+
// const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower";
37+
38+
let mut pool = DbConnectionPool{
39+
connections:Vec::with_capacity( 1
40+
),
41+
next:0.into(),
42+
// rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap()
43+
};
44+
45+
let local_set = LocalSet::new();
46+
47+
_= local_set.run_until(async move {
48+
tokio::task::spawn_local(async move {
49+
pool.fill_pool(URL, 1).await;
50+
let connection = pool.get_connection();
51+
let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap();
52+
let res = connection.cl.query(&statement,&[]).await.unwrap();
53+
let mut map = HashMap::new();
54+
for row in res {
55+
map.insert(row.get(0),row.get(1));
56+
}
57+
unsafe {
58+
let ptr = ptr::addr_of_mut!(CACHED_VALUES);
59+
ptr.write(Some(map));
60+
}
61+
}).await
62+
}).await;
63+
64+
});
65+
}
66+
).join();
2567
let cpu_nums = num_cpus::get();
2668

2769

@@ -71,6 +113,9 @@ const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#;
71113
#[cfg(any(feature = "json_plaintext",feature = "all"))]
72114
const P:&'static [u8] = br#"Hello, World!"#;
73115

116+
117+
118+
74119
#[cfg(feature = "all")]
75120
WaterController! {
76121
holder -> super::MainType,
@@ -151,6 +196,27 @@ WaterController! {
151196
}
152197

153198

199+
GET -> "cached-queries" -> cached(context)async {
200+
let q = context
201+
.get_from_path_query("q")
202+
.and_then(|v| v.parse::<usize>().ok()) // safely parse
203+
.unwrap_or(1) // default to 1 if missing or invalid
204+
.clamp(1, 500);
205+
206+
let connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
207+
let connection = connection.pg_connection.get_connection();
208+
let data = connection.get_cached_queries(q);
209+
let mut sender= context.sender();
210+
sender.set_header_ef("Content-Type","application/json");
211+
sender.set_header_ef("Server","water");
212+
let date = httpdate::fmt_http_date(std::time::SystemTime::now());
213+
sender.set_header_ef("Date",date);
214+
_= sender.send_data_as_final_response(
215+
http::ResponseData::Slice(data)
216+
).await;
217+
}
218+
219+
154220
GET -> fortunes -> ft (context){
155221

156222
let connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM rust:1.93
2+
3+
RUN apt-get update -yqq && apt-get install -yqq cmake g++
4+
5+
WORKDIR /water
6+
COPY . .
7+
RUN cargo clean
8+
RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin cache --features cache
9+
10+
EXPOSE 8080
11+
12+
CMD ./target/release/cache

0 commit comments

Comments
 (0)