Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugboard/_zmq/zmq_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing as _t

from pydantic import BaseModel, Field, ValidationError
import uvloop
Comment thread
toby-coleman marked this conversation as resolved.
Outdated
import zmq
import zmq.asyncio

Expand Down Expand Up @@ -162,7 +163,7 @@ def _start_proxy(
def _run_process(self) -> None:
"""Entry point for the child process."""
try:
asyncio.run(self._run())
uvloop.run(self._run())
Comment thread
toby-coleman marked this conversation as resolved.
Outdated
finally: # pragma: no cover
self._close()

Expand Down
6 changes: 4 additions & 2 deletions plugboard/utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from concurrent.futures import ThreadPoolExecutor
import typing as _t

import uvloop
Comment thread
toby-coleman marked this conversation as resolved.
Outdated


async def gather_except(*coros: _t.Coroutine) -> list[_t.Any]:
"""Attempts to gather the given coroutines, raising any exceptions."""
Expand All @@ -16,7 +18,7 @@ async def gather_except(*coros: _t.Coroutine) -> list[_t.Any]:

def _run_coro_in_thread(coro: _t.Coroutine, timeout: _t.Optional[float] = None) -> _t.Any:
def _target() -> _t.Any:
return asyncio.run(coro)
return uvloop.run(coro)
Comment thread
toby-coleman marked this conversation as resolved.
Outdated

with ThreadPoolExecutor() as pool:
future = pool.submit(_target)
Expand All @@ -34,7 +36,7 @@ def run_coro_sync(coro: _t.Coroutine, timeout: _t.Optional[float] = None) -> _t.
loop = asyncio.get_running_loop()
except RuntimeError: # pragma: no cover
# No loop running in current thread
return asyncio.run(coro)
return uvloop.run(coro)
Comment thread
toby-coleman marked this conversation as resolved.
Outdated

if loop.is_running():
# Run coroutine in new thread with dedicated event loop.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"structlog>=25.1.0,<26",
"that-depends>=3.4.1,<4",
"typer>=0.12,<1",
"uvloop>=0.21.0",
Comment thread
toby-coleman marked this conversation as resolved.
Outdated
]

[project.optional-dependencies]
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Configuration for the test suite."""

from abc import ABC, abstractmethod
from asyncio.events import BaseDefaultEventLoopPolicy
Comment thread
toby-coleman marked this conversation as resolved.
import multiprocessing
import os
import typing as _t
Expand All @@ -11,6 +12,7 @@
import pytest_cases
import ray
from that_depends import ContextScopes, container_context
import uvloop
Comment thread
toby-coleman marked this conversation as resolved.

from plugboard.component import Component, IOController as IO
from plugboard.component.io_controller import IOStreamClosedError
Expand All @@ -19,6 +21,12 @@
from plugboard.utils.settings import Settings


@pytest.fixture(scope="session")
def event_loop_policy() -> BaseDefaultEventLoopPolicy:
"""Set uvloop as the event loop policy for the test session."""
return uvloop.EventLoopPolicy()
Comment thread
toby-coleman marked this conversation as resolved.


@pytest.fixture(scope="session", autouse=True)
def mp_set_start_method() -> None:
"""Set the start method for multiprocessing to 'spawn'."""
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_state_backend_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Tests for the `StateBackend` classes that permit multiprocessing."""

import asyncio
import typing as _t
Comment thread
toby-coleman marked this conversation as resolved.

import pytest
import pytest_asyncio
from ray.util.multiprocessing import Pool
import uvloop
Comment thread
toby-coleman marked this conversation as resolved.

from plugboard.component import Component, IOController
from plugboard.connector import Connector, ZMQConnector
Expand Down Expand Up @@ -139,7 +139,7 @@ async def _inner() -> None:
await comp.init()
print("Component initialised.")

asyncio.run(_inner())
uvloop.run(_inner())
Comment thread
toby-coleman marked this conversation as resolved.

# At the end of `Component.init` the component upserts itself into the state
# backend, so we expect the state backend to have up to date component data afterwards
Expand All @@ -161,7 +161,7 @@ def upsert_connector(conn: Connector) -> None:
async def _inner() -> None:
await state_backend.upsert_connector(conn)

asyncio.run(_inner())
uvloop.run(_inner())
Comment thread
toby-coleman marked this conversation as resolved.

mp_processes = []
with Pool(2) as pool:
Expand Down
22 changes: 22 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading