Skip to content

Commit 4567da7

Browse files
committed
feat(truapi-server): add wire and chain infrastructure
1 parent a5e9e7c commit 4567da7

10 files changed

Lines changed: 3683 additions & 3 deletions

File tree

rust/crates/truapi-server/src/chain_runtime.rs

Lines changed: 1592 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//! Request dispatcher.
2+
//!
3+
//! Routes incoming frames to the appropriate trait method based on the
4+
//! numeric wire discriminant. The handler set is registered by the
5+
//! auto-generated [`crate::generated::dispatcher::register`] function; this
6+
//! module provides the framework that owns the registration tables and the
7+
//! routing logic.
8+
9+
use std::collections::{HashMap, HashSet};
10+
use std::sync::Arc;
11+
12+
use futures::future::LocalBoxFuture;
13+
use tracing::instrument;
14+
15+
use crate::frame::{Payload, ProtocolMessage};
16+
use crate::generated::wire_table::{RequestFrameIds, SubscriptionFrameIds};
17+
use crate::subscription::{Spawner, SubscriptionManager, SubscriptionStream};
18+
use crate::transport::Transport;
19+
20+
/// A handler for a request-response method. The returned future is not
21+
/// required to be `Send` because the truapi trait uses `async fn`, whose
22+
/// auto-Send-ness is not guaranteed. The `request_id` is the per-frame
23+
/// identifier; handlers thread it into the `CallContext` so trait methods
24+
/// can correlate logs/cancellation with the originating request. On the
25+
/// error path handlers return the complete SCALE-encoded response payload.
26+
pub type RequestHandler =
27+
Arc<dyn Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<Vec<u8>, Vec<u8>>> + Send + Sync>;
28+
29+
/// A handler for a subscription method. On the error path the handler returns
30+
/// the complete SCALE-encoded `_interrupt` payload.
31+
pub type SubscriptionHandler = Arc<
32+
dyn Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<SubscriptionStream, Vec<u8>>>
33+
+ Send
34+
+ Sync,
35+
>;
36+
37+
/// A registered request handler plus the discriminants it replies on.
38+
pub struct RequestEntry {
39+
ids: RequestFrameIds,
40+
handler: RequestHandler,
41+
}
42+
43+
/// A registered subscription handler plus the discriminants its frames carry.
44+
pub struct SubscriptionEntry {
45+
ids: SubscriptionFrameIds,
46+
handler: SubscriptionHandler,
47+
}
48+
49+
/// Routes incoming protocol messages to registered handlers, keyed on the
50+
/// numeric wire discriminant.
51+
pub struct Dispatcher {
52+
by_request: HashMap<u8, RequestEntry>,
53+
by_start: HashMap<u8, SubscriptionEntry>,
54+
stop_ids: HashSet<u8>,
55+
subscriptions: SubscriptionManager,
56+
}
57+
58+
impl Dispatcher {
59+
/// Construct a dispatcher whose subscriptions are driven on `spawner`.
60+
pub fn new(spawner: Spawner) -> Self {
61+
Self {
62+
by_request: HashMap::new(),
63+
by_start: HashMap::new(),
64+
stop_ids: HashSet::new(),
65+
subscriptions: SubscriptionManager::new(spawner),
66+
}
67+
}
68+
69+
/// Register a request-response handler, keyed on `ids.request_id`. Returns
70+
/// the previously registered entry if any; callers (the generated
71+
/// `dispatcher::register`) should treat `Some` as a programming error
72+
/// since each request id must own exactly one handler.
73+
pub fn on_request<F>(&mut self, ids: RequestFrameIds, handler: F) -> Option<RequestEntry>
74+
where
75+
F: Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<Vec<u8>, Vec<u8>>>
76+
+ Send
77+
+ Sync
78+
+ 'static,
79+
{
80+
self.by_request.insert(
81+
ids.request_id,
82+
RequestEntry {
83+
ids,
84+
handler: Arc::new(handler),
85+
},
86+
)
87+
}
88+
89+
/// Register a subscription handler, keyed on `ids.start_id`, and record
90+
/// `ids.stop_id` so a matching `_stop` frame tears the subscription down.
91+
/// Returns the previously registered entry if any.
92+
pub fn on_subscription<F>(
93+
&mut self,
94+
ids: SubscriptionFrameIds,
95+
handler: F,
96+
) -> Option<SubscriptionEntry>
97+
where
98+
F: Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<SubscriptionStream, Vec<u8>>>
99+
+ Send
100+
+ Sync
101+
+ 'static,
102+
{
103+
self.stop_ids.insert(ids.stop_id);
104+
self.by_start.insert(
105+
ids.start_id,
106+
SubscriptionEntry {
107+
ids,
108+
handler: Arc::new(handler),
109+
},
110+
)
111+
}
112+
113+
/// Process an incoming protocol message, sending any responses or
114+
/// subscription frames through `transport`. A discriminant with no
115+
/// registered handler is dropped.
116+
#[instrument(skip_all, fields(runtime.method = "dispatcher.dispatch"))]
117+
pub async fn dispatch(&self, message: ProtocolMessage, transport: Arc<dyn Transport>) {
118+
let id = message.payload.id;
119+
120+
if let Some(entry) = self.by_request.get(&id) {
121+
let request_id = message.request_id.clone();
122+
let value = (entry.handler)(request_id, message.payload.value)
123+
.await
124+
.unwrap_or_else(|value| value);
125+
transport.send(ProtocolMessage {
126+
request_id: message.request_id,
127+
payload: Payload {
128+
id: entry.ids.response_id,
129+
value,
130+
},
131+
});
132+
} else if let Some(entry) = self.by_start.get(&id) {
133+
// Reserve the slot before awaiting the handler so a `_stop`
134+
// arriving while the handler resolves cancels the pending
135+
// subscription instead of racing the registration.
136+
let token = self.subscriptions.reserve(message.request_id.clone());
137+
let request_id = message.request_id.clone();
138+
match (entry.handler)(request_id, message.payload.value).await {
139+
Ok(stream) => {
140+
self.subscriptions.activate(
141+
token,
142+
entry.ids.receive_id,
143+
entry.ids.interrupt_id,
144+
stream,
145+
transport,
146+
);
147+
}
148+
Err(err_bytes) => {
149+
self.subscriptions.cancel_reservation(token);
150+
transport.send(ProtocolMessage {
151+
request_id: message.request_id,
152+
payload: Payload {
153+
id: entry.ids.interrupt_id,
154+
value: err_bytes,
155+
},
156+
});
157+
}
158+
}
159+
} else if self.stop_ids.contains(&id) {
160+
self.subscriptions.handle_stop(&message.request_id);
161+
}
162+
// Unknown discriminant: drop. Response / receive / interrupt frames are
163+
// handled by the client side and never registered here.
164+
}
165+
}
166+
167+
#[cfg(test)]
168+
mod tests {
169+
use super::*;
170+
use std::sync::Mutex;
171+
172+
fn test_spawner() -> Spawner {
173+
#[cfg(not(target_arch = "wasm32"))]
174+
{
175+
crate::subscription::thread_per_subscription_spawner()
176+
}
177+
#[cfg(target_arch = "wasm32")]
178+
{
179+
Arc::new(futures::executor::block_on)
180+
}
181+
}
182+
183+
#[derive(Default)]
184+
struct RecordingTransport {
185+
sent: Mutex<Vec<ProtocolMessage>>,
186+
}
187+
188+
impl RecordingTransport {
189+
fn sent(&self) -> Vec<ProtocolMessage> {
190+
self.sent.lock().unwrap().clone()
191+
}
192+
}
193+
194+
impl Transport for RecordingTransport {
195+
fn send(&self, message: ProtocolMessage) {
196+
self.sent.lock().unwrap().push(message);
197+
}
198+
fn on_message(
199+
&self,
200+
_handler: Box<dyn Fn(ProtocolMessage) + Send + Sync>,
201+
) -> Box<dyn FnOnce()> {
202+
Box::new(|| {})
203+
}
204+
}
205+
206+
fn make_frame(id: u8, value: Vec<u8>) -> ProtocolMessage {
207+
ProtocolMessage {
208+
request_id: "p:1".into(),
209+
payload: Payload { id, value },
210+
}
211+
}
212+
213+
/// A frame whose discriminant has no registered handler is dropped: no
214+
/// response, no interrupt. (In production `register` registers every wire
215+
/// method, so this only happens for malformed or client-bound ids.)
216+
#[test]
217+
fn dispatch_unregistered_id_sends_nothing() {
218+
let dispatcher = Dispatcher::new(test_spawner());
219+
let transport = Arc::new(RecordingTransport::default());
220+
let transport_dyn: Arc<dyn Transport> = transport.clone();
221+
let frame = make_frame(250, Vec::new());
222+
futures::executor::block_on(dispatcher.dispatch(frame, transport_dyn));
223+
assert!(
224+
transport.sent().is_empty(),
225+
"an unregistered discriminant must produce no frame"
226+
);
227+
}
228+
229+
/// A handler error already owns the complete response payload. The
230+
/// dispatcher only routes it to the registered response id.
231+
#[test]
232+
fn dispatch_request_handler_error_emits_response_payload() {
233+
let mut dispatcher = Dispatcher::new(test_spawner());
234+
let ids = RequestFrameIds {
235+
request_id: 200,
236+
response_id: 201,
237+
};
238+
dispatcher.on_request(ids, |_request_id, _bytes| {
239+
Box::pin(async move { Err(vec![9, 8, 7]) })
240+
});
241+
let transport = Arc::new(RecordingTransport::default());
242+
let frame = make_frame(200, Vec::new());
243+
futures::executor::block_on(dispatcher.dispatch(frame, transport.clone()));
244+
let sent = transport.sent();
245+
assert_eq!(sent.len(), 1, "exactly one response expected");
246+
assert_eq!(sent[0].payload.id, 201);
247+
assert_eq!(sent[0].payload.value, vec![9, 8, 7]);
248+
}
249+
250+
/// Registering two handlers under the same key must not silently
251+
/// overwrite. The contract chosen here is "loud": `on_request`
252+
/// returns the previous handler, so callers can detect collisions.
253+
#[test]
254+
fn register_request_twice_returns_previous_handler() {
255+
let mut dispatcher = Dispatcher::new(test_spawner());
256+
let ids = RequestFrameIds {
257+
request_id: 200,
258+
response_id: 201,
259+
};
260+
let prev = dispatcher.on_request(ids, |_request_id, _bytes| {
261+
Box::pin(async move { Ok(Vec::new()) })
262+
});
263+
assert!(prev.is_none(), "first registration has no predecessor");
264+
let prev = dispatcher.on_request(ids, |_request_id, _bytes| {
265+
Box::pin(async move { Ok(Vec::new()) })
266+
});
267+
assert!(
268+
prev.is_some(),
269+
"second registration must return the previous handler"
270+
);
271+
}
272+
}

0 commit comments

Comments
 (0)