Skip to content

Commit b061ffd

Browse files
authored
[To dev/1.3] Add historical transfer summary logs (#17763)
Backport 18ea0e9 (#17717) to dev/1.3.
1 parent 7721cab commit b061ffd

5 files changed

Lines changed: 246 additions & 0 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,24 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
154154
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
155155

156156
temporaryMeta.markDataNodeCompleted(nodeId);
157+
LOGGER.info(
158+
"Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
159+
nodeId,
160+
staticMeta.getPipeName(),
161+
pipeHeartbeat.getRemainingEventCount(staticMeta),
162+
pipeHeartbeat.getRemainingTime(staticMeta),
163+
temporaryMeta.getCompletedDataNodeIds());
157164

158165
final Set<Integer> uncompletedDataNodeIds =
159166
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
160167
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
161168
if (uncompletedDataNodeIds.isEmpty()) {
169+
LOGGER.info(
170+
"All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
171+
staticMeta.getPipeName(),
172+
temporaryMeta.getGlobalRemainingEvents(),
173+
temporaryMeta.getGlobalRemainingTime(),
174+
staticMeta);
162175
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
163176
LOGGER.info(
164177
"Detected completion of pipe {}, static meta: {}, remove it.",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
3030
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
3131
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
32+
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
3233
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3334
import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
3435
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -131,12 +132,20 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
131132

132133
if (skipParsing || !forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
133134
collectEvent(sourceEvent);
135+
if (sourceEvent.isGeneratedByHistoricalExtractor()) {
136+
PipeTerminateEvent.markHistoricalTsFileUnsplit(
137+
sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId);
138+
}
134139
return;
135140
}
136141

137142
try {
138143
sourceEvent.consumeTabletInsertionEventsWithRetry(
139144
this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent");
145+
if (sourceEvent.isGeneratedByHistoricalExtractor()) {
146+
PipeTerminateEvent.markHistoricalTsFileSplit(
147+
sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId);
148+
}
140149
} finally {
141150
sourceEvent.close();
142151
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,16 @@
3232
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
3333
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3434

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.util.Objects;
3539
import java.util.concurrent.ArrayBlockingQueue;
40+
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.ConcurrentMap;
3642
import java.util.concurrent.ExecutorService;
3743
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicLong;
3845

3946
/**
4047
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe,
@@ -44,6 +51,8 @@
4451
*/
4552
public class PipeTerminateEvent extends EnrichedEvent {
4653

54+
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTerminateEvent.class);
55+
4756
private final int dataRegionId;
4857

4958
private final boolean shouldMark;
@@ -57,6 +66,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
5766
// Do not use call run policy to avoid deadlock
5867
private static final ExecutorService terminateExecutor = createTerminateExecutor();
5968

69+
private static final ConcurrentMap<HistoricalTransferKey, HistoricalTransferSummaryCounter>
70+
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();
71+
6072
private static ExecutorService createTerminateExecutor() {
6173
final WrappedThreadPoolExecutor executor =
6274
new WrappedThreadPoolExecutor(
@@ -128,6 +140,18 @@ public boolean mayEventPathsOverlappedWithPattern() {
128140
}
129141

130142
public void markCompleted() {
143+
final HistoricalTransferSummary summary =
144+
snapshotAndClearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
145+
if (Objects.nonNull(summary)) {
146+
LOGGER.info(
147+
"Pipe {}@{}: terminate event committed for historical transfer. creationTime: {}, shouldMark: {}. {}",
148+
pipeName,
149+
dataRegionId,
150+
creationTime,
151+
shouldMark,
152+
summary.toReportMessage());
153+
}
154+
131155
// To avoid deadlock
132156
if (shouldMark) {
133157
terminateExecutor.submit(
@@ -142,4 +166,159 @@ public String toString() {
142166
+ " - "
143167
+ super.toString();
144168
}
169+
170+
public static void initializeHistoricalTransferSummary(
171+
final String pipeName,
172+
final long creationTime,
173+
final int dataRegionId,
174+
final long extractedHistoricalTsFileCount,
175+
final long extractedHistoricalDeletionCount) {
176+
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
177+
.computeIfAbsent(
178+
new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
179+
ignored -> new HistoricalTransferSummaryCounter())
180+
.initialize(extractedHistoricalTsFileCount, extractedHistoricalDeletionCount);
181+
}
182+
183+
public static void markHistoricalTsFileSkipped(
184+
final String pipeName, final long creationTime, final int dataRegionId) {
185+
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
186+
.skippedHistoricalTsFileCount
187+
.incrementAndGet();
188+
}
189+
190+
public static void markHistoricalTsFileSplit(
191+
final String pipeName, final long creationTime, final int dataRegionId) {
192+
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
193+
.splitHistoricalTsFileCount
194+
.incrementAndGet();
195+
}
196+
197+
public static void markHistoricalTsFileUnsplit(
198+
final String pipeName, final long creationTime, final int dataRegionId) {
199+
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
200+
.unsplitHistoricalTsFileCount
201+
.incrementAndGet();
202+
}
203+
204+
public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
205+
final String pipeName, final long creationTime, final int dataRegionId) {
206+
final HistoricalTransferSummaryCounter counter =
207+
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
208+
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
209+
return Objects.nonNull(counter) ? counter.snapshot() : null;
210+
}
211+
212+
public static void clearHistoricalTransferSummary(
213+
final String pipeName, final long creationTime, final int dataRegionId) {
214+
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
215+
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
216+
}
217+
218+
private static HistoricalTransferSummary snapshotAndClearHistoricalTransferSummary(
219+
final String pipeName, final long creationTime, final int dataRegionId) {
220+
final HistoricalTransferSummaryCounter counter =
221+
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
222+
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
223+
return Objects.nonNull(counter) ? counter.snapshot() : null;
224+
}
225+
226+
private static HistoricalTransferSummaryCounter getOrCreateHistoricalTransferSummaryCounter(
227+
final String pipeName, final long creationTime, final int dataRegionId) {
228+
return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
229+
new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
230+
ignored -> new HistoricalTransferSummaryCounter());
231+
}
232+
233+
public static final class HistoricalTransferSummary {
234+
235+
private final long extractedHistoricalTsFileCount;
236+
private final long skippedHistoricalTsFileCount;
237+
private final long splitHistoricalTsFileCount;
238+
private final long unsplitHistoricalTsFileCount;
239+
private final long extractedHistoricalDeletionCount;
240+
241+
private HistoricalTransferSummary(
242+
final long extractedHistoricalTsFileCount,
243+
final long skippedHistoricalTsFileCount,
244+
final long splitHistoricalTsFileCount,
245+
final long unsplitHistoricalTsFileCount,
246+
final long extractedHistoricalDeletionCount) {
247+
this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
248+
this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
249+
this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
250+
this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
251+
this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
252+
}
253+
254+
public String toReportMessage() {
255+
return String.format(
256+
"historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s, splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",
257+
extractedHistoricalTsFileCount,
258+
skippedHistoricalTsFileCount,
259+
splitHistoricalTsFileCount,
260+
unsplitHistoricalTsFileCount,
261+
extractedHistoricalDeletionCount);
262+
}
263+
}
264+
265+
private static final class HistoricalTransferSummaryCounter {
266+
267+
private final AtomicLong extractedHistoricalTsFileCount = new AtomicLong(0);
268+
private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
269+
private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
270+
private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
271+
private final AtomicLong extractedHistoricalDeletionCount = new AtomicLong(0);
272+
273+
private void initialize(
274+
final long extractedHistoricalTsFileCount, final long extractedHistoricalDeletionCount) {
275+
this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
276+
this.skippedHistoricalTsFileCount.set(0);
277+
this.splitHistoricalTsFileCount.set(0);
278+
this.unsplitHistoricalTsFileCount.set(0);
279+
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
280+
}
281+
282+
private HistoricalTransferSummary snapshot() {
283+
return new HistoricalTransferSummary(
284+
extractedHistoricalTsFileCount.get(),
285+
skippedHistoricalTsFileCount.get(),
286+
splitHistoricalTsFileCount.get(),
287+
unsplitHistoricalTsFileCount.get(),
288+
extractedHistoricalDeletionCount.get());
289+
}
290+
}
291+
292+
private static final class HistoricalTransferKey {
293+
294+
private final String pipeName;
295+
private final long creationTime;
296+
private final int dataRegionId;
297+
298+
private HistoricalTransferKey(
299+
final String pipeName, final long creationTime, final int dataRegionId) {
300+
this.pipeName = pipeName;
301+
this.creationTime = creationTime;
302+
this.dataRegionId = dataRegionId;
303+
}
304+
305+
@Override
306+
public boolean equals(final Object obj) {
307+
if (this == obj) {
308+
return true;
309+
}
310+
if (!(obj instanceof HistoricalTransferKey)) {
311+
return false;
312+
}
313+
final HistoricalTransferKey that = (HistoricalTransferKey) obj;
314+
return creationTime == that.creationTime
315+
&& dataRegionId == that.dataRegionId
316+
&& Objects.equals(pipeName, that.pipeName);
317+
}
318+
319+
@Override
320+
public int hashCode() {
321+
return Objects.hash(pipeName, creationTime, dataRegionId);
322+
}
323+
}
145324
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,8 @@ && mayTsFileResourceOverlappedWithPattern(resource)))
441441
? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
442442
: o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
443443
pendingQueue = new ArrayDeque<>(originalResourceList);
444+
PipeTerminateEvent.initializeHistoricalTransferSummary(
445+
pipeName, creationTime, dataRegionId, filteredTsFileResources.size(), 0);
444446

445447
LOGGER.info(
446448
"Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, "
@@ -537,6 +539,17 @@ public synchronized Event supply() {
537539
final TsFileResource resource = pendingQueue.poll();
538540

539541
if (resource == null) {
542+
final PipeTerminateEvent.HistoricalTransferSummary historicalTransferSummary =
543+
PipeTerminateEvent.snapshotHistoricalTransferSummary(
544+
pipeName, creationTime, dataRegionId);
545+
if (Objects.nonNull(historicalTransferSummary)) {
546+
LOGGER.info(
547+
"Pipe {}@{}: historical source has supplied all events, emitting terminate event. {}",
548+
pipeName,
549+
dataRegionId,
550+
historicalTransferSummary.toReportMessage());
551+
}
552+
540553
final PipeTerminateEvent terminateEvent =
541554
new PipeTerminateEvent(
542555
pipeName,
@@ -632,6 +645,9 @@ public int getPendingQueueSize() {
632645

633646
@Override
634647
public synchronized void close() {
648+
if (!isTerminateSignalSent) {
649+
PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
650+
}
635651
if (Objects.nonNull(pendingQueue)) {
636652
pendingQueue.forEach(
637653
resource -> {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class PipeReceiverStatusHandler {
4646
private static final String NO_PERMISSION = "No permission";
4747
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception";
4848
private static final String NO_PERMISSION_STR = "No permissions for this operation";
49+
private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048;
4950

5051
private final boolean isRetryAllowedWhenConflictOccurs;
5152
private final long retryMaxMillisWhenConflictOccurs;
@@ -134,6 +135,7 @@ public void handle(
134135
"User conflict exception: will be ignored because retry is not allowed. event: {}. status: {}",
135136
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded",
136137
status);
138+
logDiscardedUserConflictData("retry is not allowed", recordMessage, status);
137139
return;
138140
}
139141

@@ -147,6 +149,7 @@ public void handle(
147149
"User conflict exception: retry timeout. will be ignored. event: {}. status: {}",
148150
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not recorded",
149151
status);
152+
logDiscardedUserConflictData("retry timeout", recordMessage, status);
150153
resetExceptionStatus();
151154
return;
152155
}
@@ -252,6 +255,32 @@ private static String getNoPermission(final boolean noPermission) {
252255
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
253256
}
254257

258+
private void logDiscardedUserConflictData(
259+
final String reason, final String recordMessage, final TSStatus status) {
260+
if (!LOGGER.isWarnEnabled()) {
261+
return;
262+
}
263+
264+
LOGGER.warn(
265+
"User conflict exception: discarded data info because {}. data: {}. receiver message: {}. status: {}",
266+
reason,
267+
summarizeRecordMessage(recordMessage),
268+
status.getMessage(),
269+
status);
270+
}
271+
272+
private String summarizeRecordMessage(final String recordMessage) {
273+
if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) {
274+
return "<empty>";
275+
}
276+
277+
final String normalizedRecordMessage =
278+
recordMessage.replace('\r', ' ').replace('\n', ' ').trim();
279+
return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG
280+
? normalizedRecordMessage
281+
: normalizedRecordMessage.substring(0, MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)";
282+
}
283+
255284
private void recordExceptionStatusIfNecessary(final String message) {
256285
if (!Objects.equals(exceptionRecordedMessage.get(), message)) {
257286
exceptionFirstEncounteredTime.set(System.currentTimeMillis());

0 commit comments

Comments
 (0)