Skip to content

Commit 6680d6f

Browse files
authored
Pipe: Fixed the bug that Disruptor may not clear the reference & will wait long time after pipe close (#17549)
* fix * fix
1 parent 554ac26 commit 6680d6f

6 files changed

Lines changed: 257 additions & 28 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void invalidateCache() {
256256
matcher.invalidateCache();
257257
}
258258

259-
public boolean notMoreExtractorNeededToBeAssigned() {
259+
public boolean notMoreSourceNeededToBeAssigned() {
260260
return matcher.getRegisterCount() == 0;
261261
}
262262

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public void halt() {
6969

7070
@Override
7171
public void run() {
72-
T event = null;
7372
long nextSequence = sequence.get() + 1L;
7473

7574
while (running) {
@@ -78,29 +77,59 @@ public void run() {
7877
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
7978

8079
// Batch process all available events
81-
while (nextSequence <= availableSequence) {
82-
event = ringBuffer.get(nextSequence);
83-
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
84-
nextSequence++;
85-
}
86-
87-
// Update sequence
88-
sequence.set(availableSequence);
80+
nextSequence = processAvailableEvents(nextSequence, availableSequence);
8981

9082
} catch (final InterruptedException ex) {
91-
Thread.currentThread().interrupt();
92-
LOGGER.info("Processor interrupted");
83+
if (running) {
84+
Thread.currentThread().interrupt();
85+
LOGGER.info("Processor interrupted");
86+
}
9387
break;
9488
} catch (final Throwable ex) {
95-
exceptionHandler.handleEventException(ex, nextSequence, event);
89+
exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence));
9690
sequence.set(nextSequence);
9791
nextSequence++;
9892
}
9993
}
10094

95+
if (!running) {
96+
drainRemainingPublishedEvents(nextSequence);
97+
}
10198
LOGGER.info("Processor stopped");
10299
}
103100

101+
private long processAvailableEvents(long nextSequence, long availableSequence) throws Throwable {
102+
while (nextSequence <= availableSequence) {
103+
final T event = ringBuffer.get(nextSequence);
104+
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
105+
nextSequence++;
106+
}
107+
108+
sequence.set(availableSequence);
109+
return nextSequence;
110+
}
111+
112+
private void drainRemainingPublishedEvents(long nextSequence) {
113+
final long availableSequence = sequenceBarrier.getCursor();
114+
if (availableSequence < nextSequence) {
115+
return;
116+
}
117+
118+
final long highestPublishedSequence =
119+
sequenceBarrier.getHighestPublishedSequence(nextSequence, availableSequence);
120+
while (nextSequence <= highestPublishedSequence) {
121+
final T event = ringBuffer.get(nextSequence);
122+
try {
123+
eventHandler.onEvent(event, nextSequence, nextSequence == highestPublishedSequence);
124+
} catch (final Throwable ex) {
125+
exceptionHandler.handleEventException(ex, nextSequence, event);
126+
} finally {
127+
sequence.set(nextSequence);
128+
}
129+
nextSequence++;
130+
}
131+
}
132+
104133
private static class DefaultExceptionHandler<T> implements ExceptionHandler<T> {
105134
@Override
106135
public void handleEventException(Throwable ex, long sequence, T event) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,15 @@ public void shutdown() {
122122

123123
if (processorThread != null) {
124124
try {
125+
processorThread.interrupt();
125126
processorThread.join(5000);
126127
} catch (InterruptedException e) {
127128
Thread.currentThread().interrupt();
128129
LOGGER.warn("Interrupted waiting for processor to stop");
129130
}
131+
if (processorThread.isAlive()) {
132+
LOGGER.warn("Timed out waiting for processor to stop");
133+
}
130134
}
131135

132136
started = false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,8 @@ public long waitFor(long sequence) throws InterruptedException {
7575
public long getCursor() {
7676
return sequencer.getCursor().get();
7777
}
78+
79+
public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
80+
return sequencer.getHighestPublishedSequence(lowerBound, availableSequence);
81+
}
7882
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,34 @@ public synchronized void startListenAndAssign(
7272

7373
public synchronized void stopListenAndAssign(
7474
final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
75-
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
76-
if (assigner == null) {
77-
return;
78-
}
75+
PipeDataRegionAssigner assignerToClose = null;
7976

80-
assigner.stopAssignTo(extractor);
77+
synchronized (this) {
78+
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
79+
if (assigner == null) {
80+
return;
81+
}
8182

82-
if (extractor.isNeedListenToTsFile()) {
83-
listenToTsFileExtractorCount.decrementAndGet();
84-
}
85-
if (extractor.isNeedListenToInsertNode()) {
86-
listenToInsertNodeExtractorCount.decrementAndGet();
83+
assigner.stopAssignTo(extractor);
84+
85+
if (extractor.isNeedListenToTsFile()) {
86+
listenToTsFileExtractorCount.decrementAndGet();
87+
}
88+
if (extractor.isNeedListenToInsertNode()) {
89+
listenToInsertNodeExtractorCount.decrementAndGet();
90+
}
91+
92+
if (assigner.notMoreSourceNeededToBeAssigned()) {
93+
// The removed assigner will is the same as the one referenced by the variable `assigner`
94+
dataRegionId2Assigner.remove(dataRegionId);
95+
// This will help to release the memory occupied by the assigner
96+
assignerToClose = assigner;
97+
}
8798
}
8899

89-
if (assigner.notMoreExtractorNeededToBeAssigned()) {
90-
// The removed assigner will is the same as the one referenced by the variable `assigner`
91-
dataRegionId2Assigner.remove(dataRegionId);
92-
// This will help to release the memory occupied by the assigner
93-
assigner.close();
100+
if (assignerToClose != null) {
101+
// Closing the disruptor may block for a while, so keep it out of the global listener lock.
102+
assignerToClose.close();
94103
}
95104
}
96105

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
31+
public class DisruptorShutdownTest {
32+
33+
@Test
34+
public void testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws Exception {
35+
final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32);
36+
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);
37+
38+
final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
39+
final AtomicInteger handledEventCount = new AtomicInteger();
40+
final BatchEventProcessor<TestEvent> processor =
41+
new BatchEventProcessor<>(
42+
ringBuffer,
43+
barrier,
44+
(event, sequence, endOfBatch) -> handledEventCount.incrementAndGet());
45+
46+
final Thread processorThread = new Thread(processor, "pipe-batch-event-processor-test");
47+
processorThread.start();
48+
49+
Assert.assertTrue(barrier.awaitWaitForCall());
50+
processor.halt();
51+
barrier.interruptWait();
52+
53+
processorThread.join(TimeUnit.SECONDS.toMillis(5));
54+
55+
Assert.assertFalse(processorThread.isAlive());
56+
Assert.assertEquals(1, handledEventCount.get());
57+
Assert.assertEquals(0L, processor.getSequence().get());
58+
}
59+
60+
@Test
61+
public void testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
62+
throws Exception {
63+
final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 32);
64+
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);
65+
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 2);
66+
67+
final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L, 1L);
68+
final AtomicInteger handledEventCount = new AtomicInteger();
69+
final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
70+
new AtomicReference<>();
71+
final BatchEventProcessor<TestEvent> processor =
72+
new BatchEventProcessor<>(
73+
ringBuffer,
74+
barrier,
75+
(event, sequence, endOfBatch) -> {
76+
handledEventCount.incrementAndGet();
77+
if (event.value == 1) {
78+
processorReference.get().halt();
79+
}
80+
});
81+
processorReference.set(processor);
82+
83+
final Thread processorThread =
84+
new Thread(processor, "pipe-batch-event-processor-snapshot-test");
85+
processorThread.start();
86+
processorThread.join(TimeUnit.SECONDS.toMillis(5));
87+
88+
Assert.assertFalse(processorThread.isAlive());
89+
Assert.assertEquals(2, handledEventCount.get());
90+
Assert.assertEquals(1L, processor.getSequence().get());
91+
}
92+
93+
@Test
94+
public void testDisruptorShutdownInterruptsWaitingProcessor() throws Exception {
95+
final AtomicReference<Thread> processorThreadReference = new AtomicReference<>();
96+
final ThreadFactory threadFactory =
97+
runnable -> {
98+
final Thread thread = new Thread(runnable, "pipe-disruptor-shutdown-test");
99+
processorThreadReference.set(thread);
100+
return thread;
101+
};
102+
103+
final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory);
104+
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
105+
disruptor.start();
106+
107+
final Thread processorThread = processorThreadReference.get();
108+
Assert.assertNotNull(processorThread);
109+
110+
TimeUnit.MILLISECONDS.sleep(50);
111+
disruptor.shutdown();
112+
113+
Assert.assertFalse(processorThread.isAlive());
114+
}
115+
116+
private static class TestEvent {
117+
private int value;
118+
}
119+
120+
private static class TestSequenceBarrier extends SequenceBarrier {
121+
122+
private final long cursor;
123+
private final CountDownLatch waitForCalled = new CountDownLatch(1);
124+
private final CountDownLatch interruptWait = new CountDownLatch(1);
125+
126+
private TestSequenceBarrier(final long cursor) {
127+
super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
128+
this.cursor = cursor;
129+
}
130+
131+
@Override
132+
public long waitFor(final long sequence) throws InterruptedException {
133+
waitForCalled.countDown();
134+
interruptWait.await();
135+
throw new InterruptedException();
136+
}
137+
138+
@Override
139+
public long getCursor() {
140+
return cursor;
141+
}
142+
143+
@Override
144+
public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) {
145+
return availableSequence;
146+
}
147+
148+
private boolean awaitWaitForCall() throws InterruptedException {
149+
return waitForCalled.await(5, TimeUnit.SECONDS);
150+
}
151+
152+
private void interruptWait() {
153+
interruptWait.countDown();
154+
}
155+
}
156+
157+
private static class SnapshotSequenceBarrier extends SequenceBarrier {
158+
159+
private final long waitForResult;
160+
private final long cursor;
161+
162+
private SnapshotSequenceBarrier(final long waitForResult, final long cursor) {
163+
super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
164+
this.waitForResult = waitForResult;
165+
this.cursor = cursor;
166+
}
167+
168+
@Override
169+
public long waitFor(final long sequence) {
170+
return waitForResult;
171+
}
172+
173+
@Override
174+
public long getCursor() {
175+
return cursor;
176+
}
177+
178+
@Override
179+
public long getHighestPublishedSequence(final long lowerBound, final long availableSequence) {
180+
return availableSequence;
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)