Skip to content

Commit e4b1b5a

Browse files
YunchuWangCopilot
andcommitted
Separate sandbox management client from worker transport
Keep worker registration on an internal gRPC client while exposing only declaration management APIs through the public on-demand sandbox client. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f5fd86b commit e4b1b5a

3 files changed

Lines changed: 53 additions & 13 deletions

File tree

durabletask-azuremanaged/durabletask/azuremanaged/preview/on_demand_sandbox/client.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ def build_on_demand_sandbox_worker_heartbeat(active_activities_count: int) -> pb
233233
active_activities_count=active_activities_count))
234234

235235

236-
class OnDemandSandboxActivitiesClient:
237-
"""Client for Durable Task Scheduler on-demand sandbox activity management operations."""
236+
class _OnDemandSandboxActivitiesGrpcClient:
237+
"""Internal gRPC client for on-demand sandbox activity RPCs."""
238238

239239
def __init__(
240240
self, *,
@@ -266,25 +266,60 @@ def close(self) -> None:
266266
if self._owns_channel:
267267
self._channel.close()
268268

269+
def declare_on_demand_sandbox_activities(
270+
self,
271+
declaration: pb.OnDemandSandboxActivityDeclaration) -> pb.OnDemandSandboxActivityDeclarationResult:
272+
return self._stub.DeclareOnDemandSandboxActivities(declaration)
273+
274+
def remove_on_demand_sandbox_activity_declaration(
275+
self,
276+
worker_profile_id: str) -> pb.RemoveOnDemandSandboxActivityDeclarationResult:
277+
return self._stub.RemoveOnDemandSandboxActivityDeclaration(
278+
pb.RemoveOnDemandSandboxActivityDeclarationRequest(worker_profile_id=worker_profile_id))
279+
280+
def connect_on_demand_sandbox_activity_worker(
281+
self,
282+
messages: Iterable[pb.OnDemandSandboxActivityWorkerMessage]
283+
) -> pb.OnDemandSandboxActivityWorkerSessionResult:
284+
return self._stub.ConnectOnDemandSandboxActivityWorker(messages)
285+
286+
287+
class OnDemandSandboxActivitiesClient:
288+
"""Client for Durable Task Scheduler on-demand sandbox activity management operations."""
289+
290+
def __init__(
291+
self, *,
292+
host_address: str,
293+
taskhub: str,
294+
token_credential: Optional[TokenCredential],
295+
channel: Optional[grpc.Channel] = None,
296+
secure_channel: bool = True,
297+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
298+
channel_options: Optional[GrpcChannelOptions] = None):
299+
self._grpc_client = _OnDemandSandboxActivitiesGrpcClient(
300+
host_address=host_address,
301+
taskhub=taskhub,
302+
token_credential=token_credential,
303+
channel=channel,
304+
secure_channel=secure_channel,
305+
interceptors=interceptors,
306+
channel_options=channel_options)
307+
308+
def close(self) -> None:
309+
self._grpc_client.close()
310+
269311
def enable_on_demand_sandbox_activities(self) -> None:
270312
"""Declare all configured on-demand sandbox worker profiles with Durable Task Scheduler."""
271313
declarations = build_profile_on_demand_sandbox_activity_declarations()
272314
if not declarations:
273315
raise ValueError("No configured on-demand sandbox activities were found.")
274316

275317
for declaration in declarations:
276-
self._stub.DeclareOnDemandSandboxActivities(declaration)
318+
self._grpc_client.declare_on_demand_sandbox_activities(declaration)
277319

278320
def remove_on_demand_sandbox_activity_declaration(self, worker_profile_id: str) -> None:
279321
worker_profile_id = _normalize_required(worker_profile_id, "Worker profile ID is required.")
280-
self._stub.RemoveOnDemandSandboxActivityDeclaration(
281-
pb.RemoveOnDemandSandboxActivityDeclarationRequest(worker_profile_id=worker_profile_id))
282-
283-
def connect_on_demand_sandbox_activity_worker(
284-
self,
285-
messages: Iterable[pb.OnDemandSandboxActivityWorkerMessage]
286-
) -> pb.OnDemandSandboxActivityWorkerSessionResult:
287-
return self._stub.ConnectOnDemandSandboxActivityWorker(messages)
322+
self._grpc_client.remove_on_demand_sandbox_activity_declaration(worker_profile_id)
288323

289324

290325
def _normalize_optional_strings(values: Iterable[str]) -> list[str]:

durabletask-azuremanaged/durabletask/azuremanaged/preview/on_demand_sandbox/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from durabletask.azuremanaged.preview.on_demand_sandbox.client import (
1313
DEFAULT_MAX_CONCURRENT_ACTIVITIES,
1414
DEFAULT_WORKER_PROFILE_ID,
15-
OnDemandSandboxActivitiesClient,
15+
_OnDemandSandboxActivitiesGrpcClient,
1616
build_on_demand_sandbox_worker_heartbeat,
1717
build_on_demand_sandbox_worker_start,
1818
resolve_activity_names,
@@ -118,7 +118,7 @@ def _run_on_demand_sandbox_registration_loop(self) -> None:
118118
retry_delay = 1.0
119119
while not self._on_demand_sandbox_registration_stop.is_set():
120120
try:
121-
client = OnDemandSandboxActivitiesClient(
121+
client = _OnDemandSandboxActivitiesGrpcClient(
122122
host_address=self._on_demand_sandbox_host_address,
123123
taskhub=self._on_demand_sandbox_taskhub,
124124
token_credential=self._on_demand_sandbox_token_credential,

tests/durabletask-azuremanaged/test_on_demand_sandbox_extension.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import durabletask.azuremanaged.preview.on_demand_sandbox.client as sandbox_client
1010
import durabletask.azuremanaged.preview.on_demand_sandbox.worker as sandbox_worker
1111
from durabletask.azuremanaged.preview.on_demand_sandbox import OnDemandSandboxWorker
12+
from durabletask.azuremanaged.preview.on_demand_sandbox import OnDemandSandboxActivitiesClient
1213
from durabletask.azuremanaged.preview.on_demand_sandbox import OnDemandSandboxWorkerProfile
1314
from durabletask.azuremanaged.preview.on_demand_sandbox import OnDemandSandboxWorkerProfileOptions
1415
from durabletask.azuremanaged.preview.on_demand_sandbox import on_demand_sandbox_worker_profile
@@ -280,6 +281,10 @@ def test_on_demand_sandbox_worker_constructor_does_not_expose_runtime_contract()
280281
assert "_execute_activity" not in OnDemandSandboxWorker.__dict__
281282

282283

284+
def test_on_demand_sandbox_activities_client_does_not_expose_worker_registration_rpc() -> None:
285+
assert not hasattr(OnDemandSandboxActivitiesClient, "connect_on_demand_sandbox_activity_worker")
286+
287+
283288
def test_on_demand_sandbox_worker_does_not_own_legacy_wakeup_server(monkeypatch) -> None:
284289
monkeypatch.setenv("DTS_ENDPOINT", "http://localhost:8080")
285290
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")

0 commit comments

Comments
 (0)