Skip to content

Commit 9af6f55

Browse files
committed
[feat] Support cross-job actor discovery via explicit namespace
When multiple Ray Jobs share the same Ray cluster, Named Actors are isolated by namespace. Without an explicit namespace, a TQ Controller created by one job is invisible to workers in another job. This commit adds namespace="transfer_queue" to both: - ray.get_actor() in _init_from_existing() - TransferQueueController.options() in init() This ensures that the TQ Controller is always registered and discovered in the fixed "transfer_queue" namespace, enabling cross-job TQ sharing (e.g., a teacher server job creates TQ, and a trainer job connects to it). This change is backward-compatible: single-job usage is unaffected since the namespace is consistent between creation and discovery. Signed-off-by: huniu20 <huniumail@gmail.com>
1 parent 9aefd26 commit 9af6f55

5 files changed

Lines changed: 11 additions & 7 deletions

File tree

recipe/simple_use_case/relax_demo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class BaseStageWorker:
131131
def __init__(self, tq_config, tracker, worker_id: int, config: "DemoConfig"):
132132
tq.init(tq_config)
133133
self.tq_client = tq.get_client()
134-
controller = ray.get_actor("TransferQueueController")
134+
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
135135
self.cfg = ray.get(controller.get_config.remote())
136136
self.tracker = tracker
137137
self.worker_id = worker_id

tests/e2e/test_kv_interface_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def tq_system(ray_init, backend_name):
154154
@pytest.fixture
155155
def controller(tq_system):
156156
"""Get the TransferQueueController actor for direct verification."""
157-
controller = ray.get_actor("TransferQueueController")
157+
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
158158
yield controller
159159

160160

transfer_queue/interface.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def _init_from_existing() -> bool:
9393
global _TQ_CONTROLLER
9494
try:
9595
if _TQ_CONTROLLER is None:
96-
_TQ_CONTROLLER = ray.get_actor("TransferQueueController")
96+
_TQ_CONTROLLER = ray.get_actor("TransferQueueController", namespace="transfer_queue")
9797

9898
except ValueError:
9999
logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.")
@@ -174,7 +174,9 @@ def init(conf: DictConfig | None = None) -> DictConfig | None:
174174

175175
try:
176176
global _TQ_CONTROLLER
177-
_TQ_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined]
177+
_TQ_CONTROLLER = TransferQueueController.options(
178+
name="TransferQueueController", namespace="transfer_queue"
179+
).remote( # type: ignore[attr-defined]
178180
sampler=sampler, polling_mode=final_conf.controller.polling_mode
179181
)
180182
logger.info("TransferQueueController has been created.")

transfer_queue/storage/clients/ray_storage_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ def __init__(self, config=None):
5757

5858
# initialize actor
5959
try:
60-
self.storage_actor = ray.get_actor("RayObjectRefStorage")
60+
self.storage_actor = ray.get_actor("RayObjectRefStorage", namespace="transfer_queue")
6161
except ValueError:
62-
self.storage_actor = RayObjectRefStorage.options(name="RayObjectRefStorage", get_if_exists=False).remote()
62+
self.storage_actor = RayObjectRefStorage.options(
63+
name="RayObjectRefStorage", namespace="transfer_queue", get_if_exists=False
64+
).remote()
6365

6466
def put(self, keys: list[str], values: list[Any]) -> list[Any] | None:
6567
"""

tutorial/06_streaming_dataloader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def update_worker(
184184
# Step 1: Create StreamingDataset
185185
# This dataset integrates with TransferQueue and handles batch retrieval
186186

187-
controller = ray.get_actor("TransferQueueController")
187+
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
188188
config = ray.get(controller.get_config.remote())
189189

190190
dataset = StreamingDataset(

0 commit comments

Comments
 (0)