Skip to content

Commit b1dd7ea

Browse files
committed
fix
1 parent 3f4ddd2 commit b1dd7ea

6 files changed

Lines changed: 9 additions & 12 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ public void discardEventsOfPipe(final CommitterKey committerKey) {
364364
super.discardEventsOfPipe(committerKey);
365365
tsfileInsertEventDeque.removeIf(
366366
event -> {
367-
if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) {
367+
if (event instanceof EnrichedEvent
368+
&& isEventFromPipe((EnrichedEvent) event, committerKey)) {
368369
if (((EnrichedEvent) event)
369370
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
370371
eventCounter.decreaseEventCount(event);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,7 @@ private static boolean isEventFromPipe(
257257
return committerKey.getPipeName().equals(event.getPipeName())
258258
&& committerKey.getCreationTime() == event.getCreationTime()
259259
&& committerKey.getRegionId() == event.getRegionId()
260-
&& (committerKey.getRestartTimes() < 0
261-
|| committerKey.equals(event.getCommitterKey()));
260+
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
262261
}
263262

264263
//////////////////////////// APIs provided for metric framework ////////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ private static boolean isEventFromPipe(
177177
return committerKey.getPipeName().equals(event.getPipeName())
178178
&& committerKey.getCreationTime() == event.getCreationTime()
179179
&& committerKey.getRegionId() == event.getRegionId()
180-
&& (committerKey.getRestartTimes() < 0
181-
|| committerKey.equals(event.getCommitterKey()));
180+
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
182181
}
183182

184183
public synchronized void decreaseEventsReferenceCount(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,8 +880,7 @@ private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterK
880880
return committerKey.getPipeName().equals(event.getPipeName())
881881
&& committerKey.getCreationTime() == event.getCreationTime()
882882
&& committerKey.getRegionId() == event.getRegionId()
883-
&& (committerKey.getRestartTimes() < 0
884-
|| committerKey.equals(event.getCommitterKey()));
883+
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
885884
}
886885

887886
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,7 @@ private static boolean isEventFromPipe(
529529
return committerKey.getPipeName().equals(event.getPipeName())
530530
&& committerKey.getCreationTime() == event.getCreationTime()
531531
&& committerKey.getRegionId() == event.getRegionId()
532-
&& (committerKey.getRestartTimes() < 0
533-
|| committerKey.equals(event.getCommitterKey()));
532+
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
534533
}
535534

536535
private boolean isQueueAvailable(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public void discardEventsOfPipe(final CommitterKey committerKey) {
144144
droppedPipeTaskKeys.add(committerKey);
145145
pendingQueue.removeIf(
146146
event -> {
147-
if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) {
147+
if (event instanceof EnrichedEvent
148+
&& isEventFromPipe((EnrichedEvent) event, committerKey)) {
148149
if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
149150
eventCounter.decreaseEventCount(event);
150151
}
@@ -197,8 +198,7 @@ protected static boolean isEventFromPipe(
197198
return committerKey.getPipeName().equals(event.getPipeName())
198199
&& committerKey.getCreationTime() == event.getCreationTime()
199200
&& committerKey.getRegionId() == event.getRegionId()
200-
&& (committerKey.getRestartTimes() < 0
201-
|| committerKey.equals(event.getCommitterKey()));
201+
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
202202
}
203203

204204
protected boolean isEventFromDroppedPipe(final E event) {

0 commit comments

Comments
 (0)