Skip to content

Commit 9925209

Browse files
committed
feat: [US-054] - [Inspector: WebSocket protocol with BARE-encoded versioned messages]
1 parent 04296e9 commit 9925209

7 files changed

Lines changed: 1341 additions & 29 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/inspector/mod.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,47 @@
11
use anyhow::Result;
22
use futures::future::BoxFuture;
33
use std::sync::Arc;
4+
use std::sync::Weak;
45
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
56

7+
pub(crate) mod protocol;
8+
69
type WorkflowHistoryCallback =
710
Arc<dyn Fn() -> BoxFuture<'static, Result<Option<Vec<u8>>>> + Send + Sync>;
811
type WorkflowReplayCallback = Arc<
912
dyn Fn(Option<String>) -> BoxFuture<'static, Result<Option<Vec<u8>>>> + Send + Sync,
1013
>;
14+
type InspectorListener = Arc<dyn Fn(InspectorSignal) + Send + Sync>;
1115

1216
#[derive(Clone, Debug, Default)]
1317
pub struct Inspector(Arc<InspectorInner>);
1418

15-
#[derive(Default)]
1619
struct InspectorInner {
1720
state_revision: AtomicU64,
1821
connections_revision: AtomicU64,
1922
queue_revision: AtomicU64,
2023
active_connections: AtomicU32,
2124
queue_size: AtomicU32,
2225
connected_clients: AtomicUsize,
26+
next_listener_id: AtomicU64,
27+
listeners: std::sync::RwLock<Vec<(u64, InspectorListener)>>,
2328
get_workflow_history: Option<WorkflowHistoryCallback>,
2429
replay_workflow: Option<WorkflowReplayCallback>,
2530
}
2631

32+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33+
pub(crate) enum InspectorSignal {
34+
StateUpdated,
35+
ConnectionsUpdated,
36+
QueueUpdated,
37+
WorkflowHistoryUpdated,
38+
}
39+
40+
pub(crate) struct InspectorSubscription {
41+
inspector: Weak<InspectorInner>,
42+
listener_id: u64,
43+
}
44+
2745
impl std::fmt::Debug for InspectorInner {
2846
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2947
f.debug_struct("InspectorInner")
@@ -48,6 +66,42 @@ impl std::fmt::Debug for InspectorInner {
4866
}
4967
}
5068

69+
impl Default for InspectorInner {
70+
fn default() -> Self {
71+
Self {
72+
state_revision: AtomicU64::new(0),
73+
connections_revision: AtomicU64::new(0),
74+
queue_revision: AtomicU64::new(0),
75+
active_connections: AtomicU32::new(0),
76+
queue_size: AtomicU32::new(0),
77+
connected_clients: AtomicUsize::new(0),
78+
next_listener_id: AtomicU64::new(1),
79+
listeners: std::sync::RwLock::new(Vec::new()),
80+
get_workflow_history: None,
81+
replay_workflow: None,
82+
}
83+
}
84+
}
85+
86+
impl Drop for InspectorSubscription {
87+
fn drop(&mut self) {
88+
let Some(inspector) = self.inspector.upgrade() else {
89+
return;
90+
};
91+
let connected_clients = {
92+
let mut listeners = match inspector.listeners.write() {
93+
Ok(listeners) => listeners,
94+
Err(poisoned) => poisoned.into_inner(),
95+
};
96+
listeners.retain(|(listener_id, _)| *listener_id != self.listener_id);
97+
listeners.len()
98+
};
99+
inspector
100+
.connected_clients
101+
.store(connected_clients, Ordering::SeqCst);
102+
}
103+
}
104+
51105
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52106
pub struct InspectorSnapshot {
53107
pub state_revision: u64,
@@ -103,8 +157,27 @@ impl Inspector {
103157
callback(entry_id).await
104158
}
105159

160+
pub(crate) fn subscribe(&self, listener: InspectorListener) -> InspectorSubscription {
161+
let listener_id = self.0.next_listener_id.fetch_add(1, Ordering::SeqCst);
162+
let connected_clients = {
163+
let mut listeners = match self.0.listeners.write() {
164+
Ok(listeners) => listeners,
165+
Err(poisoned) => poisoned.into_inner(),
166+
};
167+
listeners.push((listener_id, listener));
168+
listeners.len()
169+
};
170+
self.set_connected_clients(connected_clients);
171+
172+
InspectorSubscription {
173+
inspector: Arc::downgrade(&self.0),
174+
listener_id,
175+
}
176+
}
177+
106178
pub(crate) fn record_state_updated(&self) {
107179
self.0.state_revision.fetch_add(1, Ordering::SeqCst);
180+
self.notify(InspectorSignal::StateUpdated);
108181
}
109182

110183
pub(crate) fn record_connections_updated(&self, active_connections: u32) {
@@ -116,11 +189,17 @@ impl Inspector {
116189
.0
117190
.connections_revision
118191
.fetch_add(1, Ordering::SeqCst);
192+
self.notify(InspectorSignal::ConnectionsUpdated);
119193
}
120194

121195
pub(crate) fn record_queue_updated(&self, queue_size: u32) {
122196
self.0.queue_size.store(queue_size, Ordering::SeqCst);
123197
self.0.queue_revision.fetch_add(1, Ordering::SeqCst);
198+
self.notify(InspectorSignal::QueueUpdated);
199+
}
200+
201+
pub(crate) fn record_workflow_history_updated(&self) {
202+
self.notify(InspectorSignal::WorkflowHistoryUpdated);
124203
}
125204

126205
#[allow(dead_code)]
@@ -130,6 +209,27 @@ impl Inspector {
130209
.connected_clients
131210
.store(connected_clients, Ordering::SeqCst);
132211
}
212+
213+
fn notify(&self, signal: InspectorSignal) {
214+
if self.0.connected_clients.load(Ordering::SeqCst) == 0 {
215+
return;
216+
}
217+
218+
let listeners = {
219+
let listeners = match self.0.listeners.read() {
220+
Ok(listeners) => listeners,
221+
Err(poisoned) => poisoned.into_inner(),
222+
};
223+
listeners
224+
.iter()
225+
.map(|(_, listener)| listener.clone())
226+
.collect::<Vec<_>>()
227+
};
228+
229+
for listener in listeners {
230+
listener(signal);
231+
}
232+
}
133233
}
134234

135235
#[cfg(test)]

0 commit comments

Comments
 (0)