1818import io .optimism .utilities .rpc .Web3jProvider ;
1919import io .optimism .utilities .telemetry .Logging ;
2020import io .optimism .utilities .telemetry .TracerTaskWrapper ;
21+ import io .reactivex .BackpressureStrategy ;
22+ import io .reactivex .Flowable ;
2123import io .reactivex .disposables .Disposable ;
2224import java .math .BigInteger ;
2325import java .time .Duration ;
2426import java .util .*;
2527import java .util .concurrent .ExecutionException ;
28+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
2629import java .util .concurrent .StructuredTaskScope ;
30+ import java .util .concurrent .TimeUnit ;
2731import java .util .stream .Collectors ;
2832import org .apache .commons .collections4 .CollectionUtils ;
2933import org .apache .commons .lang3 .StringUtils ;
4549import org .web3j .protocol .core .methods .response .EthBlock ;
4650import org .web3j .protocol .core .methods .response .EthLog ;
4751import org .web3j .protocol .core .methods .response .EthLog .LogObject ;
48- import org .web3j .protocol .core .methods .response .EthLog .LogResult ;
4952import org .web3j .protocol .websocket .events .NewHead ;
5053import org .web3j .tuples .generated .Tuple2 ;
5154import org .web3j .tuples .generated .Tuple3 ;
@@ -85,7 +88,7 @@ public class InnerWatcher extends AbstractExecutionThreadService {
8588 */
8689 private final Web3j provider ;
8790
88- private final Web3j wsProvider ;
91+ private Web3j wsProvider ;
8992
9093 /**
9194 * Beacon blob fetcher to fetch the beacon blob from the L1 beacon endpoint.
@@ -156,6 +159,8 @@ public class InnerWatcher extends AbstractExecutionThreadService {
156159
157160 private boolean devnet = false ;
158161
162+ private ScheduledThreadPoolExecutor scheduledExecutorService ;
163+
159164 /**
160165 * create a InnerWatcher instance.
161166 *
@@ -168,7 +173,9 @@ public InnerWatcher(
168173 Config config , MessagePassingQueue <BlockUpdate > queue , BigInteger l1StartBlock , BigInteger l2StartBlock ) {
169174 this .config = config ;
170175 this .provider = Web3jProvider .createClient (config .l1RpcUrl ());
171- this .wsProvider = Web3jProvider .createClient (config .l1WsRpcUrl ());
176+ if (StringUtils .isNotEmpty (config .l1WsRpcUrl ())) {
177+ this .wsProvider = Web3jProvider .createClient (config .l1WsRpcUrl ());
178+ }
172179 this .beaconFetcher = new BeaconBlobFetcher (config .l1BeaconUrl (), config .l1BeaconArchiverUrl ());
173180 this .l2StartBlock = l2StartBlock ;
174181 this .devnet = config .devnet () != null && config .devnet ();
@@ -213,22 +220,49 @@ private void getMetadataFromL2(BigInteger l2StartBlock) {
213220 }
214221
215222 private Disposable subscribeL1NewHeads () {
216- this .l1HeadListener = this .wsProvider
217- .newHeadsNotifications ()
218- .subscribe (
219- notification -> {
220- NewHead header = notification .getParams ().getResult ();
221- String hash = header .getHash ();
222- BigInteger number = Numeric .toBigInt (header .getNumber ());
223- String parentHash = header .getParentHash ();
224- BigInteger time = Numeric .toBigInt (header .getTimestamp ());
225- l1Head = new BlockInfo (hash , number , parentHash , time );
226- },
227- t -> {
228- if (t instanceof WebsocketNotConnectedException ) {
229- this .subscribeL1NewHeads ();
230- }
231- });
223+ if (this .wsProvider != null ) {
224+ this .l1HeadListener = this .wsProvider
225+ .newHeadsNotifications ()
226+ .subscribe (
227+ notification -> {
228+ NewHead header = notification .getParams ().getResult ();
229+ String hash = header .getHash ();
230+ BigInteger number = Numeric .toBigInt (header .getNumber ());
231+ String parentHash = header .getParentHash ();
232+ BigInteger time = Numeric .toBigInt (header .getTimestamp ());
233+ l1Head = new BlockInfo (hash , number , parentHash , time );
234+ },
235+ t -> {
236+ if (t instanceof WebsocketNotConnectedException ) {
237+ this .subscribeL1NewHeads ();
238+ }
239+ });
240+ } else {
241+ this .scheduledExecutorService = new ScheduledThreadPoolExecutor (1 );
242+ this .l1HeadListener = Flowable .create (
243+ (subscriber ) -> {
244+ this .scheduledExecutorService .scheduleAtFixedRate (
245+ () -> {
246+ EthBlock .Block block = null ;
247+ try {
248+ block = pollBlock (
249+ this .provider , DefaultBlockParameterName .LATEST , false );
250+ } catch (ExecutionException | InterruptedException e ) {
251+ LOGGER .warn ("error while fetching L1 data for block" , e );
252+ }
253+ subscriber .onNext (block );
254+ },
255+ 0 ,
256+ 12 ,
257+ TimeUnit .SECONDS );
258+ },
259+ BackpressureStrategy .BUFFER )
260+ .subscribe (notification -> {
261+ EthBlock .Block block = (EthBlock .Block ) notification ;
262+ l1Head = BlockInfo .from (block );
263+ });
264+ }
265+
232266 return this .l1HeadListener ;
233267 }
234268
@@ -381,30 +415,45 @@ private void putBlockUpdate(final BlockUpdate update) {
381415
382416 private void updateSystemConfig (BlockInfo l1BlockInfo ) throws ExecutionException , InterruptedException {
383417 BigInteger preLastUpdateBlock = this .systemConfigUpdate .component1 ();
384- if (preLastUpdateBlock .compareTo (this .currentBlock ) < 0 ) {
385- BigInteger toBlock = preLastUpdateBlock .add (BigInteger .valueOf (1000L ));
418+ if (preLastUpdateBlock .compareTo (this .currentBlock ) <= 0 ) {
419+ BigInteger fromBlock = preLastUpdateBlock .equals (BigInteger .ZERO )
420+ ? BigInteger .ZERO
421+ : preLastUpdateBlock .add (BigInteger .ONE );
422+ BigInteger toBlock = preLastUpdateBlock .add (BigInteger .valueOf (100L ));
386423 LOGGER .debug (
387424 "will get system update eth log: fromBlock={} -> toBlock={}; contract={}" ,
388- preLastUpdateBlock . add ( BigInteger . ONE ) ,
425+ fromBlock ,
389426 toBlock ,
390427 InnerWatcher .this .config .chainConfig ().systemConfigContract ());
391428 EthLog updates = this .getLog (
392- preLastUpdateBlock . add ( BigInteger . ONE ) ,
429+ fromBlock ,
393430 toBlock ,
394431 InnerWatcher .this .config .chainConfig ().systemConfigContract (),
395432 CONFIG_UPDATE_TOPIC );
396433
397434 if (updates .getLogs ().isEmpty ()) {
398435 this .systemConfigUpdate = new Tuple2 <>(toBlock , null );
399436 } else {
400- LogResult <?> update = updates .getLogs ().getFirst ();
401- BigInteger updateBlock = ((LogObject ) update ).getBlockNumber ();
402- SystemConfigUpdate configUpdate = SystemConfigUpdate .tryFrom ((LogObject ) update );
403- if (updateBlock == null ) {
437+ BigInteger updateBlockNum = ((LogObject ) updates .getLogs ().getFirst ()).getBlockNumber ();
438+ SystemConfig updatedConfig = this .systemConfig ;
439+ boolean updated = false ;
440+ for (int i = 0 ; i < updates .getLogs ().size (); i ++) {
441+ LogObject update = (LogObject ) updates .getLogs ().get (i );
442+ BigInteger updateBlock = update .getBlockNumber ();
443+ if (updateBlock == null ) {
444+ break ;
445+ }
446+ if (!updateBlock .equals (updateBlockNum )) {
447+ break ;
448+ }
449+ SystemConfigUpdate configUpdate = SystemConfigUpdate .tryFrom (update );
450+ updatedConfig = parseSystemConfigUpdate (updatedConfig , l1BlockInfo , configUpdate );
451+ updated = true ;
452+ }
453+ if (!updated ) {
404454 this .systemConfigUpdate = new Tuple2 <>(toBlock , null );
405455 } else {
406- SystemConfig updateSystemConfig = parseSystemConfigUpdate (l1BlockInfo , configUpdate );
407- this .systemConfigUpdate = new Tuple2 <>(updateBlock , updateSystemConfig );
456+ this .systemConfigUpdate = new Tuple2 <>(updateBlockNum , updatedConfig );
408457 }
409458 }
410459 }
@@ -417,46 +466,47 @@ private void updateSystemConfig(BlockInfo l1BlockInfo) throws ExecutionException
417466 }
418467 }
419468
420- private Config .SystemConfig parseSystemConfigUpdate (BlockInfo l1BlockInfo , SystemConfigUpdate configUpdate ) {
469+ private Config .SystemConfig parseSystemConfigUpdate (
470+ SystemConfig lastSystemConfig , BlockInfo l1BlockInfo , SystemConfigUpdate configUpdate ) {
421471 Config .SystemConfig updateSystemConfig = null ;
422472 if (configUpdate instanceof SystemConfigUpdate .BatchSender ) {
423473 updateSystemConfig = new Config .SystemConfig (
424474 ((SystemConfigUpdate .BatchSender ) configUpdate ).getAddress (),
425- this . systemConfig .gasLimit (),
426- this . systemConfig .l1FeeOverhead (),
427- this . systemConfig .l1FeeScalar (),
428- this . systemConfig .unsafeBlockSigner ());
475+ lastSystemConfig .gasLimit (),
476+ lastSystemConfig .l1FeeOverhead (),
477+ lastSystemConfig .l1FeeScalar (),
478+ lastSystemConfig .unsafeBlockSigner ());
429479 } else if (configUpdate instanceof SystemConfigUpdate .Fees ) {
430480 var ecotoneTime = this .config .chainConfig ().ecotoneTime ();
431481 if (ecotoneTime .compareTo (BigInteger .ZERO ) > 0
432482 && l1BlockInfo .timestamp ().compareTo (ecotoneTime ) >= 0 ) {
433483 updateSystemConfig = new Config .SystemConfig (
434- this . systemConfig .batchSender (),
435- this . systemConfig .gasLimit (),
484+ lastSystemConfig .batchSender (),
485+ lastSystemConfig .gasLimit (),
436486 BigInteger .ZERO ,
437487 ((SystemConfigUpdate .Fees ) configUpdate ).getFeeScalar (),
438- this . systemConfig .unsafeBlockSigner ());
488+ lastSystemConfig .unsafeBlockSigner ());
439489 } else {
440490 updateSystemConfig = new Config .SystemConfig (
441- this . systemConfig .batchSender (),
442- this . systemConfig .gasLimit (),
491+ lastSystemConfig .batchSender (),
492+ lastSystemConfig .gasLimit (),
443493 ((SystemConfigUpdate .Fees ) configUpdate ).getFeeOverhead (),
444494 ((SystemConfigUpdate .Fees ) configUpdate ).getFeeScalar (),
445- this . systemConfig .unsafeBlockSigner ());
495+ lastSystemConfig .unsafeBlockSigner ());
446496 }
447497 } else if (configUpdate instanceof SystemConfigUpdate .GasLimit ) {
448498 updateSystemConfig = new Config .SystemConfig (
449- this . systemConfig .batchSender (),
499+ lastSystemConfig .batchSender (),
450500 ((SystemConfigUpdate .GasLimit ) configUpdate ).getGas (),
451- this . systemConfig .l1FeeOverhead (),
452- this . systemConfig .l1FeeScalar (),
453- this . systemConfig .unsafeBlockSigner ());
501+ lastSystemConfig .l1FeeOverhead (),
502+ lastSystemConfig .l1FeeScalar (),
503+ lastSystemConfig .unsafeBlockSigner ());
454504 } else if (configUpdate instanceof SystemConfigUpdate .UnsafeBlockSigner ) {
455505 updateSystemConfig = new Config .SystemConfig (
456- this . systemConfig .batchSender (),
457- this . systemConfig .gasLimit (),
458- this . systemConfig .l1FeeOverhead (),
459- this . systemConfig .l1FeeScalar (),
506+ lastSystemConfig .batchSender (),
507+ lastSystemConfig .gasLimit (),
508+ lastSystemConfig .l1FeeOverhead (),
509+ lastSystemConfig .l1FeeScalar (),
460510 ((SystemConfigUpdate .UnsafeBlockSigner ) configUpdate ).getAddress ());
461511 }
462512 return updateSystemConfig ;
@@ -601,6 +651,12 @@ protected void shutDown() {
601651 if (!this .l1HeadListener .isDisposed ()) {
602652 this .l1HeadListener .dispose ();
603653 }
654+ if (this .wsProvider != null ) {
655+ this .wsProvider .shutdown ();
656+ }
657+ if (this .scheduledExecutorService != null ) {
658+ this .scheduledExecutorService .shutdown ();
659+ }
604660 }
605661
606662 @ Override
0 commit comments