Skip to content

Commit 9f1ce66

Browse files
heyong4725oceanusxivclaude
committed
feat(api): add node.timestamp() accessor for the node's HLC (rescue of #1789)
Rescues @oceanusxiv's #1789 against current main. The PR sat for 18 days with no reviews and trivially-clean cherry-pick against `main`, so this is a near-verbatim re-application plus expanded docstrings and a regression test. Why === `DoraNode` already maintains an internal `uhlc::HLC` clock that stamps every outgoing message, but user code had no way to read that clock directly. Without this method, a node that wants to measure per-event processing latency has to fall back to `std::time::SystemTime::now()` — a different physical clock from the HLC, so subtracting that from `event.metadata.timestamp` is meaningless across daemons. Three concrete use cases motivated #1789: 1. `latency = node.timestamp() - event.metadata.timestamp` measured against the same clock dora's data plane uses. 2. Correlating log lines / external observations with the HLC. 3. Profiling node processing time without introducing a second clock source. What's added ============ * `apis/rust/node/src/node/mod.rs` — `DoraNode::timestamp() -> uhlc::Timestamp`. Single-line body: `self.clock.new_timestamp()`. Docstring spells out the "use this against `event.metadata.timestamp`, NOT against `SystemTime::now()`" rule. * `apis/python/node/src/lib.rs` — `Node.timestamp() -> datetime.datetime`. Converts HLC physical time to a UTC-aware `datetime`. The Python docstring explicitly notes the resolution caveats (microsecond precision, HLC logical counter dropped) and points strict-ordering callers at the Rust API. * `apis/python/operator/src/lib.rs` — promote `datetime_module()` from private to `pub` so the node crate can reuse the cached `PyModule::import("datetime")` instead of re-importing on every call. Same cache pattern dora already uses for its own outgoing metadata conversions in the operator crate. * `apis/python/node/dora/__init__.pyi` — type stub for `timestamp()`. What was added on top of #1789 ============================== * Expanded both docstrings to call out the load-bearing "use the HLC, not SystemTime" guidance and the Python resolution caveats. * New regression test `node::tests::timestamp_uses_node_clock_and_is_monotonic` that asserts two `timestamp()` calls share an HLC ID (i.e., they read the *node's* clock, not a freshly-created one) AND are strictly monotonic. Without this test, a refactor that gives `timestamp()` its own clock would silently break the latency-measurement use case in the docstring. Verification ============ cargo fmt --all -- --check cargo clippy --all --exclude dora-{node-api,operator-api,ros2-bridge}-python -- -D warnings cargo test -p dora-node-api --lib (83/83 pass; the new test runs in 0.11s) cargo check --examples Co-authored-by: oceanusxiv <oceanusxiv@users.noreply.github.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 99fb105 commit 9f1ce66

4 files changed

Lines changed: 94 additions & 2 deletions

File tree

apis/python/node/dora/__init__.pyi

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import typing
23

34
import pyarrow
@@ -149,6 +150,10 @@ class Node:
149150
150151
Returns 0 on the first run, 1 after the first restart, etc."""
151152

153+
def timestamp(self) -> "datetime.datetime":
154+
"""Returns the current timestamp from the node's Hybrid Logical Clock
155+
as a UTC datetime object."""
156+
152157
def merge_external_events(self, subscription: dora.Ros2Subscription) -> None:
153158
"""Merge an external event stream with dora main loop.
154159
This currently only work with ROS2."""

apis/python/node/src/lib.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow};
88
use dora_node_api::dora_core::config::{DataId, NodeId};
99
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
1010
use dora_node_api::{DataflowId, DoraNode, EventStream, TryRecvError, init_tracing};
11-
use dora_operator_api_python::{DelayedCleanup, NodeCleanupHandle, PyEvent, pydict_to_metadata};
11+
use dora_operator_api_python::{
12+
DelayedCleanup, NodeCleanupHandle, PyEvent, datetime_module, pydict_to_metadata,
13+
};
1214
use dora_ros2_bridge_python::Ros2Subscription;
1315
use eyre::{Context, ContextCompat};
1416

@@ -688,6 +690,45 @@ impl Node {
688690
self.node.get_mut().restart_count()
689691
}
690692

693+
/// Returns the current timestamp from the node's Hybrid Logical Clock
694+
/// as a UTC datetime object.
695+
///
696+
/// Use this against ``event["metadata"]["timestamp"]`` from an INPUT
697+
/// event to compute per-event processing latency against the same
698+
/// clock dora uses on its data plane.
699+
///
700+
/// **Resolution & ordering caveats** — Python ``datetime`` is
701+
/// microsecond-resolution and only carries the HLC's physical
702+
/// component; its logical counter is dropped. Two messages produced
703+
/// in the same HLC microsecond will compare equal here even though
704+
/// the underlying HLC distinguishes them. For strict ordering use
705+
/// the Rust API (``DoraNode::timestamp() -> uhlc::Timestamp``).
706+
///
707+
/// **Threading** — like every other ``Node`` method, this acquires
708+
/// the node's internal mutex via ``try_lock`` and will panic if
709+
/// another thread is mid-call into the same ``Node`` instance. The
710+
/// Python GIL serialises in-process calls, so this is only
711+
/// reachable if a caller releases the GIL (e.g. via
712+
/// ``allow_threads``) and another thread re-enters the same Node.
713+
/// Don't do that.
714+
///
715+
/// :rtype: datetime.datetime
716+
pub fn timestamp(&self, py: Python) -> eyre::Result<Py<PyAny>> {
717+
let ts = self.node.get_mut().timestamp();
718+
let system_time = ts.get_time().to_system_time();
719+
let duration_since_epoch = system_time
720+
.duration_since(std::time::UNIX_EPOCH)
721+
.context("Failed to calculate duration since epoch")?;
722+
let total_seconds = duration_since_epoch.as_secs() as f64
723+
+ duration_since_epoch.subsec_micros() as f64 / 1_000_000.0;
724+
let dt_module = datetime_module(py).context("Failed to import datetime module")?;
725+
let datetime_class = dt_module.getattr("datetime")?;
726+
let utc_timezone = dt_module.getattr("timezone")?.getattr("utc")?;
727+
let py_datetime =
728+
datetime_class.call_method1("fromtimestamp", (total_seconds, utc_timezone))?;
729+
Ok(py_datetime.unbind())
730+
}
731+
691732
/// Merge an external event stream with dora main loop.
692733
/// This currently only work with ROS2.
693734
///

apis/python/operator/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::time::UNIX_EPOCH;
2222
/// Cached Python `datetime` module to avoid repeated `PyModule::import` on the hot path.
2323
static DATETIME_MODULE: PyOnceLock<Py<PyModule>> = PyOnceLock::new();
2424

25-
fn datetime_module<'py>(py: Python<'py>) -> PyResult<&'py Bound<'py, PyModule>> {
25+
pub fn datetime_module<'py>(py: Python<'py>) -> PyResult<&'py Bound<'py, PyModule>> {
2626
Ok(DATETIME_MODULE
2727
.get_or_try_init(py, || PyModule::import(py, "datetime").map(|m| m.unbind()))?
2828
.bind(py))

apis/rust/node/src/node/mod.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,6 +1149,20 @@ impl DoraNode {
11491149
self.restart_count
11501150
}
11511151

1152+
/// Returns the current timestamp from the node's Hybrid Logical Clock.
1153+
///
1154+
/// This generates a new HLC timestamp, which combines the physical
1155+
/// wall-clock time with a logical counter to ensure uniqueness and
1156+
/// monotonicity even across nodes. The HLC is the same clock dora
1157+
/// stamps every outgoing message with, so this is the right value
1158+
/// to subtract from an input event's `metadata.timestamp` when
1159+
/// measuring per-event processing latency — using
1160+
/// `std::time::SystemTime::now()` instead would mix two unrelated
1161+
/// clocks and give meaningless results across daemons.
1162+
pub fn timestamp(&self) -> uhlc::Timestamp {
1163+
self.clock.new_timestamp()
1164+
}
1165+
11521166
/// Send a structured log message.
11531167
///
11541168
/// Outputs a JSONL line to stdout that the daemon parses automatically.
@@ -1691,6 +1705,38 @@ mod tests {
16911705
uuid::Uuid::parse_str(&id).expect("should be valid UUID");
16921706
}
16931707

1708+
/// `DoraNode::timestamp()` must read from the SAME HLC the node
1709+
/// uses to stamp outgoing messages. If a refactor accidentally
1710+
/// gives `timestamp()` its own clock, the latency-measurement use
1711+
/// case in the docstring silently breaks (subtracting against an
1712+
/// `event.metadata.timestamp` from the data plane would mix two
1713+
/// unrelated HLCs). Guard by asserting two calls share an HLC ID
1714+
/// and that the second reads strictly later than the first.
1715+
///
1716+
/// The strict `t2 > t1` assertion holds by HLC construction: if
1717+
/// the wall clock advanced between calls, the physical component
1718+
/// strictly increases; if not, the logical counter bumps. The
1719+
/// lexicographic ordering on `uhlc::Timestamp` puts `t2` strictly
1720+
/// after `t1` in either case, so this assertion does not flake on
1721+
/// fast machines whose OS clock rounds both calls to the same tick.
1722+
#[test]
1723+
fn timestamp_uses_node_clock_and_is_monotonic() {
1724+
let (node, events, _rx) = test_node();
1725+
let t1 = node.timestamp();
1726+
let t2 = node.timestamp();
1727+
assert_eq!(
1728+
t1.get_id(),
1729+
t2.get_id(),
1730+
"two timestamp() calls must come from the same HLC instance",
1731+
);
1732+
assert!(
1733+
t2 > t1,
1734+
"HLC timestamps must be strictly monotonic: {t1:?} >= {t2:?}"
1735+
);
1736+
drop(node);
1737+
drop(events);
1738+
}
1739+
16941740
/// Helper: create a minimal test node with a channel output.
16951741
fn test_node() -> (
16961742
DoraNode,

0 commit comments

Comments
 (0)