Skip to content

Commit 7fdfcf1

Browse files
committed
fix(hiroz-py): surface P6 callback thread errors via last_error property
Background callback threads were silently swallowing exceptions via eprintln. Add a shared Arc<Mutex<Option<String>>> that records the most recent error; expose it as ZServer.last_error (resets on read). Also fix a pre-existing broken intra-doc link in lifecycle/node.rs (create_publisher → Self::create_publisher) that was failing cargo doc.
1 parent a9101c6 commit 7fdfcf1

3 files changed

Lines changed: 82 additions & 9 deletions

File tree

crates/hiroz-py/python/hiroz_py/__init__.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ class ZServer:
288288
def take_request(self) -> tuple[dict[str, Any], Any]: ...
289289
def send_response(self, response: Any, request_id: dict[str, Any]) -> None: ...
290290
def get_type_name(self) -> str: ...
291+
@property
292+
def last_error(self) -> str | None: ...
291293

292294
# ---------------------------------------------------------------------------
293295
# ZActionClient

crates/hiroz-py/src/service.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use hiroz::graph::Graph;
33
use hiroz::service::RequestId;
44
use pyo3::prelude::*;
55
use pyo3::types::PyDict;
6-
use std::sync::Arc;
76
use std::sync::atomic::{AtomicBool, Ordering};
7+
use std::sync::{Arc, Mutex};
88
use std::time::Duration;
99

1010
/// Python wrapper for service client
@@ -90,6 +90,7 @@ struct CallbackServerState {
9090
stop: Arc<AtomicBool>,
9191
handle: Option<std::thread::JoinHandle<()>>,
9292
_server: Arc<dyn RawServer>,
93+
last_error: Arc<Mutex<Option<String>>>,
9394
}
9495

9596
impl Drop for CallbackServerState {
@@ -106,23 +107,27 @@ impl Drop for CallbackServerState {
106107
/// Pull mode (default): `inner` is `Some`; the caller drives `take_request` /
107108
/// `send_response`. Callback mode (P6): `inner` is `None` and a background
108109
/// thread (held in `_callback`) services requests via the user callback.
110+
/// Errors from the callback thread are stored in `last_error` and surfaced via
111+
/// the `last_error` Python property.
109112
#[pyclass(name = "ZServer")]
110113
pub struct PyZServer {
111-
inner: Option<std::sync::Mutex<Box<dyn RawServer>>>,
114+
inner: Option<Mutex<Box<dyn RawServer>>>,
112115
request_type_name: String,
113116
response_type_name: String,
114117
_callback: Option<CallbackServerState>,
118+
last_error: Arc<Mutex<Option<String>>>,
115119
}
116120

117121
impl PyZServer {
118122
pub fn new(inner: Box<dyn RawServer>, service_type: String) -> Self {
119123
let request_type_name = format!("{}_Request", service_type);
120124
let response_type_name = format!("{}_Response", service_type);
121125
Self {
122-
inner: Some(std::sync::Mutex::new(inner)),
126+
inner: Some(Mutex::new(inner)),
123127
request_type_name,
124128
response_type_name,
125129
_callback: None,
130+
last_error: Arc::new(Mutex::new(None)),
126131
}
127132
}
128133

@@ -137,12 +142,14 @@ impl PyZServer {
137142
let response_type_name = format!("{}_Response", service_type);
138143

139144
let stop = Arc::new(AtomicBool::new(false));
145+
let last_error: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
140146
let handle = spawn_callback_loop(
141147
Arc::clone(&server),
142148
request_type_name.clone(),
143149
response_type_name.clone(),
144150
callback,
145151
Arc::clone(&stop),
152+
Arc::clone(&last_error),
146153
);
147154

148155
Self {
@@ -153,11 +160,13 @@ impl PyZServer {
153160
stop,
154161
handle: Some(handle),
155162
_server: server,
163+
last_error: Arc::clone(&last_error),
156164
}),
165+
last_error,
157166
}
158167
}
159168

160-
fn require_pull(&self) -> PyResult<&std::sync::Mutex<Box<dyn RawServer>>> {
169+
fn require_pull(&self) -> PyResult<&Mutex<Box<dyn RawServer>>> {
161170
self.inner.as_ref().ok_or_else(|| {
162171
pyo3::exceptions::PyRuntimeError::new_err(
163172
"This server runs in callback mode; take_request/send_response are unavailable. \
@@ -174,7 +183,19 @@ fn spawn_callback_loop(
174183
response_type_name: String,
175184
callback: PyObject,
176185
stop: Arc<AtomicBool>,
186+
last_error: Arc<Mutex<Option<String>>>,
177187
) -> std::thread::JoinHandle<()> {
188+
// Helper: record an error both in the shared slot and stderr.
189+
macro_rules! record_error {
190+
($last_error:expr, $msg:literal, $e:expr) => {{
191+
let msg = format!(concat!("hiroz_py: ", $msg, ": {}"), $e);
192+
eprintln!("{}", msg);
193+
if let Ok(mut guard) = $last_error.lock() {
194+
*guard = Some(msg);
195+
}
196+
}};
197+
}
198+
178199
std::thread::spawn(move || {
179200
while !stop.load(Ordering::Relaxed) {
180201
// Poll for a request without holding the GIL.
@@ -188,14 +209,14 @@ fn spawn_callback_loop(
188209
) {
189210
Ok(o) => o,
190211
Err(e) => {
191-
eprintln!("hiroz_py: request deserialize error: {}", e);
212+
record_error!(last_error, "request deserialize error", e);
192213
return;
193214
}
194215
};
195216
let resp_obj = match callback.call1(py, (req_obj,)) {
196217
Ok(o) => o,
197218
Err(e) => {
198-
eprintln!("hiroz_py: service callback error: {}", e);
219+
record_error!(last_error, "service callback error", e);
199220
return;
200221
}
201222
};
@@ -206,18 +227,18 @@ fn spawn_callback_loop(
206227
) {
207228
Ok(b) => b,
208229
Err(e) => {
209-
eprintln!("hiroz_py: response serialize error: {}", e);
230+
record_error!(last_error, "response serialize error", e);
210231
return;
211232
}
212233
};
213234
if let Err(e) = server.send_response_serialized(&resp_bytes, &request_id) {
214-
eprintln!("hiroz_py: send_response error: {}", e);
235+
record_error!(last_error, "send_response error", e);
215236
}
216237
});
217238
}
218239
Ok(None) => std::thread::sleep(Duration::from_millis(2)),
219240
Err(e) => {
220-
eprintln!("hiroz_py: service poll error: {}", e);
241+
record_error!(last_error, "service poll error", e);
221242
std::thread::sleep(Duration::from_millis(50));
222243
}
223244
}
@@ -289,4 +310,12 @@ impl PyZServer {
289310
self.request_type_name, self.response_type_name
290311
)
291312
}
313+
314+
/// The last error raised by the callback thread, or None if no error has
315+
/// occurred. Resets to None when read. Only meaningful in callback mode;
316+
/// always None in pull mode.
317+
#[getter]
318+
fn last_error(&self) -> Option<String> {
319+
self.last_error.lock().ok().and_then(|mut g| g.take())
320+
}
292321
}

crates/hiroz-py/tests/test_rclpy_alignment.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,48 @@ def handle(req):
147147
assert resp.sum == 42
148148

149149

150+
def test_p6_last_error_surfaced_on_callback_exception(ctx):
151+
node = ctx.create_node("p6_err_node").build()
152+
153+
def bad_handle(req):
154+
raise ValueError("intentional callback failure")
155+
156+
server = node.create_server(
157+
"/p6_err_add", example_interfaces.AddTwoInts, callback=bad_handle
158+
)
159+
client = node.create_client("/p6_err_add", example_interfaces.AddTwoInts)
160+
assert client.wait_for_service(timeout=5.0)
161+
162+
# The call will fail from the client side (no response sent).
163+
try:
164+
client.call(example_interfaces.AddTwoInts.Request(a=1, b=2), timeout=1.0)
165+
except Exception:
166+
pass
167+
168+
# Give the background thread a moment to record the error.
169+
deadline = time.time() + 2.0
170+
err = None
171+
while err is None and time.time() < deadline:
172+
err = server.last_error
173+
if err is None:
174+
time.sleep(0.05)
175+
176+
assert err is not None, "last_error should surface the callback exception"
177+
assert "intentional callback failure" in err
178+
179+
180+
def test_p6_last_error_none_when_no_error(ctx):
181+
node = ctx.create_node("p6_ok_node").build()
182+
183+
def handle(req):
184+
return example_interfaces.AddTwoInts.Response(sum=req.a + req.b)
185+
186+
server = node.create_server(
187+
"/p6_ok_add", example_interfaces.AddTwoInts, callback=handle
188+
)
189+
assert server.last_error is None
190+
191+
150192
def test_p1_wait_for_service_timeout_returns_false(ctx):
151193
node = ctx.create_node("p1_wait_to").build()
152194
client = node.create_client("/p1_never", example_interfaces.AddTwoInts)

0 commit comments

Comments
 (0)