Skip to content

Commit 5709bcd

Browse files
committed
Cleanup
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent 73add2e commit 5709bcd

2 files changed

Lines changed: 24 additions & 22 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ def __init__(
352352
self._interceptors = None
353353

354354
self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger)
355+
self._activity_executor = _ActivityExecutor(self._logger)
355356

356357
@property
357358
def concurrency_options(self) -> ConcurrencyOptions:
@@ -1084,8 +1085,7 @@ def _execute_activity(
10841085
instance_id = req.workflowInstance.instanceId
10851086
with self._activity_span(req, instance_id):
10861087
try:
1087-
executor = _ActivityExecutor(self._logger)
1088-
result = executor.execute(
1088+
result = self._activity_executor.execute(
10891089
fn,
10901090
instance_id,
10911091
req.name,
@@ -1114,8 +1114,7 @@ async def _execute_activity_async(
11141114
instance_id = req.workflowInstance.instanceId
11151115
with self._activity_span(req, instance_id):
11161116
try:
1117-
executor = _ActivityExecutor(self._logger)
1118-
result = await executor.execute_async(
1117+
result = await self._activity_executor.execute_async(
11191118
fn,
11201119
instance_id,
11211120
req.name,
@@ -1143,10 +1142,11 @@ async def _execute_activity_async(
11431142
instance_id,
11441143
)
11451144
except RuntimeError as exc:
1146-
# Only swallow if the manager thread pool was shut down (worker is tearing
1147-
# down). The sidecar will re-dispatch the work item once the worker reconnects.
1148-
# Other RuntimeErrors are unexpected and bubble up to the work-item processor.
1149-
if not self._async_worker_manager._shutdown:
1145+
# Swallow only when the thread pool itself is shut down (worker tearing down).
1146+
# Other RuntimeErrors are unexpected and propagate to the work-item processor.
1147+
# The sidecar will re-dispatch this work item once the worker reconnects.
1148+
pool = self._async_worker_manager.thread_pool
1149+
if not getattr(pool, '_shutdown', False):
11501150
raise
11511151
self._logger.warning(
11521152
f"Could not deliver activity response for '{req.name}#{req.taskId}': "

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,34 @@ def _make_activity_wrapper(fn: Activity, logger: Logger) -> ActivityWrapper:
6868
"""
6969
accepts_input, input_model = _model_protocol.resolve_input(fn)
7070

71+
def _call_args(ctx: task.ActivityContext, inp: object | None) -> tuple:
72+
wf_ctx = WorkflowActivityContext(ctx)
73+
if not accepts_input:
74+
return (wf_ctx,)
75+
return (wf_ctx, _coerce_activity_input(inp, input_model))
76+
77+
def _log_failure(ctx: task.ActivityContext, exc: Exception) -> None:
78+
activity_id = getattr(ctx, 'task_id', 'unknown')
79+
logger.warning(f'Activity execution failed - task_id: {activity_id}, error: {exc}')
80+
7181
if _is_async_callable(fn):
7282

7383
async def async_activity_wrapper(
7484
ctx: task.ActivityContext, inp: object | None = None
7585
) -> object:
76-
activity_id = getattr(ctx, 'task_id', 'unknown')
7786
try:
78-
wf_ctx = WorkflowActivityContext(ctx)
79-
if not accepts_input:
80-
return await fn(wf_ctx)
81-
return await fn(wf_ctx, _coerce_activity_input(inp, input_model))
82-
except Exception as e:
83-
logger.warning(f'Activity execution failed - task_id: {activity_id}, error: {e}')
87+
return await fn(*_call_args(ctx, inp))
88+
except Exception as exc:
89+
_log_failure(ctx, exc)
8490
raise
8591

8692
return async_activity_wrapper
8793

8894
def sync_activity_wrapper(ctx: task.ActivityContext, inp: object | None = None) -> object:
89-
activity_id = getattr(ctx, 'task_id', 'unknown')
9095
try:
91-
wf_ctx = WorkflowActivityContext(ctx)
92-
if not accepts_input:
93-
return fn(wf_ctx)
94-
return fn(wf_ctx, _coerce_activity_input(inp, input_model))
95-
except Exception as e:
96-
logger.warning(f'Activity execution failed - task_id: {activity_id}, error: {e}')
96+
return fn(*_call_args(ctx, inp))
97+
except Exception as exc:
98+
_log_failure(ctx, exc)
9799
raise
98100

99101
return sync_activity_wrapper

0 commit comments

Comments
 (0)