-
Notifications
You must be signed in to change notification settings - Fork 64
Expand file tree
/
Copy pathentity.py
More file actions
129 lines (103 loc) · 4.41 KB
/
Copy pathentity.py
File metadata and controls
129 lines (103 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime, timezone
from typing import Callable, Any, List, Dict
class InternalEntityException(Exception):
"""Framework-internal Exception class (for internal use only)."""
pass
class Entity:
"""Durable Entity Class.
Responsible for executing the user-defined entity function.
"""
def __init__(self, entity_func: Callable[[DurableEntityContext], None]):
"""Create a new entity for the user-defined entity.
Responsible for executing the user-defined entity function
Parameters
----------
entity_func: Callable[[DurableEntityContext], Generator[Any, Any, Any]]
The user defined entity function
"""
self.fn: Callable[[DurableEntityContext], None] = entity_func
def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> str:
"""Handle the execution of the user-defined entity function.
Loops over the batch, which serves to specify inputs to the entity,
and collects results and generates a final state, which are returned.
Parameters
----------
context: DurableEntityContext
The entity context of the entity, which the user interacts with as their Durable API
Returns
-------
str
A JSON-formatted string representing the output state, results, and exceptions for the
entity execution.
"""
response = EntityState(results=[], signals=[])
for operation_data in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now(timezone.utc)
try:
# populate context
operation = operation_data["name"]
if operation is None:
message = "Durable Functions Internal Error:"\
"Entity operation was missing a name field"
raise InternalEntityException(message)
context._operation = operation
context._input = operation_data["input"]
self.fn(context)
result = context._result
except InternalEntityException as e:
raise e
except Exception as e:
is_error = True
result = str(e)
duration: int = self._elapsed_milliseconds_since(start_time)
operation_result = OperationResult(
is_error=is_error,
duration=duration,
execution_start_time_ms=int(start_time.timestamp() * 1000),
result=result
)
response.results.append(operation_result)
response.state = context._state
response.entity_exists = context._exists
return response.to_json_string()
@classmethod
def create(cls, fn: Callable[[DurableEntityContext], None]) -> Callable[[Any], str]:
"""Create an instance of the entity class.
Parameters
----------
fn (Callable[[DurableEntityContext], None]): [description]
Returns
-------
Callable[[Any], str]
Handle function of the newly created entity client
"""
def handle(context) -> str:
# It is not clear when the context JSON would be found
# inside a "body"-key, but this pattern matches the
# orchestrator implementation, so we keep it for safety.
context_body = getattr(context, "body", None)
if context_body is None:
context_body = context
ctx, batch = DurableEntityContext.from_json(context_body)
return Entity(fn).handle(ctx, batch)
handle.entity_function = fn
return handle
def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
"""Calculate the elapsed time, in milliseconds, from the start_time to the present.
Parameters
----------
start_time: datetime
The timestamp of when the entity began processing a batched request.
Returns
-------
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now(timezone.utc)
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time