Skip to content

Commit b72ebd1

Browse files
feat(java): Add support for cluster management and node management commands (#5503)
* feat(java): Add support for cluster management and node management commands Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): amend feedbacks Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix CI failures Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix integration tests Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix failed tests Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): format fix Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix failed test Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix failing integration test Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): amend feedbacks Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix failing tests Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> * refactor(java): fix failing tests Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> --------- Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com> Signed-off-by: Thomas Zhou <54688146+xShinnRyuu@users.noreply.github.com> Co-authored-by: Thomas Zhou <54688146+xShinnRyuu@users.noreply.github.com>
1 parent 7d3579c commit b72ebd1

16 files changed

Lines changed: 1770 additions & 33 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* CORE: Fix Tokio runtime blocking in cluster async code by replacing std::sync locks with async-safe alternatives ([#5450](https://github.com/valkey-io/valkey-glide/issues/5450))
1717
* Core: Maintain throughput during cluster failover by making reconnection non-blocking ([#4990](https://github.com/valkey-io/valkey-glide/issues/4990))
1818
* FFI: Add OpenTelemetry DB semantic convention attributes to FFI path ([#5596](https://github.com/valkey-io/valkey-glide/issues/5596))
19+
* JAVA: Add cluster management commands (CLUSTER MEET, CLUSTER FORGET, CLUSTER REPLICATE, CLUSTER REPLICAS, CLUSTER COUNT-FAILURE-REPORTS, CLUSTER FAILOVER, CLUSTER SETSLOT, CLUSTER BUMPEPOCH, CLUSTER SET-CONFIG-EPOCH, CLUSTER FLUSHSLOTS, CLUSTER RESET, READONLY, READWRITE, ASKING, CLUSTER SAVECONFIG, CLUSTER GETKEYSINSLOT) ([#5503](https://github.com/valkey-io/valkey-glide/pull/5503))
1920

2021
#### Fixes
2122
* CORE: Skip compression/decompression code paths when compression is not configured to eliminate per-command overhead ([#5644](https://github.com/valkey-io/valkey-glide/pull/5644))

java/client/src/main/java/glide/api/BaseClient.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,6 @@
339339
import java.util.function.Function;
340340
import java.util.stream.Collectors;
341341
import lombok.NonNull;
342-
import lombok.RequiredArgsConstructor;
343342
import org.apache.commons.lang3.ArrayUtils;
344343
import response.ResponseOuterClass.ConstantResponse;
345344
import response.ResponseOuterClass.Response;
@@ -437,12 +436,22 @@ protected BaseClient(ClientBuilder builder) {
437436
}
438437

439438
/** Auxiliary builder which wraps all fields */
440-
@RequiredArgsConstructor
441439
protected static class ClientBuilder {
442440
private final ConnectionManager connectionManager;
443441
private final CommandManager commandManager;
444442
private final MessageHandler messageHandler;
445443
private final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;
444+
445+
protected ClientBuilder(
446+
ConnectionManager connectionManager,
447+
CommandManager commandManager,
448+
MessageHandler messageHandler,
449+
Optional<BaseSubscriptionConfiguration> subscriptionConfiguration) {
450+
this.connectionManager = connectionManager;
451+
this.commandManager = commandManager;
452+
this.messageHandler = messageHandler;
453+
this.subscriptionConfiguration = subscriptionConfiguration;
454+
}
446455
}
447456

448457
/**

java/client/src/main/java/glide/api/GlideClusterClient.java

Lines changed: 184 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,33 @@
11
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
22
package glide.api;
33

4+
import static command_request.CommandRequestOuterClass.RequestType.Asking;
45
import static command_request.CommandRequestOuterClass.RequestType.ClientGetName;
56
import static command_request.CommandRequestOuterClass.RequestType.ClientId;
67
import static command_request.CommandRequestOuterClass.RequestType.ClusterAddSlots;
78
import static command_request.CommandRequestOuterClass.RequestType.ClusterAddSlotsRange;
9+
import static command_request.CommandRequestOuterClass.RequestType.ClusterBumpEpoch;
10+
import static command_request.CommandRequestOuterClass.RequestType.ClusterCountFailureReports;
811
import static command_request.CommandRequestOuterClass.RequestType.ClusterCountKeysInSlot;
912
import static command_request.CommandRequestOuterClass.RequestType.ClusterDelSlots;
1013
import static command_request.CommandRequestOuterClass.RequestType.ClusterDelSlotsRange;
14+
import static command_request.CommandRequestOuterClass.RequestType.ClusterFailover;
15+
import static command_request.CommandRequestOuterClass.RequestType.ClusterFlushSlots;
16+
import static command_request.CommandRequestOuterClass.RequestType.ClusterForget;
1117
import static command_request.CommandRequestOuterClass.RequestType.ClusterGetKeysInSlot;
1218
import static command_request.CommandRequestOuterClass.RequestType.ClusterInfo;
1319
import static command_request.CommandRequestOuterClass.RequestType.ClusterKeySlot;
1420
import static command_request.CommandRequestOuterClass.RequestType.ClusterLinks;
21+
import static command_request.CommandRequestOuterClass.RequestType.ClusterMeet;
1522
import static command_request.CommandRequestOuterClass.RequestType.ClusterMyId;
1623
import static command_request.CommandRequestOuterClass.RequestType.ClusterMyShardId;
1724
import static command_request.CommandRequestOuterClass.RequestType.ClusterNodes;
25+
import static command_request.CommandRequestOuterClass.RequestType.ClusterReplicas;
26+
import static command_request.CommandRequestOuterClass.RequestType.ClusterReplicate;
27+
import static command_request.CommandRequestOuterClass.RequestType.ClusterReset;
28+
import static command_request.CommandRequestOuterClass.RequestType.ClusterSaveConfig;
29+
import static command_request.CommandRequestOuterClass.RequestType.ClusterSetConfigEpoch;
30+
import static command_request.CommandRequestOuterClass.RequestType.ClusterSetslot;
1831
import static command_request.CommandRequestOuterClass.RequestType.ClusterShards;
1932
import static command_request.CommandRequestOuterClass.RequestType.ConfigGet;
2033
import static command_request.CommandRequestOuterClass.RequestType.ConfigResetStat;
@@ -43,6 +56,8 @@
4356
import static command_request.CommandRequestOuterClass.RequestType.PubSubShardChannels;
4457
import static command_request.CommandRequestOuterClass.RequestType.PubSubShardNumSub;
4558
import static command_request.CommandRequestOuterClass.RequestType.RandomKey;
59+
import static command_request.CommandRequestOuterClass.RequestType.ReadOnly;
60+
import static command_request.CommandRequestOuterClass.RequestType.ReadWrite;
4661
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
4762
import static command_request.CommandRequestOuterClass.RequestType.SSubscribe;
4863
import static command_request.CommandRequestOuterClass.RequestType.SSubscribeBlocking;
@@ -66,9 +81,13 @@
6681
import static glide.utils.ArrayTransformUtils.concatenateArrays;
6782
import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
6883

84+
import glide.api.commands.ClusterAdminCommands;
6985
import glide.api.commands.ClusterManagementClusterCommands;
86+
import glide.api.commands.ClusterOperationsCommands;
87+
import glide.api.commands.ConnectionControlCommands;
7088
import glide.api.commands.ConnectionManagementClusterCommands;
7189
import glide.api.commands.GenericClusterCommands;
90+
import glide.api.commands.NodeManagementCommands;
7291
import glide.api.commands.PubSubClusterCommands;
7392
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
7493
import glide.api.commands.ServerManagementClusterCommands;
@@ -84,6 +103,9 @@
84103
import glide.api.models.commands.ScriptArgOptions;
85104
import glide.api.models.commands.ScriptArgOptionsGlideString;
86105
import glide.api.models.commands.batch.ClusterBatchOptions;
106+
import glide.api.models.commands.cluster.ClusterFailoverOptions;
107+
import glide.api.models.commands.cluster.ClusterResetOptions;
108+
import glide.api.models.commands.cluster.ClusterSetSlotOptions;
87109
import glide.api.models.commands.function.FunctionRestorePolicy;
88110
import glide.api.models.commands.scan.ClusterScanCursor;
89111
import glide.api.models.commands.scan.ScanOptions;
@@ -122,12 +144,16 @@
122144
* Documentation</a>.
123145
*/
124146
public class GlideClusterClient extends BaseClient
125-
implements ConnectionManagementClusterCommands,
147+
implements ClusterAdminCommands,
148+
ClusterOperationsCommands,
149+
ConnectionControlCommands,
150+
ConnectionManagementClusterCommands,
126151
GenericClusterCommands,
127-
ServerManagementClusterCommands,
152+
NodeManagementCommands,
153+
PubSubClusterCommands,
128154
ScriptingAndFunctionsClusterCommands,
155+
ServerManagementClusterCommands,
129156
TransactionsClusterCommands,
130-
PubSubClusterCommands,
131157
ClusterManagementClusterCommands {
132158

133159
/** Constructor using ClientParams from BaseClient. */
@@ -1728,6 +1754,161 @@ public CompletableFuture<Void> sunsubscribe(int timeoutMs) {
17281754
});
17291755
}
17301756

1757+
@Override
1758+
public CompletableFuture<String> clusterMeet(@NonNull String host, long port) {
1759+
return commandManager.submitNewCommand(
1760+
ClusterMeet, new String[] {host, Long.toString(port)}, this::handleStringResponse);
1761+
}
1762+
1763+
@Override
1764+
public CompletableFuture<ClusterValue<String>> clusterMeet(
1765+
@NonNull String host, long port, @NonNull Route route) {
1766+
return commandManager.submitNewCommand(
1767+
ClusterMeet,
1768+
new String[] {host, Long.toString(port)},
1769+
route,
1770+
response ->
1771+
route instanceof SingleNodeRoute
1772+
? ClusterValue.of(handleStringResponse(response))
1773+
: ClusterValue.of(handleMapResponse(response)));
1774+
}
1775+
1776+
@Override
1777+
public CompletableFuture<String> clusterForget(@NonNull String nodeId) {
1778+
return commandManager.submitNewCommand(
1779+
ClusterForget, new String[] {nodeId}, this::handleStringResponse);
1780+
}
1781+
1782+
@Override
1783+
public CompletableFuture<String> clusterReplicate(@NonNull String nodeId) {
1784+
return commandManager.submitNewCommand(
1785+
ClusterReplicate, new String[] {nodeId}, this::handleStringResponse);
1786+
}
1787+
1788+
@Override
1789+
public CompletableFuture<String[]> clusterReplicas(@NonNull String nodeId) {
1790+
return commandManager.submitNewCommand(
1791+
ClusterReplicas,
1792+
new String[] {nodeId},
1793+
response -> castArray(handleArrayResponse(response), String.class));
1794+
}
1795+
1796+
@Override
1797+
public CompletableFuture<ClusterValue<String[]>> clusterReplicas(
1798+
@NonNull String nodeId, @NonNull Route route) {
1799+
return commandManager.submitNewCommand(
1800+
ClusterReplicas,
1801+
new String[] {nodeId},
1802+
route,
1803+
response ->
1804+
route instanceof SingleNodeRoute
1805+
? ClusterValue.ofSingleValue(castArray(handleArrayResponse(response), String.class))
1806+
: ClusterValue.ofMultiValue(
1807+
castMapOfArrays(handleMapResponse(response), String.class)));
1808+
}
1809+
1810+
@Override
1811+
public CompletableFuture<Long> clusterCountFailureReports(@NonNull String nodeId) {
1812+
return commandManager.submitNewCommand(
1813+
ClusterCountFailureReports, new String[] {nodeId}, this::handleLongResponse);
1814+
}
1815+
1816+
@Override
1817+
public CompletableFuture<ClusterValue<Long>> clusterCountFailureReports(
1818+
@NonNull String nodeId, @NonNull Route route) {
1819+
return commandManager.submitNewCommand(
1820+
ClusterCountFailureReports,
1821+
new String[] {nodeId},
1822+
route,
1823+
response ->
1824+
route instanceof SingleNodeRoute
1825+
? ClusterValue.of(handleLongResponse(response))
1826+
: ClusterValue.of(handleMapResponse(response)));
1827+
}
1828+
1829+
@Override
1830+
public CompletableFuture<String> clusterFailover() {
1831+
return commandManager.submitNewCommand(
1832+
ClusterFailover, new String[0], this::handleStringResponse);
1833+
}
1834+
1835+
@Override
1836+
public CompletableFuture<String> clusterFailover(@NonNull ClusterFailoverOptions options) {
1837+
return commandManager.submitNewCommand(
1838+
ClusterFailover, options.toArgs(), this::handleStringResponse);
1839+
}
1840+
1841+
@Override
1842+
public CompletableFuture<String> clusterSetSlot(
1843+
long slot, @NonNull ClusterSetSlotOptions options) {
1844+
String[] args = concatenateArrays(new String[] {Long.toString(slot)}, options.toArgs());
1845+
return commandManager.submitNewCommand(ClusterSetslot, args, this::handleStringResponse);
1846+
}
1847+
1848+
@Override
1849+
public CompletableFuture<String> clusterBumpEpoch() {
1850+
return commandManager.submitNewCommand(
1851+
ClusterBumpEpoch, new String[0], this::handleStringResponse);
1852+
}
1853+
1854+
@Override
1855+
public CompletableFuture<String> clusterSetConfigEpoch(long configEpoch) {
1856+
return commandManager.submitNewCommand(
1857+
ClusterSetConfigEpoch,
1858+
new String[] {Long.toString(configEpoch)},
1859+
this::handleStringResponse);
1860+
}
1861+
1862+
@Override
1863+
public CompletableFuture<String> clusterFlushSlots() {
1864+
return commandManager.submitNewCommand(
1865+
ClusterFlushSlots, new String[0], this::handleStringResponse);
1866+
}
1867+
1868+
@Override
1869+
public CompletableFuture<String> clusterReset() {
1870+
return commandManager.submitNewCommand(ClusterReset, new String[0], this::handleStringResponse);
1871+
}
1872+
1873+
@Override
1874+
public CompletableFuture<String> clusterReset(@NonNull ClusterResetOptions options) {
1875+
return commandManager.submitNewCommand(
1876+
ClusterReset, options.toArgs(), this::handleStringResponse);
1877+
}
1878+
1879+
@Override
1880+
public CompletableFuture<String> readonly() {
1881+
return commandManager.submitNewCommand(ReadOnly, new String[0], this::handleStringResponse);
1882+
}
1883+
1884+
@Override
1885+
public CompletableFuture<String> readwrite() {
1886+
return commandManager.submitNewCommand(ReadWrite, new String[0], this::handleStringResponse);
1887+
}
1888+
1889+
@Override
1890+
public CompletableFuture<String> asking() {
1891+
return commandManager.submitNewCommand(Asking, new String[0], this::handleStringResponse);
1892+
}
1893+
1894+
@Override
1895+
public CompletableFuture<String> clusterSaveConfig() {
1896+
return commandManager.submitNewCommand(
1897+
ClusterSaveConfig, new String[0], this::handleStringResponse);
1898+
}
1899+
1900+
@Override
1901+
public CompletableFuture<ClusterValue<String>> clusterSaveConfig(@NonNull Route route) {
1902+
return commandManager.submitNewCommand(
1903+
ClusterSaveConfig,
1904+
new String[0],
1905+
route,
1906+
response ->
1907+
route instanceof SingleNodeRoute
1908+
? ClusterValue.of(handleStringResponse(response))
1909+
: ClusterValue.of(handleMapResponse(response)));
1910+
}
1911+
17311912
@Override
17321913
public CompletableFuture<String> clusterInfo() {
17331914
return commandManager.submitNewCommand(ClusterInfo, new String[0], this::handleStringResponse);
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
2+
package glide.api.commands;
3+
4+
import glide.api.models.ClusterValue;
5+
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
6+
import java.util.concurrent.CompletableFuture;
7+
8+
/**
9+
* Supports commands for cluster administration including configuration persistence and key
10+
* inspection during slot migration.
11+
*
12+
* @see <a href="https://valkey.io/commands/?group=cluster">Cluster Commands</a>
13+
*/
14+
public interface ClusterAdminCommands {
15+
16+
/**
17+
* Saves the cluster configuration to disk. This forces the node to persist its current cluster
18+
* configuration (including node mappings, slot assignments, etc.) to the cluster configuration
19+
* file on disk.<br>
20+
* The command will be routed to a random node in the cluster.
21+
*
22+
* @see <a href="https://valkey.io/commands/cluster-saveconfig/">valkey.io</a> for details.
23+
* @return <code>OK</code> if the configuration was successfully saved.
24+
* @example
25+
* <pre>{@code
26+
* String result = clusterClient.clusterSaveConfig().get();
27+
* assert result.equals("OK");
28+
* }</pre>
29+
*/
30+
CompletableFuture<String> clusterSaveConfig();
31+
32+
/**
33+
* Saves the cluster configuration to disk. This forces the node(s) to persist their current
34+
* cluster configuration (including node mappings, slot assignments, etc.) to the cluster
35+
* configuration file on disk.
36+
*
37+
* @see <a href="https://valkey.io/commands/cluster-saveconfig/">valkey.io</a> for details.
38+
* @param route Specifies the routing configuration for the command. The client will route the
39+
* command to the nodes defined by <code>route</code>.
40+
* @return <code>OK</code> if the configuration was successfully saved. When specifying a <code>
41+
* route</code> other than a single node, it returns a <code>Map{@literal <String, String>}
42+
* </code> with each address as the key and its corresponding result.
43+
* @example
44+
* <pre>{@code
45+
* ClusterValue<String> result = clusterClient.clusterSaveConfig(ALL_PRIMARIES).get();
46+
* // Command sent to all primary nodes, expecting MultiValue result.
47+
* for (Map.Entry<String, String> entry : result.getMultiValue().entrySet()) {
48+
* System.out.println("Node [" + entry.getKey() + "]: " + entry.getValue());
49+
* }
50+
* }</pre>
51+
*/
52+
CompletableFuture<ClusterValue<String>> clusterSaveConfig(Route route);
53+
54+
/**
55+
* Returns an array of keys stored in the specified hash slot. This command is useful for
56+
* inspecting the contents of a slot, particularly during slot migration or resharding operations.
57+
* <br>
58+
* The command will be routed to the node that owns the specified slot.
59+
*
60+
* @see <a href="https://valkey.io/commands/cluster-getkeysinslot/">valkey.io</a> for details.
61+
* @param slot The hash slot number to query (0-16383).
62+
* @param count The maximum number of keys to return from the slot.
63+
* @return An array of keys stored in the specified slot. The array may contain fewer keys than
64+
* <code>count</code> if the slot has fewer keys.
65+
* @example
66+
* <pre>{@code
67+
* // Get up to 10 keys from slot 1234
68+
* String[] keys = clusterClient.clusterGetKeysInSlot(1234, 10).get();
69+
* for (String key : keys) {
70+
* System.out.println("Key in slot 1234: " + key);
71+
* }
72+
* }</pre>
73+
*/
74+
CompletableFuture<String[]> clusterGetKeysInSlot(long slot, long count);
75+
}

0 commit comments

Comments
 (0)