Skip to content

Commit de55623

Browse files
committed
GRPC Actors
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 51d6e93 commit de55623

22 files changed

Lines changed: 2647 additions & 64 deletions

dapr/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
from dapr.actor.id import ActorId
1919
from dapr.actor.runtime.actor import Actor
2020
from dapr.actor.runtime.failure_policy import ActorReminderFailurePolicy
21+
from dapr.actor.runtime.grpc_host import ActorGrpcHost
2122
from dapr.actor.runtime.remindable import Remindable
2223
from dapr.actor.runtime.runtime import ActorRuntime
2324

2425
__all__ = [
2526
'ActorInterface',
27+
'ActorGrpcHost',
2628
'ActorProxy',
2729
'ActorProxyFactory',
2830
'ActorId',
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2026 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
import base64
17+
import json
18+
from typing import Any, Dict, Mapping, Optional
19+
20+
from google.protobuf import any_pb2, wrappers_pb2
21+
from grpc import StatusCode # type: ignore[attr-defined]
22+
23+
from dapr.actor.runtime.config import (
24+
ActorReentrancyConfig,
25+
ActorRuntimeConfig,
26+
ActorTypeConfig,
27+
)
28+
from dapr.clients.base import DAPR_REENTRANCY_ID_HEADER
29+
from dapr.clients.exceptions import ERROR_CODE_UNKNOWN, DaprInternalError
30+
from dapr.proto import api_v1
31+
32+
CONTENT_TYPE_HEADER = 'content-type'
33+
JSON_CONTENT_TYPE = 'application/json'
34+
35+
36+
def build_initial_request(
37+
config: ActorRuntimeConfig,
38+
) -> api_v1.SubscribeActorEventsRequestInitialAlpha1:
39+
"""Builds the stream registration message from the actor runtime config.
40+
41+
This is the gRPC equivalent of the JSON payload served by the HTTP
42+
``GET /dapr/config`` endpoint. ``actor_scan_interval`` and
43+
``reminders_storage_partitions`` have no counterpart in the proto
44+
contract and are not transmitted.
45+
"""
46+
config_entities = {type_config.actor_type for type_config in config.actor_type_configs}
47+
entities = sorted(config.entities | config_entities)
48+
49+
initial_request = api_v1.SubscribeActorEventsRequestInitialAlpha1(entities=entities)
50+
51+
if config.actor_idle_timeout is not None:
52+
initial_request.actor_idle_timeout.FromTimedelta(config.actor_idle_timeout)
53+
if config.drain_ongoing_call_timeout is not None:
54+
initial_request.drain_ongoing_call_timeout.FromTimedelta(config.drain_ongoing_call_timeout)
55+
if config.drain_rebalanced_actors is not None:
56+
initial_request.drain_rebalanced_actors = config.drain_rebalanced_actors
57+
if config.reentrancy is not None:
58+
initial_request.reentrancy.CopyFrom(_reentrancy_to_proto(config.reentrancy))
59+
60+
for type_config in config.actor_type_configs:
61+
initial_request.entities_config.append(_entity_config_to_proto(type_config))
62+
63+
return initial_request
64+
65+
66+
def _reentrancy_to_proto(reentrancy: ActorReentrancyConfig) -> api_v1.ActorReentrancyConfig:
67+
return api_v1.ActorReentrancyConfig(
68+
enabled=reentrancy.enabled,
69+
max_stack_depth=reentrancy.max_stack_depth,
70+
)
71+
72+
73+
def _entity_config_to_proto(type_config: ActorTypeConfig) -> api_v1.ActorEntityConfig:
74+
proto_config = api_v1.ActorEntityConfig(entities=[type_config.actor_type])
75+
76+
if type_config.actor_idle_timeout is not None:
77+
proto_config.actor_idle_timeout.FromTimedelta(type_config.actor_idle_timeout)
78+
if type_config.drain_ongoing_call_timeout is not None:
79+
proto_config.drain_ongoing_call_timeout.FromTimedelta(
80+
type_config.drain_ongoing_call_timeout
81+
)
82+
if type_config.drain_rebalanced_actors is not None:
83+
proto_config.drain_rebalanced_actors = type_config.drain_rebalanced_actors
84+
if type_config.reentrancy is not None:
85+
proto_config.reentrancy.CopyFrom(_reentrancy_to_proto(type_config.reentrancy))
86+
87+
return proto_config
88+
89+
90+
def extract_reentrancy_id(metadata: Mapping[str, str]) -> Optional[str]:
91+
"""Looks up the reentrancy id header case-insensitively in callback metadata."""
92+
for key, value in metadata.items():
93+
if key.lower() == DAPR_REENTRANCY_ID_HEADER.lower():
94+
return value
95+
return None
96+
97+
98+
def build_reminder_fire_body(
99+
reminder_request: api_v1.SubscribeActorEventsResponseReminderRequestAlpha1,
100+
) -> bytes:
101+
"""Synthesizes the JSON body ``ActorRuntime.fire_reminder`` expects.
102+
103+
Over HTTP, daprd delivers reminder fires as a JSON object with the
104+
registered data embedded verbatim as the ``data`` value (for SDK-registered
105+
reminders, a base64 string). The stream carries the same payload inside a
106+
``google.protobuf.Any``, so the JSON value is embedded unchanged.
107+
"""
108+
body: Dict[str, Any] = {
109+
'dueTime': reminder_request.due_time,
110+
'period': reminder_request.period,
111+
}
112+
data_value = _any_to_json_value(reminder_request)
113+
if data_value is not None:
114+
body['data'] = _parse_json_value(data_value, 'reminder')
115+
return json.dumps(body).encode('utf-8')
116+
117+
118+
def build_timer_fire_body(
119+
timer_request: api_v1.SubscribeActorEventsResponseTimerRequestAlpha1,
120+
) -> bytes:
121+
"""Synthesizes the JSON body ``ActorRuntime.fire_timer`` expects.
122+
123+
Timers registered through ``DaprActorGrpcClient`` arrive base64-wrapped
124+
because daprd JSON-marshals the raw bytes of the unary
125+
``RegisterActorTimer`` request, so the original JSON value is recovered
126+
before embedding (see ``_maybe_unwrap_grpc_registered_value``).
127+
"""
128+
body: Dict[str, Any] = {
129+
'callback': timer_request.callback,
130+
'dueTime': timer_request.due_time,
131+
'period': timer_request.period,
132+
'data': None,
133+
}
134+
data_value = _any_to_json_value(timer_request)
135+
if data_value is not None:
136+
data_value = _maybe_unwrap_grpc_registered_value(data_value)
137+
body['data'] = _parse_json_value(data_value, 'timer')
138+
return json.dumps(body).encode('utf-8')
139+
140+
141+
def _parse_json_value(value: bytes, callback_kind: str) -> Any:
142+
"""Parses a callback data payload, failing with a clear error message."""
143+
try:
144+
return json.loads(value)
145+
except ValueError as error:
146+
raise ValueError(f'{callback_kind} data is not valid JSON: {error}') from error
147+
148+
149+
def _any_to_json_value(callback_request: Any) -> Optional[bytes]:
150+
"""Extracts the raw JSON value bytes from a callback's ``Any`` data field.
151+
152+
daprd stores reminder/timer payloads as a ``google.protobuf.BytesValue``
153+
holding the JSON value registered by the app, and unwraps it the same way
154+
when delivering over HTTP (see Reminder.MarshalJSON in dapr/dapr). An
155+
``Any`` of any other type falls back to its raw value bytes.
156+
"""
157+
if not callback_request.HasField('data'):
158+
return None
159+
160+
data: any_pb2.Any = callback_request.data
161+
if data.Is(wrappers_pb2.BytesValue.DESCRIPTOR):
162+
bytes_value = wrappers_pb2.BytesValue()
163+
data.Unpack(bytes_value)
164+
return bytes_value.value or None
165+
return data.value or None
166+
167+
168+
def _maybe_unwrap_grpc_registered_value(value: bytes) -> bytes:
169+
"""Recovers the original JSON value from a gRPC-registered timer payload.
170+
171+
daprd's unary ``RegisterActorTimer`` JSON-marshals the request's raw
172+
bytes, turning a JSON value ``J`` into the string ``base64(J)``. When the
173+
stored value is a JSON string that base64-decodes to valid JSON, the
174+
decoded form is the original registration payload. HTTP-registered string
175+
payloads that coincidentally satisfy both checks are misdetected; this is
176+
a documented limitation of the alpha transport.
177+
"""
178+
try:
179+
parsed = json.loads(value)
180+
except ValueError:
181+
return value
182+
if not isinstance(parsed, str):
183+
return value
184+
try:
185+
decoded = base64.b64decode(parsed, validate=True)
186+
json.loads(decoded)
187+
except ValueError:
188+
return value
189+
return decoded
190+
191+
192+
def build_invoke_error_payload(exception: Exception) -> bytes:
193+
"""Serializes a handler exception the way the HTTP extensions do.
194+
195+
Matches the 500-response body shape of the FastAPI/Flask actor
196+
extensions so callers observe the same error payload on both transports.
197+
"""
198+
if isinstance(exception, DaprInternalError):
199+
payload = exception.as_json_safe_dict()
200+
else:
201+
payload = {'message': repr(exception), 'errorCode': ERROR_CODE_UNKNOWN}
202+
return json.dumps(payload).encode('utf-8')
203+
204+
205+
def status_code_for_exception(exception: Exception) -> int:
206+
"""Maps a dispatch exception to the gRPC status code for ``request_failed``.
207+
208+
``ValueError`` (unregistered actor type) and ``AttributeError`` (unknown
209+
actor method) map to ``NOT_FOUND``, which daprd treats as a permanent,
210+
non-retryable failure. Everything else maps to ``UNKNOWN``.
211+
"""
212+
if isinstance(exception, (ValueError, AttributeError)):
213+
return StatusCode.NOT_FOUND.value[0]
214+
return StatusCode.UNKNOWN.value[0]

dapr/actor/runtime/config.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""
1515

1616
from datetime import timedelta
17-
from typing import Any, Dict, List, Optional, Set
17+
from typing import Any, Dict, FrozenSet, List, Optional, Set
1818

1919

2020
class ActorReentrancyConfig:
@@ -31,6 +31,16 @@ def __init__(self, enabled: bool = False, maxStackDepth: int = 32):
3131
self._enabled = enabled
3232
self._maxStackDepth = maxStackDepth
3333

34+
@property
35+
def enabled(self) -> bool:
36+
"""Returns whether reentrancy is enabled."""
37+
return self._enabled
38+
39+
@property
40+
def max_stack_depth(self) -> int:
41+
"""Returns the limit for concurrent reentrant requests to an actor."""
42+
return self._maxStackDepth
43+
3444
def as_dict(self) -> Dict[str, Any]:
3545
"""Returns ActorReentrancyConfig as a dict."""
3646
return {
@@ -81,6 +91,41 @@ def __init__(
8191
self._reentrancy = reentrancy
8292
self._reminders_storage_partitions = reminders_storage_partitions
8393

94+
@property
95+
def actor_type(self) -> str:
96+
"""Returns the actor type this configuration applies to."""
97+
return self._actor_type
98+
99+
@property
100+
def actor_idle_timeout(self) -> Optional[timedelta]:
101+
"""Returns the timeout before deactivating an idle actor."""
102+
return self._actor_idle_timeout
103+
104+
@property
105+
def actor_scan_interval(self) -> Optional[timedelta]:
106+
"""Returns how often to scan for idle actors to deactivate."""
107+
return self._actor_scan_interval
108+
109+
@property
110+
def drain_ongoing_call_timeout(self) -> Optional[timedelta]:
111+
"""Returns the timeout for ongoing calls before actor deactivation."""
112+
return self._drain_ongoing_call_timeout
113+
114+
@property
115+
def drain_rebalanced_actors(self) -> Optional[bool]:
116+
"""Returns whether rebalanced actors are drained."""
117+
return self._drain_rebalanced_actors
118+
119+
@property
120+
def reentrancy(self) -> Optional[ActorReentrancyConfig]:
121+
"""Returns the reentrancy configuration for this actor type."""
122+
return self._reentrancy
123+
124+
@property
125+
def reminders_storage_partitions(self) -> Optional[int]:
126+
"""Returns the number of partitions for reminders storage."""
127+
return self._reminders_storage_partitions
128+
84129
def as_dict(self) -> Dict[str, Any]:
85130
"""Returns ActorTypeConfig as a dict."""
86131

@@ -155,6 +200,46 @@ def __init__(
155200
self._reminders_storage_partitions = reminders_storage_partitions
156201
self._entitiesConfig: List[ActorTypeConfig] = actor_type_configs
157202

203+
@property
204+
def entities(self) -> FrozenSet[str]:
205+
"""Returns a snapshot of the registered actor type names."""
206+
return frozenset(self._entities)
207+
208+
@property
209+
def actor_idle_timeout(self) -> Optional[timedelta]:
210+
"""Returns the default timeout before deactivating an idle actor."""
211+
return self._actor_idle_timeout
212+
213+
@property
214+
def actor_scan_interval(self) -> Optional[timedelta]:
215+
"""Returns how often to scan for idle actors to deactivate."""
216+
return self._actor_scan_interval
217+
218+
@property
219+
def drain_ongoing_call_timeout(self) -> Optional[timedelta]:
220+
"""Returns the timeout for ongoing calls before actor deactivation."""
221+
return self._drain_ongoing_call_timeout
222+
223+
@property
224+
def drain_rebalanced_actors(self) -> Optional[bool]:
225+
"""Returns whether rebalanced actors are drained."""
226+
return self._drain_rebalanced_actors
227+
228+
@property
229+
def reentrancy(self) -> Optional[ActorReentrancyConfig]:
230+
"""Returns the default reentrancy configuration."""
231+
return self._reentrancy
232+
233+
@property
234+
def reminders_storage_partitions(self) -> Optional[int]:
235+
"""Returns the number of partitions for reminders storage."""
236+
return self._reminders_storage_partitions
237+
238+
@property
239+
def actor_type_configs(self) -> List[ActorTypeConfig]:
240+
"""Returns a snapshot of the per-actor-type configurations."""
241+
return list(self._entitiesConfig)
242+
158243
def update_entities(self, entities: List[str]) -> None:
159244
"""Updates actor types in entities property.
160245

0 commit comments

Comments
 (0)