@@ -3,64 +3,73 @@ use wtx::{
33 codec:: i64_string,
44 collection:: { ArrayVectorU8 , Vector } ,
55 http:: {
6- Header , HttpRecvParams , KnownHeaderName , ReqResBuffer , StatusCode ,
6+ Header , HttpRecvParams , KnownHeaderName , MsgBufferString , MsgDataMut , StatusCode ,
77 server_framework:: {
8- JsonReply , PathOwned , Router , ServerFrameworkBuilder , State , VerbatimParams , get,
8+ JsonReply , PathOwned , Router , ServerFrameworkBuilder , State , VerbatimParams , get,
99 } ,
1010 } ,
1111 misc:: Wrapper ,
1212 sync:: Arc ,
1313} ;
1414
15+ fn main ( ) {
16+ let dataset = load_dataset ( ) ;
17+ let threads = std:: thread:: available_parallelism ( ) . map ( |el| el. get ( ) ) . unwrap_or ( 1 ) ;
18+ let mut handlers = Vector :: new ( ) ;
19+ for _ in 0 ..threads {
20+ let dataset_thread = dataset. clone ( ) ;
21+ let handle = std:: thread:: spawn ( || {
22+ tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on ( async {
23+ let router = Router :: paths ( wtx:: paths!(
24+ ( "/baseline2" , get( endpoint_baseline2) ) ,
25+ ( "/json/{count}" , get( endpoint_json) ) ,
26+ ) ) . unwrap ( ) ;
27+ let _rslt = ServerFrameworkBuilder :: new ( HttpRecvParams :: with_permissive_params ( ) , router)
28+ . with_conn_aux ( move || Ok ( ConnAux { dataset : dataset_thread. clone ( ) } ) )
29+ . tokio (
30+ "0.0.0.0:8082" ,
31+ |_error| { } ,
32+ |_| Ok ( ( ) ) ,
33+ |_stream| Ok ( ( ) ) ,
34+ |_error| { } ,
35+ )
36+ . await ;
37+ } )
38+ } ) ;
39+ handlers. push ( handle) . unwrap ( ) ;
40+ }
41+ for handle in handlers {
42+ handle. join ( ) . unwrap ( ) ;
43+ }
44+ }
45+
1546#[ derive( Clone , wtx:: ConnAux ) ]
1647struct ConnAux {
1748 dataset : Arc < Vector < DatasetItem > > ,
1849}
1950
20- #[ tokio:: main]
21- async fn main ( ) -> wtx:: Result < ( ) > {
22- let dataset = load_dataset ( ) ;
23- let router = Router :: paths ( wtx:: paths!(
24- ( "/baseline2" , get( endpoint_baseline2) ) ,
25- ( "/json/{count}" , get( endpoint_json) ) ,
26- ) ) ?;
27- ServerFrameworkBuilder :: new ( HttpRecvParams :: with_permissive_params ( ) , router)
28- . with_conn_aux ( move || Ok ( ConnAux { dataset : dataset. clone ( ) } ) )
29- . tokio (
30- "0.0.0.0:8082" ,
31- |_error| { } ,
32- |_| Ok ( ( ) ) ,
33- |stream| {
34- stream. set_nodelay ( true ) ?;
35- Ok ( ( ) )
36- } ,
37- |_error| { } ,
38- )
39- . await
40- }
41-
4251async fn endpoint_baseline2 (
43- state : State < ' _ , ConnAux , ( ) , ReqResBuffer > ,
52+ state : State < ' _ , ConnAux , ( ) , MsgBufferString > ,
4453) -> wtx:: Result < VerbatimParams > {
4554 let mut sum: i64 = 0 ;
46- for ( _, value) in state. req . rrd . uri . query_params ( ) {
55+ for ( _, value) in state. req . msg_data . uri . query_params ( ) {
4756 sum = sum. wrapping_add ( value. parse ( ) ?) ;
4857 }
49- state. req . rrd . clear ( ) ;
50- state. req . rrd . body . extend_from_copyable_slice ( i64_string ( sum) . as_bytes ( ) ) ?;
51- state. req . rrd . headers . push_from_iter_many ( [
58+ state. req . msg_data . clear ( ) ;
59+ state. req . msg_data . body . extend_from_copyable_slice ( i64_string ( sum) . as_bytes ( ) ) ?;
60+ state. req . msg_data . headers . push_from_iter_many ( [
5261 Header :: from_name_and_value ( KnownHeaderName :: ContentType . into ( ) , [ "text/plain" ] . into_iter ( ) ) ,
53- Header :: from_name_and_value ( KnownHeaderName :: Server . into ( ) , [ "wtx" ] . into_iter ( ) )
62+ Header :: from_name_and_value ( KnownHeaderName :: Server . into ( ) , [ "wtx" ] . into_iter ( ) ) ,
5463 ] ) ?;
5564 Ok ( VerbatimParams ( StatusCode :: Ok ) )
5665}
5766
5867async fn endpoint_json (
59- state : State < ' _ , ConnAux , ( ) , ReqResBuffer > ,
68+ state : State < ' _ , ConnAux , ( ) , MsgBufferString > ,
6069 PathOwned ( count) : PathOwned < usize > ,
6170) -> wtx:: Result < JsonReply > {
6271 let mut m: f64 = 1.0 ;
63- for ( key, value) in state. req . rrd . uri . query_params ( ) {
72+ for ( key, value) in state. req . msg_data . uri . query_params ( ) {
6473 if key != "m" {
6574 continue ;
6675 }
@@ -69,7 +78,7 @@ async fn endpoint_json(
6978 }
7079 let dataset_len = state. conn_aux . dataset . len ( ) ;
7180 let clamped = if count > dataset_len { dataset_len } else { count } ;
72- state. req . rrd . clear ( ) ;
81+ state. req . msg_data . clear ( ) ;
7382 let items = state. conn_aux . dataset . iter ( ) . take ( clamped) . map ( move |el| {
7483 Ok ( ProcessedItem {
7584 id : el. id ,
@@ -84,9 +93,9 @@ async fn endpoint_json(
8493 } )
8594 } ) ;
8695 let resp = JsonResponse { count : clamped, items : Wrapper ( items) } ;
87- serde_json:: to_writer ( & mut state. req . rrd . body , & resp) . unwrap_or_default ( ) ;
96+ serde_json:: to_writer ( & mut state. req . msg_data . body , & resp) ? ;
8897 let header = Header :: from_name_and_value ( KnownHeaderName :: Server . into ( ) , [ "wtx" ] ) ;
89- state. req . rrd . headers . push_from_iter ( header) ?;
98+ state. req . msg_data . headers . push_from_iter ( header) ?;
9099 Ok ( JsonReply ( StatusCode :: Ok ) )
91100}
92101
0 commit comments