Skip to content

Commit d9622c6

Browse files
authored
Detect is_replaying using history (#598)
* Detect is_replaying using history
1 parent f5e6fbb commit d9622c6

2 files changed

Lines changed: 214 additions & 5 deletions

File tree

azure/durable_functions/models/TaskOrchestrationExecutor.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ def initialize(self):
4747
self.exception: Optional[Exception] = None
4848
self.orchestrator_returned: bool = False
4949

50+
# History-based replay detection: tracks whether we are currently
51+
# processing old (replayed) events or new events in the current episode.
52+
# This is used as an additional signal for backends that don't set IsPlayed.
53+
self._is_processing_new_events: bool = False
54+
5055
def execute(self, context: DurableOrchestrationContext,
5156
history: List[HistoryEvent], fn) -> str:
5257
"""Execute an orchestration via its history to evaluate Tasks and replay events.
@@ -80,16 +85,28 @@ def execute(self, context: DurableOrchestrationContext,
8085
+ "https://github.com/Azure/azure-functions-durable-python/issues."
8186
raise Exception(err_message)
8287

83-
# Set initial is_replaing state.
88+
# Pre-scan history to find the start of new (non-replayed) events.
89+
# The last OrchestratorStarted event marks the boundary: events before
90+
# it are old/replayed, events from it onwards are new.
91+
# This provides a reliable replay signal for backends that don't set IsPlayed.
92+
self._new_events_start_index = self._find_new_events_start_index(history)
93+
94+
# Set initial is_replaying state.
95+
# Combine the is_played field with the history-based signal:
96+
# we are replaying if is_played says so OR if we haven't reached new events yet.
8497
execution_started_event = history[1]
85-
self.current_task.is_played = execution_started_event.is_played
98+
history_is_replaying = self._new_events_start_index > 0
99+
self.current_task.is_played = execution_started_event.is_played or history_is_replaying
86100

87101
# If user code is a generator, then it uses `yield` statements (the DF API)
88102
# and so we iterate through the DF history, generating tasks and populating
89103
# them with values when the history provides them
90104
if isinstance(evaluated_user_code, GeneratorType):
91105
self.generator = evaluated_user_code
92-
for event in history:
106+
for index, event in enumerate(history):
107+
# Update whether we've crossed into the new events portion of the history.
108+
if index >= self._new_events_start_index:
109+
self._is_processing_new_events = True
93110
self.process_event(event)
94111
if self.has_execution_completed:
95112
break
@@ -209,8 +226,12 @@ def parse_history_event(directive_result):
209226
# generate exception
210227
new_value = Exception(f"{event.Reason} \n {event.Details}")
211228

212-
# with a yielded task now evaluated, we can try to resume the user code
213-
task.set_is_played(event._is_played)
229+
# With a yielded task now evaluated, we can try to resume the user code.
230+
# Combine the event's is_played field with the history-based signal:
231+
# a task is considered "played" (replayed) if either is_played is set
232+
# OR we are still processing old events (not yet in the new events section).
233+
is_played = event._is_played or not self._is_processing_new_events
234+
task.set_is_played(is_played)
214235
task.set_value(is_error=not is_success, value=new_value)
215236

216237
def resume_user_code(self):
@@ -254,6 +275,31 @@ def resume_user_code(self):
254275
# until a new/not-previously-yielded task is encountered
255276
self.resume_user_code()
256277

278+
def _find_new_events_start_index(self, history: List[HistoryEvent]) -> int:
279+
"""Find the index in history where new (non-replayed) events begin.
280+
281+
The history is structured in episodes delimited by OrchestratorStarted
282+
and OrchestratorCompleted events. The last OrchestratorStarted event
283+
(which has no matching OrchestratorCompleted after it) marks the start
284+
of the current episode containing new events.
285+
286+
Parameters
287+
----------
288+
history : List[HistoryEvent]
289+
The orchestration history.
290+
291+
Returns
292+
-------
293+
int
294+
The index of the last OrchestratorStarted event, which is the
295+
boundary between old (replayed) and new events.
296+
"""
297+
last_orchestrator_started_index = -1
298+
for i, event in enumerate(history):
299+
if event.event_type == HistoryEventType.ORCHESTRATOR_STARTED:
300+
last_orchestrator_started_index = i
301+
return last_orchestrator_started_index
302+
257303
def _mark_as_scheduled(self, task: TaskBase):
258304
if isinstance(task, CompoundTask):
259305
for task in task.children:
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""Tests that the is_replaying flag is correctly determined from event history structure
2+
alone, without relying on the is_played field. This covers Durable backends that never
3+
set IsPlayed on history events.
4+
"""
5+
6+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
7+
from tests.test_utils.ContextBuilder import ContextBuilder
8+
from .orchestrator_test_utils import get_orchestration_property
9+
from azure.durable_functions.models.OrchestratorState import OrchestratorState
10+
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
11+
from datetime import datetime, timedelta, timezone
12+
13+
14+
def generator_function(context):
15+
"""Orchestrator that creates 3 sequential timers."""
16+
timestamp = "2020-07-23T21:56:54.936700Z"
17+
deadline = datetime.strptime(timestamp, DATETIME_STRING_FORMAT)
18+
deadline = deadline.replace(tzinfo=timezone.utc)
19+
20+
for _ in range(0, 3):
21+
deadline = deadline + timedelta(seconds=30)
22+
yield context.create_timer(deadline)
23+
24+
25+
def add_timer_fired_events_without_is_played(context_builder: ContextBuilder, id_: int, timestamp: str):
26+
"""Add a complete timer episode without setting is_played (always False).
27+
28+
Adds: TimerCreated, OrchestratorCompleted, OrchestratorStarted, TimerFired.
29+
This simulates a backend that never sets IsPlayed.
30+
"""
31+
fire_at: str = context_builder.add_timer_created_event(id_, timestamp)
32+
context_builder.add_orchestrator_completed_event()
33+
context_builder.add_orchestrator_started_event()
34+
context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at, is_played=False)
35+
36+
37+
def add_activity_completed_events_without_is_played(
38+
context_builder: ContextBuilder, name: str, id_: int, result: str):
39+
"""Add a complete activity episode without setting is_played (always False).
40+
41+
Adds: TaskScheduled, OrchestratorCompleted, OrchestratorStarted, TaskCompleted.
42+
This simulates a backend that never sets IsPlayed.
43+
"""
44+
context_builder.add_task_scheduled_event(name, id_)
45+
context_builder.add_orchestrator_completed_event()
46+
context_builder.add_orchestrator_started_event()
47+
context_builder.add_task_completed_event(id_=id_, result=result, is_played=False)
48+
49+
# ---------- Tests verifying replaying=True for mid-replay tasks ----------
50+
51+
class IsReplayingTracker:
52+
"""Tracks is_replaying values observed at each yield point during orchestration."""
53+
54+
def __init__(self):
55+
self.values_at_yield = []
56+
57+
58+
def generator_function_tracking_replay(context):
59+
"""Orchestrator that records is_replaying at each yield point."""
60+
tracker = context._tracker
61+
62+
result1 = yield context.call_activity("Hello", "Tokyo")
63+
tracker.values_at_yield.append(context.is_replaying)
64+
65+
result2 = yield context.call_activity("Hello", "Seattle")
66+
tracker.values_at_yield.append(context.is_replaying)
67+
68+
result3 = yield context.call_activity("Hello", "London")
69+
tracker.values_at_yield.append(context.is_replaying)
70+
71+
return [result1, result2, result3]
72+
73+
74+
def test_hello_cities_is_replaying_mid_execution_without_is_played():
75+
"""Verify that is_replaying is True for old events and False for new events,
76+
even when is_played is never set.
77+
"""
78+
tracker = IsReplayingTracker()
79+
80+
context_builder = ContextBuilder("", is_replaying=False)
81+
add_activity_completed_events_without_is_played(context_builder, "Hello", 0, '"Hello Tokyo!"')
82+
add_activity_completed_events_without_is_played(context_builder, "Hello", 1, '"Hello Seattle!"')
83+
84+
context_as_string = context_builder.to_json_string()
85+
86+
from azure.durable_functions.models import DurableOrchestrationContext
87+
from azure.durable_functions.orchestrator import Orchestrator
88+
89+
context = DurableOrchestrationContext.from_json(context_as_string)
90+
context._tracker = tracker # type: ignore
91+
92+
orchestrator = Orchestrator(generator_function_tracking_replay)
93+
orchestrator.handle(context)
94+
95+
# After first activity (old episode): replaying
96+
assert tracker.values_at_yield[0] == True
97+
# After second activity (new episode): not replaying
98+
assert tracker.values_at_yield[1] == False
99+
100+
101+
def test_hello_cities_is_replaying_completed_without_is_played():
102+
"""Verify intermediate is_replaying states when all three activities are completed.
103+
"""
104+
tracker = IsReplayingTracker()
105+
106+
context_builder = ContextBuilder("", is_replaying=False)
107+
add_activity_completed_events_without_is_played(context_builder, "Hello", 0, '"Hello Tokyo!"')
108+
add_activity_completed_events_without_is_played(context_builder, "Hello", 1, '"Hello Seattle!"')
109+
add_activity_completed_events_without_is_played(context_builder, "Hello", 2, '"Hello London!"')
110+
111+
context_as_string = context_builder.to_json_string()
112+
113+
from azure.durable_functions.models import DurableOrchestrationContext
114+
from azure.durable_functions.orchestrator import Orchestrator
115+
116+
context = DurableOrchestrationContext.from_json(context_as_string)
117+
context._tracker = tracker # type: ignore
118+
119+
orchestrator = Orchestrator(generator_function_tracking_replay)
120+
orchestrator.handle(context)
121+
122+
# After first activity (old episode): replaying
123+
assert tracker.values_at_yield[0] == True
124+
# After second activity (old episode): replaying
125+
assert tracker.values_at_yield[1] == True
126+
# After third activity (new episode): not replaying
127+
assert tracker.values_at_yield[2] == False
128+
129+
130+
def test_is_played_does_not_affect_is_replaying_behavior():
131+
"""Verify that history-based detection produces the same is_replaying result
132+
as the is_played-based detection for one replayed timer event.
133+
"""
134+
timestamp = "2020-07-23T21:56:54.9367Z"
135+
fire_at_1 = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + timedelta(seconds=30)
136+
fire_at_str_1 = fire_at_1.strftime(DATETIME_STRING_FORMAT)
137+
fire_at_2 = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + timedelta(seconds=60)
138+
fire_at_str_2 = fire_at_2.strftime(DATETIME_STRING_FORMAT)
139+
140+
# Traditional backend: sets is_played=True on old events, False on new events
141+
traditional_backend_ctx = ContextBuilder("")
142+
scheduled_fire_at_1: str = traditional_backend_ctx.add_timer_created_event(0, fire_at_str_1)
143+
traditional_backend_ctx.add_orchestrator_completed_event()
144+
traditional_backend_ctx.add_orchestrator_started_event()
145+
traditional_backend_ctx.add_timer_fired_event(id_=0, fire_at=scheduled_fire_at_1, is_played=True)
146+
scheduled_fire_at_2: str = traditional_backend_ctx.add_timer_created_event(1, fire_at_str_2)
147+
traditional_backend_ctx.add_orchestrator_completed_event()
148+
traditional_backend_ctx.add_orchestrator_started_event()
149+
traditional_backend_ctx.add_timer_fired_event(id_=1, fire_at=scheduled_fire_at_2, is_played=False)
150+
151+
result_traditional = get_orchestration_property(
152+
traditional_backend_ctx, generator_function, "durable_context")
153+
154+
# Backend that never sets is_played (always False), relies on history structure
155+
history_based_ctx = ContextBuilder("", is_replaying=False)
156+
add_timer_fired_events_without_is_played(history_based_ctx, 0, fire_at_str_1)
157+
add_timer_fired_events_without_is_played(history_based_ctx, 1, fire_at_str_2)
158+
159+
result_history_based = get_orchestration_property(
160+
history_based_ctx, generator_function, "durable_context")
161+
162+
# Both approaches should agree on the final is_replaying state
163+
assert result_traditional.is_replaying == result_history_based.is_replaying == False

0 commit comments

Comments
 (0)