Skip to content

Commit fc6913b

Browse files
authored
[To dev/1.3] Pipe: Fix the stuck state caused by unfair lock in Sink start phase (#16100) (#16106)
* Pipe: Fix the stuck state caused by unfair lock in Sink start phase * fix (cherry picked from commit 2df3c45)
1 parent 6c27518 commit fc6913b

4 files changed

Lines changed: 48 additions & 43 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
228228
// Try to remove the events as much as possible
229229
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
230230

231-
highPriorityLockTaskCount.incrementAndGet();
232231
try {
233-
synchronized (highPriorityLockTaskCount) {
234-
highPriorityLockTaskCount.notifyAll();
235-
}
232+
increaseHighPriorityTaskCount();
236233

237234
// synchronized to use the lastEvent & lastExceptionEvent
238235
synchronized (this) {
@@ -271,7 +268,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
271268
}
272269
}
273270
} finally {
274-
highPriorityLockTaskCount.decrementAndGet();
271+
decreaseHighPriorityTaskCount();
275272
}
276273

277274
if (outputPipeConnector instanceof IoTDBSink) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,12 @@ public synchronized void start() {
123123
}
124124

125125
if (runningTaskCount == 0) {
126-
executor.start(subtask.getTaskID());
126+
try {
127+
subtask.increaseHighPriorityTaskCount();
128+
executor.start(subtask.getTaskID());
129+
} finally {
130+
subtask.decreaseHighPriorityTaskCount();
131+
}
127132
}
128133

129134
runningTaskCount++;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,8 @@ private boolean onPipeConnectionException(final Throwable throwable) {
170170
MAX_RETRY_TIMES,
171171
e);
172172
try {
173-
synchronized (highPriorityLockTaskCount) {
174-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
175-
// no deadlock.
176-
if (highPriorityLockTaskCount.get() == 0) {
177-
highPriorityLockTaskCount.wait(
178-
retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
179-
}
180-
}
173+
sleepIfNoHighPriorityTask(
174+
retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
181175
} catch (final InterruptedException interruptedException) {
182176
LOGGER.info(
183177
"Interrupted while sleeping, will retry to handshake with the target system.",
@@ -254,17 +248,4 @@ protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() {
254248
lastExceptionEvent = null;
255249
}
256250
}
257-
258-
private void preScheduleLowPriorityTask(int maxRetries) {
259-
while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
260-
try {
261-
// Introduce a short delay to avoid CPU spinning
262-
Thread.sleep(10);
263-
} catch (InterruptedException e) {
264-
Thread.currentThread().interrupt();
265-
LOGGER.warn("Interrupted while waiting for the high priority lock task.", e);
266-
break;
267-
}
268-
}
269-
}
270251
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,8 @@ private void onEnrichedEventFailure(final Throwable throwable) {
9090
throwable.getMessage(),
9191
throwable);
9292
try {
93-
synchronized (highPriorityLockTaskCount) {
94-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
95-
// no deadlock.
96-
if (highPriorityLockTaskCount.get() == 0) {
97-
highPriorityLockTaskCount.wait(
98-
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
99-
}
100-
}
93+
sleepIfNoHighPriorityTask(
94+
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
10195
} catch (final InterruptedException e) {
10296
LOGGER.warn(
10397
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
@@ -164,14 +158,8 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
164158
throwable.getMessage(),
165159
throwable);
166160
try {
167-
synchronized (highPriorityLockTaskCount) {
168-
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
169-
// no deadlock.
170-
if (highPriorityLockTaskCount.get() == 0) {
171-
highPriorityLockTaskCount.wait(
172-
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
173-
}
174-
}
161+
sleepIfNoHighPriorityTask(
162+
retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
175163
} catch (final InterruptedException e) {
176164
LOGGER.warn(
177165
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
@@ -183,4 +171,38 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
183171

184172
submitSelf();
185173
}
174+
175+
protected void preScheduleLowPriorityTask(int maxRetries) {
176+
while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
177+
try {
178+
// Introduce a short delay to avoid CPU spinning
179+
Thread.sleep(10);
180+
} catch (InterruptedException e) {
181+
Thread.currentThread().interrupt();
182+
LOGGER.warn("Interrupted while waiting for the high priority lock task.", e);
183+
break;
184+
}
185+
}
186+
}
187+
188+
protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException {
189+
synchronized (highPriorityLockTaskCount) {
190+
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
191+
// no deadlock.
192+
if (highPriorityLockTaskCount.get() > 0) {
193+
highPriorityLockTaskCount.wait(sleepMillis);
194+
}
195+
}
196+
}
197+
198+
public void increaseHighPriorityTaskCount() {
199+
highPriorityLockTaskCount.incrementAndGet();
200+
synchronized (highPriorityLockTaskCount) {
201+
highPriorityLockTaskCount.notifyAll();
202+
}
203+
}
204+
205+
public void decreaseHighPriorityTaskCount() {
206+
highPriorityLockTaskCount.decrementAndGet();
207+
}
186208
}

0 commit comments

Comments
 (0)