forked from dapr/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactor.py
More file actions
154 lines (134 loc) · 6.44 KB
/
Copy pathactor.py
File metadata and controls
154 lines (134 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import Any, List, Optional, Type
from fastapi import APIRouter, FastAPI, Request, Response, status # type: ignore
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from dapr.actor import Actor, ActorRuntime
from dapr.clients.exceptions import ERROR_CODE_UNKNOWN, DaprInternalError
from dapr.serializers import DefaultJSONSerializer
DEFAULT_CONTENT_TYPE = 'application/json; utf-8'
DAPR_REENTRANCY_ID_HEADER = 'Dapr-Reentrancy-Id'
def _wrap_response(
status_code: int,
msg: Any,
error_code: Optional[str] = None,
content_type: Optional[str] = DEFAULT_CONTENT_TYPE,
):
resp = None
if isinstance(msg, str):
response_obj = {
'message': msg,
}
if not (status_code >= 200 and status_code < 300) and error_code:
response_obj['errorCode'] = error_code
resp = JSONResponse(content=response_obj, status_code=status_code)
elif isinstance(msg, bytes):
resp = Response(content=msg, media_type=content_type)
else:
resp = JSONResponse(content=msg, status_code=status_code)
return resp
class DaprActor(object):
def __init__(self, app: FastAPI, router_tags: Optional[List[str]] = ['Actor']):
# router_tags should be added to all magic Dapr Actor methods implemented here
self._router_tags = router_tags
self._router = APIRouter()
self._dapr_serializer = DefaultJSONSerializer()
self.init_routes(self._router)
app.include_router(self._router)
def init_routes(self, router: APIRouter):
@router.get('/healthz', tags=self._router_tags)
async def healthz():
return {'status': 'ok'}
@router.get('/dapr/config', tags=self._router_tags)
async def dapr_config():
serialized = self._dapr_serializer.serialize(ActorRuntime.get_actor_config())
return _wrap_response(status.HTTP_200_OK, serialized)
@router.delete('/actors/{actor_type_name}/{actor_id}', tags=self._router_tags)
async def actor_deactivation(actor_type_name: str, actor_id: str):
try:
await ActorRuntime.deactivate(actor_type_name, actor_id)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
)
msg = f'deactivated actor: {actor_type_name}.{actor_id}'
logger.debug(msg)
return _wrap_response(status.HTTP_200_OK, msg)
@router.put(
'/actors/{actor_type_name}/{actor_id}/method/{method_name}', tags=self._router_tags
)
async def actor_method(
actor_type_name: str, actor_id: str, method_name: str, request: Request
):
try:
# Read raw bytes from request stream
req_body = await request.body()
reentrancy_id = request.headers.get(DAPR_REENTRANCY_ID_HEADER)
result = await ActorRuntime.dispatch(
actor_type_name, actor_id, method_name, req_body, reentrancy_id
)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
)
msg = f'called method. actor: {actor_type_name}.{actor_id}, method: {method_name}'
logger.debug(msg)
return _wrap_response(status.HTTP_200_OK, result)
@router.put(
'/actors/{actor_type_name}/{actor_id}/method/timer/{timer_name}', tags=self._router_tags
)
async def actor_timer(
actor_type_name: str, actor_id: str, timer_name: str, request: Request
):
try:
# Read raw bytes from request stream
req_body = await request.body()
await ActorRuntime.fire_timer(actor_type_name, actor_id, timer_name, req_body)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
)
msg = f'called timer. actor: {actor_type_name}.{actor_id}, timer: {timer_name}'
logger.debug(msg)
return _wrap_response(status.HTTP_200_OK, msg)
@router.put(
'/actors/{actor_type_name}/{actor_id}/method/remind/{reminder_name}',
tags=self._router_tags,
)
async def actor_reminder(
actor_type_name: str, actor_id: str, reminder_name: str, request: Request
):
try:
# Read raw bytes from request stream
req_body = await request.body()
await ActorRuntime.fire_reminder(actor_type_name, actor_id, reminder_name, req_body)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
)
msg = f'called reminder. actor: {actor_type_name}.{actor_id}, reminder: {reminder_name}'
logger.debug(msg)
return _wrap_response(status.HTTP_200_OK, msg)
async def register_actor(self, actor: Type[Actor], **kwargs) -> None:
await ActorRuntime.register_actor(actor, **kwargs)
logger.debug(f'registered actor: {actor.__class__.__name__}')