Skip to content

Commit b9c2124

Browse files
[To dev/1.3] Pipe: Fix and improve async tsfile transfer error handling and logging (avoid client connection leak) (#16008) (#16125)
* Pipe: Fix and improve async tsfile transfer error handling and logging Refactored IoTDBDataRegionAsyncConnector to handle exceptions during asynchronous tsfile transfer more gracefully. Now logs warnings instead of errors, invokes onError on the handler, and provides more context in log messages. Added getTsFile() method to PipeTransferTsFileHandler for better logging, and ensured memoryBlock is set to null after closing to prevent potential resource leaks. * fix * fix * bald-logger --------- Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent 7372101 commit b9c2124

3 files changed

Lines changed: 57 additions & 23 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,7 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
409409
}
410410
}
411411

412-
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
413-
throws Exception {
412+
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) {
414413
transferTsFileCounter.incrementAndGet();
415414
CompletableFuture<Void> completableFuture =
416415
CompletableFuture.supplyAsync(
@@ -432,13 +431,20 @@ private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
432431
if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) {
433432
try {
434433
completableFuture.get();
435-
} catch (InterruptedException e) {
436-
Thread.currentThread().interrupt();
437-
LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e);
438-
throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e);
439-
} catch (Exception e) {
440-
LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
441-
throw e;
434+
} catch (final Exception e) {
435+
if (e instanceof InterruptedException) {
436+
Thread.currentThread().interrupt();
437+
LOGGER.warn(
438+
"Transfer tsfile event {} asynchronously was interrupted.",
439+
pipeTransferTsFileHandler.getTsFile(),
440+
e);
441+
}
442+
443+
pipeTransferTsFileHandler.onError(e);
444+
LOGGER.warn(
445+
"Failed to transfer tsfile event {} asynchronously.",
446+
pipeTransferTsFileHandler.getTsFile(),
447+
e);
442448
}
443449
}
444450
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@
2626

2727
import org.apache.thrift.TException;
2828
import org.apache.thrift.async.AsyncMethodCallback;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.Objects;
2933

3034
public abstract class PipeTransferTrackableHandler
3135
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
36+
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
3237

3338
protected final IoTDBDataRegionAsyncSink connector;
39+
protected volatile AsyncPipeDataTransferServiceClient client;
3440

3541
public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink connector) {
3642
this.connector = connector;
@@ -77,13 +83,21 @@ public void onError(final Exception exception) {
7783
protected boolean tryTransfer(
7884
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
7985
throws TException {
86+
if (Objects.isNull(this.client)) {
87+
this.client = client;
88+
}
8089
// track handler before checking if connector is closed
8190
connector.trackHandler(this);
8291
if (connector.isClosed()) {
8392
clearEventsReferenceCount();
8493
connector.eliminateHandler(this);
8594
client.setShouldReturnSelf(true);
86-
client.returnSelf();
95+
try {
96+
client.returnSelf();
97+
} catch (final IllegalStateException e) {
98+
LOGGER.info(
99+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
100+
}
87101
return false;
88102
}
89103
doTransfer(client, req);
@@ -106,6 +120,18 @@ protected abstract void doTransfer(
106120

107121
@Override
108122
public void close() {
109-
// do nothing
123+
if (Objects.isNull(client)) {
124+
return;
125+
}
126+
try {
127+
client.close();
128+
client.invalidateAll();
129+
} catch (final Exception e) {
130+
LOGGER.warn(
131+
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
132+
client,
133+
e.getMessage(),
134+
e);
135+
}
110136
}
111137
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
8888
private final AtomicBoolean isSealSignalSent;
8989

9090
private IoTDBDataNodeAsyncClientManager clientManager;
91-
private volatile AsyncPipeDataTransferServiceClient client;
9291

9392
public PipeTransferTsFileHandler(
9493
final IoTDBDataRegionAsyncSink connector,
@@ -130,6 +129,10 @@ public PipeTransferTsFileHandler(
130129
isSealSignalSent = new AtomicBoolean(false);
131130
}
132131

132+
public File getTsFile() {
133+
return tsFile;
134+
}
135+
133136
public void transfer(
134137
final IoTDBDataNodeAsyncClientManager clientManager,
135138
final AsyncPipeDataTransferServiceClient client)
@@ -413,19 +416,16 @@ private void returnClientIfNecessary() {
413416
}
414417

415418
if (connector.isClosed()) {
416-
try {
417-
client.close();
418-
client.invalidateAll();
419-
} catch (final Exception e) {
420-
LOGGER.warn(
421-
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
422-
client,
423-
e.getMessage(),
424-
e);
425-
}
419+
close();
426420
}
421+
427422
client.setShouldReturnSelf(true);
428-
client.returnSelf();
423+
try {
424+
client.returnSelf();
425+
} catch (final IllegalStateException e) {
426+
LOGGER.info(
427+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
428+
}
429429
client = null;
430430
}
431431

@@ -452,8 +452,10 @@ public void clearEventsReferenceCount() {
452452
@Override
453453
public void close() {
454454
super.close();
455+
455456
if (memoryBlock != null) {
456457
memoryBlock.close();
458+
memoryBlock = null;
457459
}
458460
}
459461

0 commit comments

Comments
 (0)