Skip to content

Commit 79c84e1

Browse files
committed
fix: address rest of copilot feedback
Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent ed0c3fe commit 79c84e1

2 files changed

Lines changed: 23 additions & 6 deletions

File tree

durabletask/task.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def call_sub_orchestrator(
158158
"""
159159
pass
160160

161-
# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
161+
# TODO: Add a timeout parameter, which allows the task to be canceled if the event is
162162
# not received within the specified timeout. This requires support for task cancellation.
163163
@abstractmethod
164164
def wait_for_external_event(self, name: str) -> Task:
@@ -434,7 +434,22 @@ def compute_next_delay(self) -> Optional[timedelta]:
434434
else:
435435
backoff_coefficient = self._retry_policy.backoff_coefficient
436436

437-
if datetime.utcnow() < retry_expiration:
437+
# Compute a deterministic "logical now" based on start time and accumulated delays,
438+
# rather than wall-clock time, to avoid non-determinism during replay.
439+
total_elapsed_seconds = 0.0
440+
for i in range(1, self._attempt_count):
441+
attempt_delay = (
442+
math.pow(backoff_coefficient, i - 1)
443+
* self._retry_policy.first_retry_interval.total_seconds()
444+
)
445+
if self._retry_policy.max_retry_interval is not None:
446+
attempt_delay = min(
447+
attempt_delay,
448+
self._retry_policy.max_retry_interval.total_seconds(),
449+
)
450+
total_elapsed_seconds += attempt_delay
451+
logical_now = self._start_time + timedelta(seconds=total_elapsed_seconds)
452+
if logical_now < retry_expiration:
438453
next_delay_f = (
439454
math.pow(backoff_coefficient, self._attempt_count - 1)
440455
* self._retry_policy.first_retry_interval.total_seconds()

durabletask_tests/durabletask/test_orchestration_e2e_async.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,10 @@ def orchestrator(ctx: task.OrchestrationContext, _):
293293

294294

295295
async def test_terminate_recursive():
296+
child_instance_id = 'child-instance-id'
297+
296298
def root(ctx: task.OrchestrationContext, _):
297-
result = yield ctx.call_sub_orchestrator(child)
299+
result = yield ctx.call_sub_orchestrator(child, instance_id=child_instance_id)
298300
return result
299301

300302
def child(ctx: task.OrchestrationContext, _):
@@ -320,9 +322,9 @@ def child(ctx: task.OrchestrationContext, _):
320322
assert state.runtime_status == OrchestrationStatus.TERMINATED
321323

322324
# Verify that child orchestration is also terminated
323-
await client.wait_for_orchestration_completion(id, timeout=30)
324-
assert state is not None
325-
assert state.runtime_status == OrchestrationStatus.TERMINATED
325+
child_state = await client.wait_for_orchestration_completion(child_instance_id, timeout=30)
326+
assert child_state is not None
327+
assert child_state.runtime_status == OrchestrationStatus.TERMINATED
326328

327329
await client.purge_orchestration(id)
328330
state = await client.get_orchestration_state(id)

0 commit comments

Comments
 (0)