diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index ba68f880a..ce0103fac 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -147,6 +147,7 @@ pub struct TunerHolder { workflow_slot_supplier: SlotSupplier, activity_slot_supplier: SlotSupplier, local_activity_slot_supplier: SlotSupplier, + nexus_slot_supplier: SlotSupplier, } #[derive(FromPyObject)] @@ -745,10 +746,17 @@ fn convert_tuner_holder( } else { None }; + let maybe_nexus_resource_opts = + if let SlotSupplier::ResourceBased(ref ss) = holder.nexus_slot_supplier { + Some(&ss.tuner_config) + } else { + None + }; let all_resource_opts = [ maybe_wf_resource_opts, maybe_act_resource_opts, maybe_local_act_resource_opts, + maybe_nexus_resource_opts, ]; let mut set_resource_opts = all_resource_opts.iter().flatten(); let first = set_resource_opts.next(); @@ -784,6 +792,10 @@ fn convert_tuner_holder( )?) .local_activity_slot_options(convert_slot_supplier( holder.local_activity_slot_supplier, + task_locals.clone(), + )?) + .nexus_slot_options(convert_slot_supplier( + holder.nexus_slot_supplier, task_locals, )?); Ok(options diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 9b2abed8e..6fff9878c 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -168,6 +168,7 @@ class TunerHolder: workflow_slot_supplier: SlotSupplier activity_slot_supplier: SlotSupplier local_activity_slot_supplier: SlotSupplier + nexus_slot_supplier: SlotSupplier class Worker: diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 08686dcb3..1d7b2558e 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -33,6 +33,7 @@ CustomSlotSupplier, FixedSizeSlotSupplier, LocalActivitySlotInfo, + NexusSlotInfo, ResourceBasedSlotConfig, ResourceBasedSlotSupplier, ResourceBasedTunerConfig, @@ -117,4 +118,5 @@ "SlotReleaseContext", "SlotReserveContext", "WorkflowSlotInfo", + "NexusSlotInfo", ] diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 240429bf7..664602219 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -264,6 +264,9 @@ def on_eviction_hook( local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier( 1 ), + nexus_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier( + 1 + ), ), nonsticky_to_sticky_poll_ratio=1, no_remote_activities=True, diff --git a/temporalio/worker/_tuning.py b/temporalio/worker/_tuning.py index b74f79d5e..775c92348 100644 --- a/temporalio/worker/_tuning.py +++ b/temporalio/worker/_tuning.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging from abc import ABC, abstractmethod @@ -10,7 +12,7 @@ import temporalio.bridge.worker from temporalio.common import WorkerDeploymentVersion -_DEFAULT_RESOURCE_ACTIVITY_MAX = 500 +_DEFAULT_RESOURCE_SLOTS_MAX = 500 logger = logging.getLogger(__name__) @@ -150,7 +152,22 @@ class LocalActivitySlotInfo(Protocol): activity_type: str -SlotInfo: TypeAlias = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo] +# WARNING: This must match Rust worker::NexusSlotInfo +@runtime_checkable +class NexusSlotInfo(Protocol): + """Info about a nexus task slot usage. + + .. warning:: + Custom slot suppliers are currently experimental. + """ + + service: str + operation: str + + +SlotInfo: TypeAlias = Union[ + WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo +] # WARNING: This must match Rust worker::SlotMarkUsedCtx @@ -303,13 +320,14 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: def _to_bridge_slot_supplier( - slot_supplier: SlotSupplier, kind: Literal["workflow", "activity", "local_activity"] + slot_supplier: SlotSupplier, + kind: Literal["workflow", "activity", "local_activity", "nexus"], ) -> temporalio.bridge.worker.SlotSupplier: if isinstance(slot_supplier, FixedSizeSlotSupplier): return temporalio.bridge.worker.FixedSizeSlotSupplier(slot_supplier.num_slots) elif isinstance(slot_supplier, ResourceBasedSlotSupplier): min_slots = 5 if kind == "workflow" else 1 - max_slots = _DEFAULT_RESOURCE_ACTIVITY_MAX + max_slots = _DEFAULT_RESOURCE_SLOTS_MAX ramp_throttle = ( timedelta(seconds=0) if kind == "workflow" else timedelta(milliseconds=50) ) @@ -347,7 +365,8 @@ def create_resource_based( workflow_config: Optional[ResourceBasedSlotConfig] = None, activity_config: Optional[ResourceBasedSlotConfig] = None, local_activity_config: Optional[ResourceBasedSlotConfig] = None, - ) -> "WorkerTuner": + nexus_config: Optional[ResourceBasedSlotConfig] = None, + ) -> WorkerTuner: """Create a resource-based tuner with the provided options.""" resource_cfg = ResourceBasedTunerConfig(target_memory_usage, target_cpu_usage) wf = ResourceBasedSlotSupplier( @@ -359,26 +378,35 @@ def create_resource_based( local_act = ResourceBasedSlotSupplier( local_activity_config or ResourceBasedSlotConfig(), resource_cfg ) + nexus = ResourceBasedSlotSupplier( + nexus_config or ResourceBasedSlotConfig(), resource_cfg + ) return _CompositeTuner( wf, act, local_act, + nexus, ) @staticmethod def create_fixed( *, - workflow_slots: Optional[int], - activity_slots: Optional[int], - local_activity_slots: Optional[int], - ) -> "WorkerTuner": - """Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100.""" + workflow_slots: Optional[int] = None, + activity_slots: Optional[int] = None, + local_activity_slots: Optional[int] = None, + nexus_slots: Optional[int] = None, + ) -> WorkerTuner: + """Create a fixed-size tuner with the provided number of slots. + + Any unspecified slot numbers will default to 100. + """ return _CompositeTuner( FixedSizeSlotSupplier(workflow_slots if workflow_slots else 100), FixedSizeSlotSupplier(activity_slots if activity_slots else 100), FixedSizeSlotSupplier( local_activity_slots if local_activity_slots else 100 ), + FixedSizeSlotSupplier(nexus_slots if nexus_slots else 100), ) @staticmethod @@ -387,12 +415,14 @@ def create_composite( workflow_supplier: SlotSupplier, activity_supplier: SlotSupplier, local_activity_supplier: SlotSupplier, - ) -> "WorkerTuner": + nexus_supplier: SlotSupplier, + ) -> WorkerTuner: """Create a tuner composed of the provided slot suppliers.""" return _CompositeTuner( workflow_supplier, activity_supplier, local_activity_supplier, + nexus_supplier, ) @abstractmethod @@ -407,6 +437,10 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier: def _get_local_activity_task_slot_supplier(self) -> SlotSupplier: raise NotImplementedError + @abstractmethod + def _get_nexus_slot_supplier(self) -> SlotSupplier: + raise NotImplementedError + def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder: return temporalio.bridge.worker.TunerHolder( _to_bridge_slot_supplier( @@ -418,14 +452,25 @@ def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder: _to_bridge_slot_supplier( self._get_local_activity_task_slot_supplier(), "local_activity" ), + _to_bridge_slot_supplier(self._get_nexus_slot_supplier(), "nexus"), ) def _get_activities_max(self) -> Optional[int]: - ss = self._get_activity_task_slot_supplier() - if isinstance(ss, FixedSizeSlotSupplier): - return ss.num_slots - elif isinstance(ss, ResourceBasedSlotSupplier): - return ss.slot_config.maximum_slots or _DEFAULT_RESOURCE_ACTIVITY_MAX + return WorkerTuner._get_slot_supplier_max( + self._get_activity_task_slot_supplier() + ) + + def _get_nexus_tasks_max(self) -> Optional[int]: + return WorkerTuner._get_slot_supplier_max(self._get_nexus_slot_supplier()) + + @staticmethod + def _get_slot_supplier_max(slot_supplier: SlotSupplier) -> Optional[int]: + if isinstance(slot_supplier, FixedSizeSlotSupplier): + return slot_supplier.num_slots + elif isinstance(slot_supplier, ResourceBasedSlotSupplier): + return ( + slot_supplier.slot_config.maximum_slots or _DEFAULT_RESOURCE_SLOTS_MAX + ) return None @@ -436,6 +481,7 @@ class _CompositeTuner(WorkerTuner): workflow_slot_supplier: SlotSupplier activity_slot_supplier: SlotSupplier local_activity_slot_supplier: SlotSupplier + nexus_slot_supplier: SlotSupplier def _get_workflow_task_slot_supplier(self) -> SlotSupplier: return self.workflow_slot_supplier @@ -445,3 +491,6 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier: def _get_local_activity_task_slot_supplier(self) -> SlotSupplier: return self.local_activity_slot_supplier + + def _get_nexus_slot_supplier(self) -> SlotSupplier: + return self.nexus_slot_supplier diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index f93848496..dea516da8 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -119,6 +119,7 @@ def __init__( max_concurrent_workflow_tasks: Optional[int] = None, max_concurrent_activities: Optional[int] = None, max_concurrent_local_activities: Optional[int] = None, + max_concurrent_nexus_tasks: Optional[int] = None, tuner: Optional[WorkerTuner] = None, max_concurrent_workflow_task_polls: Optional[int] = None, nonsticky_to_sticky_poll_ratio: float = 0.2, @@ -214,13 +215,16 @@ def __init__( max_concurrent_workflow_tasks: Maximum allowed number of workflow tasks that will ever be given to this worker at one time. Mutually exclusive with ``tuner``. Must be set to at least two if ``max_cached_workflows`` is nonzero. - max_concurrent_activities: Maximum number of activity tasks that - will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``. + max_concurrent_activities: Maximum number of activity tasks that will ever be given to + the activity worker concurrently. Mutually exclusive with ``tuner``. max_concurrent_local_activities: Maximum number of local activity - tasks that will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``. + tasks that will ever be given to the activity worker concurrently. Mutually + exclusive with ``tuner``. + max_concurrent_nexus_tasks: Maximum number of Nexus tasks that will ever be given to + the Nexus worker concurrently. Mutually exclusive with ``tuner``. tuner: Provide a custom :py:class:`WorkerTuner`. Mutually exclusive with the - ``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and - ``max_concurrent_local_activities`` arguments. + ``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, + ``max_concurrent_local_activities``, and ``max_concurrent_nexus_tasks`` arguments. Defaults to fixed-size 100 slots for each slot kind if unset and none of the max_* arguments are provided. @@ -337,6 +341,7 @@ def __init__( max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, max_concurrent_activities=max_concurrent_activities, max_concurrent_local_activities=max_concurrent_local_activities, + max_concurrent_nexus_tasks=max_concurrent_nexus_tasks, tuner=tuner, max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, @@ -387,7 +392,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf """ self._config = config - # TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support if not ( config["activities"] or config["nexus_service_handlers"] @@ -430,22 +434,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf bridge_client.config.runtime or temporalio.runtime.Runtime.default() ) if config["activities"]: - # Issue warning here if executor max_workers is lower than max - # concurrent activities. We do this here instead of in - # _ActivityWorker so the stack level is predictable. - max_workers = getattr(config["activity_executor"], "_max_workers", None) - concurrent_activities = config["max_concurrent_activities"] - if config["tuner"] and config["tuner"]._get_activities_max(): - concurrent_activities = config["tuner"]._get_activities_max() - if isinstance(max_workers, int) and max_workers < ( - concurrent_activities or 0 - ): - warnings.warn( - f"Worker max_concurrent_activities is {concurrent_activities} " - + f"but activity_executor's max_workers is only {max_workers}", - stacklevel=2, - ) - + _warn_if_activity_executor_max_workers_is_inconsistent(config) self._activity_worker = _ActivityWorker( bridge_worker=lambda: self._bridge_worker, task_queue=config["task_queue"], @@ -462,6 +451,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf ) self._nexus_worker: Optional[_NexusWorker] = None if config["nexus_service_handlers"]: + _warn_if_nexus_task_executor_max_workers_is_inconsistent(config) self._nexus_worker = _NexusWorker( bridge_worker=lambda: self._bridge_worker, client=config["client"], @@ -522,16 +512,19 @@ def check_activity(activity): config["max_concurrent_workflow_tasks"] or config["max_concurrent_activities"] or config["max_concurrent_local_activities"] + or config["max_concurrent_nexus_tasks"] ): raise ValueError( "Cannot specify max_concurrent_workflow_tasks, max_concurrent_activities, " - "or max_concurrent_local_activities when also specifying tuner" + "max_concurrent_local_activities, or max_concurrent_nexus_tasks when also " + "specifying tuner" ) else: tuner = WorkerTuner.create_fixed( workflow_slots=config["max_concurrent_workflow_tasks"], activity_slots=config["max_concurrent_activities"], local_activity_slots=config["max_concurrent_local_activities"], + nexus_slots=config["max_concurrent_nexus_tasks"], ) bridge_tuner = tuner._to_bridge_tuner() @@ -790,8 +783,6 @@ async def raise_on_shutdown(): if self._nexus_worker: await self._nexus_worker.wait_all_completed() - # TODO(nexus-preview): check that we do all appropriate things for nexus worker that we do for activity worker - # Do final shutdown try: await self._bridge_worker.finalize_shutdown() @@ -883,6 +874,7 @@ class WorkerConfig(TypedDict, total=False): max_concurrent_workflow_tasks: Optional[int] max_concurrent_activities: Optional[int] max_concurrent_local_activities: Optional[int] + max_concurrent_nexus_tasks: Optional[int] tuner: Optional[WorkerTuner] max_concurrent_workflow_task_polls: Optional[int] nonsticky_to_sticky_poll_ratio: float @@ -907,6 +899,36 @@ class WorkerConfig(TypedDict, total=False): nexus_task_poller_behavior: PollerBehavior +def _warn_if_activity_executor_max_workers_is_inconsistent( + config: WorkerConfig, +) -> None: + max_workers = getattr(config["activity_executor"], "_max_workers", None) + concurrent_activities = config["max_concurrent_activities"] + if config["tuner"] and config["tuner"]._get_activities_max(): + concurrent_activities = config["tuner"]._get_activities_max() + if isinstance(max_workers, int) and max_workers < (concurrent_activities or 0): + warnings.warn( + f"Worker max_concurrent_activities is {concurrent_activities} " + + f"but activity_executor's max_workers is only {max_workers}", + stacklevel=3, + ) + + +def _warn_if_nexus_task_executor_max_workers_is_inconsistent( + config: WorkerConfig, +) -> None: + max_workers = getattr(config["nexus_task_executor"], "_max_workers", None) + concurrent_nexus_tasks = config["max_concurrent_nexus_tasks"] + if config["tuner"] and config["tuner"]._get_nexus_tasks_max(): + concurrent_nexus_tasks = config["tuner"]._get_nexus_tasks_max() + if isinstance(max_workers, int) and max_workers < (concurrent_nexus_tasks or 0): + warnings.warn( + f"Worker max_concurrent_nexus_tasks is {concurrent_nexus_tasks} " + + f"but nexus_task_executor's max_workers is only {max_workers}", + stacklevel=3, + ) + + @dataclass class WorkerDeploymentConfig: """Options for configuring the Worker Versioning feature. diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 3e1a1c8f7..e19325835 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -7,8 +7,10 @@ from typing import Any, Awaitable, Callable, Optional, Sequence from urllib.request import urlopen +import nexusrpc + import temporalio.api.enums.v1 -import temporalio.client +import temporalio.nexus import temporalio.worker._worker from temporalio import activity, workflow from temporalio.api.workflowservice.v1 import ( @@ -33,6 +35,7 @@ CustomSlotSupplier, FixedSizeSlotSupplier, LocalActivitySlotInfo, + NexusSlotInfo, PollerBehaviorAutoscaling, ResourceBasedSlotConfig, ResourceBasedSlotSupplier, @@ -42,7 +45,6 @@ SlotReleaseContext, SlotReserveContext, Worker, - WorkerConfig, WorkerDeploymentConfig, WorkerDeploymentVersion, WorkerTuner, @@ -55,6 +57,7 @@ new_worker, worker_versioning_enabled, ) +from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name # Passing through because Python 3.9 has an import bug at # https://github.com/python/cpython/issues/91351 @@ -81,6 +84,21 @@ async def run(self) -> None: raise NotImplementedError +@nexusrpc.handler.service_handler +class NeverRunService: + @nexusrpc.handler.sync_operation + async def never_run_operation( + self, _ctx: nexusrpc.handler.StartOperationContext, _input: None + ) -> None: + raise NotImplementedError + + @temporalio.nexus.workflow_run_operation + async def never_run_workflow_run_operation( + self, _ctx: temporalio.nexus.WorkflowRunOperationContext, _input: None + ) -> temporalio.nexus.WorkflowHandle[None]: + raise NotImplementedError + + async def test_worker_fatal_error_run(client: Client): # Run worker with injected workflow poll error worker = create_worker(client) @@ -317,6 +335,7 @@ async def test_can_run_composite_tuner_worker(client: Client, env: WorkflowEnvir ), resource_based_options, ), + nexus_supplier=FixedSizeSlotSupplier(10), ) async with new_worker( client, @@ -354,7 +373,7 @@ async def test_cant_specify_max_concurrent_and_tuner( assert "when also specifying tuner" in str(err.value) -async def test_warns_when_workers_too_lot(client: Client, env: WorkflowEnvironment): +async def test_warns_when_workers_too_low(client: Client, env: WorkflowEnvironment): tuner = WorkerTuner.create_resource_based( target_memory_usage=0.5, target_cpu_usage=0.5, @@ -372,9 +391,63 @@ async def test_warns_when_workers_too_lot(client: Client, env: WorkflowEnvironme activity_executor=executor, ): pass + with concurrent.futures.ThreadPoolExecutor() as executor: + with pytest.warns( + UserWarning, + match="Worker max_concurrent_nexus_tasks is 500 but nexus_task_executor's max_workers is only", + ): + async with new_worker( + client, + WaitOnSignalWorkflow, + nexus_service_handlers=[NeverRunService()], + tuner=tuner, + nexus_task_executor=executor, + ): + pass + + +@nexusrpc.handler.service_handler +class SayHelloService: + @nexusrpc.handler.sync_operation + async def say_hello( + self, _ctx: nexusrpc.handler.StartOperationContext, name: str + ) -> str: + return f"Hello, {name}!" + + +@workflow.defn +class CustomSlotSupplierWorkflow: + def __init__(self) -> None: + self._last_signal = "" + + @workflow.run + async def run(self) -> None: + await workflow.wait_condition(lambda: self._last_signal == "finish") + await workflow.execute_activity( + say_hello, + "hi", + versioning_intent=VersioningIntent.DEFAULT, + start_to_close_timeout=timedelta(seconds=5), + ) + nexus_client = workflow.create_nexus_client( + endpoint=make_nexus_endpoint_name(workflow.info().task_queue), + service=SayHelloService, + ) + await nexus_client.execute_operation( + SayHelloService.say_hello, + "hi", + ) + + @workflow.signal + def my_signal(self, value: str) -> None: + self._last_signal = value + workflow.logger.info(f"Signal: {value}") async def test_custom_slot_supplier(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under Java test server") + class MyPermit(SlotPermit): def __init__(self, pnum: int): super().__init__() @@ -413,6 +486,8 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: self.seen_used_slot_kinds.add("a") elif isinstance(ctx.slot_info, LocalActivitySlotInfo): self.seen_used_slot_kinds.add("la") + elif isinstance(ctx.slot_info, NexusSlotInfo): + self.seen_used_slot_kinds.add("nx") self.used += 1 def release_slot(self, ctx: SlotReleaseContext) -> None: @@ -439,21 +514,26 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None: ss = MySlotSupplier() tuner = WorkerTuner.create_composite( - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss + workflow_supplier=ss, + activity_supplier=ss, + local_activity_supplier=ss, + nexus_supplier=ss, ) async with new_worker( client, - WaitOnSignalWorkflow, + CustomSlotSupplierWorkflow, activities=[say_hello], + nexus_service_handlers=[SayHelloService()], tuner=tuner, identity="myworker", ) as w: + await create_nexus_endpoint(w.task_queue, client) wf1 = await client.start_workflow( - WaitOnSignalWorkflow.run, + CustomSlotSupplierWorkflow.run, id=f"custom-slot-supplier-{uuid.uuid4()}", task_queue=w.task_queue, ) - await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish") + await wf1.signal(CustomSlotSupplierWorkflow.my_signal, "finish") await wf1.result() # We can't use reserve number directly because there is a technically possible race @@ -461,11 +541,10 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None: # This isn't solvable without redoing a chunk of pyo3-asyncio. So we only check # that the permits passed to release line up. assert ss.highest_seen_reserve_on_release == ss.releases - # Two workflow tasks, one activity - assert ss.used == 3 + assert ss.used == 5 assert ss.seen_sticky_kinds == {True, False} - assert ss.seen_slot_kinds == {"workflow", "activity", "local-activity"} - assert ss.seen_used_slot_kinds == {"wf", "a"} + assert ss.seen_slot_kinds == {"workflow", "activity", "local-activity", "nexus"} + assert ss.seen_used_slot_kinds == {"wf", "a", "nx"} assert ss.seen_release_info_empty assert ss.seen_release_info_nonempty @@ -501,7 +580,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None: ss = ThrowingSlotSupplier() tuner = WorkerTuner.create_composite( - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss + workflow_supplier=ss, + activity_supplier=ss, + local_activity_supplier=ss, + nexus_supplier=ss, ) async with new_worker( client, @@ -537,7 +619,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None: ss = BlockingSlotSupplier() tuner = WorkerTuner.create_composite( - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss + workflow_supplier=ss, + activity_supplier=ss, + local_activity_supplier=ss, + nexus_supplier=ss, ) async with new_worker( client, @@ -1134,6 +1219,7 @@ def create_worker( task_queue=f"task-queue-{uuid.uuid4()}", activities=[never_run_activity], workflows=[NeverRunWorkflow], + nexus_service_handlers=[NeverRunService()], on_fatal_error=on_fatal_error, )