Skip to content

Commit 7e454ac

Browse files
Apply suggestions from code review
Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com> Signed-off-by: joshua-spacetime <josh@clockworklabs.io>
1 parent 6563a75 commit 7e454ac

1 file changed

Lines changed: 22 additions & 2 deletions

File tree

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,26 +1289,46 @@ impl SubscriptionManager {
12891289
}
12901290
}
12911291

1292+
/// An instruction sent after a transaction, successful or not, has finished,
1293+
/// to record the metrics of that transaction.
1294+
///
1295+
/// Recording metrics is done in a separate worker ([`spawn_metrics_recorder`])
1296+
/// to unblock reducer threads so that they can process the next call faster.
12921297
struct MetricsMessage {
1298+
/// The name of the reducer, when run in a reducer context.
1299+
/// When run in a non-reducer transaction, this will be the empty string.
12931300
reducer: String,
1301+
/// The metrics for the mutable part of a transaction,
1302+
/// e.g., when a reducer is being processed.
12941303
metrics_for_writer: Option<TxMetrics>,
1304+
/// The metrics for the read-only part of a transaction.
1305+
/// For a reducer call, this would be the part after committing
1306+
/// and where the tx has been downgraded to read-only
1307+
/// to e.g., process subscription updates.
12951308
metrics_for_reader: Option<TxMetrics>,
1309+
/// For a successful mutable transaction,
1310+
/// the info on committed changes.
12961311
tx_data: Arc<Option<TxData>>,
1312+
/// Cached metrics counters for each workload type.
12971313
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
12981314
}
12991315

1316+
/// The handle used to send work to the tx metrics recorder.
13001317
#[derive(Clone)]
13011318
pub struct MetricsRecorderQueue {
13021319
tx: mpsc::UnboundedSender<MetricsMessage>,
13031320
}
13041321

13051322
impl MetricsRecorderQueue {
1323+
/// Instructs the tx metrics recorder to record some tx metrics.
1324+
///
1325+
/// See `MetricsMessage` for more details on the parameters.
13061326
pub fn send_metrics(
13071327
&self,
13081328
reducer: String,
13091329
metrics_for_writer: Option<TxMetrics>,
13101330
metrics_for_reader: Option<TxMetrics>,
1311-
tx_data: Arc<Option<TxData>>,
1331+
tx_data: Option<Arc<TxData>>,
13121332
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
13131333
) {
13141334
if let Err(err) = self.tx.send(MetricsMessage {
@@ -1338,7 +1358,7 @@ pub fn spawn_metrics_recorder() -> MetricsRecorderQueue {
13381358
tx_metrics.report(tx_data.as_ref().as_ref(), &reducer, |wl| &counters[wl]);
13391359
}
13401360
if let Some(tx_metrics) = metrics_for_reader {
1341-
tx_metrics.report(tx_data.as_ref().as_ref(), &reducer, |wl| &counters[wl]);
1361+
tx_metrics.report(None, &reducer, |wl| &counters[wl]);
13421362
}
13431363
}
13441364
});

0 commit comments

Comments
 (0)