Skip to content

Commit c58d993

Browse files
authored
runtime: Instrument with tracing (#6981)
## Summary This is so that tracing subscribers, can correctly attribute information to eg. opentelemetry distributed tracing IDs. ## API Changes None ## Testing Wrote a test, but didn't check it in because it would require the `std` feature of tracing and make tracing-subscriber a dependency. Happy to add the test if those things are ok. ``` #[tokio::test] async fn test_spawn_propagates_tracing_span() { install_global_subscriber(); let handle = TokioRuntime::current(); let span = tracing::info_span!("parent_span"); let parent_id = span.id(); assert!(parent_id.is_some(), "subscriber must be active"); let _enter = span.enter(); // spawn: async future is instrumented with the parent span let spawned_id = handle.spawn(async { tracing::Span::current().id() }).await; assert_eq!(spawned_id, parent_id, "spawn should propagate span"); // spawn_cpu: closure enters the parent span let cpu_id = handle.spawn_cpu(|| tracing::Span::current().id()).await; assert_eq!(cpu_id, parent_id, "spawn_cpu should propagate span"); // spawn_blocking: closure enters the parent span on the blocking thread let blocking_id = handle .spawn_blocking(|| tracing::Span::current().id()) .await; assert_eq!( blocking_id, parent_id, "spawn_blocking should propagate span" ); } ``` Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
1 parent bf5f390 commit c58d993

1 file changed

Lines changed: 7 additions & 0 deletions

File tree

vortex-io/src/runtime/handle.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::task::Poll;
99
use std::task::ready;
1010

1111
use futures::FutureExt;
12+
use tracing::Instrument;
1213
use vortex_error::vortex_panic;
1314

1415
use crate::runtime::AbortHandleRef;
@@ -65,11 +66,13 @@ impl Handle {
6566
R: Send + 'static,
6667
{
6768
let (send, recv) = oneshot::channel();
69+
let span = tracing::Span::current();
6870
let abort_handle = self.runtime().spawn(
6971
async move {
7072
// Task::detach allows the receiver to be dropped, so we ignore send errors.
7173
drop(send.send(f.await));
7274
}
75+
.instrument(span)
7376
.boxed(),
7477
);
7578
Task {
@@ -103,7 +106,9 @@ impl Handle {
103106
R: Send + 'static,
104107
{
105108
let (send, recv) = oneshot::channel();
109+
let span = tracing::Span::current();
106110
let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
111+
let _guard = span.enter();
107112
// Optimistically avoid the work if the result won't be used.
108113
if !send.is_closed() {
109114
// Task::detach allows the receiver to be dropped, so we ignore send errors.
@@ -123,7 +128,9 @@ impl Handle {
123128
R: Send + 'static,
124129
{
125130
let (send, recv) = oneshot::channel();
131+
let span = tracing::Span::current();
126132
let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
133+
let _guard = span.enter();
127134
// Optimistically avoid the work if the result won't be used.
128135
if !send.is_closed() {
129136
// Task::detach allows the receiver to be dropped, so we ignore send errors.

0 commit comments

Comments
 (0)