Skip to content

Commit 7aeb395

Browse files
Add current_application_duration_ms to cluster state download stats in node stats API; add UT for test coverage (#20922)
Signed-off-by: Ayushi Arya <ayuaryak@amazon.com>
1 parent ac2b2fd commit 7aeb395

9 files changed

Lines changed: 194 additions & 13 deletions

File tree

server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public interface ClusterApplier {
6464
*/
6565
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);
6666

67+
/**
68+
* Returns the duration in milliseconds of the currently running cluster state application,
69+
* or 0 if no application is in progress
70+
*/
71+
default long getCurrentApplicationDurationMs() {
72+
return 0;
73+
}
74+
6775
/**
6876
* Listener for results of cluster state application
6977
*

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
129129

130130
private final ClusterManagerMetrics clusterManagerMetrics;
131131

132+
// application duration tracking
133+
private static final long NOT_RUNNING = -1L;
134+
private volatile long applicationStartTimeNanos = NOT_RUNNING;
135+
132136
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
133137
this(nodeName, settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
134138
}
@@ -468,7 +472,7 @@ private void runTask(UpdateTask task) {
468472
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
469473
return;
470474
}
471-
475+
this.applicationStartTimeNanos = System.nanoTime();
472476
logger.debug("processing [{}]: execute", task.source);
473477
final ClusterState previousClusterState = state.get();
474478

@@ -492,6 +496,7 @@ private void runTask(UpdateTask task) {
492496
e
493497
);
494498
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
499+
this.applicationStartTimeNanos = NOT_RUNNING;
495500
task.listener.onFailure(task.source, e);
496501
return;
497502
}
@@ -500,6 +505,7 @@ private void runTask(UpdateTask task) {
500505
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
501506
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
502507
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
508+
this.applicationStartTimeNanos = NOT_RUNNING;
503509
task.listener.onSuccess(task.source);
504510
} else {
505511
if (logger.isTraceEnabled()) {
@@ -524,6 +530,7 @@ private void runTask(UpdateTask task) {
524530
newClusterState.stateUUID()
525531
);
526532
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
533+
this.applicationStartTimeNanos = NOT_RUNNING;
527534
// Then we call the ClusterApplyListener of the task
528535
task.listener.onSuccess(task.source);
529536
} catch (Exception e) {
@@ -555,6 +562,7 @@ private void runTask(UpdateTask task) {
555562
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
556563
// continue we will retry with the same cluster state but that might not help.
557564
assert applicationMayFail();
565+
this.applicationStartTimeNanos = NOT_RUNNING;
558566
task.listener.onFailure(task.source, e);
559567
}
560568
}
@@ -785,6 +793,19 @@ protected boolean applicationMayFail() {
785793
return false;
786794
}
787795

796+
/**
797+
* Returns the duration in milliseconds of the currently running cluster state application,
798+
* or 0 if no application is in progress.
799+
*/
800+
@Override
801+
public long getCurrentApplicationDurationMs() {
802+
long startNanos = this.applicationStartTimeNanos;
803+
if (startNanos == NOT_RUNNING) {
804+
return 0;
805+
}
806+
return TimeValue.nsecToMSec(System.nanoTime() - startNanos);
807+
}
808+
788809
/**
789810
* Pre-commit State of the cluster-applier
790811
* @return ClusterState

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
224224
private final Supplier<RepositoriesService> repositoriesService;
225225
private final Settings settings;
226226
private final LongSupplier relativeTimeNanosSupplier;
227+
private final LongSupplier applicationDurationMsSupplier;
227228
private final ThreadPool threadpool;
228229
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
229230
private BlobStoreRepository blobStoreRepository;
@@ -269,13 +270,15 @@ public RemoteClusterStateService(
269270
LongSupplier relativeTimeNanosSupplier,
270271
ThreadPool threadPool,
271272
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
272-
NamedWriteableRegistry namedWriteableRegistry
273+
NamedWriteableRegistry namedWriteableRegistry,
274+
LongSupplier applicationDurationMsSupplier
273275
) {
274276
assert isRemoteClusterStateConfigured(settings) : "Remote cluster state is not configured";
275277
this.nodeId = nodeId;
276278
this.repositoriesService = repositoriesService;
277279
this.settings = settings;
278280
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
281+
this.applicationDurationMsSupplier = applicationDurationMsSupplier;
279282
this.threadpool = threadPool;
280283
clusterSettings = clusterService.getClusterSettings();
281284
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
@@ -2115,11 +2118,15 @@ public PersistedStateStats getUploadStats() {
21152118
}
21162119

21172120
public PersistedStateStats getFullDownloadStats() {
2118-
return remoteStateStats.getRemoteFullDownloadStats();
2121+
RemoteDownloadStats stats = (RemoteDownloadStats) remoteStateStats.getRemoteFullDownloadStats();
2122+
stats.setCurrentApplicationDurationMs(applicationDurationMsSupplier.getAsLong());
2123+
return stats;
21192124
}
21202125

21212126
public PersistedStateStats getDiffDownloadStats() {
2122-
return remoteStateStats.getRemoteDiffDownloadStats();
2127+
RemoteDownloadStats stats = (RemoteDownloadStats) remoteStateStats.getRemoteDiffDownloadStats();
2128+
stats.setCurrentApplicationDurationMs(applicationDurationMsSupplier.getAsLong());
2129+
return stats;
21232130
}
21242131

21252132
public void fullDownloadFailed() {

server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ public class RemoteDownloadStats extends PersistedStateStats {
2222
private AtomicLong checksumValidationFailedCount = new AtomicLong(0);
2323
public static final String INCOMING_PUBLICATION_FAILED_COUNT = "incoming_publication_failed_count";
2424
private AtomicLong incomingPublicationFailedCount = new AtomicLong(0);
25+
static final String CURRENT_APPLICATION_DURATION_MS = "current_application_duration_ms";
26+
private AtomicLong currentApplicationDurationMs = new AtomicLong(0);
2527

2628
public RemoteDownloadStats(String statsName) {
2729
super(statsName);
2830
addToExtendedFields(CHECKSUM_VALIDATION_FAILED_COUNT, checksumValidationFailedCount);
2931
addToExtendedFields(INCOMING_PUBLICATION_FAILED_COUNT, incomingPublicationFailedCount);
32+
addToExtendedFields(CURRENT_APPLICATION_DURATION_MS, currentApplicationDurationMs);
3033
}
3134

3235
public void checksumValidationFailedCount() {
@@ -44,4 +47,12 @@ public void incomingPublicationFailedCount() {
4447
public long getIncomingPublicationFailedCount() {
4548
return incomingPublicationFailedCount.get();
4649
}
50+
51+
public void setCurrentApplicationDurationMs(long durationMs) {
52+
currentApplicationDurationMs.set(durationMs);
53+
}
54+
55+
public long getCurrentApplicationDurationMs() {
56+
return currentApplicationDurationMs.get();
57+
}
4758
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,9 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
891891
threadPool::preciseRelativeTimeInNanos,
892892
threadPool,
893893
List.of(remoteIndexPathUploader),
894-
namedWriteableRegistry
894+
namedWriteableRegistry,
895+
// Supplier for current cluster state application duration in ms (0 if idle)
896+
() -> clusterService.getClusterApplierService().getCurrentApplicationDurationMs()
895897
);
896898
remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager();
897899
} else {

server/src/test/java/org/opensearch/cluster/coordination/PersistedStateStatsTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88

99
package org.opensearch.cluster.coordination;
1010

11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
1113
import org.opensearch.test.OpenSearchTestCase;
1214
import org.junit.Before;
1315

16+
import java.io.IOException;
1417
import java.util.concurrent.atomic.AtomicLong;
1518

1619
public class PersistedStateStatsTests extends OpenSearchTestCase {
@@ -59,4 +62,63 @@ public void testAddMultipleFields() {
5962
assertEquals(42, persistedStateStats.getExtendedFields().get(fieldName1).get());
6063
assertEquals(84, persistedStateStats.getExtendedFields().get(fieldName2).get());
6164
}
65+
66+
public void testBwcSerializationWithExtraExtendedField() throws IOException {
67+
// Simulate NEW node writing stats with 3 extended fields
68+
PersistedStateStats newNodeStats = new PersistedStateStats("test_download");
69+
newNodeStats.addToExtendedFields("checksum_validation_failed_count", new AtomicLong(0));
70+
newNodeStats.addToExtendedFields("incoming_publication_failed_count", new AtomicLong(1));
71+
newNodeStats.addToExtendedFields("current_application_duration_ms", new AtomicLong(42));
72+
73+
// Serialize
74+
BytesStreamOutput out = new BytesStreamOutput();
75+
newNodeStats.writeTo(out);
76+
77+
// Deserialize as generic PersistedStateStats (simulates OLD node reading)
78+
StreamInput in = out.bytes().streamInput();
79+
PersistedStateStats oldNodeDeserialized = new PersistedStateStats(in);
80+
81+
// Old node reads all 3 fields successfully — no bytes left unread
82+
assertEquals(3, oldNodeDeserialized.getExtendedFields().size());
83+
assertEquals(42, oldNodeDeserialized.getExtendedFields().get("current_application_duration_ms").get());
84+
85+
// Simulate OLD node writing stats with only 2 extended fields
86+
PersistedStateStats oldNodeStats = new PersistedStateStats("test_download");
87+
oldNodeStats.addToExtendedFields("checksum_validation_failed_count", new AtomicLong(0));
88+
oldNodeStats.addToExtendedFields("incoming_publication_failed_count", new AtomicLong(1));
89+
// Note: NO current_application_duration_ms
90+
91+
BytesStreamOutput out2 = new BytesStreamOutput();
92+
oldNodeStats.writeTo(out2);
93+
94+
StreamInput in2 = out2.bytes().streamInput();
95+
PersistedStateStats newNodeDeserialized = new PersistedStateStats(in2);
96+
97+
// New node reads only 2 fields — current_application_duration_ms simply absent
98+
assertEquals(2, newNodeDeserialized.getExtendedFields().size());
99+
assertFalse(newNodeDeserialized.getExtendedFields().containsKey("current_application_duration_ms"));
100+
}
101+
102+
// serialization with extendedFields
103+
public void testSerializationRoundTripWithExtendedFields() throws IOException {
104+
PersistedStateStats original = new PersistedStateStats("test_download");
105+
original.stateSucceeded();
106+
original.stateTook(100);
107+
original.addToExtendedFields("current_application_duration_ms", new AtomicLong(5000));
108+
109+
// Serialize
110+
BytesStreamOutput out = new BytesStreamOutput();
111+
original.writeTo(out);
112+
113+
// Deserialize
114+
StreamInput in = out.bytes().streamInput();
115+
PersistedStateStats deserialized = new PersistedStateStats(in);
116+
117+
assertEquals("test_download", deserialized.getStatsName());
118+
assertEquals(1, deserialized.getSuccessCount());
119+
assertEquals(100, deserialized.getTotalTimeInMillis());
120+
assertTrue(deserialized.getExtendedFields().containsKey("current_application_duration_ms"));
121+
assertEquals(5000, deserialized.getExtendedFields().get("current_application_duration_ms").get());
122+
}
123+
62124
}

server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,68 @@ public void onFailure(String source, Exception e) {
677677
verifyNoInteractions(listenerslatencyHistogram);
678678
}
679679

680+
// testing for application duration tracking
681+
682+
public void testGetCurrentApplicationDurationMsReturnsZeroWhenIdle() {
683+
// No task running → should return 0
684+
assertEquals(0, clusterApplierService.getCurrentApplicationDurationMs());
685+
}
686+
687+
public void testGetCurrentApplicationDurationMsDuringApplication() throws Exception {
688+
689+
CountDownLatch taskStarted = new CountDownLatch(1);
690+
CountDownLatch taskCanFinish = new CountDownLatch(1);
691+
CountDownLatch taskDone = new CountDownLatch(1);
692+
693+
clusterApplierService.onNewClusterState("test", () -> {
694+
taskStarted.countDown();
695+
try {
696+
taskCanFinish.await();
697+
} catch (InterruptedException e) {
698+
Thread.currentThread().interrupt();
699+
}
700+
return ClusterState.builder(clusterApplierService.state()).build();
701+
}, new ClusterApplier.ClusterApplyListener() {
702+
@Override
703+
public void onSuccess(String source) {
704+
taskDone.countDown();
705+
}
706+
707+
@Override
708+
public void onFailure(String source, Exception e) {
709+
taskDone.countDown();
710+
}
711+
});
712+
713+
taskStarted.await(); // Wait for task to start
714+
long duration = clusterApplierService.getCurrentApplicationDurationMs();
715+
assertTrue("Duration should be >= 0 during application, got: " + duration, duration >= 0);
716+
taskCanFinish.countDown(); // Let task finish
717+
taskDone.await(); // Wait for task to fully complete to avoid interference with other tests
718+
}
719+
720+
public void testGetCurrentApplicationDurationMsResetsAfterCompletion() throws Exception {
721+
CountDownLatch taskDone = new CountDownLatch(1);
722+
723+
clusterApplierService.onNewClusterState(
724+
"test",
725+
() -> ClusterState.builder(clusterApplierService.state()).build(),
726+
new ClusterApplier.ClusterApplyListener() {
727+
@Override
728+
public void onSuccess(String source) {
729+
taskDone.countDown();
730+
}
731+
732+
@Override
733+
public void onFailure(String source, Exception e) {
734+
taskDone.countDown();
735+
}
736+
}
737+
);
738+
taskDone.await();
739+
assertEquals(0, clusterApplierService.getCurrentApplicationDurationMs());
740+
}
741+
680742
static class TimedClusterApplierService extends ClusterApplierService {
681743

682744
final ClusterSettings clusterSettings;

server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,8 @@ public void testDataOnlyNodePersistence() throws Exception {
524524
DefaultRemoteStoreSettings.INSTANCE
525525
)
526526
),
527-
writableRegistry()
527+
writableRegistry(),
528+
() -> 0L
528529
);
529530
} else {
530531
return null;

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ public void setup() {
263263
DefaultRemoteStoreSettings.INSTANCE
264264
)
265265
),
266-
namedWriteableRegistry
266+
namedWriteableRegistry,
267+
() -> 0L
267268
);
268269
}
269270

@@ -305,7 +306,8 @@ public void testFailInitializationWhenRemoteStateDisabled() {
305306
DefaultRemoteStoreSettings.INSTANCE
306307
)
307308
),
308-
writableRegistry()
309+
writableRegistry(),
310+
() -> 0L
309311
)
310312
);
311313
}
@@ -383,7 +385,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
383385
DefaultRemoteStoreSettings.INSTANCE
384386
)
385387
),
386-
writableRegistry()
388+
writableRegistry(),
389+
() -> 0L
387390
);
388391
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
389392
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
@@ -757,7 +760,8 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
757760
DefaultRemoteStoreSettings.INSTANCE
758761
)
759762
),
760-
writableRegistry()
763+
writableRegistry(),
764+
() -> 0L
761765
);
762766
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
763767
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
@@ -2798,7 +2802,8 @@ public void testRemoteRoutingTableInitializedWhenEnabled() {
27982802
DefaultRemoteStoreSettings.INSTANCE
27992803
)
28002804
),
2801-
writableRegistry()
2805+
writableRegistry(),
2806+
() -> 0L
28022807
);
28032808
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService);
28042809
}
@@ -3070,7 +3075,8 @@ private void initializeRoutingTable() {
30703075
DefaultRemoteStoreSettings.INSTANCE
30713076
)
30723077
),
3073-
writableRegistry()
3078+
writableRegistry(),
3079+
() -> 0L
30743080
);
30753081
}
30763082

@@ -3100,7 +3106,8 @@ private void initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClust
31003106
DefaultRemoteStoreSettings.INSTANCE
31013107
)
31023108
),
3103-
writableRegistry()
3109+
writableRegistry(),
3110+
() -> 0L
31043111
);
31053112
}
31063113

0 commit comments

Comments
 (0)