Skip to content

Commit 3f4ddd2

Browse files
committed
Fix
1 parent 0d1b838 commit 3f4ddd2

15 files changed

Lines changed: 161 additions & 93 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private void collectEvent(final Event event) {
238238
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
239239

240240
if (enrichedEvent.getPipeName() != null
241-
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
241+
&& pendingQueue.isEventFromDroppedPipe(enrichedEvent)) {
242242
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
243243
return;
244244
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,15 @@ public void discardAllEvents() {
356356
@Override
357357
public void discardEventsOfPipe(
358358
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
359-
super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
359+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
360+
}
361+
362+
@Override
363+
public void discardEventsOfPipe(final CommitterKey committerKey) {
364+
super.discardEventsOfPipe(committerKey);
360365
tsfileInsertEventDeque.removeIf(
361366
event -> {
362-
if (event instanceof EnrichedEvent
363-
&& isEventFromPipe(
364-
((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) {
367+
if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) {
365368
if (((EnrichedEvent) event)
366369
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
367370
eventCounter.decreaseEventCount(event);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2323
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
24+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2425
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
2526
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2627
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,10 +202,9 @@ public void close() {
201202
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
202203
* its queued events in the output pipe connector.
203204
*/
204-
public void discardEventsOfPipe(
205-
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
205+
public void discardEventsOfPipe(final CommitterKey committerKey) {
206206
// Try to remove the events as much as possible
207-
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
207+
inputPendingQueue.discardEventsOfPipe(committerKey);
208208

209209
try {
210210
increaseHighPriorityTaskCount();
@@ -217,9 +217,7 @@ public void discardEventsOfPipe(
217217
// use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we
218218
// will.
219219
if (lastEvent instanceof EnrichedEvent
220-
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
221-
&& creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime()
222-
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
220+
&& isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
223221
// Do not clear the last event's reference counts because it may be on transferring
224222
lastEvent = null;
225223
// Submit self to avoid that the lastEvent has been retried "max times" times and has
@@ -241,9 +239,7 @@ public void discardEventsOfPipe(
241239
// clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the
242240
// "nonnull" detection.
243241
if (lastExceptionEvent instanceof EnrichedEvent
244-
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
245-
&& creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime()
246-
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
242+
&& isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) {
247243
clearReferenceCountAndReleaseLastExceptionEvent();
248244
}
249245
}
@@ -252,11 +248,19 @@ public void discardEventsOfPipe(
252248
}
253249

254250
if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
255-
((PipeConnectorWithEventDiscard) outputPipeSink)
256-
.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
251+
((PipeConnectorWithEventDiscard) outputPipeSink).discardEventsOfPipe(committerKey);
257252
}
258253
}
259254

255+
private static boolean isEventFromPipe(
256+
final EnrichedEvent event, final CommitterKey committerKey) {
257+
return committerKey.getPipeName().equals(event.getPipeName())
258+
&& committerKey.getCreationTime() == event.getCreationTime()
259+
&& committerKey.getRegionId() == event.getRegionId()
260+
&& (committerKey.getRestartTimes() < 0
261+
|| committerKey.equals(event.getCommitterKey()));
262+
}
263+
260264
//////////////////////////// APIs provided for metric framework ////////////////////////////
261265

262266
public String getAttributeSortedString() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
2121

2222
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
23+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2324
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
2425
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
2526
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -87,19 +88,17 @@ public synchronized void register() {
8788
* Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the
8889
* {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling.
8990
*
90-
* @param pipeNameToDeregister pipe name
91-
* @param regionId region id
91+
* @param committerKey committer key of the pipe task to deregister
9292
* @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the
9393
* {@link PipeSinkSubtask} should never be used again
9494
* @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
9595
*/
96-
public synchronized boolean deregister(
97-
final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) {
96+
public synchronized boolean deregister(final CommitterKey committerKey) {
9897
if (registeredTaskCount <= 0) {
9998
throw new IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1);
10099
}
101100

102-
subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId);
101+
subtask.discardEventsOfPipe(committerKey);
103102

104103
try {
105104
if (registeredTaskCount > 1) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2424
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2525
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
26+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2627
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2728
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2829
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -211,7 +212,10 @@ public synchronized void deregister(
211212
// Shall not be empty
212213
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
213214

214-
lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
215+
final CommitterKey committerKey =
216+
PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId);
217+
218+
lifeCycles.removeIf(o -> o.deregister(committerKey));
215219

216220
if (lifeCycles.isEmpty()) {
217221
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
2121

22+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2223
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2324
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
2425
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -157,18 +158,29 @@ public synchronized void close() {
157158
*/
158159
public synchronized void discardEventsOfPipe(
159160
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
161+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
162+
}
163+
164+
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
160165
events.removeIf(
161166
event -> {
162-
if (pipeNameToDrop.equals(event.getPipeName())
163-
&& creationTimeToDrop == event.getCreationTime()
164-
&& regionId == event.getRegionId()) {
167+
if (isEventFromPipe(event, committerKey)) {
165168
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
166169
return true;
167170
}
168171
return false;
169172
});
170173
}
171174

175+
private static boolean isEventFromPipe(
176+
final EnrichedEvent event, final CommitterKey committerKey) {
177+
return committerKey.getPipeName().equals(event.getPipeName())
178+
&& committerKey.getCreationTime() == event.getCreationTime()
179+
&& committerKey.getRegionId() == event.getRegionId()
180+
&& (committerKey.getRestartTimes() < 0
181+
|| committerKey.equals(event.getCommitterKey()));
182+
}
183+
172184
public synchronized void decreaseEventsReferenceCount(
173185
final String holderMessage, final boolean shouldReport) {
174186
events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2324
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2425
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
2526
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -201,10 +202,12 @@ public boolean isEmpty() {
201202

202203
public synchronized void discardEventsOfPipe(
203204
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
204-
defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
205-
endPointToBatch
206-
.values()
207-
.forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId));
205+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
206+
}
207+
208+
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
209+
defaultBatch.discardEventsOfPipe(committerKey);
210+
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
208211
}
209212

210213
public int size() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.protocol.airgap;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2324
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2425
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
2526
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -613,8 +614,13 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException {
613614
@Override
614615
public synchronized void discardEventsOfPipe(
615616
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
617+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
618+
}
619+
620+
@Override
621+
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
616622
if (Objects.nonNull(tabletBatchBuilder)) {
617-
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
623+
tabletBatchBuilder.discardEventsOfPipe(committerKey);
618624
}
619625
}
620626

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.apache.iotdb.commons.audit.UserEntity;
2424
import org.apache.iotdb.commons.client.ThriftClient;
2525
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
26+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2627
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27-
import org.apache.iotdb.commons.pipe.datastructure.Triple;
2828
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2929
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
3030
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
130130
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
131131
new ConcurrentHashMap<>();
132132

133-
// Pipe name, creation time, region id
134-
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
135-
ConcurrentHashMap.newKeySet();
133+
private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet();
136134

137135
private boolean enableSendTsFileLimit;
138136
private volatile boolean isConnectionException;
@@ -749,16 +747,20 @@ public boolean isEnableSendTsFileLimit() {
749747
@Override
750748
public synchronized void discardEventsOfPipe(
751749
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
752-
droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId));
750+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
751+
}
752+
753+
@Override
754+
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
755+
droppedPipeTaskKeys.add(committerKey);
753756

754757
if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
755-
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
758+
tabletBatchBuilder.discardEventsOfPipe(committerKey);
756759
}
757760
retryEventQueue.removeIf(
758761
event -> {
759762
if (event instanceof EnrichedEvent
760-
&& isDroppedPipe(
761-
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
763+
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
762764
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
763765
retryEventQueueEventCounter.decreaseEventCount(event);
764766
return true;
@@ -769,8 +771,7 @@ && isDroppedPipe(
769771
retryTsFileQueue.removeIf(
770772
event -> {
771773
if (event instanceof EnrichedEvent
772-
&& isDroppedPipe(
773-
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
774+
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
774775
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
775776
retryEventQueueEventCounter.decreaseEventCount(event);
776777
return true;
@@ -872,18 +873,15 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
872873
}
873874

874875
private boolean isDroppedPipe(final EnrichedEvent event) {
875-
return droppedPipeTaskKeys.contains(
876-
new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId()));
877-
}
878-
879-
private static boolean isDroppedPipe(
880-
final EnrichedEvent event,
881-
final String pipeNameToDrop,
882-
final long creationTimeToDrop,
883-
final int regionId) {
884-
return pipeNameToDrop.equals(event.getPipeName())
885-
&& creationTimeToDrop == event.getCreationTime()
886-
&& regionId == event.getRegionId();
876+
return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key));
877+
}
878+
879+
private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) {
880+
return committerKey.getPipeName().equals(event.getPipeName())
881+
&& committerKey.getCreationTime() == event.getCreationTime()
882+
&& committerKey.getRegionId() == event.getRegionId()
883+
&& (committerKey.getRestartTimes() < 0
884+
|| committerKey.equals(event.getCommitterKey()));
887885
}
888886

889887
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2425
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2526
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
2627
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
@@ -604,8 +605,13 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx
604605
@Override
605606
public synchronized void discardEventsOfPipe(
606607
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
608+
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
609+
}
610+
611+
@Override
612+
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
607613
if (Objects.nonNull(tabletBatchBuilder)) {
608-
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
614+
tabletBatchBuilder.discardEventsOfPipe(committerKey);
609615
}
610616
}
611617

0 commit comments

Comments
 (0)