Skip to content

Commit c2c33bc

Browse files
authored
Add global cluster client support (#1769)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 5bc6ddb commit c2c33bc

9 files changed

Lines changed: 1001 additions & 0 deletions

File tree

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import io.milvus.v2.service.vector.VectorService;
6363
import io.milvus.v2.service.vector.request.*;
6464
import io.milvus.v2.service.vector.response.*;
65+
import io.milvus.v2.client.globalcluster.GlobalClusterUtils;
66+
import io.milvus.v2.client.globalcluster.GlobalStub;
6567
import io.milvus.v2.utils.ClientUtils;
6668
import io.milvus.v2.utils.RpcUtils;
6769
import org.apache.commons.lang3.StringUtils;
@@ -87,6 +89,7 @@ public class MilvusClientV2 {
8789
private final CDCService cdcService = new CDCService();
8890
private RpcUtils rpcUtils = new RpcUtils();
8991
private ConnectConfig connectConfig;
92+
private GlobalStub globalStub;
9093

9194
/**
9295
* Creates a Milvus client instance.
@@ -125,6 +128,21 @@ private void initServices(String dbName) {
125128
*/
126129
private void connect(ConnectConfig connectConfig) {
127130
this.connectConfig = connectConfig;
131+
132+
// Check if this is a global cluster endpoint
133+
if (GlobalClusterUtils.isGlobalEndpoint(connectConfig.getUri())) {
134+
logger.info("Detected global cluster endpoint: {}", connectConfig.getUri());
135+
this.globalStub = new GlobalStub(connectConfig.getUri(), connectConfig, this::updatePrimaryConnection);
136+
updatePrimaryConnection(this.globalStub.getPrimaryClient());
137+
// Set up the global refresh trigger on RpcUtils for UNAVAILABLE errors
138+
this.rpcUtils.setGlobalRefreshTrigger(() -> {
139+
if (globalStub != null) {
140+
globalStub.triggerRefresh();
141+
}
142+
});
143+
return;
144+
}
145+
128146
if (connectConfig.isEnablePrecheck()) {
129147
clientUtils.validateHostname(connectConfig);
130148
clientUtils.validatePort(connectConfig);
@@ -160,6 +178,11 @@ private void connect(ConnectConfig connectConfig) {
160178
}
161179
}
162180

181+
private synchronized void updatePrimaryConnection(MilvusClientV2 primaryClient) {
182+
this.channel = primaryClient.channel;
183+
this.blockingStub = primaryClient.blockingStub;
184+
}
185+
163186
// The withDeadlineAfter() need to be reset for each RPC call.
164187
// If we set a blockingStub for multiple rpc calls, it eventually will timeout since the timeout is calculated
165188
// begin the first call and end with the last call.
@@ -1200,6 +1223,14 @@ public UpdateReplicateConfigurationResp updateReplicateConfiguration(UpdateRepli
12001223
* @throws InterruptedException throws InterruptedException if the client failed to close connection
12011224
*/
12021225
public void close(long maxWaitSeconds) throws InterruptedException {
1226+
if (globalStub != null) {
1227+
globalStub.close();
1228+
globalStub = null;
1229+
// channel is owned by the inner client, already closed by globalStub.close()
1230+
channel = null;
1231+
blockingStub = null;
1232+
return;
1233+
}
12031234
if (channel != null) {
12041235
channel.shutdownNow();
12051236
channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS);
@@ -1219,6 +1250,10 @@ public void close() {
12191250
}
12201251

12211252
public boolean clientIsReady() {
1253+
if (globalStub != null) {
1254+
MilvusClientV2 primaryClient = globalStub.getPrimaryClient();
1255+
return primaryClient != null && primaryClient.clientIsReady();
1256+
}
12221257
return channel != null && !channel.isShutdown() && !channel.isTerminated();
12231258
}
12241259
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.client.globalcluster;
21+
22+
public class ClusterCapability {
23+
public static final int READABLE = 0b01;
24+
public static final int WRITABLE = 0b10;
25+
public static final int PRIMARY = READABLE | WRITABLE; // 0b11
26+
27+
private ClusterCapability() {
28+
}
29+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.client.globalcluster;
21+
22+
public class ClusterInfo {
23+
private final String clusterId;
24+
private final String endpoint;
25+
private final int capability;
26+
27+
public ClusterInfo(String clusterId, String endpoint, int capability) {
28+
this.clusterId = clusterId;
29+
this.endpoint = endpoint;
30+
this.capability = capability;
31+
}
32+
33+
public String getClusterId() {
34+
return clusterId;
35+
}
36+
37+
public String getEndpoint() {
38+
return endpoint;
39+
}
40+
41+
public int getCapability() {
42+
return capability;
43+
}
44+
45+
public boolean isPrimary() {
46+
return (capability & ClusterCapability.WRITABLE) != 0;
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "ClusterInfo{" +
52+
"clusterId='" + clusterId + '\'' +
53+
", endpoint='" + endpoint + '\'' +
54+
", capability=" + capability +
55+
'}';
56+
}
57+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.client.globalcluster;
21+
22+
import com.google.gson.JsonArray;
23+
import com.google.gson.JsonElement;
24+
import com.google.gson.JsonObject;
25+
import com.google.gson.JsonParser;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.BufferedReader;
30+
import java.io.IOException;
31+
import java.io.InputStreamReader;
32+
import java.net.HttpURLConnection;
33+
import java.net.URL;
34+
import java.nio.charset.StandardCharsets;
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
import java.util.concurrent.ThreadLocalRandom;
38+
39+
public class GlobalClusterUtils {
40+
private static final Logger logger = LoggerFactory.getLogger(GlobalClusterUtils.class);
41+
42+
private static final String GLOBAL_CLUSTER_MARKER = "global-cluster";
43+
private static final String TOPOLOGY_PATH = "/global-cluster/topology";
44+
private static final int MAX_RETRIES = 3;
45+
private static final long BASE_BACKOFF_MS = 1000;
46+
private static final long MAX_BACKOFF_MS = 10000;
47+
private static final int REQUEST_TIMEOUT_MS = 10000;
48+
49+
private GlobalClusterUtils() {
50+
}
51+
52+
public static boolean isGlobalEndpoint(String uri) {
53+
if (uri == null) {
54+
return false;
55+
}
56+
return uri.toLowerCase().contains(GLOBAL_CLUSTER_MARKER);
57+
}
58+
59+
public static GlobalTopology fetchTopology(String globalEndpoint, String token) {
60+
String topologyUrl = buildTopologyUrl(globalEndpoint);
61+
62+
Exception lastException = null;
63+
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
64+
try {
65+
String responseBody = doHttpGet(topologyUrl, token);
66+
return parseTopologyResponse(responseBody);
67+
} catch (Exception e) {
68+
lastException = e;
69+
logger.warn("Failed to fetch global topology (attempt {}/{}): {}", attempt, MAX_RETRIES, e.getMessage());
70+
if (attempt < MAX_RETRIES) {
71+
long backoff = calculateBackoff(attempt);
72+
try {
73+
Thread.sleep(backoff);
74+
} catch (InterruptedException ie) {
75+
Thread.currentThread().interrupt();
76+
throw new RuntimeException("Interrupted while fetching global topology", ie);
77+
}
78+
}
79+
}
80+
}
81+
throw new RuntimeException("Failed to fetch global topology after " + MAX_RETRIES + " attempts", lastException);
82+
}
83+
84+
static String buildTopologyUrl(String globalEndpoint) {
85+
// Normalize the URI: ensure HTTPS scheme, trim whitespace, remove trailing slash.
86+
// The host and port are preserved as-is — the topology REST API is expected to be
87+
// served on the same host:port as the global endpoint.
88+
// Example: "https://xxx.global-cluster.yyy.com:443"
89+
// -> "https://xxx.global-cluster.yyy.com:443/global-cluster/topology"
90+
String base = globalEndpoint.trim();
91+
if (!base.startsWith("http://") && !base.startsWith("https://")) {
92+
base = "https://" + base;
93+
}
94+
if (base.endsWith("/")) {
95+
base = base.substring(0, base.length() - 1);
96+
}
97+
// Upgrade http to https — the topology API requires TLS
98+
if (base.startsWith("http://")) {
99+
base = "https://" + base.substring(7);
100+
}
101+
return base + TOPOLOGY_PATH;
102+
}
103+
104+
static String doHttpGet(String urlStr, String token) throws IOException {
105+
URL url = new URL(urlStr);
106+
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
107+
try {
108+
conn.setRequestMethod("GET");
109+
conn.setConnectTimeout(REQUEST_TIMEOUT_MS);
110+
conn.setReadTimeout(REQUEST_TIMEOUT_MS);
111+
conn.setRequestProperty("Accept", "application/json");
112+
if (token != null && !token.isEmpty()) {
113+
conn.setRequestProperty("Authorization", "Bearer " + token);
114+
}
115+
116+
int responseCode = conn.getResponseCode();
117+
if (responseCode != HttpURLConnection.HTTP_OK) {
118+
throw new IOException("HTTP request failed with status code: " + responseCode);
119+
}
120+
121+
StringBuilder response = new StringBuilder();
122+
try (BufferedReader reader = new BufferedReader(
123+
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
124+
String line;
125+
while ((line = reader.readLine()) != null) {
126+
response.append(line);
127+
}
128+
}
129+
return response.toString();
130+
} finally {
131+
conn.disconnect();
132+
}
133+
}
134+
135+
static GlobalTopology parseTopologyResponse(String responseBody) {
136+
JsonObject root = JsonParser.parseString(responseBody).getAsJsonObject();
137+
int code = root.get("code").getAsInt();
138+
if (code != 0) {
139+
String message = root.has("message") ? root.get("message").getAsString() : "unknown error";
140+
throw new RuntimeException("Global topology API returned error code " + code + ": " + message);
141+
}
142+
143+
JsonObject data = root.getAsJsonObject("data");
144+
long version = data.get("version").getAsLong();
145+
JsonArray clustersArray = data.getAsJsonArray("clusters");
146+
147+
List<ClusterInfo> clusters = new ArrayList<>();
148+
for (JsonElement elem : clustersArray) {
149+
JsonObject clusterObj = elem.getAsJsonObject();
150+
String clusterId = clusterObj.get("clusterId").getAsString();
151+
String endpoint = clusterObj.get("endpoint").getAsString();
152+
int capability = clusterObj.get("capability").getAsInt();
153+
clusters.add(new ClusterInfo(clusterId, endpoint, capability));
154+
}
155+
156+
return new GlobalTopology(version, clusters);
157+
}
158+
159+
private static long calculateBackoff(int attempt) {
160+
long backoff = BASE_BACKOFF_MS * (1L << (attempt - 1)); // exponential: 1s, 2s, 4s...
161+
backoff = Math.min(backoff, MAX_BACKOFF_MS);
162+
// Add 10% jitter
163+
long jitter = (long) (backoff * 0.1 * ThreadLocalRandom.current().nextDouble());
164+
return backoff + jitter;
165+
}
166+
}

0 commit comments

Comments
 (0)