Skip to content

Commit 28eb3d5

Browse files
authored
feat(tools): retry rpc calls on server-side error (#2056)
1 parent b808aa2 commit 28eb3d5

1 file changed

Lines changed: 130 additions & 111 deletions

File tree

src/ethereum_spec_tools/sync.py

Lines changed: 130 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,34 @@ def download(self) -> None:
188188
except Full:
189189
pass
190190

191+
def _make_request(self, req: request.Request) -> Any:
192+
backoff = 1.75
193+
start = time.monotonic()
194+
last_error = None
195+
delay = 1.0
196+
while True:
197+
now = time.monotonic()
198+
elapsed = now - start
199+
remaining = (60.0 * 60.0) - elapsed
200+
201+
if 0.0 >= remaining:
202+
assert last_error is not None
203+
raise last_error
204+
205+
try:
206+
with request.urlopen(req, timeout=60.0) as response:
207+
return json.load(response)
208+
except request.HTTPError as e:
209+
if e.code < 500 or e.code > 599:
210+
raise
211+
logging.warning(
212+
"server-side error during RPC request", exc_info=True
213+
)
214+
last_error = e
215+
216+
time.sleep(min(delay, remaining))
217+
delay *= backoff
218+
191219
def fetch_blocks(
192220
self,
193221
first: Uint,
@@ -243,70 +271,65 @@ def fetch_blocks_debug(
243271
headers=headers,
244272
)
245273

246-
with request.urlopen(post) as response:
247-
replies = json.load(response)
248-
if not isinstance(replies, list):
249-
self.log.error(
250-
"got non-list JSON-RPC response. replies=%r", replies
274+
replies = self._make_request(post)
275+
if not isinstance(replies, list):
276+
self.log.error(
277+
"got non-list JSON-RPC response. replies=%r", replies
278+
)
279+
raise ValueError
280+
281+
block_rlps: Dict[Uint, Union[RpcError, bytes]] = {}
282+
283+
for reply in replies:
284+
try:
285+
reply_id = Uint(int(reply["id"], 0))
286+
except Exception:
287+
self.log.exception("unable to parse RPC id. reply=%r", reply)
288+
raise
289+
290+
if reply_id < first or reply_id >= first + count:
291+
raise Exception("mismatched request id")
292+
293+
if "error" in reply:
294+
block_rlps[reply_id] = RpcError(
295+
reply["error"]["code"],
296+
reply["error"]["message"],
251297
)
252-
raise ValueError
298+
else:
299+
block_rlps[reply_id] = bytes.fromhex(reply["result"][2:])
253300

254-
block_rlps: Dict[Uint, Union[RpcError, bytes]] = {}
301+
if len(block_rlps) != count:
302+
raise Exception(
303+
f"expected {count} blocks but only got {len(block_rlps)}"
304+
)
255305

256-
for reply in replies:
306+
self.log.info("blocks [%d, %d) fetched", first, first + count)
307+
308+
blocks: List[Union[RpcError, Any]] = []
309+
for _, block_rlp in sorted(block_rlps.items()):
310+
if isinstance(block_rlp, RpcError):
311+
blocks.append(block_rlp)
312+
else:
313+
# Unfortunately we have to decode the RLP twice.
314+
decoded_block = rlp.decode(block_rlp)
315+
assert not isinstance(decoded_block, bytes)
316+
assert not isinstance(decoded_block[0], bytes)
317+
assert isinstance(decoded_block[0][11], bytes)
318+
timestamp = U256.from_be_bytes(decoded_block[0][11])
319+
self.advance_block(timestamp)
257320
try:
258-
reply_id = Uint(int(reply["id"], 0))
321+
blocks.append(
322+
rlp.decode_to(self.module("blocks").Block, block_rlp)
323+
)
259324
except Exception:
260325
self.log.exception(
261-
"unable to parse RPC id. reply=%r", reply
326+
"failed to decode block %d with timestamp %d",
327+
self.block_number,
328+
timestamp,
262329
)
263330
raise
264331

265-
if reply_id < first or reply_id >= first + count:
266-
raise Exception("mismatched request id")
267-
268-
if "error" in reply:
269-
block_rlps[reply_id] = RpcError(
270-
reply["error"]["code"],
271-
reply["error"]["message"],
272-
)
273-
else:
274-
block_rlps[reply_id] = bytes.fromhex(reply["result"][2:])
275-
276-
if len(block_rlps) != count:
277-
raise Exception(
278-
f"expected {count} blocks but only got {len(block_rlps)}"
279-
)
280-
281-
self.log.info("blocks [%d, %d) fetched", first, first + count)
282-
283-
blocks: List[Union[RpcError, Any]] = []
284-
for _, block_rlp in sorted(block_rlps.items()):
285-
if isinstance(block_rlp, RpcError):
286-
blocks.append(block_rlp)
287-
else:
288-
# Unfortunately we have to decode the RLP twice.
289-
decoded_block = rlp.decode(block_rlp)
290-
assert not isinstance(decoded_block, bytes)
291-
assert not isinstance(decoded_block[0], bytes)
292-
assert isinstance(decoded_block[0][11], bytes)
293-
timestamp = U256.from_be_bytes(decoded_block[0][11])
294-
self.advance_block(timestamp)
295-
try:
296-
blocks.append(
297-
rlp.decode_to(
298-
self.module("blocks").Block, block_rlp
299-
)
300-
)
301-
except Exception:
302-
self.log.exception(
303-
"failed to decode block %d with timestamp %d",
304-
self.block_number,
305-
timestamp,
306-
)
307-
raise
308-
309-
return blocks
332+
return blocks
310333

311334
def load_transaction(self, t: Any) -> Any:
312335
"""
@@ -438,44 +461,41 @@ def fetch_blocks_eth(
438461
headers=headers,
439462
)
440463

441-
with request.urlopen(post) as response:
442-
replies = json.load(response)
443-
block_jsons: Dict[Uint, Any] = {}
444-
ommers_needed: Dict[Uint, int] = {}
445-
blocks: Dict[Uint, Union[Any, RpcError]] = {}
464+
replies = self._make_request(post)
465+
block_jsons: Dict[Uint, Any] = {}
466+
ommers_needed: Dict[Uint, int] = {}
467+
blocks: Dict[Uint, Union[Any, RpcError]] = {}
446468

447-
for reply in replies:
448-
reply_id = Uint(int(reply["id"], 0))
469+
for reply in replies:
470+
reply_id = Uint(int(reply["id"], 0))
449471

450-
if reply_id < first or reply_id >= first + count:
451-
raise Exception("mismatched request id")
472+
if reply_id < first or reply_id >= first + count:
473+
raise Exception("mismatched request id")
452474

453-
if "error" in reply:
454-
blocks[reply_id] = RpcError(
455-
reply["error"]["code"],
456-
reply["error"]["message"],
457-
)
458-
else:
459-
res = reply["result"]
460-
if res is None:
461-
from time import sleep
475+
if "error" in reply:
476+
blocks[reply_id] = RpcError(
477+
reply["error"]["code"],
478+
reply["error"]["message"],
479+
)
480+
else:
481+
res = reply["result"]
482+
if res is None:
483+
from time import sleep
462484

463-
sleep(12)
464-
break
485+
sleep(12)
486+
break
465487

466-
block_jsons[reply_id] = res
467-
ommers_needed[reply_id] = len(res["uncles"])
488+
block_jsons[reply_id] = res
489+
ommers_needed[reply_id] = len(res["uncles"])
468490

469-
ommers = self.fetch_ommers(ommers_needed)
470-
for id in block_jsons: # noqa A001
471-
self.advance_block(hex_to_u256(block_jsons[id]["timestamp"]))
472-
blocks[id] = self.make_block(
473-
block_jsons[id], ommers.get(id, ())
474-
)
491+
ommers = self.fetch_ommers(ommers_needed)
492+
for id in block_jsons: # noqa A001
493+
self.advance_block(hex_to_u256(block_jsons[id]["timestamp"]))
494+
blocks[id] = self.make_block(block_jsons[id], ommers.get(id, ()))
475495

476-
self.log.info("blocks [%d, %d) fetched", first, first + count)
496+
self.log.info("blocks [%d, %d) fetched", first, first + count)
477497

478-
return [v for (_, v) in sorted(blocks.items())]
498+
return [v for (_, v) in sorted(blocks.items())]
479499

480500
def fetch_ommers(self, ommers_needed: Dict[Uint, int]) -> Dict[Uint, Any]:
481501
"""
@@ -519,37 +539,36 @@ def fetch_ommers(self, ommers_needed: Dict[Uint, int]) -> Dict[Uint, Any]:
519539
headers=headers,
520540
)
521541

522-
with request.urlopen(post) as response:
523-
replies = json.load(response)
524-
ommers: Dict[Uint, Dict[Uint, Any]] = {}
542+
replies = self._make_request(post)
543+
ommers: Dict[Uint, Dict[Uint, Any]] = {}
525544

526-
twenty = Uint(20)
527-
for reply in replies:
528-
reply_id = Uint(int(reply["id"], 0))
545+
twenty = Uint(20)
546+
for reply in replies:
547+
reply_id = Uint(int(reply["id"], 0))
529548

530-
if reply_id // twenty not in ommers:
531-
ommers[reply_id // twenty] = {}
549+
if reply_id // twenty not in ommers:
550+
ommers[reply_id // twenty] = {}
532551

533-
if "error" in reply:
534-
raise RpcError(
535-
reply["error"]["code"],
536-
reply["error"]["message"],
537-
)
538-
else:
539-
ommers[reply_id // twenty][reply_id % twenty] = (
540-
self.make_header(reply["result"])
541-
)
552+
if "error" in reply:
553+
raise RpcError(
554+
reply["error"]["code"],
555+
reply["error"]["message"],
556+
)
557+
else:
558+
ommers[reply_id // twenty][reply_id % twenty] = (
559+
self.make_header(reply["result"])
560+
)
542561

543-
self.log.info(
544-
"ommers [%d, %d] fetched",
545-
min(ommers_needed),
546-
max(ommers_needed),
547-
)
562+
self.log.info(
563+
"ommers [%d, %d] fetched",
564+
min(ommers_needed),
565+
max(ommers_needed),
566+
)
548567

549-
return {
550-
k: tuple(x for (_, x) in sorted(v.items()))
551-
for (k, v) in ommers.items()
552-
}
568+
return {
569+
k: tuple(x for (_, x) in sorted(v.items()))
570+
for (k, v) in ommers.items()
571+
}
553572

554573
def make_header(self, json: Any) -> Any:
555574
"""

0 commit comments

Comments
 (0)