1919from common .utils import bytes_to_str , str_to_bytes
2020from .http_client import HttpClient
2121
22- # TODO: Decide on logic on how the PSRD block size is decided. Does the client decide? Does
23- # the hub decide?
24- _PSRD_BLOCK_SIZE_IN_BYTES = 1000
25- _FAILED_REQUEST_RETRY_DELAY_IN_SECONDS = 1.0
22+ # TODO: Make the following configurable.
23+
24+ _START_REQUEST_PSRD_THRESHOLD = 500
25+ """
26+ Start requesting more PSRD blocks from the hub when the amount of PSRD in the pool falls below this
27+ threshold.
28+ """
29+
30+ _STOP_REQUEST_PSRD_THRESHOLD = 2000
31+ """
32+ Stop requesting more PSRD blocks from the hub when the amount of PSRD in the pool rises above or
33+ equal to this threshold.
34+ """
35+
36+ _GET_PSRD_BLOCK_SIZE = 2000
37+ """
38+ When requesting more PSRD blocks from the hub, request blocks of this size.
39+ """
40+
41+ _GET_PSRD_RETRY_DELAY = 1.0
42+ """
43+ If a get PSRD request to the hub fails, wait this many seconds before retrying.
44+ """
2645
2746
2847class PeerHub :
@@ -34,6 +53,7 @@ class PeerHub:
3453 _http_client : HttpClient
3554 _base_url : str
3655 _startup_task : asyncio .Task | None = None
56+ _request_psrd_task : asyncio .Task | None = None
3757 _registered : bool
3858 _local_pool : Pool
3959 _peer_pool : Pool
@@ -45,6 +65,8 @@ def __init__(self, client, base_url):
4565 self ._base_url = base_url
4666 if self ._base_url .endswith ("/" ):
4767 self ._base_url = self ._base_url [:- 1 ]
68+ self ._startup_task = None
69+ self ._request_psrd_task = None
4870 self ._registered = False
4971 self ._local_pool = Pool (Pool .Owner .LOCAL )
5072 self ._peer_pool = Pool (Pool .Owner .PEER )
@@ -82,7 +104,6 @@ def start(self) -> None:
82104 """
83105 assert self ._startup_task is None
84106 self ._startup_task = asyncio .create_task (self .start_task ())
85- # TODO: Do we need a done_callback? Set start_task to None?
86107
87108 async def start_task (self ) -> None :
88109 """
@@ -91,16 +112,17 @@ async def start_task(self) -> None:
91112 - Request the initial block of Pre-Shared Random Data (PSRD) from the hub (also periodically
92113 retrying if it fails).
93114 """
115+ LOGGER .info (f"Begin start task for peer hub at { self ._base_url } " )
94116 try :
95117 while not await self .attempt_registration ():
96- await asyncio .sleep (_FAILED_REQUEST_RETRY_DELAY_IN_SECONDS )
97- for owner in (Pool .Owner .LOCAL , Pool .Owner .PEER ):
98- while not await self .attempt_request_psrd (owner ):
99- await asyncio .sleep (_FAILED_REQUEST_RETRY_DELAY_IN_SECONDS )
118+ await asyncio .sleep (_GET_PSRD_RETRY_DELAY )
100119 except asyncio .CancelledError :
101- # The task is cancelled when the client is shut down before startup is complete.
102- # TODO: Do we need to do anything here? The un-registration is done elsewhere
103- pass
120+ self ._startup_task = None
121+ LOGGER .info (f"Cancel start task for peer hub at { self ._base_url } " )
122+ return
123+ self .start_request_psrd_task_if_needed ()
124+ self ._startup_task = None
125+ LOGGER .info (f"Finish start task for peer hub at { self ._base_url } " )
104126
105127 async def attempt_registration (self ) -> bool :
106128 """
@@ -125,7 +147,59 @@ async def unregister(self) -> None:
125147 """
126148 Unregister this client from the peer hub.
127149 """
128- # TODO: Implement this
150+ # TODO: Implement this and call it from somewhere
151+
152+ def start_request_psrd_task_if_needed (self ) -> None :
153+ """
154+ Start the request PSRD task if needed.
155+ """
156+ if self ._request_psrd_task is None :
157+ if self .at_least_one_pool_below_start_threshold ():
158+ self ._request_psrd_task = asyncio .create_task (self .request_psrd_task ())
159+
160+ def at_least_one_pool_below_start_threshold (self ) -> bool :
161+ """
162+ Check if the number of bytes available at least one of the pools is below the threshold
163+ for starting to get more PSRD from the hub.
164+ """
165+ return (
166+ self ._local_pool .bytes_available < _START_REQUEST_PSRD_THRESHOLD
167+ or self ._peer_pool .bytes_available < _START_REQUEST_PSRD_THRESHOLD
168+ )
169+
170+ def all_pools_above_stop_threshold (self ) -> bool :
171+ """
172+ Check if the number of bytes available in all of the pools is above the threshold for
173+ stopping to get more PSRD from the hub.
174+ """
175+ return (
176+ self ._local_pool .bytes_available >= _STOP_REQUEST_PSRD_THRESHOLD
177+ and self ._peer_pool .bytes_available >= _STOP_REQUEST_PSRD_THRESHOLD
178+ )
179+
180+ async def request_psrd_task (self ) -> None :
181+ """
182+ Task for requesting Pre-Shared Random Data (PSRD) from the peer hub. This tasks runs as long
183+ as the local pool or the peer pool need more PSRD.
184+ """
185+ LOGGER .info (f"Begin request PSRD task for peer hub { self ._hub_name } " )
186+ try :
187+ while not self .all_pools_above_stop_threshold ():
188+ need_delay = False
189+ if self ._local_pool .bytes_available < _STOP_REQUEST_PSRD_THRESHOLD :
190+ if not await self .attempt_request_psrd (Pool .Owner .LOCAL ):
191+ need_delay = True
192+ if self ._peer_pool .bytes_available < _STOP_REQUEST_PSRD_THRESHOLD :
193+ if not await self .attempt_request_psrd (Pool .Owner .PEER ):
194+ need_delay = True
195+ if need_delay :
196+ await asyncio .sleep (_GET_PSRD_RETRY_DELAY )
197+ except asyncio .CancelledError :
198+ self ._request_psrd_task = None
199+ LOGGER .info (f"Cancel request PSRD task for peer hub { self ._hub_name } " )
200+ return
201+ self ._request_psrd_task = None
202+ LOGGER .info (f"Finish request PSRD task for peer hub { self ._hub_name } " )
129203
130204 async def attempt_request_psrd (self , owner : Pool .Owner ) -> bool :
131205 """
@@ -144,7 +218,7 @@ async def attempt_request_psrd(self, owner: Pool.Owner) -> bool:
144218 params = {
145219 "client_name" : self ._client .name ,
146220 "pool_owner" : pool_owner_str ,
147- "size" : _PSRD_BLOCK_SIZE_IN_BYTES ,
221+ "size" : _GET_PSRD_BLOCK_SIZE ,
148222 }
149223 try :
150224 api_block = await self ._http_client .get (url , params , APIBlock )
0 commit comments