Skip to content

Commit d7d7dd9

Browse files
authored
Merge branch 'master' into jsdt/row-eq
2 parents 09ad71c + c871610 commit d7d7dd9

10 files changed

Lines changed: 302 additions & 68 deletions

File tree

crates/client-api/src/routes/subscribe.rs

Lines changed: 105 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ where
182182
}
183183

184184
const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
185+
const SEND_TIMEOUT: Duration = Duration::from_secs(5);
185186

186187
async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver<SerializableMessage>) {
187188
// ensure that even if this task gets cancelled, we always cleanup the connection
@@ -266,12 +267,12 @@ async fn ws_client_actor_inner(
266267
Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)),
267268
Some(Err(error)) => {
268269
log::warn!("Websocket receive error: {}", error);
269-
continue;
270+
break;
270271
}
271272
// the client sent us a close frame
272273
None => {
273-
break
274-
},
274+
break;
275+
}
275276
},
276277

277278
// If we have an outgoing message to send, send it off.
@@ -311,11 +312,29 @@ async fn ws_client_actor_inner(
311312
// now we flush all the messages to the socket
312313
(ws.flush().await, msg_buffer)
313314
};
315+
// Build a future that both times out and drives the send.
316+
//
317+
// Note that if flushing cannot immediately complete for whatever reason,
318+
// it will wait without polling the other futures in the `select!` arms.
319+
// Among other things, this means our liveness tick will not be polled.
320+
//
321+
// To avoid waiting indefinitely, we wrap the send in a timeout.
322+
// A timeout is treated as an unresponsive client and we drop the connection.
323+
let send_all = tokio::time::timeout(SEND_TIMEOUT, send_all);
314324
// Flush the websocket while continuing to poll the `handle_queue`,
315325
// to avoid deadlocks or delays due to enqueued futures holding resources.
316326
let send_all = also_poll(send_all, make_progress(&mut current_message));
317327
let t1 = Instant::now();
318-
let (send_all_result, buf) = send_all.await;
328+
let (send_all_result, buf) = match send_all.await {
329+
Ok((send_all_result, buf)) => {
330+
(send_all_result, buf)
331+
}
332+
Err(e) => {
333+
// Our send timed out; drop client without trying to send them a Close
334+
log::warn!("send_all timed out: {e}");
335+
break;
336+
}
337+
};
319338
msg_buffer = buf;
320339
if let Err(error) = send_all_result {
321340
log::warn!("Websocket send error: {error}")
@@ -335,13 +354,33 @@ async fn ws_client_actor_inner(
335354
Err(NoSuchModule) => {
336355
// Send a close frame while continuing to poll the `handle_queue`,
337356
// to avoid deadlocks or delays due to enqueued futures holding resources.
338-
let close = also_poll(
339-
ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })),
340-
make_progress(&mut current_message),
341-
);
342-
if let Err(e) = close.await {
343-
log::warn!("error closing: {e:#}")
344-
}
357+
let close = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() }));
358+
// Wrap the close in a timeout
359+
let close = tokio::time::timeout(SEND_TIMEOUT, close);
360+
match also_poll(close, make_progress(&mut current_message)).await {
361+
Ok(Err(e)) => {
362+
log::warn!("error closing websocket: {e:#}")
363+
}
364+
Err(e) => {
365+
// Our send timed out; drop client without trying to send them a Close.
366+
//
367+
// Is it correct to break if a reducer is still in progress?
368+
// Answer: Yes it is.
369+
//
370+
// If a reducer is currently being executed,
371+
// we are waiting for the `current_message` future to complete.
372+
// When we break, the task completes and this future is dropped.
373+
//
374+
// Notably though the reducer itself will run to completion,
375+
// however when it tries to notify this task that it is done,
376+
// it will encounter a closed sender in `JobThread::run`,
377+
// dropping the value that it's trying to send.
378+
// In particular it will not throw an error or panic.
379+
log::warn!("websocket close timed out: {e}");
380+
break;
381+
}
382+
_ => {}
383+
};
345384
closed = true;
346385
}
347386
}
@@ -352,10 +391,29 @@ async fn ws_client_actor_inner(
352391
_ = liveness_check_interval.tick() => {
353392
// If we received a pong at some point, send a fresh ping.
354393
if mem::take(&mut got_pong) {
394+
// Build a future that both times out and drives the send.
395+
//
396+
// Note that if the send cannot immediately complete for whatever reason,
397+
// it will wait without polling the other futures in the `select!` arms.
398+
// Among other things, this means we won't poll the websocket for a Close frame.
399+
//
400+
// To avoid waiting indefinitely, we wrap the ping in a timeout.
401+
// A timeout is treated as an unresponsive client and we drop the connection.
402+
let ping = ws.send(WsMessage::Ping(Bytes::new()));
403+
let ping_with_timeout = tokio::time::timeout(SEND_TIMEOUT, ping);
404+
355405
// Send a ping message while continuing to poll the `handle_queue`,
356406
// to avoid deadlocks or delays due to enqueued futures holding resources.
357-
if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await {
358-
log::warn!("error sending ping: {e:#}");
407+
match also_poll(ping_with_timeout, make_progress(&mut current_message)).await {
408+
Ok(Err(e)) => {
409+
log::warn!("error sending ping: {e:#}");
410+
}
411+
Err(e) => {
412+
// Our ping timed out; drop them without trying to send them a Close
413+
log::warn!("ping timed out after: {e}");
414+
break;
415+
}
416+
_ => {}
359417
}
360418
continue;
361419
} else {
@@ -380,13 +438,22 @@ async fn ws_client_actor_inner(
380438
Item::HandleResult(res) => {
381439
if let Err(e) = res {
382440
if let MessageHandleError::Execution(err) = e {
383-
log::error!("{err:#}");
441+
log::error!("reducer execution error: {err:#}");
384442
// Serialize the message and keep a handle to the buffer.
385443
let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config);
386444

387-
// Buffer the message without necessarily sending it.
388-
if let Err(error) = ws.send(datamsg_to_wsmsg(msg_data)).await {
389-
log::warn!("Websocket send error: {error}")
445+
let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await };
446+
let send = tokio::time::timeout(SEND_TIMEOUT, send);
447+
448+
match send.await {
449+
Ok(Err(error)) => {
450+
log::warn!("Websocket send error: {error}")
451+
}
452+
Err(error) => {
453+
log::warn!("send timed out after: {error}");
454+
break;
455+
}
456+
_ => {}
390457
}
391458

392459
// At this point,
@@ -399,16 +466,23 @@ async fn ws_client_actor_inner(
399466

400467
continue;
401468
}
402-
log::debug!("Client caused error on text message: {}", e);
403-
if let Err(e) = ws
404-
.close(Some(CloseFrame {
405-
code: CloseCode::Error,
406-
reason: format!("{e:#}").into(),
407-
}))
408-
.await
409-
{
410-
log::warn!("error closing websocket: {e:#}")
411-
};
469+
log::warn!("Client caused error on text message: {}", e);
470+
let close = ws.close(Some(CloseFrame {
471+
code: CloseCode::Error,
472+
reason: format!("{e:#}").into(),
473+
}));
474+
475+
// Wrap the close in a timeout
476+
match tokio::time::timeout(SEND_TIMEOUT, close).await {
477+
Ok(Err(e)) => {
478+
log::warn!("error closing websocket: {e:#}")
479+
}
480+
Err(e) => {
481+
log::warn!("send timed out after: {e}");
482+
break;
483+
}
484+
_ => {}
485+
}
412486
}
413487
}
414488
Item::Message(ClientMessage::Ping(_message)) => {
@@ -439,6 +513,10 @@ async fn ws_client_actor_inner(
439513
.with_label_values(&addr)
440514
.inc();
441515
}
516+
517+
// Can't we just break out of the loop here?
518+
// Not, if we want tungstenite to send a close frame back to the client.
519+
// That will only happen once `ws.next()` returns `None`.
442520
closed = true;
443521
}
444522
}

crates/core/src/db/mod.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
use std::sync::Arc;
2+
3+
use enum_map::EnumMap;
4+
use tokio::sync::mpsc;
5+
6+
use crate::{
7+
db::datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData},
8+
execution_context::WorkloadType,
9+
subscription::ExecutionCounters,
10+
};
11+
112
pub mod datastore;
213
pub mod db_metrics;
314
pub mod relational_db;
@@ -22,3 +33,86 @@ pub struct Config {
2233
/// Specifies the page pool max size in bytes.
2334
pub page_pool_max_size: Option<usize>,
2435
}
36+
37+
/// A message that is processed by the [`spawn_metrics_recorder`] actor.
38+
/// We use a separate task to record metrics to avoid blocking transactions.
39+
pub struct MetricsMessage {
40+
/// The reducer the produced these metrics.
41+
reducer: String,
42+
/// Metrics from a mutable transaction.
43+
metrics_for_writer: Option<TxMetrics>,
44+
/// Metrics from a read-only transaction.
45+
/// A message may have metrics for both types of transactions,
46+
/// because metrics for a reducer and its subscription updates are recorded together.
47+
metrics_for_reader: Option<TxMetrics>,
48+
/// The row updates for an immutable transaction.
49+
/// Needed for insert and delete counters.
50+
tx_data: Option<Arc<TxData>>,
51+
/// Cached metrics counters for each workload type.
52+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
53+
}
54+
55+
/// The handle used to send work to the tx metrics recorder.
56+
#[derive(Clone)]
57+
pub struct MetricsRecorderQueue {
58+
tx: mpsc::UnboundedSender<MetricsMessage>,
59+
}
60+
61+
impl MetricsRecorderQueue {
62+
pub fn send_metrics(
63+
&self,
64+
reducer: String,
65+
metrics_for_writer: Option<TxMetrics>,
66+
metrics_for_reader: Option<TxMetrics>,
67+
tx_data: Option<Arc<TxData>>,
68+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
69+
) {
70+
if let Err(err) = self.tx.send(MetricsMessage {
71+
reducer,
72+
metrics_for_writer,
73+
metrics_for_reader,
74+
tx_data,
75+
counters,
76+
}) {
77+
log::warn!("failed to send metrics: {err}");
78+
}
79+
}
80+
}
81+
82+
/// Spawns a task for recording transaction metrics.
83+
/// Returns the handle for pushing metrics to the recorder.
84+
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
85+
let (tx, mut rx) = mpsc::unbounded_channel();
86+
let abort_handle = tokio::spawn(async move {
87+
while let Some(MetricsMessage {
88+
reducer,
89+
metrics_for_writer,
90+
metrics_for_reader,
91+
tx_data,
92+
counters,
93+
}) = rx.recv().await
94+
{
95+
if let Some(tx_metrics) = metrics_for_writer {
96+
tx_metrics.report(
97+
// If row updates are present,
98+
// they will always belong to the writer transaction.
99+
tx_data.as_deref(),
100+
&reducer,
101+
|wl| &counters[wl],
102+
);
103+
}
104+
if let Some(tx_metrics) = metrics_for_reader {
105+
tx_metrics.report(
106+
// If row updates are present,
107+
// they will never belong to the reader transaction.
108+
// Passing row updates here will most likely panic.
109+
None,
110+
&reducer,
111+
|wl| &counters[wl],
112+
);
113+
}
114+
}
115+
})
116+
.abort_handle();
117+
(MetricsRecorderQueue { tx }, abort_handle)
118+
}

0 commit comments

Comments
 (0)