From 2a17d179d24d185e3ec55c06f8309ca3a63983e1 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Wed, 1 Apr 2026 17:22:14 +0800 Subject: [PATCH 1/2] Add IP Address of Subtask Running Checkpoint to Flink WebU --- .../src/app/interfaces/job-checkpoint.ts | 1 + .../job-checkpoints-subtask.component.html | 2 + .../job-checkpoints-subtask.component.ts | 23 +++-- .../DefaultCheckpointStatsTracker.java | 3 +- .../runtime/checkpoint/PendingCheckpoint.java | 12 ++- .../runtime/checkpoint/SubtaskStateStats.java | 13 ++- ...TaskCheckpointStatisticDetailsHandler.java | 3 +- .../SubtaskCheckpointStatistics.java | 19 ++++- .../checkpoint/CompletedCheckpointTest.java | 2 +- .../DefaultCheckpointStatsTrackerTest.java | 12 +-- .../PendingCheckpointStatsTest.java | 3 +- .../checkpoint/PendingCheckpointTest.java | 84 +++++++++++++++++++ .../checkpoint/SubtaskStateStatsTest.java | 4 +- .../checkpoint/TaskStateStatsTest.java | 6 +- ...pointStatisticsWithSubtaskDetailsTest.java | 3 +- 15 files changed, 164 insertions(+), 26 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts index 5979a7d0cd064..0f4f50961c8bb 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts @@ -188,6 +188,7 @@ export interface CompletedSubTaskCheckpointStatistics { start_delay: number; unaligned_checkpoint: boolean; aborted: boolean; + ip: string | null; } export interface PendingSubTaskCheckpointStatistics {} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html index 2647ae83a2ac0..9c4f730cff346 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html @@ -144,6 +144,7 @@ ID + IP Address Acknowledged End to End Duration @@ -166,6 +167,7 @@ {{ subTask['index'] }} + {{ subTask['ip'] || '-' }} {{ subTask['ack_timestamp'] | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts index d80d1003111e7..10ae807026b91 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts @@ -48,13 +48,13 @@ import { NzTableModule, NzTableSortFn } from 'ng-zorro-antd/table'; import { JobLocalService } from '../../job-local.service'; function createSortFn( - selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean + selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean | string | null ): NzTableSortFn { - // FIXME This type-asserts that pre / next are a specific subtype. - return (pre, next) => - selector(pre as CompletedSubTaskCheckpointStatistics) > selector(next as CompletedSubTaskCheckpointStatistics) - ? 1 - : -1; + return (pre, next) => { + const a = selector(pre as CompletedSubTaskCheckpointStatistics) ?? ''; + const b = selector(next as CompletedSubTaskCheckpointStatistics) ?? ''; + return a > b ? 1 : -1; + }; } @Component({ @@ -77,6 +77,17 @@ export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges, OnDest public mapOfSubtask: Map = new Map(); public readonly sortAckTimestampFn = createSortFn(item => item.ack_timestamp); + public readonly sortIPAddressFn: NzTableSortFn = (a, b) => { + const ipA = (a as CompletedSubTaskCheckpointStatistics).ip ?? ''; + const ipB = (b as CompletedSubTaskCheckpointStatistics).ip ?? ''; + const normalize = (ip: string): string => + ip + .split('.') + .map(seg => seg.padStart(3, '0')) + .join('.'); + + return normalize(ipA) > normalize(ipB) ? 1 : normalize(ipA) < normalize(ipB) ? -1 : 0; + }; public readonly sortEndToEndDurationFn = createSortFn(item => item.end_to_end_duration); public readonly sortCheckpointedSizeFn = createSortFn(item => item.checkpointed_size); public readonly sortStateSizeFn = createSortFn(item => item.state_size); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java index d9c63d9d52f43..9552877392c19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java @@ -567,7 +567,8 @@ public void reportIncompleteStats( metrics.getAlignmentDurationNanos() / 1_000_000, metrics.getCheckpointStartDelayNanos() / 1_000_000, metrics.getUnalignedCheckpoint(), - false)); + false, + null)); dirty = true; } } finally { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index c1b428c201309..cc8e780d30c0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -427,6 +428,14 @@ public TaskAcknowledgeResult acknowledgeTask( long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000; + String taskManagerIp = null; + if (vertex.getCurrentExecutionAttempt() != null) { + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + if (location != null) { + taskManagerIp = location.address().getHostAddress(); + } + } + SubtaskStateStats subtaskStateStats = new SubtaskStateStats( vertex.getParallelSubtaskIndex(), @@ -440,7 +449,8 @@ public TaskAcknowledgeResult acknowledgeTask( alignmentDurationMillis, checkpointStartDelayMillis, metrics.getUnalignedCheckpoint(), - true); + true, + taskManagerIp); LOG.trace( "Checkpoint {} stats for {}: size={}Kb, duration={}ms, sync part={}ms, async part={}ms", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java index 3908dbbcd3588..8c82d689e3fb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java @@ -66,8 +66,10 @@ public class SubtaskStateStats implements Serializable { /** Is the checkpoint completed by this subtask. */ private final boolean completed; + private final String ip; + SubtaskStateStats(int subtaskIndex, long ackTimestamp) { - this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true); + this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null); } SubtaskStateStats( @@ -82,8 +84,8 @@ public class SubtaskStateStats implements Serializable { long alignmentDuration, long checkpointStartDelay, boolean unalignedCheckpoint, - boolean completed) { - + boolean completed, + String ip) { checkArgument(subtaskIndex >= 0, "Negative subtask index"); this.subtaskIndex = subtaskIndex; checkArgument(checkpointedSize >= 0, "Negative incremental state size"); @@ -99,6 +101,7 @@ public class SubtaskStateStats implements Serializable { this.checkpointStartDelay = checkpointStartDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.completed = completed; + this.ip = ip; } public int getSubtaskIndex() { @@ -194,4 +197,8 @@ public boolean getUnalignedCheckpoint() { public boolean isCompleted() { return completed; } + + public String getIp() { + return ip; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index 473c3f52c920e..b49707c426561 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -220,7 +220,8 @@ private static List createSubtaskCheckpointStatisti subtask.getAlignmentDuration()), subtask.getCheckpointStartDelay(), subtask.getUnalignedCheckpoint(), - !subtask.isCompleted())); + !subtask.isCompleted(), + subtask.getIp())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java index 613916eb3d762..deb0f0f765f69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java @@ -111,6 +111,8 @@ public static final class CompletedSubtaskCheckpointStatistics public static final String FIELD_NAME_CHECKPOINTED_SIZE = "checkpointed_size"; + public static final String FIELD_NAME_IP_ADDRESS = "ip"; + /** * The accurate name of this field should be 'checkpointed_data_size', keep it as before to * not break backwards compatibility for old web UI. @@ -156,6 +158,9 @@ public static final class CompletedSubtaskCheckpointStatistics @JsonProperty(FIELD_NAME_ABORTED) private final boolean aborted; + @JsonProperty(value = FIELD_NAME_IP_ADDRESS, required = false) + private final String ip; + @JsonCreator public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_INDEX) int index, @@ -167,7 +172,8 @@ public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment, @JsonProperty(FIELD_NAME_START_DELAY) long startDelay, @JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT) boolean unalignedCheckpoint, - @JsonProperty(FIELD_NAME_ABORTED) boolean aborted) { + @JsonProperty(FIELD_NAME_ABORTED) boolean aborted, + @JsonProperty(FIELD_NAME_IP_ADDRESS) String ip) { super(index, "completed"); this.ackTimestamp = ackTimestamp; this.duration = duration; @@ -178,6 +184,7 @@ public CompletedSubtaskCheckpointStatistics( this.startDelay = startDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.aborted = aborted; + this.ip = ip; } public long getAckTimestamp() { @@ -216,6 +223,10 @@ public boolean isAborted() { return aborted; } + public String getIp() { + return ip; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -233,7 +244,8 @@ public boolean equals(Object o) { && Objects.equals(alignment, that.alignment) && startDelay == that.startDelay && unalignedCheckpoint == that.unalignedCheckpoint - && aborted == that.aborted; + && aborted == that.aborted + && Objects.equals(ip, that.ip); } @Override @@ -247,7 +259,8 @@ public int hashCode() { alignment, startDelay, unalignedCheckpoint, - aborted); + aborted, + ip); } /** Duration of the checkpoint. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 8fe8dc19ed27f..463e7f4a02325 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -415,7 +415,7 @@ void testIsJavaSerializable() throws Exception { 44L, true, new SubtaskStateStats( - 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true), + 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true, null), null); CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java index b3bbe4b3d9b02..fc051e4ef9825 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java @@ -673,13 +673,14 @@ public void addEvent(EventBuilder eventBuilder) { subtasksByVertex); pending.reportSubtaskStats( - jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true)); + jobVertexID0, + new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true, null)); pending.reportSubtaskStats( jobVertexID0, - new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true)); + new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true, null)); pending.reportSubtaskStats( jobVertexID1, - new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true)); + new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true, null)); // Complete checkpoint => new snapshot tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984)); return reportedSpansOut; @@ -984,7 +985,8 @@ public > G gauge(String name, G gauge) { ignored, ignored, false, - true); + true, + null); assertThat(pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue(); @@ -1071,7 +1073,7 @@ private SubtaskStateStats createSubtaskStats(int index) { } private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) { - return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true); + return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true, null); } private void reportRestoredCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index e55cbb6fca748..9181cee3363dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -284,6 +284,7 @@ private SubtaskStateStats createSubtaskStats(int index, boolean unalignedCheckpo Integer.MAX_VALUE + (long) index, Integer.MAX_VALUE + (long) index, unalignedCheckpoint, - true); + true, + null); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 4dbc22dc34a90..54972ef746049 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -28,10 +28,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer; import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo; @@ -39,6 +41,7 @@ import org.apache.flink.runtime.state.TestingStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.concurrent.Executors; @@ -49,11 +52,13 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.net.InetAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -509,6 +514,85 @@ void testReportTaskFinishedOperators() throws IOException { .contains(ACK_TASKS.get(0).getVertex()); } + @Test + void testAcknowledgeTaskCapturesTaskManagerIp() throws Exception { + final String expectedIp = "10.0.0.1"; + final ExecutionAttemptID ipTestAttemptId = createExecutionAttemptId(); + final JobVertexID jobVertexId = new JobVertexID(); + + ExecutionJobVertex ejv = mock(ExecutionJobVertex.class); + when(ejv.getOperatorIDs()) + .thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID))); + + TaskManagerLocation location = + new TaskManagerLocation( + ResourceID.generate(), InetAddress.getByName(expectedIp), 6121); + + Execution currentAttempt = mock(Execution.class); + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getMaxParallelism()).thenReturn(MAX_PARALLELISM); + when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(PARALLELISM); + when(vertex.getJobVertex()).thenReturn(ejv); + when(vertex.getJobvertexId()).thenReturn(jobVertexId); + when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("test-task (0/1)"); + when(vertex.getCurrentExecutionAttempt()).thenReturn(currentAttempt); + when(vertex.getCurrentAssignedResourceLocation()).thenReturn(location); + + Execution execution = mock(Execution.class); + when(execution.getAttemptId()).thenReturn(ipTestAttemptId); + when(execution.getVertex()).thenReturn(vertex); + + Map taskStatsCounts = new HashMap<>(); + taskStatsCounts.put(jobVertexId, PARALLELISM); + PendingCheckpointStats pendingStats = + new PendingCheckpointStats( + 0, + 1, + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + taskStatsCounts); + + CheckpointPlan plan = + new DefaultCheckpointPlan( + Collections.emptyList(), + Collections.singletonList(execution), + Collections.singletonList(vertex), + Collections.emptyList(), + Collections.emptyList(), + true); + + final Path checkpointDir = new Path(TempDirUtils.newFolder(tmpFolder).toURI()); + final FsCheckpointStorageLocation storageLocation = + new FsCheckpointStorageLocation( + LocalFileSystem.getSharedInstance(), + checkpointDir, + checkpointDir, + checkpointDir, + CheckpointStorageLocationReference.getDefault(), + 1024, + 4096); + + PendingCheckpoint checkpoint = + new PendingCheckpoint( + new JobID(), + 0, + 1, + plan, + Collections.emptyList(), + Collections.emptyList(), + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + new CompletableFuture<>(), + pendingStats, + new CompletableFuture<>()); + checkpoint.setCheckpointTargetLocation(storageLocation); + + checkpoint.acknowledgeTask(ipTestAttemptId, null, new CheckpointMetrics()); + + assertThat(pendingStats.getLatestAcknowledgedSubtaskStats()).isNotNull(); + assertThat(pendingStats.getLatestAcknowledgedSubtaskStats().getIp()).isEqualTo(expectedIp); + } + // ------------------------------------------------------------------------ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java index 30970fcd9ee00..8ffb750b08f77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java @@ -52,7 +52,8 @@ public void test(boolean serialize) throws Exception { Integer.MAX_VALUE + 6L, Integer.MAX_VALUE + 7L, false, - true); + true, + "10.0.0.1"); stats = serialize ? CommonTestUtils.createCopySerializable(stats) : stats; @@ -63,6 +64,7 @@ public void test(boolean serialize) throws Exception { assertThat(stats.getAsyncCheckpointDuration()).isEqualTo(Integer.MAX_VALUE + 4L); assertThat(stats.getAlignmentDuration()).isEqualTo(Integer.MAX_VALUE + 6L); assertThat(stats.getCheckpointStartDelay()).isEqualTo(Integer.MAX_VALUE + 7L); + assertThat(stats.getIp()).isEqualTo("10.0.0.1"); // Check duration helper long ackTimestamp = stats.getAckTimestamp(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java index e754e33117423..5a1f86dd44109 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java @@ -74,7 +74,8 @@ private void test(boolean serialize) throws Exception { rand.nextInt(128), rand.nextInt(128), false, - true); + true, + null); stateSize += subtasks[i].getStateSize(); processedData += subtasks[i].getProcessedData(); @@ -94,7 +95,8 @@ private void test(boolean serialize) throws Exception { assertThat( taskStats.reportSubtaskStats( - new SubtaskStateStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true))) + new SubtaskStateStats( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null))) .isFalse(); taskStats = serialize ? CommonTestUtils.createCopySerializable(taskStats) : taskStats; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java index 89b8acfa52c81..67d8020d8c811 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java @@ -73,7 +73,8 @@ protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance() .CheckpointAlignment(2L, 4L, 5L, 3L), 42L, true, - false)); + false, + "192.168.1.100")); return new TaskCheckpointStatisticsWithSubtaskDetails( 4L, From f498514f21420fbac54b054736c928878114d4a6 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 20 Apr 2026 17:34:53 +0800 Subject: [PATCH 2/2] [webui] Show subtask endpoint in checkpoint statistics via existing vertex API Per reviewer feedback, avoid modifying the REST API or backend code. Instead, fetch subtask endpoint info from the existing /jobs/{jobid}/vertices/{vertexid} API (already called by the component) and display the endpoint field (hostname:port) directly. - Revert all backend changes (SubtaskStateStats, PendingCheckpoint, SubtaskCheckpointStatistics, TaskCheckpointStatisticDetailsHandler, DefaultCheckpointStatsTracker and related tests) - Remove ip field from CompletedSubTaskCheckpointStatistics interface - Use mapOfSubtask.get(index)?.endpoint from the vertex subtask API response instead of a new ip field from the checkpoint API - Rename column "IP Address" to "Endpoint" and sort function accordingly --- .../src/app/interfaces/job-checkpoint.ts | 1 - .../job-checkpoints-subtask.component.html | 4 +- .../job-checkpoints-subtask.component.ts | 14 +--- .../DefaultCheckpointStatsTracker.java | 3 +- .../runtime/checkpoint/PendingCheckpoint.java | 12 +-- .../runtime/checkpoint/SubtaskStateStats.java | 13 +-- ...TaskCheckpointStatisticDetailsHandler.java | 3 +- .../SubtaskCheckpointStatistics.java | 19 +---- .../checkpoint/CompletedCheckpointTest.java | 2 +- .../DefaultCheckpointStatsTrackerTest.java | 12 ++- .../PendingCheckpointStatsTest.java | 3 +- .../checkpoint/PendingCheckpointTest.java | 84 ------------------- .../checkpoint/SubtaskStateStatsTest.java | 4 +- .../checkpoint/TaskStateStatsTest.java | 6 +- ...pointStatisticsWithSubtaskDetailsTest.java | 3 +- 15 files changed, 26 insertions(+), 157 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts index 0f4f50961c8bb..5979a7d0cd064 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts @@ -188,7 +188,6 @@ export interface CompletedSubTaskCheckpointStatistics { start_delay: number; unaligned_checkpoint: boolean; aborted: boolean; - ip: string | null; } export interface PendingSubTaskCheckpointStatistics {} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html index 9c4f730cff346..d65e3c2a82a6d 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html @@ -144,7 +144,7 @@ ID - IP Address + Endpoint Acknowledged End to End Duration @@ -167,7 +167,7 @@ {{ subTask['index'] }} - {{ subTask['ip'] || '-' }} + {{ mapOfSubtask.get(subTask['index'])?.endpoint || '-' }} {{ subTask['ack_timestamp'] | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts index 10ae807026b91..1913cfa6c1685 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts @@ -77,16 +77,10 @@ export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges, OnDest public mapOfSubtask: Map = new Map(); public readonly sortAckTimestampFn = createSortFn(item => item.ack_timestamp); - public readonly sortIPAddressFn: NzTableSortFn = (a, b) => { - const ipA = (a as CompletedSubTaskCheckpointStatistics).ip ?? ''; - const ipB = (b as CompletedSubTaskCheckpointStatistics).ip ?? ''; - const normalize = (ip: string): string => - ip - .split('.') - .map(seg => seg.padStart(3, '0')) - .join('.'); - - return normalize(ipA) > normalize(ipB) ? 1 : normalize(ipA) < normalize(ipB) ? -1 : 0; + public readonly sortEndpointFn: NzTableSortFn = (a, b) => { + const endpointA = this.mapOfSubtask.get(a['index'])?.endpoint ?? ''; + const endpointB = this.mapOfSubtask.get(b['index'])?.endpoint ?? ''; + return endpointA > endpointB ? 1 : endpointA < endpointB ? -1 : 0; }; public readonly sortEndToEndDurationFn = createSortFn(item => item.end_to_end_duration); public readonly sortCheckpointedSizeFn = createSortFn(item => item.checkpointed_size); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java index 9552877392c19..d9c63d9d52f43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java @@ -567,8 +567,7 @@ public void reportIncompleteStats( metrics.getAlignmentDurationNanos() / 1_000_000, metrics.getCheckpointStartDelayNanos() / 1_000_000, metrics.getUnalignedCheckpoint(), - false, - null)); + false)); dirty = true; } } finally { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index cc8e780d30c0d..c1b428c201309 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -428,14 +427,6 @@ public TaskAcknowledgeResult acknowledgeTask( long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000; - String taskManagerIp = null; - if (vertex.getCurrentExecutionAttempt() != null) { - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - if (location != null) { - taskManagerIp = location.address().getHostAddress(); - } - } - SubtaskStateStats subtaskStateStats = new SubtaskStateStats( vertex.getParallelSubtaskIndex(), @@ -449,8 +440,7 @@ public TaskAcknowledgeResult acknowledgeTask( alignmentDurationMillis, checkpointStartDelayMillis, metrics.getUnalignedCheckpoint(), - true, - taskManagerIp); + true); LOG.trace( "Checkpoint {} stats for {}: size={}Kb, duration={}ms, sync part={}ms, async part={}ms", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java index 8c82d689e3fb6..3908dbbcd3588 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java @@ -66,10 +66,8 @@ public class SubtaskStateStats implements Serializable { /** Is the checkpoint completed by this subtask. */ private final boolean completed; - private final String ip; - SubtaskStateStats(int subtaskIndex, long ackTimestamp) { - this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null); + this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true); } SubtaskStateStats( @@ -84,8 +82,8 @@ public class SubtaskStateStats implements Serializable { long alignmentDuration, long checkpointStartDelay, boolean unalignedCheckpoint, - boolean completed, - String ip) { + boolean completed) { + checkArgument(subtaskIndex >= 0, "Negative subtask index"); this.subtaskIndex = subtaskIndex; checkArgument(checkpointedSize >= 0, "Negative incremental state size"); @@ -101,7 +99,6 @@ public class SubtaskStateStats implements Serializable { this.checkpointStartDelay = checkpointStartDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.completed = completed; - this.ip = ip; } public int getSubtaskIndex() { @@ -197,8 +194,4 @@ public boolean getUnalignedCheckpoint() { public boolean isCompleted() { return completed; } - - public String getIp() { - return ip; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index b49707c426561..473c3f52c920e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -220,8 +220,7 @@ private static List createSubtaskCheckpointStatisti subtask.getAlignmentDuration()), subtask.getCheckpointStartDelay(), subtask.getUnalignedCheckpoint(), - !subtask.isCompleted(), - subtask.getIp())); + !subtask.isCompleted())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java index deb0f0f765f69..613916eb3d762 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java @@ -111,8 +111,6 @@ public static final class CompletedSubtaskCheckpointStatistics public static final String FIELD_NAME_CHECKPOINTED_SIZE = "checkpointed_size"; - public static final String FIELD_NAME_IP_ADDRESS = "ip"; - /** * The accurate name of this field should be 'checkpointed_data_size', keep it as before to * not break backwards compatibility for old web UI. @@ -158,9 +156,6 @@ public static final class CompletedSubtaskCheckpointStatistics @JsonProperty(FIELD_NAME_ABORTED) private final boolean aborted; - @JsonProperty(value = FIELD_NAME_IP_ADDRESS, required = false) - private final String ip; - @JsonCreator public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_INDEX) int index, @@ -172,8 +167,7 @@ public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment, @JsonProperty(FIELD_NAME_START_DELAY) long startDelay, @JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT) boolean unalignedCheckpoint, - @JsonProperty(FIELD_NAME_ABORTED) boolean aborted, - @JsonProperty(FIELD_NAME_IP_ADDRESS) String ip) { + @JsonProperty(FIELD_NAME_ABORTED) boolean aborted) { super(index, "completed"); this.ackTimestamp = ackTimestamp; this.duration = duration; @@ -184,7 +178,6 @@ public CompletedSubtaskCheckpointStatistics( this.startDelay = startDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.aborted = aborted; - this.ip = ip; } public long getAckTimestamp() { @@ -223,10 +216,6 @@ public boolean isAborted() { return aborted; } - public String getIp() { - return ip; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -244,8 +233,7 @@ public boolean equals(Object o) { && Objects.equals(alignment, that.alignment) && startDelay == that.startDelay && unalignedCheckpoint == that.unalignedCheckpoint - && aborted == that.aborted - && Objects.equals(ip, that.ip); + && aborted == that.aborted; } @Override @@ -259,8 +247,7 @@ public int hashCode() { alignment, startDelay, unalignedCheckpoint, - aborted, - ip); + aborted); } /** Duration of the checkpoint. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 463e7f4a02325..8fe8dc19ed27f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -415,7 +415,7 @@ void testIsJavaSerializable() throws Exception { 44L, true, new SubtaskStateStats( - 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true, null), + 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true), null); CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java index fc051e4ef9825..b3bbe4b3d9b02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java @@ -673,14 +673,13 @@ public void addEvent(EventBuilder eventBuilder) { subtasksByVertex); pending.reportSubtaskStats( - jobVertexID0, - new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true, null)); + jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true)); pending.reportSubtaskStats( jobVertexID0, - new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true, null)); + new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true)); pending.reportSubtaskStats( jobVertexID1, - new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true, null)); + new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true)); // Complete checkpoint => new snapshot tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984)); return reportedSpansOut; @@ -985,8 +984,7 @@ public > G gauge(String name, G gauge) { ignored, ignored, false, - true, - null); + true); assertThat(pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue(); @@ -1073,7 +1071,7 @@ private SubtaskStateStats createSubtaskStats(int index) { } private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) { - return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true, null); + return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true); } private void reportRestoredCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index 9181cee3363dc..e55cbb6fca748 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -284,7 +284,6 @@ private SubtaskStateStats createSubtaskStats(int index, boolean unalignedCheckpo Integer.MAX_VALUE + (long) index, Integer.MAX_VALUE + (long) index, unalignedCheckpoint, - true, - null); + true); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 54972ef746049..4dbc22dc34a90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -28,12 +28,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer; import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo; @@ -41,7 +39,6 @@ import org.apache.flink.runtime.state.TestingStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.concurrent.Executors; @@ -52,13 +49,11 @@ import java.io.IOException; import java.lang.reflect.Field; -import java.net.InetAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -514,85 +509,6 @@ void testReportTaskFinishedOperators() throws IOException { .contains(ACK_TASKS.get(0).getVertex()); } - @Test - void testAcknowledgeTaskCapturesTaskManagerIp() throws Exception { - final String expectedIp = "10.0.0.1"; - final ExecutionAttemptID ipTestAttemptId = createExecutionAttemptId(); - final JobVertexID jobVertexId = new JobVertexID(); - - ExecutionJobVertex ejv = mock(ExecutionJobVertex.class); - when(ejv.getOperatorIDs()) - .thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID))); - - TaskManagerLocation location = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getByName(expectedIp), 6121); - - Execution currentAttempt = mock(Execution.class); - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getMaxParallelism()).thenReturn(MAX_PARALLELISM); - when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(PARALLELISM); - when(vertex.getJobVertex()).thenReturn(ejv); - when(vertex.getJobvertexId()).thenReturn(jobVertexId); - when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("test-task (0/1)"); - when(vertex.getCurrentExecutionAttempt()).thenReturn(currentAttempt); - when(vertex.getCurrentAssignedResourceLocation()).thenReturn(location); - - Execution execution = mock(Execution.class); - when(execution.getAttemptId()).thenReturn(ipTestAttemptId); - when(execution.getVertex()).thenReturn(vertex); - - Map taskStatsCounts = new HashMap<>(); - taskStatsCounts.put(jobVertexId, PARALLELISM); - PendingCheckpointStats pendingStats = - new PendingCheckpointStats( - 0, - 1, - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - taskStatsCounts); - - CheckpointPlan plan = - new DefaultCheckpointPlan( - Collections.emptyList(), - Collections.singletonList(execution), - Collections.singletonList(vertex), - Collections.emptyList(), - Collections.emptyList(), - true); - - final Path checkpointDir = new Path(TempDirUtils.newFolder(tmpFolder).toURI()); - final FsCheckpointStorageLocation storageLocation = - new FsCheckpointStorageLocation( - LocalFileSystem.getSharedInstance(), - checkpointDir, - checkpointDir, - checkpointDir, - CheckpointStorageLocationReference.getDefault(), - 1024, - 4096); - - PendingCheckpoint checkpoint = - new PendingCheckpoint( - new JobID(), - 0, - 1, - plan, - Collections.emptyList(), - Collections.emptyList(), - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new CompletableFuture<>(), - pendingStats, - new CompletableFuture<>()); - checkpoint.setCheckpointTargetLocation(storageLocation); - - checkpoint.acknowledgeTask(ipTestAttemptId, null, new CheckpointMetrics()); - - assertThat(pendingStats.getLatestAcknowledgedSubtaskStats()).isNotNull(); - assertThat(pendingStats.getLatestAcknowledgedSubtaskStats().getIp()).isEqualTo(expectedIp); - } - // ------------------------------------------------------------------------ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java index 8ffb750b08f77..30970fcd9ee00 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java @@ -52,8 +52,7 @@ public void test(boolean serialize) throws Exception { Integer.MAX_VALUE + 6L, Integer.MAX_VALUE + 7L, false, - true, - "10.0.0.1"); + true); stats = serialize ? CommonTestUtils.createCopySerializable(stats) : stats; @@ -64,7 +63,6 @@ public void test(boolean serialize) throws Exception { assertThat(stats.getAsyncCheckpointDuration()).isEqualTo(Integer.MAX_VALUE + 4L); assertThat(stats.getAlignmentDuration()).isEqualTo(Integer.MAX_VALUE + 6L); assertThat(stats.getCheckpointStartDelay()).isEqualTo(Integer.MAX_VALUE + 7L); - assertThat(stats.getIp()).isEqualTo("10.0.0.1"); // Check duration helper long ackTimestamp = stats.getAckTimestamp(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java index 5a1f86dd44109..e754e33117423 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java @@ -74,8 +74,7 @@ private void test(boolean serialize) throws Exception { rand.nextInt(128), rand.nextInt(128), false, - true, - null); + true); stateSize += subtasks[i].getStateSize(); processedData += subtasks[i].getProcessedData(); @@ -95,8 +94,7 @@ private void test(boolean serialize) throws Exception { assertThat( taskStats.reportSubtaskStats( - new SubtaskStateStats( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null))) + new SubtaskStateStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true))) .isFalse(); taskStats = serialize ? CommonTestUtils.createCopySerializable(taskStats) : taskStats; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java index 67d8020d8c811..89b8acfa52c81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java @@ -73,8 +73,7 @@ protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance() .CheckpointAlignment(2L, 4L, 5L, 3L), 42L, true, - false, - "192.168.1.100")); + false)); return new TaskCheckpointStatisticsWithSubtaskDetails( 4L,