|
18 | 18 | import io.optimism.utilities.rpc.Web3jProvider; |
19 | 19 | import io.optimism.utilities.telemetry.Logging; |
20 | 20 | import io.optimism.utilities.telemetry.TracerTaskWrapper; |
21 | | -import io.reactivex.BackpressureStrategy; |
22 | | -import io.reactivex.Flowable; |
23 | 21 | import io.reactivex.disposables.Disposable; |
24 | 22 | import java.math.BigInteger; |
25 | 23 | import java.time.Duration; |
26 | 24 | import java.util.*; |
27 | 25 | import java.util.concurrent.ExecutionException; |
28 | | -import java.util.concurrent.ScheduledThreadPoolExecutor; |
29 | 26 | import java.util.concurrent.StructuredTaskScope; |
30 | | -import java.util.concurrent.TimeUnit; |
31 | 27 | import java.util.stream.Collectors; |
32 | 28 | import org.apache.commons.collections4.CollectionUtils; |
33 | 29 | import org.apache.commons.lang3.StringUtils; |
@@ -159,8 +155,6 @@ public class InnerWatcher extends AbstractExecutionThreadService { |
159 | 155 |
|
160 | 156 | private boolean devnet = false; |
161 | 157 |
|
162 | | - private ScheduledThreadPoolExecutor scheduledExecutorService; |
163 | | - |
164 | 158 | /** |
165 | 159 | * create a InnerWatcher instance. |
166 | 160 | * |
@@ -219,51 +213,23 @@ private void getMetadataFromL2(BigInteger l2StartBlock) { |
219 | 213 | } |
220 | 214 | } |
221 | 215 |
|
222 | | - private Disposable subscribeL1NewHeads() { |
223 | | - if (this.wsProvider != null) { |
224 | | - this.l1HeadListener = this.wsProvider |
225 | | - .newHeadsNotifications() |
226 | | - .subscribe( |
227 | | - notification -> { |
228 | | - NewHead header = notification.getParams().getResult(); |
229 | | - String hash = header.getHash(); |
230 | | - BigInteger number = Numeric.toBigInt(header.getNumber()); |
231 | | - String parentHash = header.getParentHash(); |
232 | | - BigInteger time = Numeric.toBigInt(header.getTimestamp()); |
233 | | - l1Head = new BlockInfo(hash, number, parentHash, time); |
234 | | - }, |
235 | | - t -> { |
236 | | - if (t instanceof WebsocketNotConnectedException) { |
237 | | - this.subscribeL1NewHeads(); |
238 | | - } |
239 | | - }); |
240 | | - } else { |
241 | | - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1); |
242 | | - this.l1HeadListener = Flowable.create( |
243 | | - (subscriber) -> { |
244 | | - this.scheduledExecutorService.scheduleAtFixedRate( |
245 | | - () -> { |
246 | | - EthBlock.Block block = null; |
247 | | - try { |
248 | | - block = pollBlock( |
249 | | - this.provider, DefaultBlockParameterName.LATEST, false); |
250 | | - } catch (ExecutionException | InterruptedException e) { |
251 | | - LOGGER.warn("error while fetching L1 data for block", e); |
252 | | - } |
253 | | - subscriber.onNext(block); |
254 | | - }, |
255 | | - 0, |
256 | | - 12, |
257 | | - TimeUnit.SECONDS); |
258 | | - }, |
259 | | - BackpressureStrategy.BUFFER) |
260 | | - .subscribe(notification -> { |
261 | | - EthBlock.Block block = (EthBlock.Block) notification; |
262 | | - l1Head = BlockInfo.from(block); |
263 | | - }); |
264 | | - } |
265 | | - |
266 | | - return this.l1HeadListener; |
| 216 | + private void subscribeL1NewHeads() { |
| 217 | + this.l1HeadListener = this.wsProvider |
| 218 | + .newHeadsNotifications() |
| 219 | + .subscribe( |
| 220 | + notification -> { |
| 221 | + NewHead header = notification.getParams().getResult(); |
| 222 | + String hash = header.getHash(); |
| 223 | + BigInteger number = Numeric.toBigInt(header.getNumber()); |
| 224 | + String parentHash = header.getParentHash(); |
| 225 | + BigInteger time = Numeric.toBigInt(header.getTimestamp()); |
| 226 | + l1Head = new BlockInfo(hash, number, parentHash, time); |
| 227 | + }, |
| 228 | + t -> { |
| 229 | + if (t instanceof WebsocketNotConnectedException) { |
| 230 | + this.subscribeL1NewHeads(); |
| 231 | + } |
| 232 | + }); |
267 | 233 | } |
268 | 234 |
|
269 | 235 | /** |
@@ -654,9 +620,6 @@ protected void shutDown() { |
654 | 620 | if (this.wsProvider != null) { |
655 | 621 | this.wsProvider.shutdown(); |
656 | 622 | } |
657 | | - if (this.scheduledExecutorService != null) { |
658 | | - this.scheduledExecutorService.shutdown(); |
659 | | - } |
660 | 623 | } |
661 | 624 |
|
662 | 625 | @Override |
|
0 commit comments