@@ -99,7 +99,7 @@ public void recover(boolean concurrently) {
9999 } else {
100100 for (ConcurrentMap <Integer , ConsumeQueueInterface > maps : this .consumeQueueTable .values ()) {
101101 for (ConsumeQueueInterface logic : maps .values ()) {
102- this .recover (logic );
102+ logic .recover ();
103103 }
104104 }
105105 }
@@ -111,7 +111,7 @@ public void recover(boolean concurrently) {
111111 * from it.
112112 */
113113 @ Override
114- public Long getDispatchFromPhyOffset (boolean recoverNormally ) throws RocksDBException {
114+ public Long getDispatchFromPhyOffset (boolean recoverNormally ) {
115115 if (recoverNormally ) {
116116 return getMaxPhyOffsetInConsumeQueue ();
117117 } else {
@@ -127,7 +127,7 @@ public Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBExce
127127 public boolean recoverConcurrently () {
128128 int count = 0 ;
129129 for (ConcurrentMap <Integer , ConsumeQueueInterface > maps : this .consumeQueueTable .values ()) {
130- count += maps .values (). size ();
130+ count += maps .size ();
131131 }
132132 final CountDownLatch countDownLatch = new CountDownLatch (count );
133133 BlockingQueue <Runnable > recoverQueue = new LinkedBlockingQueue <>();
@@ -206,15 +206,6 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
206206 return 0 ;
207207 }
208208
209- private FileQueueLifeCycle getLifeCycle (String topic , int queueId ) {
210- return findOrCreateConsumeQueue (topic , queueId );
211- }
212-
213- public boolean load (ConsumeQueueInterface consumeQueue ) {
214- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
215- return fileQueueLifeCycle .load ();
216- }
217-
218209 private boolean loadConsumeQueues (String storePath , CQType cqType ) {
219210 File dirLogic = new File (storePath );
220211 File [] fileTopicList = dirLogic .listFiles ();
@@ -237,7 +228,7 @@ private boolean loadConsumeQueues(String storePath, CQType cqType) {
237228
238229 ConsumeQueueInterface logic = createConsumeQueueByType (cqType , topic , queueId , storePath );
239230 this .putConsumeQueue (topic , queueId , logic );
240- if (!this .load (logic )) {
231+ if (!logic .load ()) {
241232 return false ;
242233 }
243234 }
@@ -291,11 +282,6 @@ private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQue
291282 new ThreadFactoryImpl (threadNamePrefix ));
292283 }
293284
294- public void recover (ConsumeQueueInterface consumeQueue ) {
295- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
296- fileQueueLifeCycle .recover ();
297- }
298-
299285 @ Override
300286 public long getMaxPhyOffsetInConsumeQueue () {
301287 long maxPhysicOffset = -1L ;
@@ -319,71 +305,110 @@ public long getMinOffsetInQueue(String topic, int queueId) {
319305 return -1 ;
320306 }
321307
322- public void checkSelf (ConsumeQueueInterface consumeQueue ) {
323- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
324- fileQueueLifeCycle .checkSelf ();
325- }
326-
327308 @ Override
328309 public void checkSelf () {
329310 for (Map .Entry <String , ConcurrentMap <Integer , ConsumeQueueInterface >> topicEntry : this .consumeQueueTable .entrySet ()) {
330311 for (Map .Entry <Integer , ConsumeQueueInterface > cqEntry : topicEntry .getValue ().entrySet ()) {
331- this . checkSelf ( cqEntry .getValue ());
312+ cqEntry .getValue (). checkSelf ( );
332313 }
333314 }
334315 }
335316
336- public boolean flush (ConsumeQueueInterface consumeQueue , int flushLeastPages ) {
337- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
338- return fileQueueLifeCycle .flush (flushLeastPages );
339- }
340-
341317 public void flush () throws StoreException {
342318 for (Map .Entry <String , ConcurrentMap <Integer , ConsumeQueueInterface >> topicEntry : this .consumeQueueTable .entrySet ()) {
343319 for (Map .Entry <Integer , ConsumeQueueInterface > cqEntry : topicEntry .getValue ().entrySet ()) {
344- flush ( cqEntry .getValue (), 0 );
320+ cqEntry .getValue (). flush ( 0 );
345321 }
346322 }
347323 }
348324
349325 @ Override
350326 public void destroy (ConsumeQueueInterface consumeQueue ) {
351- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
352- fileQueueLifeCycle .destroy ();
327+ consumeQueue .destroy ();
353328 if (MixAll .isLmq (consumeQueue .getTopic ())) {
354329 lmqCounter .decrementAndGet ();
355330 }
356331 }
357332
333+ /**
334+ * @deprecated Use {@link ConsumeQueueInterface#load()} directly instead.
335+ */
336+ @ Deprecated
337+ public boolean load (ConsumeQueueInterface consumeQueue ) {
338+ return consumeQueue .load ();
339+ }
340+
341+ /**
342+ * @deprecated Use {@link ConsumeQueueInterface#recover()} directly instead.
343+ */
344+ @ Deprecated
345+ public void recover (ConsumeQueueInterface consumeQueue ) {
346+ consumeQueue .recover ();
347+ }
348+
349+ /**
350+ * @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly instead.
351+ */
352+ @ Deprecated
353+ public void checkSelf (ConsumeQueueInterface consumeQueue ) {
354+ consumeQueue .checkSelf ();
355+ }
356+
357+ /**
358+ * @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly instead.
359+ */
360+ @ Deprecated
361+ public boolean flush (ConsumeQueueInterface consumeQueue , int flushLeastPages ) {
362+ return consumeQueue .flush (flushLeastPages );
363+ }
364+
365+ /**
366+ * @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)} directly instead.
367+ */
368+ @ Deprecated
358369 public int deleteExpiredFile (ConsumeQueueInterface consumeQueue , long minCommitLogPos ) {
359- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
360- return fileQueueLifeCycle .deleteExpiredFile (minCommitLogPos );
370+ return consumeQueue .deleteExpiredFile (minCommitLogPos );
361371 }
362372
373+ /**
374+ * @deprecated Use {@link ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead.
375+ */
376+ @ Deprecated
363377 public void truncateDirtyLogicFiles (ConsumeQueueInterface consumeQueue , long phyOffset ) {
364- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
365- fileQueueLifeCycle .truncateDirtyLogicFiles (phyOffset );
378+ consumeQueue .truncateDirtyLogicFiles (phyOffset );
366379 }
367380
381+ /**
382+ * @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)} directly instead.
383+ */
384+ @ Deprecated
368385 public void swapMap (ConsumeQueueInterface consumeQueue , int reserveNum , long forceSwapIntervalMs ,
369386 long normalSwapIntervalMs ) {
370- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
371- fileQueueLifeCycle .swapMap (reserveNum , forceSwapIntervalMs , normalSwapIntervalMs );
387+ consumeQueue .swapMap (reserveNum , forceSwapIntervalMs , normalSwapIntervalMs );
372388 }
373389
390+ /**
391+ * @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)} directly instead.
392+ */
393+ @ Deprecated
374394 public void cleanSwappedMap (ConsumeQueueInterface consumeQueue , long forceCleanSwapIntervalMs ) {
375- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
376- fileQueueLifeCycle .cleanSwappedMap (forceCleanSwapIntervalMs );
395+ consumeQueue .cleanSwappedMap (forceCleanSwapIntervalMs );
377396 }
378397
398+ /**
399+ * @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()} directly instead.
400+ */
401+ @ Deprecated
379402 public boolean isFirstFileAvailable (ConsumeQueueInterface consumeQueue ) {
380- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
381- return fileQueueLifeCycle .isFirstFileAvailable ();
403+ return consumeQueue .isFirstFileAvailable ();
382404 }
383405
406+ /**
407+ * @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()} directly instead.
408+ */
409+ @ Deprecated
384410 public boolean isFirstFileExist (ConsumeQueueInterface consumeQueue ) {
385- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle (consumeQueue .getTopic (), consumeQueue .getQueueId ());
386- return fileQueueLifeCycle .isFirstFileExist ();
411+ return consumeQueue .isFirstFileExist ();
387412 }
388413
389414 @ Override
@@ -604,7 +629,7 @@ public void truncateDirty(long offsetToTruncate) {
604629 log .warn ("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files" , maxPhyOffsetOfConsumeQueue , offsetToTruncate );
605630 for (ConcurrentMap <Integer , ConsumeQueueInterface > maps : this .consumeQueueTable .values ()) {
606631 for (ConsumeQueueInterface logic : maps .values ()) {
607- this .truncateDirtyLogicFiles (logic , offsetToTruncate );
632+ logic .truncateDirtyLogicFiles (offsetToTruncate );
608633 }
609634 }
610635 }
@@ -667,7 +692,7 @@ private void doFlush(int retryTimes) {
667692 for (ConsumeQueueInterface cq : maps .values ()) {
668693 boolean result = false ;
669694 for (int i = 0 ; i < retryTimes && !result ; i ++) {
670- result = flush (cq , flushConsumeQueueLeastPages );
695+ result = cq . flush (flushConsumeQueueLeastPages );
671696 }
672697 }
673698 }
@@ -736,7 +761,7 @@ private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long
736761 return false ;
737762 }
738763 // If first exist and not available, it means first file may destroy failed, delete it.
739- if (isFirstFileExist (logic ) && !isFirstFileAvailable (logic )) {
764+ if (logic . isFirstFileExist () && !logic . isFirstFileAvailable ()) {
740765 log .error ("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." +
741766 " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " +
742767 "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}"
@@ -821,7 +846,7 @@ private void correctLogicMinOffset() {
821846 }
822847
823848 private void doCorrect (ConsumeQueueInterface logic , long minPhyOffset ) {
824- deleteExpiredFile (logic , minPhyOffset );
849+ logic . deleteExpiredFile (minPhyOffset );
825850 int sleepIntervalWhenCorrectMinOffset = messageStoreConfig .getCorrectLogicMinOffsetSleepInterval ();
826851 if (sleepIntervalWhenCorrectMinOffset > 0 ) {
827852 try {
@@ -859,7 +884,7 @@ protected void deleteExpiredFiles() {
859884
860885 for (ConcurrentMap <Integer , ConsumeQueueInterface > maps : consumeQueueTable .values ()) {
861886 for (ConsumeQueueInterface logic : maps .values ()) {
862- int deleteCount = deleteExpiredFile (logic , minOffset );
887+ int deleteCount = logic . deleteExpiredFile (minOffset );
863888 if (deleteCount > 0 && deleteLogicsFilesInterval > 0 ) {
864889 try {
865890 Thread .sleep (deleteLogicsFilesInterval );
0 commit comments