Skip to content

Commit 735238a

Browse files
junmuzclaude
authored andcommitted
Add lastCommittedSnapshotId metric and read actual snapshot ID post-commit
Read the actual latest snapshot ID from SnapshotManager after commit instead of computing it from a pre-commit baseline, avoiding race conditions with concurrent writers. Also adds additional commit and source reader metrics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4a4907d commit 735238a

8 files changed

Lines changed: 57 additions & 13 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,16 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
360360
tableName,
361361
commitDuration);
362362
if (this.commitMetrics != null) {
363+
Long latestSnapshotId = snapshotManager.latestSnapshotId();
363364
reportCommit(
364365
changes.appendTableFiles,
365366
changes.appendChangelog,
366367
changes.compactTableFiles,
367368
changes.compactChangelog,
368369
commitDuration,
369370
generatedSnapshot,
370-
attempts);
371+
attempts,
372+
latestSnapshotId == null ? 0L : latestSnapshotId);
371373
}
372374
}
373375
return generatedSnapshot;
@@ -380,7 +382,8 @@ private void reportCommit(
380382
List<ManifestEntry> compactChangelogFiles,
381383
long commitDuration,
382384
int generatedSnapshots,
383-
int attempts) {
385+
int attempts,
386+
long lastCommittedSnapshotId) {
384387
CommitStats commitStats =
385388
new CommitStats(
386389
appendTableFiles,
@@ -389,7 +392,8 @@ private void reportCommit(
389392
compactChangelogFiles,
390393
commitDuration,
391394
generatedSnapshots,
392-
attempts);
395+
attempts,
396+
lastCommittedSnapshotId);
393397
commitMetrics.reportCommit(commitStats);
394398
}
395399

@@ -524,14 +528,16 @@ public int overwritePartition(
524528
long commitDuration = (System.nanoTime() - started) / 1_000_000;
525529
LOG.info("Finished overwrite to table {}, duration {} ms", tableName, commitDuration);
526530
if (this.commitMetrics != null) {
531+
Long latestSnapshotId = snapshotManager.latestSnapshotId();
527532
reportCommit(
528533
changes.appendTableFiles,
529534
emptyList(),
530535
changes.compactTableFiles,
531536
emptyList(),
532537
commitDuration,
533538
generatedSnapshot,
534-
attempts);
539+
attempts,
540+
latestSnapshotId == null ? 0L : latestSnapshotId);
535541
}
536542
}
537543
return generatedSnapshot;

paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public MetricGroup getMetricGroup() {
7474

7575
public static final String LAST_COMPACTION_INPUT_FILE_SIZE = "lastCompactionInputFileSize";
7676
public static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = "lastCompactionOutputFileSize";
77+
public static final String LAST_COMMITTED_SNAPSHOT_ID = "lastCommittedSnapshotId";
7778

7879
private void registerGenericCommitMetrics() {
7980
metricGroup.gauge(
@@ -126,6 +127,9 @@ private void registerGenericCommitMetrics() {
126127
metricGroup.gauge(
127128
LAST_COMPACTION_OUTPUT_FILE_SIZE,
128129
() -> latestCommit == null ? 0L : latestCommit.getCompactionOutputFileSize());
130+
metricGroup.gauge(
131+
LAST_COMMITTED_SNAPSHOT_ID,
132+
() -> latestCommit == null ? 0L : latestCommit.getLastCommittedSnapshotId());
129133
}
130134

131135
public void reportCommit(CommitStats commitStats) {

paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class CommitStats {
5353
private final long generatedSnapshots;
5454
private final long numPartitionsWritten;
5555
private final long numBucketsWritten;
56+
private final long lastCommittedSnapshotId;
5657

5758
public CommitStats(
5859
List<ManifestEntry> appendTableFiles,
@@ -61,7 +62,8 @@ public CommitStats(
6162
List<ManifestEntry> compactChangelogFiles,
6263
long commitDuration,
6364
int generatedSnapshots,
64-
int attempts) {
65+
int attempts,
66+
long lastCommittedSnapshotId) {
6567
List<ManifestEntry> addedTableFiles =
6668
appendTableFiles.stream()
6769
.filter(f -> FileKind.ADD.equals(f.kind()))
@@ -110,6 +112,7 @@ public CommitStats(
110112
this.duration = commitDuration;
111113
this.generatedSnapshots = generatedSnapshots;
112114
this.attempts = attempts;
115+
this.lastCommittedSnapshotId = lastCommittedSnapshotId;
113116
}
114117

115118
@VisibleForTesting
@@ -236,4 +239,9 @@ public long getCompactionInputFileSize() {
236239
public long getCompactionOutputFileSize() {
237240
return compactionOutputFileSize;
238241
}
242+
243+
@VisibleForTesting
244+
protected long getLastCommittedSnapshotId() {
245+
return lastCommittedSnapshotId;
246+
}
239247
}

paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public void testMetricsAreUpdated() {
100100
registeredGenericMetrics.get(
101101
CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
102102

103+
Gauge<Long> lastCommittedSnapshotId =
104+
(Gauge<Long>)
105+
registeredGenericMetrics.get(CommitMetrics.LAST_COMMITTED_SNAPSHOT_ID);
106+
103107
assertThat(lastCommitDuration.getValue()).isEqualTo(0);
104108
assertThat(commitDuration.getCount()).isEqualTo(0);
105109
assertThat(commitDuration.getStatistics().size()).isEqualTo(0);
@@ -117,6 +121,7 @@ public void testMetricsAreUpdated() {
117121
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0);
118122
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0);
119123
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0);
124+
assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(0);
120125

121126
// report once
122127
reportOnce(commitMetrics);
@@ -145,6 +150,7 @@ public void testMetricsAreUpdated() {
145150
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(503);
146151
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(613);
147152
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(512);
153+
assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(42);
148154

149155
// report again
150156
reportAgain(commitMetrics);
@@ -173,6 +179,7 @@ public void testMetricsAreUpdated() {
173179
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(213);
174180
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(506);
175181
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(601);
182+
assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(99);
176183
}
177184

178185
private void reportOnce(CommitMetrics commitMetrics) {
@@ -199,7 +206,8 @@ private void reportOnce(CommitMetrics commitMetrics) {
199206
compactChangelogFiles,
200207
200,
201208
2,
202-
1);
209+
1,
210+
42L);
203211

204212
commitMetrics.reportCommit(commitStats);
205213
}
@@ -228,7 +236,8 @@ private void reportAgain(CommitMetrics commitMetrics) {
228236
compactChangelogFiles,
229237
500,
230238
1,
231-
2);
239+
2,
240+
99L);
232241

233242
commitMetrics.reportCommit(commitStats);
234243
}

paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public void testFailedAppendSnapshot() {
7878
Collections.emptyList(),
7979
0,
8080
0,
81-
1);
81+
1,
82+
0L);
8283
assertThat(commitStats.getTableFilesAdded()).isEqualTo(0);
8384
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
8485
assertThat(commitStats.getTableFilesAppended()).isEqualTo(0);
@@ -94,6 +95,7 @@ public void testFailedAppendSnapshot() {
9495
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0);
9596
assertThat(commitStats.getDuration()).isEqualTo(0);
9697
assertThat(commitStats.getAttempts()).isEqualTo(1);
98+
assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(0);
9799
}
98100

99101
@Test
@@ -106,7 +108,8 @@ public void testFailedCompactSnapshot() {
106108
Collections.emptyList(),
107109
3000,
108110
1,
109-
2);
111+
2,
112+
5L);
110113
assertThat(commitStats.getTableFilesAdded()).isEqualTo(2);
111114
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
112115
assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
@@ -122,6 +125,7 @@ public void testFailedCompactSnapshot() {
122125
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(2);
123126
assertThat(commitStats.getDuration()).isEqualTo(3000);
124127
assertThat(commitStats.getAttempts()).isEqualTo(2);
128+
assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(5);
125129
}
126130

127131
@Test
@@ -134,7 +138,8 @@ public void testSucceedAllSnapshot() {
134138
compactChangelogFiles,
135139
3000,
136140
2,
137-
2);
141+
2,
142+
10L);
138143
assertThat(commitStats.getTableFilesAdded()).isEqualTo(4);
139144
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(1);
140145
assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
@@ -150,5 +155,6 @@ public void testSucceedAllSnapshot() {
150155
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(3);
151156
assertThat(commitStats.getDuration()).isEqualTo(3000);
152157
assertThat(commitStats.getAttempts()).isEqualTo(2);
158+
assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(10);
153159
}
154160
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class ConsumerProgressCalculator {
3737

3838
private final Map<Integer, Long> consumingSnapshotPerReader;
3939

40+
private long currentConsumerId = 0L;
41+
4042
public ConsumerProgressCalculator(int parallelism) {
4143
this.minNextSnapshotPerCheckpoint = new TreeMap<>();
4244
this.assignedSnapshotPerReader = new HashMap<>(parallelism);
@@ -59,8 +61,10 @@ public void notifySnapshotState(
5961
int parallelism) {
6062
computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism)
6163
.ifPresent(
62-
minNextSnapshotId ->
63-
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
64+
minNextSnapshotId -> {
65+
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId);
66+
currentConsumerId = minNextSnapshotId;
67+
});
6468
}
6569

6670
public OptionalLong notifyCheckpointComplete(long checkpointId) {
@@ -71,6 +75,10 @@ public OptionalLong notifyCheckpointComplete(long checkpointId) {
7175
return max;
7276
}
7377

78+
public long getCurrentConsumerId() {
79+
return currentConsumerId;
80+
}
81+
7482
/** Calculate the minimum snapshot currently being consumed by all readers. */
7583
private Optional<Long> computeMinNextSnapshotId(
7684
Set<Integer> readersAwaitingSplit,

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public class ContinuousFileSplitEnumerator
100100
*/
101101
public static final String SOURCE_PARALLELISM_UPPER_BOUND = "sourceParallelismUpperBound";
102102

103+
public static final String CURRENT_CONSUMER_ID = "currentConsumerId";
104+
103105
public ContinuousFileSplitEnumerator(
104106
SplitEnumeratorContext<FileStoreSourceSplit> context,
105107
Collection<FileStoreSourceSplit> remainSplits,
@@ -154,6 +156,8 @@ private void registerMetrics() {
154156
try {
155157
context.metricGroup()
156158
.gauge(SOURCE_PARALLELISM_UPPER_BOUND, () -> sourceParallelismUpperBound);
159+
context.metricGroup()
160+
.gauge(CURRENT_CONSUMER_ID, consumerProgressCalculator::getCurrentConsumerId);
157161
} catch (Exception e) {
158162
LOG.warn("Failed to register enumerator metrics.", e);
159163
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public class FileStoreSourceReaderMetrics {
2727

2828
private long latestFileCreationTime = UNDEFINED;
2929
private long lastSplitUpdateTime = UNDEFINED;
30-
3130
public static final long UNDEFINED = -1L;
3231
public static final long ACTIVE = Long.MAX_VALUE;
3332

0 commit comments

Comments
 (0)