@@ -4,7 +4,7 @@ use std::{
44 rc:: Rc ,
55 sync:: {
66 Arc ,
7- atomic:: { AtomicI32 , Ordering } ,
7+ atomic:: { AtomicI64 , Ordering } ,
88 } ,
99} ;
1010
@@ -29,8 +29,8 @@ use crate::{Error, StreamReceiver};
2929
3030pub struct RpcConnection < Local : Side , Remote : Side > {
3131 outgoing_tx : UnboundedSender < OutgoingMessage < Local , Remote > > ,
32- pending_responses : Arc < Mutex < HashMap < i32 , PendingResponse > > > ,
33- next_id : AtomicI32 ,
32+ pending_responses : Arc < Mutex < HashMap < Id , PendingResponse > > > ,
33+ next_id : AtomicI64 ,
3434 broadcast : StreamBroadcast ,
3535}
3636
8181 let this = Self {
8282 outgoing_tx,
8383 pending_responses,
84- next_id : AtomicI32 :: new ( 0 ) ,
84+ next_id : AtomicI64 :: new ( 0 ) ,
8585 broadcast,
8686 } ;
8787
@@ -112,8 +112,9 @@ where
112112 ) -> impl Future < Output = Result < Out , Error > > {
113113 let ( tx, rx) = oneshot:: channel ( ) ;
114114 let id = self . next_id . fetch_add ( 1 , Ordering :: SeqCst ) ;
115+ let id = Id :: Number ( id) ;
115116 self . pending_responses . lock ( ) . insert (
116- id,
117+ id. clone ( ) ,
117118 PendingResponse {
118119 deserialize : |value| {
119120 serde_json:: from_str :: < Out > ( value. get ( ) )
@@ -129,7 +130,7 @@ where
129130 if self
130131 . outgoing_tx
131132 . unbounded_send ( OutgoingMessage :: Request {
132- id,
133+ id : id . clone ( ) ,
133134 method : method. into ( ) ,
134135 params,
135136 } )
@@ -153,7 +154,7 @@ where
153154 mut outgoing_rx : UnboundedReceiver < OutgoingMessage < Local , Remote > > ,
154155 mut outgoing_bytes : impl Unpin + AsyncWrite ,
155156 incoming_bytes : impl Unpin + AsyncRead ,
156- pending_responses : Arc < Mutex < HashMap < i32 , PendingResponse > > > ,
157+ pending_responses : Arc < Mutex < HashMap < Id , PendingResponse > > > ,
157158 broadcast : StreamSender ,
158159 ) -> Result < ( ) > {
159160 // TODO: Create nicer abstraction for broadcast
@@ -187,7 +188,7 @@ where
187188 // Request
188189 match Local :: decode_request( method, message. params) {
189190 Ok ( request) => {
190- broadcast. incoming_request( id, method, & request) ;
191+ broadcast. incoming_request( id. clone ( ) , method, & request) ;
191192 incoming_tx. unbounded_send( IncomingMessage :: Request { id, request } ) . ok( ) ;
192193 }
193194 Err ( err) => {
@@ -222,7 +223,7 @@ where
222223 pending_response. respond. send( result) . ok( ) ;
223224 }
224225 } else {
225- log:: error!( "received response for unknown request id: {id}" ) ;
226+ log:: error!( "received response for unknown request id: {id:? }" ) ;
226227 }
227228 } else if let Some ( method) = message. method {
228229 // Notification
@@ -297,31 +298,41 @@ where
297298 }
298299}
299300
301+ /// JSON RPC Request Id
302+ #[ derive( Debug , PartialEq , Clone , Hash , Eq , Deserialize , Serialize , PartialOrd , Ord ) ]
303+ #[ serde( deny_unknown_fields) ]
304+ #[ serde( untagged) ]
305+ pub enum Id {
306+ Null ,
307+ Number ( i64 ) ,
308+ Str ( String ) ,
309+ }
310+
300311#[ derive( Deserialize ) ]
301312struct RawIncomingMessage < ' a > {
302- id : Option < i32 > ,
313+ id : Option < Id > ,
303314 method : Option < & ' a str > ,
304315 params : Option < & ' a RawValue > ,
305316 result : Option < & ' a RawValue > ,
306317 error : Option < Error > ,
307318}
308319
309320enum IncomingMessage < Local : Side > {
310- Request { id : i32 , request : Local :: InRequest } ,
321+ Request { id : Id , request : Local :: InRequest } ,
311322 Notification { notification : Local :: InNotification } ,
312323}
313324
314325#[ derive( Serialize , Deserialize , Clone ) ]
315326#[ serde( untagged) ]
316327pub enum OutgoingMessage < Local : Side , Remote : Side > {
317328 Request {
318- id : i32 ,
329+ id : Id ,
319330 method : Arc < str > ,
320331 #[ serde( skip_serializing_if = "Option::is_none" ) ]
321332 params : Option < Remote :: InRequest > ,
322333 } ,
323334 Response {
324- id : i32 ,
335+ id : Id ,
325336 #[ serde( flatten) ]
326337 result : ResponseResult < Local :: OutResponse > ,
327338 } ,
@@ -400,3 +411,42 @@ pub trait MessageHandler<Local: Side> {
400411 notification : Local :: InNotification ,
401412 ) -> impl Future < Output = Result < ( ) , Error > > ;
402413}
414+
415+ #[ cfg( test) ]
416+ mod tests {
417+ use super :: * ;
418+
419+ use serde_json:: { Number , Value } ;
420+
421+ #[ test]
422+ fn id_deserialization ( ) {
423+ let id = serde_json:: from_value :: < Id > ( Value :: Null ) . unwrap ( ) ;
424+ assert_eq ! ( id, Id :: Null ) ;
425+
426+ let id =
427+ serde_json:: from_value :: < Id > ( Value :: Number ( Number :: from_u128 ( 1 ) . unwrap ( ) ) ) . unwrap ( ) ;
428+ assert_eq ! ( id, Id :: Number ( 1 ) ) ;
429+
430+ let id =
431+ serde_json:: from_value :: < Id > ( Value :: Number ( Number :: from_i128 ( -1 ) . unwrap ( ) ) ) . unwrap ( ) ;
432+ assert_eq ! ( id, Id :: Number ( -1 ) ) ;
433+
434+ let id = serde_json:: from_value :: < Id > ( Value :: String ( "id" . to_owned ( ) ) ) . unwrap ( ) ;
435+ assert_eq ! ( id, Id :: Str ( "id" . to_owned( ) ) ) ;
436+ }
437+
438+ #[ test]
439+ fn id_serialization ( ) {
440+ let id = serde_json:: to_value ( Id :: Null ) . unwrap ( ) ;
441+ assert_eq ! ( id, Value :: Null ) ;
442+
443+ let id = serde_json:: to_value ( Id :: Number ( 1 ) ) . unwrap ( ) ;
444+ assert_eq ! ( id, Value :: Number ( Number :: from_u128( 1 ) . unwrap( ) ) ) ;
445+
446+ let id = serde_json:: to_value ( Id :: Number ( -1 ) ) . unwrap ( ) ;
447+ assert_eq ! ( id, Value :: Number ( Number :: from_i128( -1 ) . unwrap( ) ) ) ;
448+
449+ let id = serde_json:: to_value ( Id :: Str ( "id" . to_owned ( ) ) ) . unwrap ( ) ;
450+ assert_eq ! ( id, Value :: String ( "id" . to_owned( ) ) ) ;
451+ }
452+ }
0 commit comments