Skip to content

Commit 3c80bcc

Browse files
authored
fix: preserve Go scope stacks across OS threads (NVIDIA#100)
#### Overview Fix intermittent Windows Go test failures caused by Go goroutines moving between OS threads while NeMo Flow scope state is stored in Rust thread-local fallback storage. - [x] I confirm this contribution is my own work, or I have the right to submit it under this project's license. - [x] I searched existing issues and open pull requests, and this does not duplicate existing work. #### Details - Add Rust runtime helpers to capture and restore the current thread-local scope stack binding. - Expose the capture/restore helpers through the FFI API and generated header. - Update Go `ScopeStack.Run` to restore the previous Rust thread-local binding before releasing the locked OS thread. - Run the flaky ATOF Go lifecycle test inside an explicit `ScopeStack.Run` so `PushScope`, `EmitEvent`, and `PopScope` stay on one OS thread. - Extend Go coverage to assert `ScopeStackActive()` is restored after `Run`. #### Where should the reviewer start? Start with `go/nemo_flow/nemo_flow.go` for the Go runtime behavior, then review `crates/core/src/api/runtime/scope_stack.rs` and `crates/ffi/src/api/scope_stack.rs` for the capture/restore bridge. The regression coverage is in `go/nemo_flow/atof_test.go` and `go/nemo_flow/context_test.go`. #### Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to) - Relates to: none ## Summary by CodeRabbit * **New Features** * Added scope-stack binding capture and restoration functionality for managing thread-local state preservation. * Extended FFI support for thread scope stack operations. * **Tests** * Updated scope-stack lifecycle tests to use the new binding management approach. [![Review Change Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/NVIDIA/NeMo-Flow/pull/100) Authors: - Will Killian (https://github.com/willkill07) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: NVIDIA#100
1 parent 14485f5 commit 3c80bcc

9 files changed

Lines changed: 156 additions & 15 deletions

File tree

crates/core/src/api/runtime.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ pub use callbacks::{
1717
};
1818
pub use global::global_context;
1919
pub use scope_stack::{
20-
ScopeStack, ScopeStackHandle, TASK_SCOPE_STACK, create_scope_stack, current_scope_stack,
21-
propagate_scope_to_thread, scope_stack_active, set_thread_scope_stack, sync_thread_scope_stack,
22-
task_scope_push, task_scope_remove, task_scope_top,
20+
ScopeStack, ScopeStackHandle, TASK_SCOPE_STACK, ThreadScopeStackBinding,
21+
capture_thread_scope_stack, create_scope_stack, current_scope_stack, propagate_scope_to_thread,
22+
restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack,
23+
sync_thread_scope_stack, task_scope_push, task_scope_remove, task_scope_top,
2324
};
2425
pub use state::NemoFlowContextState;

crates/core/src/api/runtime/scope_stack.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,16 @@ impl Default for ScopeStack {
236236
/// concurrent readers.
237237
pub type ScopeStackHandle = Arc<RwLock<ScopeStack>>;
238238

239+
/// Captured thread-local scope stack binding.
240+
///
241+
/// This preserves both the visible scope stack handle and whether it was
242+
/// explicitly installed on the current thread.
243+
#[derive(Clone)]
244+
pub struct ThreadScopeStackBinding {
245+
stack: ScopeStackHandle,
246+
explicit: bool,
247+
}
248+
239249
/// Create a new scope stack handle with an implicit root scope.
240250
///
241251
/// The returned handle wraps a freshly initialized [`ScopeStack`] inside an
@@ -297,6 +307,23 @@ pub fn set_thread_scope_stack(handle: ScopeStackHandle) {
297307
THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(true));
298308
}
299309

310+
/// Capture the current thread-local scope stack binding.
311+
///
312+
/// This is intended for foreign runtimes that temporarily bind a scope stack to
313+
/// an OS thread and need to restore the exact previous state before releasing
314+
/// that thread back to their scheduler.
315+
pub fn capture_thread_scope_stack() -> ThreadScopeStackBinding {
316+
let stack = THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone());
317+
let explicit = THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get());
318+
ThreadScopeStackBinding { stack, explicit }
319+
}
320+
321+
/// Restore a previously captured thread-local scope stack binding.
322+
pub fn restore_thread_scope_stack(binding: ThreadScopeStackBinding) {
323+
THREAD_SCOPE_STACK.with(|stack| *stack.borrow_mut() = binding.stack);
324+
THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.set(binding.explicit));
325+
}
326+
300327
/// Synchronize the thread-local scope stack without marking it explicit.
301328
///
302329
/// This updates the thread-local slot used by native runtime code while

crates/ffi/nemo_flow.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ typedef struct FfiScopeStack FfiScopeStack;
184184
*/
185185
typedef struct FfiStream FfiStream;
186186

187+
/**
188+
* Opaque handle to a captured thread-local scope stack binding.
189+
*/
190+
typedef struct FfiThreadScopeStackBinding FfiThreadScopeStackBinding;
191+
187192
/**
188193
* Opaque handle representing an active tool call.
189194
*/
@@ -1770,6 +1775,29 @@ NemoFlowStatus nemo_flow_scope_stack_create(struct FfiScopeStack **out);
17701775
*/
17711776
NemoFlowStatus nemo_flow_scope_stack_set_thread(const struct FfiScopeStack *stack);
17721777

1778+
/**
1779+
* Capture the current thread-local scope stack binding.
1780+
*
1781+
* The returned binding must be restored with
1782+
* `nemo_flow_scope_stack_restore_thread`.
1783+
*
1784+
* # Parameters
1785+
* - `out`: On success, receives a heap-allocated binding handle.
1786+
*
1787+
* # Safety
1788+
* `out` must be a valid, non-null pointer.
1789+
*/
1790+
NemoFlowStatus nemo_flow_scope_stack_capture_thread(struct FfiThreadScopeStackBinding **out);
1791+
1792+
/**
1793+
* Restore and free a captured thread-local scope stack binding.
1794+
*
1795+
* # Safety
1796+
* `binding` must be a valid pointer returned by
1797+
* `nemo_flow_scope_stack_capture_thread`.
1798+
*/
1799+
NemoFlowStatus nemo_flow_scope_stack_restore_thread(struct FfiThreadScopeStackBinding *binding);
1800+
17731801
/**
17741802
* Returns whether the current execution context has an explicitly-initialized
17751803
* scope stack.

crates/ffi/src/api/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use crate::error::{
3636
};
3737
use crate::types::{
3838
FfiAtifExporter, FfiAtofExporter, FfiCodecHandle, FfiLLMHandle, FfiOpenInferenceSubscriber,
39-
FfiOpenTelemetrySubscriber, FfiPluginContext, FfiScopeHandle, FfiScopeStack, FfiToolHandle,
40-
NemoFlowScopeType,
39+
FfiOpenTelemetrySubscriber, FfiPluginContext, FfiScopeHandle, FfiScopeStack,
40+
FfiThreadScopeStackBinding, FfiToolHandle, NemoFlowScopeType,
4141
};
4242
pub use crate::types::{nemo_flow_openinference_subscriber_free, nemo_flow_otel_subscriber_free};
4343
use libc::c_char;
@@ -46,8 +46,8 @@ use nemo_flow::api::llm::{LlmAttributes, LlmRequest};
4646
use nemo_flow::api::registry as core_registry_api;
4747
use nemo_flow::api::runtime::{LlmExecutionNextFn, LlmStreamExecutionNextFn, ToolExecutionNextFn};
4848
use nemo_flow::api::runtime::{
49-
TASK_SCOPE_STACK, create_scope_stack, current_scope_stack, scope_stack_active,
50-
set_thread_scope_stack,
49+
TASK_SCOPE_STACK, capture_thread_scope_stack, create_scope_stack, current_scope_stack,
50+
restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack,
5151
};
5252
use nemo_flow::api::scope as core_scope_api;
5353
use nemo_flow::api::scope::ScopeAttributes;

crates/ffi/src/api/scope_stack.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::{
5-
FfiScopeStack, NemoFlowStatus, clear_last_error, create_scope_stack, scope_stack_active,
5+
FfiScopeStack, FfiThreadScopeStackBinding, NemoFlowStatus, capture_thread_scope_stack,
6+
clear_last_error, create_scope_stack, restore_thread_scope_stack, scope_stack_active,
67
set_last_error, set_thread_scope_stack,
78
};
89

@@ -75,6 +76,49 @@ pub unsafe extern "C" fn nemo_flow_scope_stack_set_thread(
7576
NemoFlowStatus::Ok
7677
}
7778

79+
/// Capture the current thread-local scope stack binding.
80+
///
81+
/// The returned binding must be restored with
82+
/// `nemo_flow_scope_stack_restore_thread`.
83+
///
84+
/// # Parameters
85+
/// - `out`: On success, receives a heap-allocated binding handle.
86+
///
87+
/// # Safety
88+
/// `out` must be a valid, non-null pointer.
89+
#[unsafe(no_mangle)]
90+
pub unsafe extern "C" fn nemo_flow_scope_stack_capture_thread(
91+
out: *mut *mut FfiThreadScopeStackBinding,
92+
) -> NemoFlowStatus {
93+
clear_last_error();
94+
if out.is_null() {
95+
set_last_error("out pointer is null");
96+
return NemoFlowStatus::NullPointer;
97+
}
98+
let binding = capture_thread_scope_stack();
99+
unsafe { *out = Box::into_raw(Box::new(FfiThreadScopeStackBinding(binding))) };
100+
NemoFlowStatus::Ok
101+
}
102+
103+
/// Restore and free a captured thread-local scope stack binding.
104+
///
105+
/// # Safety
106+
/// `binding` must be a valid pointer returned by
107+
/// `nemo_flow_scope_stack_capture_thread`.
108+
#[unsafe(no_mangle)]
109+
pub unsafe extern "C" fn nemo_flow_scope_stack_restore_thread(
110+
binding: *mut FfiThreadScopeStackBinding,
111+
) -> NemoFlowStatus {
112+
clear_last_error();
113+
if binding.is_null() {
114+
set_last_error("binding pointer is null");
115+
return NemoFlowStatus::NullPointer;
116+
}
117+
let binding = unsafe { Box::from_raw(binding) };
118+
restore_thread_scope_stack(binding.0);
119+
NemoFlowStatus::Ok
120+
}
121+
78122
/// Returns whether the current execution context has an explicitly-initialized
79123
/// scope stack.
80124
///

crates/ffi/src/types/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
//! with their corresponding `nemo_flow_*_free` function.
1212
1313
use libc::c_char;
14-
use nemo_flow::api::runtime::ScopeStackHandle;
14+
use nemo_flow::api::runtime::{ScopeStackHandle, ThreadScopeStackBinding};
1515
use nemo_flow::plugin::PluginRegistrationContext;
1616
use serde_json::Value as Json;
1717

@@ -48,6 +48,8 @@ pub struct FfiLLMRequest(pub LlmRequest);
4848
pub struct FfiEvent(pub Event);
4949
/// Opaque handle to an isolated scope stack for per-request/per-task isolation.
5050
pub struct FfiScopeStack(pub ScopeStackHandle);
51+
/// Opaque handle to a captured thread-local scope stack binding.
52+
pub struct FfiThreadScopeStackBinding(pub ThreadScopeStackBinding);
5153
/// Opaque ATIF exporter handle.
5254
pub struct FfiAtifExporter(pub nemo_flow::observability::atif::AtifExporter);
5355
/// Opaque ATOF JSONL exporter handle.

go/nemo_flow/atof_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,23 @@ func TestAtofExporterLifecycleWritesRawJSONL(t *testing.T) {
4444

4545
name := "go_atof_" + time.Now().Format("150405.000000")
4646
requireNoError(t, exporter.Register(name), "Register failed")
47-
handle, err := PushScope("atof_scope", ScopeTypeAgent, WithInput(json.RawMessage(`{"scope":true}`)))
48-
requireNoError(t, err, "PushScope failed")
49-
requireNoError(t, EmitEvent("atof_mark", WithEventParent(handle), WithEventData(json.RawMessage(`{"step":1}`))), "EmitEvent failed")
50-
requireNoError(t, PopScope(handle, WithOutput(json.RawMessage(`{"done":true}`))), "PopScope failed")
47+
stack, err := NewScopeStack()
48+
requireNoError(t, err, "NewScopeStack failed")
49+
defer stack.Close()
50+
var runErr error
51+
stack.Run(func() {
52+
handle, err := PushScope("atof_scope", ScopeTypeAgent, WithInput(json.RawMessage(`{"scope":true}`)))
53+
if err != nil {
54+
runErr = err
55+
return
56+
}
57+
if err := EmitEvent("atof_mark", WithEventParent(handle), WithEventData(json.RawMessage(`{"step":1}`))); err != nil {
58+
runErr = err
59+
return
60+
}
61+
runErr = PopScope(handle, WithOutput(json.RawMessage(`{"done":true}`)))
62+
})
63+
requireNoError(t, runErr, "scope lifecycle failed")
5164
requireNoError(t, exporter.Deregister(name), "Deregister failed")
5265
requireNoError(t, exporter.Deregister(name), "repeated Deregister should be safe")
5366
requireNoError(t, exporter.ForceFlush(), "ForceFlush failed")

go/nemo_flow/context_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package nemo_flow
66
import (
77
"encoding/json"
88
"fmt"
9+
"runtime"
910
"sync"
1011
"testing"
1112
)
@@ -123,13 +124,21 @@ func TestScopeStackActiveInsideRun(t *testing.T) {
123124
defer stack.Close()
124125

125126
var active bool
127+
runtime.LockOSThread()
128+
defer runtime.UnlockOSThread()
129+
if ScopeStackActive() {
130+
t.Fatal("expected ScopeStackActive() to be false before Run")
131+
}
126132
stack.Run(func() {
127133
active = ScopeStackActive()
128134
})
129135

130136
if !active {
131137
t.Error("expected ScopeStackActive() to be true inside Run")
132138
}
139+
if ScopeStackActive() {
140+
t.Error("expected ScopeStackActive() to be restored after Run")
141+
}
133142
}
134143

135144
func TestScopeStackRunIsolation(t *testing.T) {

go/nemo_flow/nemo_flow.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ package nemo_flow
3131
3232
typedef struct FfiScopeHandle FfiScopeHandle;
3333
typedef struct FfiScopeStack FfiScopeStack;
34+
typedef struct FfiThreadScopeStackBinding FfiThreadScopeStackBinding;
3435
typedef struct FfiToolHandle FfiToolHandle;
3536
typedef struct FfiLLMHandle FfiLLMHandle;
3637
typedef struct FfiLLMRequest FfiLLMRequest;
@@ -211,6 +212,8 @@ extern void nemo_flow_string_free(char* ptr);
211212
// Scope stack isolation
212213
extern int32_t nemo_flow_scope_stack_create(FfiScopeStack** out);
213214
extern int32_t nemo_flow_scope_stack_set_thread(const FfiScopeStack* stack);
215+
extern int32_t nemo_flow_scope_stack_capture_thread(FfiThreadScopeStackBinding** out);
216+
extern int32_t nemo_flow_scope_stack_restore_thread(FfiThreadScopeStackBinding* binding);
214217
extern _Bool nemo_flow_scope_stack_active(void);
215218
extern void nemo_flow_scope_stack_free(FfiScopeStack* ptr);
216219
@@ -1500,8 +1503,22 @@ func (s *ScopeStack) Close() {
15001503
// This is the canonical way to propagate a scope stack to a worker goroutine.
15011504
func (s *ScopeStack) Run(fn func()) {
15021505
runtime.LockOSThread()
1503-
defer runtime.UnlockOSThread()
1504-
C.nemo_flow_scope_stack_set_thread(s.ptr)
1506+
var binding *C.FfiThreadScopeStackBinding
1507+
if err := checkStatus(C.nemo_flow_scope_stack_capture_thread(&binding)); err != nil {
1508+
runtime.UnlockOSThread()
1509+
panic(err)
1510+
}
1511+
defer func() {
1512+
status := C.nemo_flow_scope_stack_restore_thread(binding)
1513+
if err := checkStatus(status); err != nil {
1514+
runtime.UnlockOSThread()
1515+
panic(err)
1516+
}
1517+
runtime.UnlockOSThread()
1518+
}()
1519+
if err := checkStatus(C.nemo_flow_scope_stack_set_thread(s.ptr)); err != nil {
1520+
panic(err)
1521+
}
15051522
fn()
15061523
}
15071524

0 commit comments

Comments
 (0)