-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathexecution.py
More file actions
368 lines (310 loc) · 12.9 KB
/
execution.py
File metadata and controls
368 lines (310 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""Execution components for Saga transactions."""
import copy
import dataclasses
import logging
import typing
from cqrs.container.protocol import Container
from cqrs.saga.fallback import Fallback
from cqrs.saga.models import ContextT, SagaContext
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.enums import SagaStepStatus
from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun
logger = logging.getLogger("cqrs.saga")
class SagaStateManager:
"""Manages saga state in storage."""
def __init__(
self,
saga_id: typing.Any,
storage: ISagaStorage | SagaStorageRun,
) -> None:
"""
Create a SagaStateManager bound to a specific saga identifier and storage backend.
Parameters:
saga_id: Identifier for the saga instance.
storage: Storage backend implementing ISagaStorage or SagaStorageRun used to persist saga state and history.
"""
self._saga_id = saga_id
self._storage = storage
async def create_saga(
self,
saga_name: str,
context: SagaContext,
) -> None:
"""Create a new saga in storage."""
await self._storage.create_saga(
self._saga_id,
saga_name,
context.to_dict(),
)
async def update_status(self, status: typing.Any) -> None:
"""Update saga status."""
await self._storage.update_status(self._saga_id, status)
async def update_context(self, context: SagaContext) -> None:
"""Update saga context."""
await self._storage.update_context(
self._saga_id,
context.to_dict(),
)
async def log_step(
self,
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
error: str | None = None,
) -> None:
"""Log step execution."""
await self._storage.log_step(
self._saga_id,
step_name,
action,
status,
details=error,
)
class SagaRecoveryManager:
"""Manages saga recovery from storage."""
def __init__(
self,
saga_id: typing.Any,
storage: ISagaStorage | SagaStorageRun,
container: Container,
saga_steps: list[type[SagaStepHandler] | Fallback],
) -> None:
"""
Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state.
Parameters:
saga_id: Identifier for the saga instance (e.g., UUID or other unique value).
storage: Persistence backend implementing saga history operations (ISagaStorage or SagaStorageRun).
container: Dependency injection container used to resolve step handler instances.
saga_steps: Ordered list of saga step types or Fallback wrappers that define the saga's execution sequence.
"""
self._saga_id = saga_id
self._storage = storage
self._container = container
self._saga_steps = saga_steps
async def load_completed_step_names(self) -> set[str]:
"""
Return the names of saga steps that completed their primary ("act") action.
Returns:
set[str]: Step names recorded with status `SagaStepStatus.COMPLETED` and action `"act"`.
"""
history = await self._storage.get_step_history(self._saga_id)
return {e.step_name for e in history if e.status == SagaStepStatus.COMPLETED and e.action == "act"}
async def reconstruct_completed_steps(
self,
completed_step_names: set[str],
) -> list[SagaStepHandler[SagaContext, typing.Any]]:
"""
Reconstructs and returns the resolved step handler instances corresponding to the completed steps, preserving saga execution order.
Parameters:
completed_step_names (set[str]): Names of steps that completed the "act" action.
Returns:
list[SagaStepHandler[SagaContext, typing.Any]]: Resolved step handler instances in execution order. For Fallback wrappers, the primary handler is chosen if its name appears in completed_step_names; otherwise the fallback handler is chosen when present.
"""
completed_steps: list[SagaStepHandler[SagaContext, typing.Any]] = []
for step_item in self._saga_steps:
# Handle Fallback wrapper
if isinstance(step_item, Fallback):
# Check both primary and fallback step names
primary_name = step_item.step.__name__
fallback_name = step_item.fallback.__name__
if primary_name in completed_step_names:
step = await self._container.resolve(step_item.step)
completed_steps.append(step)
elif fallback_name in completed_step_names:
step = await self._container.resolve(step_item.fallback)
completed_steps.append(step)
else:
# Regular step
step_name = step_item.__name__
if step_name in completed_step_names:
step = await self._container.resolve(step_item)
completed_steps.append(step)
return completed_steps
class SagaStepExecutor(typing.Generic[ContextT]):
"""Executes regular saga steps."""
def __init__(
self,
context: ContextT,
container: Container,
state_manager: SagaStateManager,
) -> None:
"""
Initialize step executor.
Args:
context: Saga context
container: DI container for resolving step handlers
state_manager: State manager for logging and updates
"""
self._context = context
self._container = container
self._state_manager = state_manager
async def execute_step(
self,
step_type: type[SagaStepHandler],
step_name: str,
) -> SagaStepResult[ContextT, typing.Any]:
"""
Execute a regular saga step.
Args:
step_type: Type of the step handler
step_name: Name of the step (for logging)
Returns:
Result of step execution
"""
# Resolve step handler from DI container
step = await self._container.resolve(step_type)
# Log step start
await self._state_manager.log_step(
step_name,
"act",
SagaStepStatus.STARTED,
)
# Execute step
step_result = await step.act(self._context)
# Update context and log completion
await self._state_manager.update_context(self._context)
await self._state_manager.log_step(
step_name,
"act",
SagaStepStatus.COMPLETED,
)
return step_result
class FallbackStepExecutor(typing.Generic[ContextT]):
"""Executes Fallback wrapper steps with context snapshot/restore."""
def __init__(
self,
context: ContextT,
container: Container,
state_manager: SagaStateManager,
) -> None:
"""
Initialize fallback step executor.
Args:
context: Saga context
container: DI container for resolving step handlers
state_manager: State manager for logging and updates
"""
self._context = context
self._container = container
self._state_manager = state_manager
async def execute_fallback_step(
self,
fallback_wrapper: Fallback,
completed_step_names: set[str],
) -> tuple[SagaStepResult[ContextT, typing.Any] | None, SagaStepHandler | None]:
"""
Execute a Fallback step with context snapshot/restore mechanism.
Args:
fallback_wrapper: The Fallback instance containing step and fallback
completed_step_names: Set of completed step names for idempotency check
Returns:
Step result if executed, None if skipped (already completed)
Raises:
Exception: If both primary and fallback steps fail
"""
primary_step_name = fallback_wrapper.step.__name__
fallback_step_name = fallback_wrapper.fallback.__name__
# Idempotency: Check if either primary or fallback is already completed
if primary_step_name in completed_step_names:
logger.debug(
f"Skipping already completed Fallback primary step: {primary_step_name}",
)
return None, None
if fallback_step_name in completed_step_names:
logger.debug(
f"Skipping already completed Fallback fallback step: {fallback_step_name}",
)
return None, None
# Resolve step handlers
primary_step = await self._container.resolve(fallback_wrapper.step)
fallback_step = await self._container.resolve(fallback_wrapper.fallback)
# Create context snapshot before executing primary step
context_snapshot = copy.deepcopy(self._context.to_dict())
# Try to execute primary step
try:
await self._state_manager.log_step(
primary_step_name,
"act",
SagaStepStatus.STARTED,
)
# Execute primary step with circuit breaker if present
if fallback_wrapper.circuit_breaker is not None:
step_result = await fallback_wrapper.circuit_breaker.call(
step_type=fallback_wrapper.step,
func=primary_step.act,
context=self._context,
)
else:
step_result = await primary_step.act(self._context)
# Primary step succeeded
await self._state_manager.update_context(self._context)
await self._state_manager.log_step(
primary_step_name,
"act",
SagaStepStatus.COMPLETED,
)
return step_result, primary_step
except Exception as primary_error:
should_fallback = False
# 1. Check Circuit Breaker
if (
fallback_wrapper.circuit_breaker is not None
and fallback_wrapper.circuit_breaker.is_circuit_breaker_error(
primary_error,
)
):
logger.warning(
f"Circuit breaker open for step '{primary_step_name}'. "
f"Switching to fallback '{fallback_step_name}'.",
)
should_fallback = True
# 2. Check failure_exceptions if defined
elif fallback_wrapper.failure_exceptions:
if isinstance(primary_error, fallback_wrapper.failure_exceptions):
should_fallback = True
# 3. If no specific exceptions defined, catch all
else:
should_fallback = True
if should_fallback:
# Log warning but DO NOT log FAILED status for primary step
logger.warning(
f"Primary step '{primary_step_name}' failed: {primary_error}. "
f"Switching to fallback '{fallback_step_name}'.",
)
# Restore context from snapshot
restored_context = self._context.__class__.from_dict(context_snapshot)
# Copy all fields from restored context to the existing one
for field in dataclasses.fields(self._context):
setattr(
self._context,
field.name,
getattr(restored_context, field.name),
)
# Execute fallback step
try:
await self._state_manager.log_step(
fallback_step_name,
"act",
SagaStepStatus.STARTED,
)
step_result = await fallback_step.act(self._context)
# Fallback succeeded
await self._state_manager.update_context(self._context)
await self._state_manager.log_step(
fallback_step_name,
"act",
SagaStepStatus.COMPLETED,
)
return step_result, fallback_step
except Exception as fallback_error:
# Fallback also failed - saga fails
await self._state_manager.log_step(
fallback_step_name,
"act",
SagaStepStatus.FAILED,
str(fallback_error),
)
raise fallback_error
else:
# Should not fallback, re-raise original error
raise primary_error