Skip to content

Commit 3f87703

Browse files
committed
update
1 parent 99da4aa commit 3f87703

3 files changed

Lines changed: 32 additions & 31 deletions

File tree

src/asyncio.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ pub(crate) fn empty_context(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> {
2323
}
2424

2525
#[inline(always)]
26-
pub(crate) fn copy_context(py: Python) -> Py<PyAny> {
27-
// Use the fast FFI path for CPython
28-
// Note: PyPy support would require a different implementation, but robyn targets CPython
29-
let ctx = unsafe {
26+
pub(crate) fn copy_context(py: Python) -> PyResult<Py<PyAny>> {
27+
unsafe {
3028
let ptr = pyo3::ffi::PyContext_CopyCurrent();
31-
Bound::from_owned_ptr(py, ptr)
32-
};
33-
ctx.unbind()
29+
Ok(Bound::from_owned_ptr_or_err(py, ptr)?.unbind())
30+
}
3431
}

src/callbacks.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ impl PyDoneAwaitable {
5353

5454
#[pyclass(frozen, module = "robyn._robyn")]
5555
pub(crate) struct PyErrAwaitable {
56-
result: PyResult<()>,
56+
err: PyErr,
5757
}
5858

5959
impl PyErrAwaitable {
60-
pub(crate) fn new(result: PyResult<()>) -> Self {
61-
Self { result }
60+
pub(crate) fn new(err: PyErr) -> Self {
61+
Self { err }
6262
}
6363
}
6464

@@ -73,7 +73,7 @@ impl PyErrAwaitable {
7373
}
7474

7575
fn __next__(&self, py: Python) -> PyResult<()> {
76-
Err(self.result.as_ref().err().unwrap().clone_ref(py))
76+
Err(self.err.clone_ref(py))
7777
}
7878
}
7979

@@ -230,6 +230,9 @@ impl PyFutureAwaitable {
230230
self.event_loop.clone_ref(py)
231231
}
232232

233+
/// Single-callback optimization: only the most recent callback is stored.
234+
/// This is intentional — this type is internal to the robyn runtime and in
235+
/// practice asyncio registers at most one done-callback per future.
233236
#[pyo3(signature = (cb, context=None))]
234237
fn add_done_callback(
235238
pyself: PyRef<'_, Self>,
@@ -257,11 +260,18 @@ impl PyFutureAwaitable {
257260
Ok(())
258261
}
259262

263+
/// Clears the single stored callback (see `add_done_callback`).
264+
/// The `cb` argument is accepted for asyncio protocol compatibility but
265+
/// is not used for matching — the sole stored callback is always removed.
260266
#[allow(unused)]
261267
fn remove_done_callback(&self, cb: Py<PyAny>) -> i32 {
262268
let mut ack = self.ack.write().unwrap();
263-
*ack = None;
264-
1
269+
if ack.is_some() {
270+
*ack = None;
271+
1
272+
} else {
273+
0
274+
}
265275
}
266276

267277
#[allow(unused)]

src/runtime.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::FutureExt;
22
use pyo3::{prelude::*, IntoPyObjectExt};
3-
use std::{future::Future, sync::Arc};
3+
use std::{future::Future, sync::Arc, sync::OnceLock};
44
use tokio::{runtime::Builder as RuntimeBuilder, task::JoinHandle};
55

66
#[cfg(unix)]
@@ -151,7 +151,7 @@ pub(crate) fn done_future_into_py(
151151
}
152152

153153
#[inline(always)]
154-
pub(crate) fn err_future_into_py(py: Python, err: PyResult<()>) -> PyResult<Bound<PyAny>> {
154+
pub(crate) fn err_future_into_py(py: Python, err: PyErr) -> PyResult<Bound<PyAny>> {
155155
PyErrAwaitable::new(err).into_bound_py_any(py)
156156
}
157157

@@ -247,32 +247,30 @@ where
247247
Ok(py_fut.into_bound(py))
248248
}
249249

250-
// Wrapper function to replace pyo3_async_runtimes::tokio::future_into_py
251-
// This function matches the signature and converts Result<T, E> to FutureResultToPy
250+
static SHARED_BLOCKING_RUNNER: OnceLock<Arc<BlockingRunner>> = OnceLock::new();
251+
252+
fn shared_blocking_runner() -> Arc<BlockingRunner> {
253+
SHARED_BLOCKING_RUNNER
254+
.get_or_init(|| Arc::new(BlockingRunner::new(1, 30)))
255+
.clone()
256+
}
257+
252258
pub fn future_into_py<F>(py: Python, fut: F) -> PyResult<Bound<PyAny>>
253259
where
254260
F: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
255261
{
256-
// Try to get the tokio runtime handle
257-
// When called from Python code, we might not be in a tokio async context
258-
// So we try try_current() first, and if that fails, fall back to pyo3_async_runtimes
259262
match tokio::runtime::Handle::try_current() {
260263
Ok(rt_handle) => {
261-
// We have a handle, use our optimized implementation
262-
// Get the event loop from Python
263264
let asyncio = py.import("asyncio")?;
264265
let event_loop = asyncio
265-
.call_method0("get_event_loop")
266+
.call_method0("get_running_loop")
266267
.or_else(|_| asyncio.call_method0("new_event_loop"))?;
267268
let event_loop: Py<PyAny> = event_loop.unbind();
268269

269-
// Create a simple blocking runner (1 thread, 30s timeout)
270-
let blocking_runner = Arc::new(BlockingRunner::new(1, 30));
270+
let blocking_runner = shared_blocking_runner();
271271

272-
// Create RuntimeRef
273272
let rt_ref = RuntimeRef::new(rt_handle, blocking_runner, Arc::new(event_loop));
274273

275-
// Convert the future to use FutureResultToPy
276274
let wrapped_fut = async move {
277275
match fut.await {
278276
Ok(()) => FutureResultToPy::None,
@@ -286,13 +284,9 @@ where
286284
}
287285
};
288286

289-
// Use the futlike implementation (better for most cases)
290287
future_into_py_futlike(rt_ref, py, wrapped_fut)
291288
}
292289
Err(_) => {
293-
// Fall back to pyo3_async_runtimes when we can't get the handle
294-
// This happens when called from Python code that's not in a tokio async context
295-
// Convert Result<(), anyhow::Error> to PyResult<()> for pyo3_async_runtimes
296290
let py_fut = fut.map(|result| {
297291
result.map_err(|e| {
298292
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(

0 commit comments

Comments
 (0)