Skip to content

Commit d971214

Browse files
committed
modify simple storage
1 parent 8d54689 commit d971214

1 file changed

Lines changed: 36 additions & 57 deletions

File tree

transfer_queue/storage/simple_storage.py

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import os
1717
import time
1818
import weakref
19-
from threading import Event, Thread
19+
from threading import Event
2020
from typing import Any
2121
from uuid import uuid4
2222

@@ -31,9 +31,8 @@
3131
ZMQMessage,
3232
ZMQRequestType,
3333
ZMQServerInfo,
34+
ZMQServerTransport,
3435
create_zmq_socket,
35-
format_zmq_address,
36-
get_free_port,
3736
get_node_ip_address,
3837
)
3938

@@ -159,84 +158,60 @@ def __init__(self, storage_unit_size: int):
159158
# Shutdown event for graceful termination
160159
self._shutdown_event = Event()
161160

162-
# Placeholder for zmq_context, proxy_thread and worker_threads
163-
self.zmq_context: zmq.Context | None = None
164-
self.put_get_socket: zmq.Socket | None = None
165-
self.proxy_thread: Thread | None = None
166-
self.worker_thread: Thread | None = None
161+
# Placeholder for worker_socket (DEALER, not managed by transport)
162+
self.worker_socket: zmq.Socket | None = None
167163

168-
self._init_zmq_socket()
164+
self._init_zmq_transport()
169165
self._start_process_put_get()
170166

171167
# Register finalizer for graceful cleanup when garbage collected
172168
self._finalizer = weakref.finalize(
173169
self,
174170
self._shutdown_resources,
175171
self._shutdown_event,
176-
self.worker_thread,
177-
self.proxy_thread,
178-
self.zmq_context,
179-
self.put_get_socket,
172+
self._transport,
173+
self.worker_socket,
180174
)
181175

182-
def _init_zmq_socket(self) -> None:
176+
def _init_zmq_transport(self) -> None:
183177
"""
184-
Initialize ZMQ socket connections between storage unit and controller/clients:
178+
Initialize ZMQ transport layer and socket connections:
185179
- put_get_socket (ROUTER): Handle put/get requests from clients.
186180
- worker_socket (DEALER): Backend socket for worker communication.
187181
"""
188-
self.zmq_context = zmq.Context()
189-
self._node_ip = get_node_ip_address()
190-
191-
# Frontend: ROUTER for receiving client requests
192-
self.put_get_socket = create_zmq_socket(self.zmq_context, zmq.ROUTER, self._node_ip)
193-
194-
while True:
195-
try:
196-
self._put_get_socket_port = get_free_port(ip=self._node_ip)
197-
self.put_get_socket.bind(format_zmq_address(self._node_ip, self._put_get_socket_port))
198-
break
199-
except zmq.ZMQError:
200-
logger.warning(f"[{self.storage_unit_id}]: Try to bind ZMQ sockets failed, retrying...")
201-
continue
182+
self._transport = ZMQServerTransport(node_ip=get_node_ip_address())
183+
self._transport.create_router_socket("put_get_socket")
202184

203185
# Backend: DEALER for worker communication (connected via zmq.proxy)
204-
self.worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER, self._node_ip)
186+
self.worker_socket = create_zmq_socket(self._transport.zmq_ctx, zmq.DEALER, self._transport.node_ip)
205187
self.worker_socket.bind(self._inproc_addr)
206188

207-
self.zmq_server_info = ZMQServerInfo(
189+
self.zmq_server_info = self._transport.build_server_info(
208190
role=Role.STORAGE,
209-
id=str(self.storage_unit_id),
210-
ip=self._node_ip,
211-
ports={"put_get_socket": self._put_get_socket_port},
191+
server_id=str(self.storage_unit_id),
212192
)
213193

214194
def _start_process_put_get(self) -> None:
215195
"""Start worker threads and ZMQ proxy for handling requests."""
216-
217196
# Start worker thread
218-
self.worker_thread = Thread(
197+
self._transport.start_daemon_thread(
219198
target=self._worker_routine,
220199
name=f"StorageUnitWorkerThread-{self.storage_unit_id}",
221-
daemon=True,
222200
)
223-
self.worker_thread.start()
224-
225201
time.sleep(0.5) # make sure worker thread is ready before zmq.proxy forwarding messages
226202

227203
# Start proxy thread (ROUTER <-> DEALER)
228-
self.proxy_thread = Thread(
204+
self._transport.start_daemon_thread(
229205
target=self._proxy_routine,
230206
name=f"StorageUnitProxyThread-{self.storage_unit_id}",
231-
daemon=True,
232207
)
233-
self.proxy_thread.start()
234208

235209
def _proxy_routine(self) -> None:
236210
"""ZMQ proxy for message forwarding between frontend ROUTER and backend DEALER."""
211+
put_get_socket = self._transport.get_socket("put_get_socket")
237212
logger.info(f"[{self.storage_unit_id}]: start ZMQ proxy...")
238213
try:
239-
zmq.proxy(self.put_get_socket, self.worker_socket)
214+
zmq.proxy(put_get_socket, self.worker_socket)
240215
except zmq.ContextTerminated:
241216
logger.info(f"[{self.storage_unit_id}]: ZMQ Proxy stopped gracefully (Context Terminated)")
242217
except Exception as e:
@@ -248,7 +223,7 @@ def _proxy_routine(self) -> None:
248223
def _worker_routine(self) -> None:
249224
"""Worker thread for processing requests."""
250225

251-
worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER, self._node_ip)
226+
worker_socket = create_zmq_socket(self._transport.zmq_ctx, zmq.DEALER, self._transport.node_ip)
252227
worker_socket.connect(self._inproc_addr)
253228

254229
poller = zmq.Poller()
@@ -481,30 +456,34 @@ def _handle_clear(self, data_parts: ZMQMessage) -> ZMQMessage:
481456
@staticmethod
482457
def _shutdown_resources(
483458
shutdown_event: Event,
484-
worker_thread: Thread | None,
485-
proxy_thread: Thread | None,
486-
zmq_context: zmq.Context | None,
487-
put_get_socket: zmq.Socket | None,
459+
transport: ZMQServerTransport | None,
460+
worker_socket: zmq.Socket | None,
488461
) -> None:
489462
"""Clean up resources on garbage collection."""
490463
logger.info("Shutting down SimpleStorageUnit resources...")
491464

492465
# Signal all threads to stop
493466
shutdown_event.set()
494467

495-
# Terminate put_get_socket
496-
if put_get_socket:
497-
put_get_socket.close(linger=0)
468+
# Close worker socket
469+
if worker_socket:
470+
worker_socket.close(linger=0)
471+
472+
# Close put_get_socket
473+
if transport:
474+
put_get_socket = transport.get_socket("put_get_socket")
475+
if put_get_socket:
476+
put_get_socket.close(linger=0)
498477

499478
# Terminate ZMQ context to unblock proxy and workers
500-
if zmq_context:
501-
zmq_context.term()
479+
if transport and transport.zmq_ctx:
480+
transport.zmq_ctx.term()
502481

503482
# Wait for threads to finish (with timeout)
504-
if worker_thread and worker_thread.is_alive():
505-
worker_thread.join(timeout=5)
506-
if proxy_thread and proxy_thread.is_alive():
507-
proxy_thread.join(timeout=5)
483+
if transport:
484+
for t in transport.threads:
485+
if t.is_alive():
486+
t.join(timeout=5)
508487

509488
logger.info("SimpleStorageUnit resources shutdown complete.")
510489

0 commit comments

Comments
 (0)