forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentity_state_shim.py
More file actions
68 lines (50 loc) · 2.27 KB
/
Copy pathentity_state_shim.py
File metadata and controls
68 lines (50 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from typing import Any, TypeVar, overload
import durabletask.internal.orchestrator_service_pb2 as pb
TState = TypeVar("TState")
class StateShim:
def __init__(self, start_state: Any):
self._current_state: Any = start_state
self._checkpoint_state: Any = start_state
self._operation_actions: list[pb.OperationAction] = []
self._actions_checkpoint_state: int = 0
@overload
def get_state(self, intended_type: type[TState], default: TState) -> TState:
...
@overload
def get_state(self, intended_type: type[TState]) -> TState | None:
...
@overload
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
...
def get_state(self, intended_type: type[TState] | None = None, default: TState | None = None) -> TState | Any | None:
if self._current_state is None and default is not None:
return default
if intended_type is None:
return self._current_state
if isinstance(self._current_state, intended_type):
return self._current_state
try:
return intended_type(self._current_state) # type: ignore[call-arg]
except Exception as ex:
raise TypeError(
f"Could not convert state of type '{type(self._current_state).__name__}' to '{intended_type.__name__}'"
) from ex
def set_state(self, state: Any) -> None:
self._current_state = state
def add_operation_action(self, action: pb.OperationAction) -> None:
self._operation_actions.append(action)
def get_operation_actions(self) -> list[pb.OperationAction]:
return self._operation_actions[:self._actions_checkpoint_state]
def commit(self) -> None:
self._checkpoint_state = self._current_state
self._actions_checkpoint_state = len(self._operation_actions)
def rollback(self) -> None:
self._current_state = self._checkpoint_state
self._operation_actions = self._operation_actions[:self._actions_checkpoint_state]
def reset(self) -> None:
self._current_state = None
self._checkpoint_state = None
self._operation_actions = []
self._actions_checkpoint_state = 0