Skip to content

Commit 77e3bcf

Browse files
authored
default drain_ongoing_call_timeout to None (#1016)
* default drain_ongoing_call_timeout to None ActorRuntimeConfig hardcoded drain_ongoing_call_timeout to 60s. The Dapr runtime's placement dissemination timeout defaults to 30s and drain blocks the placement LOCK -> UPDATE -> UNLOCK round, so a 60s drain stalls the disseminator and resets the placement stream. Match the other SDKs (.NET, Go, Java, JS) by leaving the field unset, so daprd applies its 2s default (api.DefaultOngoingCallTimeout). Omit the field from as_dict() when None so a JSON null is not sent to the runtime. Signed-off-by: joshvanl <me@joshvanl.dev> * Update comment Signed-off-by: joshvanl <me@joshvanl.dev> --------- Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent a32e783 commit 77e3bcf

4 files changed

Lines changed: 26 additions & 13 deletions

File tree

dapr/actor/runtime/config.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def __init__(
117117
self,
118118
actor_idle_timeout: Optional[timedelta] = timedelta(hours=1),
119119
actor_scan_interval: Optional[timedelta] = timedelta(seconds=30),
120-
drain_ongoing_call_timeout: Optional[timedelta] = timedelta(minutes=1),
120+
drain_ongoing_call_timeout: Optional[timedelta] = None,
121121
drain_rebalanced_actors: Optional[bool] = True,
122122
reentrancy: Optional[ActorReentrancyConfig] = None,
123123
reminders_storage_partitions: Optional[int] = None,
@@ -130,9 +130,13 @@ def __init__(
130130
actor_scan_interval (datetime.timedelta): The duration which specifies how often to scan
131131
for actors to deactivate idle actors. Actors that have been idle longer than
132132
actor_idle_timeout will be deactivated.
133-
drain_ongoing_call_timeout (datetime.timedelta): The duration which specifies the
134-
timeout for the current active actor method to finish before actor deactivation.
135-
If there is no current actor method call, this is ignored.
133+
drain_ongoing_call_timeout (Optional[datetime.timedelta]): The duration which
134+
specifies the timeout for the current active actor method to finish before
135+
actor deactivation. If there is no current actor method call, this is
136+
ignored. Defaults to None, which omits the field from the configuration
137+
sent to daprd so the runtime applies its own default. An explicit value
138+
must be shorter than the daprd placement dissemination timeout, otherwise
139+
daprd will clamp it.
136140
drain_rebalanced_actors (bool): If true, Dapr will wait for drain_ongoing_call_timeout
137141
to allow a current actor call to complete before trying to deactivate an actor.
138142
reentrancy (ActorReentrancyConfig): Configure the reentrancy behavior for an actor.
@@ -175,10 +179,12 @@ def as_dict(self) -> Dict[str, Any]:
175179
configDict: Dict[str, Any] = {
176180
'actorIdleTimeout': self._actor_idle_timeout,
177181
'actorScanInterval': self._actor_scan_interval,
178-
'drainOngoingCallTimeout': self._drain_ongoing_call_timeout,
179182
'drainRebalancedActors': self._drain_rebalanced_actors,
180183
}
181184

185+
if self._drain_ongoing_call_timeout is not None:
186+
configDict['drainOngoingCallTimeout'] = self._drain_ongoing_call_timeout
187+
182188
if self._reentrancy:
183189
configDict.update({'reentrancy': self._reentrancy.as_dict()})
184190

tests/actor/test_actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def test_actor_config(self):
6565
self.assertTrue(config._drain_rebalanced_actors)
6666
self.assertEqual(timedelta(hours=1), config._actor_idle_timeout)
6767
self.assertEqual(timedelta(seconds=30), config._actor_scan_interval)
68-
self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout)
68+
self.assertIsNone(config._drain_ongoing_call_timeout)
6969
self.assertEqual(2, len(config._entities))
7070

7171
# apply new config

tests/actor/test_actor_runtime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def test_actor_config(self):
6060
self.assertTrue(config._drain_rebalanced_actors)
6161
self.assertEqual(timedelta(hours=1), config._actor_idle_timeout)
6262
self.assertEqual(timedelta(seconds=30), config._actor_scan_interval)
63-
self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout)
63+
self.assertIsNone(config._drain_ongoing_call_timeout)
6464
self.assertEqual(3, len(config._entities))
6565

6666
# apply new config

tests/actor/test_actor_runtime_config.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@ def test_default_config(self):
6969

7070
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
7171
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
72-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
72+
self.assertIsNone(config._drain_ongoing_call_timeout)
7373
self.assertEqual(config._drain_rebalanced_actors, True)
7474
self.assertEqual(config._reentrancy, None)
7575
self.assertEqual(config._entities, set())
7676
self.assertEqual(config._entitiesConfig, [])
7777
self.assertNotIn('reentrancy', config.as_dict().keys())
7878
self.assertNotIn('remindersStoragePartitions', config.as_dict().keys())
79+
self.assertNotIn('drainOngoingCallTimeout', config.as_dict().keys())
7980
self.assertEqual(config.as_dict()['entitiesConfig'], [])
8081

8182
def test_default_config_with_reentrancy(self):
@@ -84,7 +85,7 @@ def test_default_config_with_reentrancy(self):
8485

8586
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
8687
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
87-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
88+
self.assertIsNone(config._drain_ongoing_call_timeout)
8889
self.assertEqual(config._drain_rebalanced_actors, True)
8990
self.assertEqual(config._reentrancy, reentrancyConfig)
9091
self.assertEqual(config._entities, set())
@@ -110,7 +111,8 @@ def test_config_with_actor_type_config(self):
110111
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
111112

112113
d = config.as_dict()
113-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
114+
self.assertIsNone(config._drain_ongoing_call_timeout)
115+
self.assertNotIn('drainOngoingCallTimeout', d)
114116
self.assertEqual(d['entitiesConfig'][0]['entities'], ['testactor1'])
115117
self.assertEqual(d['entitiesConfig'][0]['actorScanInterval'], timedelta(seconds=10))
116118
self.assertEqual(d['entitiesConfig'][0]['reentrancy']['enabled'], True)
@@ -130,7 +132,7 @@ def test_update_entities(self):
130132

131133
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
132134
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
133-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
135+
self.assertIsNone(config._drain_ongoing_call_timeout)
134136
self.assertEqual(config._drain_rebalanced_actors, True)
135137
self.assertEqual(config._entities, {'actortype1'})
136138
self.assertEqual(config._entitiesConfig, [])
@@ -141,7 +143,7 @@ def test_update_entities_two_types(self):
141143
config.update_entities(['actortype1', 'actortype1'])
142144
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
143145
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
144-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
146+
self.assertIsNone(config._drain_ongoing_call_timeout)
145147
self.assertEqual(config._drain_rebalanced_actors, True)
146148
self.assertEqual(config._entities, {'actortype1', 'actortype1'})
147149
self.assertEqual(config._entitiesConfig, [])
@@ -164,12 +166,17 @@ def test_set_reminders_storage_partitions(self):
164166
config = ActorRuntimeConfig(reminders_storage_partitions=12)
165167
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
166168
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
167-
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
169+
self.assertIsNone(config._drain_ongoing_call_timeout)
168170
self.assertEqual(config._drain_rebalanced_actors, True)
169171
self.assertNotIn('reentrancy', config.as_dict().keys())
170172
self.assertEqual(config._reminders_storage_partitions, 12)
171173
self.assertEqual(config.as_dict()['remindersStoragePartitions'], 12)
172174

175+
def test_explicit_drain_ongoing_call_timeout(self):
176+
config = ActorRuntimeConfig(drain_ongoing_call_timeout=timedelta(seconds=10))
177+
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=10))
178+
self.assertEqual(config.as_dict()['drainOngoingCallTimeout'], timedelta(seconds=10))
179+
173180

174181
if __name__ == '__main__':
175182
unittest.main()

0 commit comments

Comments
 (0)