Skip to content

Commit 94baa19

Browse files
authored
[refactor] Register storage backend for greater scalability (#103)
## Description As the issue(#102) I created, fix it with this PR. ## Changes 1. Create a `boostrap` subfolder in storage and move the initialization code for different storage into the relevant files within this folder. 2. Add the `StorageBootstrapProvider` class in boostrap/provider.py for bootstrap code registration. 3. Split the storeage instantiation code in `yuanrong_utils.py` and move it to the `boostrap` folder. --------- Signed-off-by: fy2462 <fy2462@gmail.com>
1 parent 8c9a067 commit 94baa19

7 files changed

Lines changed: 687 additions & 522 deletions

File tree

transfer_queue/interface.py

Lines changed: 14 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import math
1716
import os
1817
import subprocess
1918
import time
2019
from importlib import resources
2120
from typing import Any, Callable
22-
from urllib.parse import urlparse
2321

2422
import ray
2523
import torch
@@ -32,13 +30,9 @@
3230
from transfer_queue.metadata import KVBatchMeta
3331
from transfer_queue.sampler import * # noqa: F401
3432
from transfer_queue.sampler import BaseSampler
35-
from transfer_queue.storage.simple_storage import SimpleStorageUnit
36-
from transfer_queue.utils.common import get_placement_group
33+
from transfer_queue.storage.bootstrap import StorageBootstrapProvider
3734
from transfer_queue.utils.logging_utils import get_logger
38-
from transfer_queue.utils.yuanrong_utils import (
39-
cleanup_yuanrong_resources,
40-
initialize_yuanrong_backend,
41-
)
35+
from transfer_queue.utils.yuanrong_utils import cleanup_yuanrong_resources
4236
from transfer_queue.utils.zmq_utils import process_zmq_server_info
4337

4438
logger = get_logger(__name__)
@@ -70,125 +64,23 @@ def _maybe_create_tq_client(conf: DictConfig | None = None) -> TransferQueueClie
7064
return _TQ_CLIENT
7165

7266

73-
# TODO(hz): Adopt registry pattern to manage storage backends for better scalability.
7467
def _maybe_create_tq_storage(conf: DictConfig) -> DictConfig:
7568
global _TQ_STORAGE
7669

7770
if _TQ_STORAGE is None:
7871
_TQ_STORAGE = {}
79-
if conf.backend.storage_backend == "SimpleStorage":
80-
# initialize SimpleStorageUnit
81-
simple_storage_handles = {}
82-
num_data_storage_units = conf.backend.SimpleStorage.num_data_storage_units
83-
total_storage_size = conf.backend.SimpleStorage.total_storage_size
84-
storage_placement_group = get_placement_group(num_data_storage_units, num_cpus_per_actor=1)
85-
86-
for storage_unit_rank in range(num_data_storage_units):
87-
storage_node = SimpleStorageUnit.options( # type: ignore[attr-defined]
88-
placement_group=storage_placement_group,
89-
placement_group_bundle_index=storage_unit_rank,
90-
name=f"TransferQueueStorageUnit#{storage_unit_rank}",
91-
).remote(
92-
storage_unit_size=math.ceil(total_storage_size / num_data_storage_units),
93-
)
94-
simple_storage_handles[f"TransferQueueStorageUnit#{storage_unit_rank}"] = storage_node
95-
logger.info(f"TransferQueueStorageUnit#{storage_unit_rank} has been created.")
96-
97-
storage_zmq_info = process_zmq_server_info(simple_storage_handles)
98-
backend_name = conf.backend.storage_backend
99-
conf.backend[backend_name].zmq_info = storage_zmq_info
100-
_TQ_STORAGE["SimpleStorage"] = simple_storage_handles
101-
if conf.backend.storage_backend == "MooncakeStore":
102-
if conf.backend.MooncakeStore.auto_init:
103-
# Try to kill existing mooncake_master processes before starting a new one to avoid potential conflicts
104-
check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True)
105-
if check.returncode == 0:
106-
pids = check.stdout.strip().replace("\n", ", ")
107-
logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")
108-
109-
result = os.system('pkill -f "[m]ooncake_master"')
110-
if result == 0:
111-
logger.info("Successfully killed existing mooncake_master processes.")
112-
else:
113-
raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).")
114-
115-
# process metadata_server
116-
metadata_server_raw_address = conf.backend.MooncakeStore.metadata_server
117-
if "://" not in metadata_server_raw_address:
118-
metadata_server_raw_address = "//" + metadata_server_raw_address
119-
120-
metadata_server_parsed = urlparse(metadata_server_raw_address)
121-
122-
if not metadata_server_parsed.hostname or metadata_server_parsed.port is None:
123-
raise ValueError(
124-
f"Invalid metadata_server '{conf.backend.MooncakeStore.metadata_server}'. "
125-
f"Host and port are required (e.g., host:port)."
126-
)
127-
128-
metadata_server_host = metadata_server_parsed.hostname
129-
metadata_server_port = str(metadata_server_parsed.port)
130-
131-
# process master_server
132-
master_server_raw_address = conf.backend.MooncakeStore.master_server_address
133-
if "://" not in master_server_raw_address:
134-
master_server_raw_address = "//" + master_server_raw_address
135-
136-
master_server_parsed = urlparse(master_server_raw_address)
137-
138-
if not master_server_parsed.hostname or master_server_parsed.port is None:
139-
raise ValueError(
140-
f"Invalid master_server_address '{conf.backend.MooncakeStore.master_server_address}'. "
141-
f"Host and port are required (e.g., host:port)."
142-
)
143-
144-
master_server_port = str(master_server_parsed.port)
145-
146-
cmd = [
147-
"mooncake_master",
148-
"-client_ttl=30",
149-
"-default_kv_lease_ttl=999999",
150-
"-default_kv_soft_pin_ttl=999999",
151-
"--eviction_high_watermark_ratio=1.0",
152-
"--eviction_ratio=0.0",
153-
"--enable_http_metadata_server=true",
154-
"--allow_evict_soft_pinned_objects=false",
155-
f"--http_metadata_server_host={metadata_server_host}",
156-
f"--http_metadata_server_port={metadata_server_port}",
157-
f"--rpc_port={master_server_port}",
158-
]
159-
160-
log_file_path = "/tmp/mooncake_master.log"
161-
with open(log_file_path, "w") as log_file:
162-
process = subprocess.Popen(
163-
cmd,
164-
stdout=log_file,
165-
stderr=subprocess.STDOUT,
166-
text=True,
167-
bufsize=1,
168-
universal_newlines=True,
169-
start_new_session=True,
170-
)
171-
time.sleep(3)
172-
173-
if process.poll() is None:
174-
logger.info(
175-
f"mooncake_master started, PID: {process.pid}. Logs are at: {os.path.abspath(log_file_path)}"
176-
)
177-
else:
178-
error_msg = ""
179-
try:
180-
with open(log_file_path) as f:
181-
error_msg = f.read()
182-
except Exception as e:
183-
error_msg = f"Failed to read log file: {e}"
184-
185-
raise RuntimeError(
186-
f"mooncake_master exited with error. Check {log_file_path} for detailed logs. "
187-
f"Output:\n{error_msg}"
188-
)
189-
_TQ_STORAGE["MooncakeStore"] = process
190-
if conf.backend.storage_backend == "Yuanrong" and conf.backend.Yuanrong.auto_init:
191-
_TQ_STORAGE["Yuanrong"] = initialize_yuanrong_backend(conf)
72+
backend_name = conf.backend.storage_backend
73+
provider_fn = StorageBootstrapProvider.get_provider(backend_name)
74+
if provider_fn is not None:
75+
backend_resources = provider_fn(conf)
76+
if backend_resources is not None:
77+
_TQ_STORAGE[backend_name] = backend_resources
78+
else:
79+
logger.error(f"Not found available {backend_name} storage resources, please check the config.")
80+
else:
81+
logger.error(
82+
f"Storage backend {backend_name} not registered. Please add it to the StorageBootstrapProvider."
83+
)
19284
return conf
19385

19486

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
2+
# Copyright 2025 The TransferQueue Team
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from . import mooncake_bootstrap, simple_storage_bootstrap, yuanrong_bootstrap # noqa: F401, I001
17+
from .provider import StorageBootstrapProvider
18+
19+
__all__ = [
20+
"StorageBootstrapProvider",
21+
]
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
2+
# Copyright 2025 The TransferQueue Team
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import os
17+
import subprocess
18+
import time
19+
from urllib.parse import urlparse
20+
21+
from omegaconf import DictConfig
22+
23+
from transfer_queue.storage.bootstrap.provider import StorageBootstrapProvider
24+
from transfer_queue.utils.logging_utils import get_logger
25+
26+
logger = get_logger(__name__)
27+
28+
29+
@StorageBootstrapProvider.register_provider("MooncakeStore")
30+
def initialize_mooncake_storage(conf: DictConfig) -> subprocess.Popen | None:
31+
"""
32+
Initialize Mooncake store backend.
33+
Args:
34+
conf (DictConfig): Configuration dictionary for the Mooncake store backend.
35+
Returns:
36+
subprocess.Popen | None: Process object for the Mooncake store backend process.
37+
Raises:
38+
ValueError: If the Mooncake store is not initialized successfully.
39+
"""
40+
if not conf.backend.MooncakeStore.auto_init:
41+
return None
42+
43+
# Try to kill existing mooncake_master processes before starting a new one to avoid potential conflicts
44+
check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True)
45+
if check.returncode == 0:
46+
pids = check.stdout.strip().replace("\n", ", ")
47+
logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")
48+
49+
result = os.system('pkill -f "[m]ooncake_master"')
50+
if result == 0:
51+
logger.info("Successfully killed existing mooncake_master processes.")
52+
else:
53+
raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).")
54+
55+
# process metadata_server
56+
metadata_server_raw_address = conf.backend.MooncakeStore.metadata_server
57+
if "://" not in metadata_server_raw_address:
58+
metadata_server_raw_address = "//" + metadata_server_raw_address
59+
60+
metadata_server_parsed = urlparse(metadata_server_raw_address)
61+
62+
if not metadata_server_parsed.hostname or metadata_server_parsed.port is None:
63+
raise ValueError(
64+
f"Invalid metadata_server '{conf.backend.MooncakeStore.metadata_server}'. "
65+
f"Host and port are required (e.g., host:port)."
66+
)
67+
68+
metadata_server_host = metadata_server_parsed.hostname
69+
metadata_server_port = str(metadata_server_parsed.port)
70+
71+
# process master_server
72+
master_server_raw_address = conf.backend.MooncakeStore.master_server_address
73+
if "://" not in master_server_raw_address:
74+
master_server_raw_address = "//" + master_server_raw_address
75+
76+
master_server_parsed = urlparse(master_server_raw_address)
77+
78+
if not master_server_parsed.hostname or master_server_parsed.port is None:
79+
raise ValueError(
80+
f"Invalid master_server_address '{conf.backend.MooncakeStore.master_server_address}'. "
81+
f"Host and port are required (e.g., host:port)."
82+
)
83+
84+
master_server_port = str(master_server_parsed.port)
85+
86+
cmd = [
87+
"mooncake_master",
88+
"-client_ttl=30",
89+
"-default_kv_lease_ttl=999999",
90+
"-default_kv_soft_pin_ttl=999999",
91+
"--eviction_high_watermark_ratio=1.0",
92+
"--eviction_ratio=0.0",
93+
"--enable_http_metadata_server=true",
94+
"--allow_evict_soft_pinned_objects=false",
95+
f"--http_metadata_server_host={metadata_server_host}",
96+
f"--http_metadata_server_port={metadata_server_port}",
97+
f"--rpc_port={master_server_port}",
98+
]
99+
100+
log_file_path = "/tmp/mooncake_master.log"
101+
with open(log_file_path, "w") as log_file:
102+
process = subprocess.Popen(
103+
cmd,
104+
stdout=log_file,
105+
stderr=subprocess.STDOUT,
106+
text=True,
107+
bufsize=1,
108+
universal_newlines=True,
109+
start_new_session=True,
110+
)
111+
time.sleep(3)
112+
113+
if process.poll() is None:
114+
logger.info(f"mooncake_master started, PID: {process.pid}. Logs are at: {os.path.abspath(log_file_path)}")
115+
else:
116+
error_msg = ""
117+
try:
118+
with open(log_file_path) as f:
119+
error_msg = f.read()
120+
except Exception as e:
121+
error_msg = f"Failed to read log file: {e}"
122+
123+
raise RuntimeError(
124+
f"mooncake_master exited with error. Check {log_file_path} for detailed logs. Output:\n{error_msg}"
125+
)
126+
127+
return process
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
2+
# Copyright 2025 The TransferQueue Team
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from functools import wraps
17+
from typing import Callable
18+
19+
20+
class StorageBootstrapProvider:
21+
"""Registry for storage backend bootstrap functions."""
22+
23+
_providers: dict[str, Callable] = {}
24+
25+
@classmethod
26+
def register_provider(cls, name: str):
27+
"""Decorator to register storage provider & returns function."""
28+
29+
def decorator(fn):
30+
@wraps(fn)
31+
def wrapper(*args, **kwargs):
32+
return fn(*args, **kwargs)
33+
34+
cls._providers[name.lower()] = wrapper
35+
return wrapper
36+
37+
return decorator
38+
39+
@classmethod
40+
def get_provider(cls, name: str) -> Callable | None:
41+
"""Get storage provider function by name."""
42+
return cls._providers.get(name.lower(), None)

0 commit comments

Comments
 (0)