Skip to content

Commit 08324fc

Browse files
junmuzclaude
authored andcommitted
Improve commit metrics, remove currentConsumerId, and add missing docs
- Fix lastCommittedSnapshotId to default to -1 and read actual snapshot ID post-commit instead of querying snapshotManager - Remove currentConsumerId metric from ConsumerProgressCalculator and ContinuousFileSplitEnumerator as lastScannedSnapshotId already exists - Add lastScannedSnapshotId and lastCommittedSnapshotId to metrics docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 735238a commit 08324fc

File tree

7 files changed

+20
-22
lines changed

7 files changed

+20
-22
lines changed

docs/content/maintenance/metrics.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
6262
<td>Histogram</td>
6363
<td>Distributions of the time taken by the last few scans.</td>
6464
</tr>
65+
<tr>
66+
<td>lastScannedSnapshotId</td>
67+
<td>Gauge</td>
68+
<td>The snapshot ID scanned in the last scan. 0 if no scan has occurred.</td>
69+
</tr>
6570
<tr>
6671
<td>lastScannedManifests</td>
6772
<td>Gauge</td>
@@ -181,6 +186,11 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
181186
<td>Gauge</td>
182187
<td>Total size of the output files for the last compaction.</td>
183188
</tr>
189+
<tr>
190+
<td>lastCommittedSnapshotId</td>
191+
<td>Gauge</td>
192+
<td>The snapshot ID created by the last commit. -1 if no commit has occurred.</td>
193+
</tr>
184194
</tbody>
185195
</table>
186196

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
155155
private boolean ignoreEmptyCommit;
156156
private CommitMetrics commitMetrics;
157157
private boolean appendCommitCheckConflict = false;
158+
private long lastCommittedSnapshotId = -1L;
158159

159160
public FileStoreCommitImpl(
160161
SnapshotCommit snapshotCommit,
@@ -360,7 +361,6 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
360361
tableName,
361362
commitDuration);
362363
if (this.commitMetrics != null) {
363-
Long latestSnapshotId = snapshotManager.latestSnapshotId();
364364
reportCommit(
365365
changes.appendTableFiles,
366366
changes.appendChangelog,
@@ -369,7 +369,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
369369
commitDuration,
370370
generatedSnapshot,
371371
attempts,
372-
latestSnapshotId == null ? 0L : latestSnapshotId);
372+
lastCommittedSnapshotId);
373373
}
374374
}
375375
return generatedSnapshot;
@@ -528,7 +528,6 @@ public int overwritePartition(
528528
long commitDuration = (System.nanoTime() - started) / 1_000_000;
529529
LOG.info("Finished overwrite to table {}, duration {} ms", tableName, commitDuration);
530530
if (this.commitMetrics != null) {
531-
Long latestSnapshotId = snapshotManager.latestSnapshotId();
532531
reportCommit(
533532
changes.appendTableFiles,
534533
emptyList(),
@@ -537,7 +536,7 @@ public int overwritePartition(
537536
commitDuration,
538537
generatedSnapshot,
539538
attempts,
540-
latestSnapshotId == null ? 0L : latestSnapshotId);
539+
lastCommittedSnapshotId);
541540
}
542541
}
543542
return generatedSnapshot;
@@ -1044,6 +1043,7 @@ CommitResult tryCommitOnce(
10441043
if (strictModeChecker != null) {
10451044
strictModeChecker.update(newSnapshotId);
10461045
}
1046+
lastCommittedSnapshotId = newSnapshotId;
10471047
CommitCallback.Context context =
10481048
new CommitCallback.Context(
10491049
finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot, identifier);

paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void registerGenericCommitMetrics() {
129129
() -> latestCommit == null ? 0L : latestCommit.getCompactionOutputFileSize());
130130
metricGroup.gauge(
131131
LAST_COMMITTED_SNAPSHOT_ID,
132-
() -> latestCommit == null ? 0L : latestCommit.getLastCommittedSnapshotId());
132+
() -> latestCommit == null ? -1L : latestCommit.getLastCommittedSnapshotId());
133133
}
134134

135135
public void reportCommit(CommitStats commitStats) {

paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void testMetricsAreUpdated() {
121121
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0);
122122
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0);
123123
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0);
124-
assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(0);
124+
assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(-1);
125125

126126
// report once
127127
reportOnce(commitMetrics);

paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testFailedAppendSnapshot() {
7979
0,
8080
0,
8181
1,
82-
0L);
82+
-1L);
8383
assertThat(commitStats.getTableFilesAdded()).isEqualTo(0);
8484
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
8585
assertThat(commitStats.getTableFilesAppended()).isEqualTo(0);
@@ -95,7 +95,7 @@ public void testFailedAppendSnapshot() {
9595
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0);
9696
assertThat(commitStats.getDuration()).isEqualTo(0);
9797
assertThat(commitStats.getAttempts()).isEqualTo(1);
98-
assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(0);
98+
assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(-1);
9999
}
100100

101101
@Test

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class ConsumerProgressCalculator {
3737

3838
private final Map<Integer, Long> consumingSnapshotPerReader;
3939

40-
private long currentConsumerId = 0L;
41-
4240
public ConsumerProgressCalculator(int parallelism) {
4341
this.minNextSnapshotPerCheckpoint = new TreeMap<>();
4442
this.assignedSnapshotPerReader = new HashMap<>(parallelism);
@@ -61,10 +59,8 @@ public void notifySnapshotState(
6159
int parallelism) {
6260
computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism)
6361
.ifPresent(
64-
minNextSnapshotId -> {
65-
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId);
66-
currentConsumerId = minNextSnapshotId;
67-
});
62+
minNextSnapshotId ->
63+
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
6864
}
6965

7066
public OptionalLong notifyCheckpointComplete(long checkpointId) {
@@ -75,10 +71,6 @@ public OptionalLong notifyCheckpointComplete(long checkpointId) {
7571
return max;
7672
}
7773

78-
public long getCurrentConsumerId() {
79-
return currentConsumerId;
80-
}
81-
8274
/** Calculate the minimum snapshot currently being consumed by all readers. */
8375
private Optional<Long> computeMinNextSnapshotId(
8476
Set<Integer> readersAwaitingSplit,

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ public class ContinuousFileSplitEnumerator
100100
*/
101101
public static final String SOURCE_PARALLELISM_UPPER_BOUND = "sourceParallelismUpperBound";
102102

103-
public static final String CURRENT_CONSUMER_ID = "currentConsumerId";
104-
105103
public ContinuousFileSplitEnumerator(
106104
SplitEnumeratorContext<FileStoreSourceSplit> context,
107105
Collection<FileStoreSourceSplit> remainSplits,
@@ -156,8 +154,6 @@ private void registerMetrics() {
156154
try {
157155
context.metricGroup()
158156
.gauge(SOURCE_PARALLELISM_UPPER_BOUND, () -> sourceParallelismUpperBound);
159-
context.metricGroup()
160-
.gauge(CURRENT_CONSUMER_ID, consumerProgressCalculator::getCurrentConsumerId);
161157
} catch (Exception e) {
162158
LOG.warn("Failed to register enumerator metrics.", e);
163159
}

0 commit comments

Comments
 (0)