Skip to content

Commit afea667

Browse files
authored
Refine the schema cache machinery and add test cases (#1456)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 6ac2927 commit afea667

15 files changed

Lines changed: 427 additions & 165 deletions

File tree

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@
4747
import io.milvus.param.resourcegroup.*;
4848
import io.milvus.param.role.*;
4949
import io.milvus.response.*;
50-
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
51-
import io.milvus.v2.service.vector.request.InsertReq;
52-
import io.milvus.v2.utils.DataUtils;
5350
import lombok.NonNull;
5451
import org.apache.commons.collections4.CollectionUtils;
5552
import org.apache.commons.lang3.StringUtils;
@@ -68,22 +65,31 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
6865
protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
6966
protected LogLevel logLevel = LogLevel.Info;
7067

71-
private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
68+
protected ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
7269

7370
protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
7471

7572
protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
7673

7774
protected abstract boolean clientIsReady();
7875

76+
protected abstract String currentDbName();
77+
78+
private String actualDbName(String overwriteName) {
79+
if (StringUtils.isNotEmpty(overwriteName)) {
80+
return overwriteName;
81+
}
82+
return currentDbName();
83+
}
84+
7985
/**
8086
* This method is for insert/upsert requests to reduce the rpc call of describeCollection()
8187
* Always try to get the collection info from cache.
8288
* If the cache doesn't have the collection info, call describeCollection() and cache it.
8389
* If insert/upsert get server error, remove the cached collection info.
8490
*/
8591
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
86-
String key = combineCacheKey(databaseName, collectionName);
92+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
8793
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
8894
if (info == null || forceUpdate) {
8995
String msg = String.format("Fail to describe collection '%s'", collectionName);
@@ -104,17 +110,6 @@ private DescribeCollectionResponse getCollectionInfo(String databaseName, String
104110
return info;
105111
}
106112

107-
private String combineCacheKey(String databaseName, String collectionName) {
108-
if (collectionName == null || StringUtils.isBlank(collectionName)) {
109-
throw new ParamException("Collection name is empty, not able to get collection info.");
110-
}
111-
String key = collectionName;
112-
if (StringUtils.isNotEmpty(databaseName)) {
113-
key = String.format("%s|%s", databaseName, collectionName);
114-
}
115-
return key;
116-
}
117-
118113
/**
119114
* insert/upsert return an error, but is not a RateLimit error,
120115
* clean the cache so that the next insert will call describeCollection() to get the latest info.
@@ -127,7 +122,8 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle
127122
}
128123

129124
private void removeCollectionCache(String databaseName, String collectionName) {
130-
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
125+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
126+
cacheCollectionInfo.remove(key);
131127
}
132128

133129
private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
@@ -658,7 +654,13 @@ public R<RpcStatus> dropCollection(@NonNull DropCollectionParam requestParam) {
658654

659655
Status response = blockingStub().dropCollection(dropCollectionRequest);
660656
handleResponse(title, response);
657+
658+
// remove the collection schema cache
661659
removeCollectionCache(dbName, collectionName);
660+
661+
// remove the last write timestamp for this collection
662+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
663+
GTsDict.getInstance().removeCollectionTs(key);
662664
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
663665
} catch (StatusRuntimeException e) {
664666
logError("{} RPC failed! Exception:{}", title, e);
@@ -1570,22 +1572,27 @@ public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
15701572
}
15711573

15721574
logDebug(requestParam.toString());
1573-
String title = String.format("DeleteRequest collectionName:%s", requestParam.getCollectionName());
1575+
String dbName = requestParam.getDatabaseName();
1576+
String collectionName = requestParam.getCollectionName();
1577+
String title = String.format("DeleteRequest collectionName:%s", collectionName);
15741578

15751579
try {
15761580
DeleteRequest.Builder builder = DeleteRequest.newBuilder()
15771581
.setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
1578-
.setCollectionName(requestParam.getCollectionName())
1582+
.setCollectionName(collectionName)
15791583
.setPartitionName(requestParam.getPartitionName())
15801584
.setExpr(requestParam.getExpr());
15811585

1582-
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
1583-
builder.setDbName(requestParam.getDatabaseName());
1586+
if (StringUtils.isNotEmpty(dbName)) {
1587+
builder.setDbName(dbName);
15841588
}
15851589

15861590
MutationResult response = blockingStub().delete(builder.build());
15871591
handleResponse(title, response.getStatus());
1588-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
1592+
1593+
// update the last write timestamp for SESSION consistency
1594+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1595+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
15891596
return R.success(response);
15901597
} catch (StatusRuntimeException e) {
15911598
logError("{} RPC failed! Exception:{}", title, e);
@@ -1639,10 +1646,14 @@ public R<MutationResult> insert(@NonNull InsertParam requestParam) {
16391646
return this.insert(requestParam);
16401647
}
16411648

1642-
// if illegal data, server fails to process insert, else succeed
1649+
// if illegal data, server fails to process insert, , clean the schema cache
1650+
// so that the next call of dml can update the cache
16431651
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
16441652
handleResponse(title, response.getStatus());
1645-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
1653+
1654+
// update the last write timestamp for SESSION consistency
1655+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1656+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
16461657
return R.success(response);
16471658
} catch (StatusRuntimeException e) {
16481659
logError("{} RPC failed! Exception:{}", title, e);
@@ -1687,11 +1698,15 @@ public ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam)
16871698
new FutureCallback<MutationResult>() {
16881699
@Override
16891700
public void onSuccess(MutationResult result) {
1690-
// if illegal data, server fails to process insert, else succeed
1701+
// if illegal data, server fails to process insert, clean the schema cache
1702+
// so that the next call of dml can update the cache
16911703
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
16921704
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
16931705
logDebug("{} successfully!", title);
1694-
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
1706+
1707+
// update the last write timestamp for SESSION consistency
1708+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1709+
GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
16951710
} else {
16961711
logError("{} failed:\n{}", title, result.getStatus().getReason());
16971712
}
@@ -1760,10 +1775,14 @@ public R<MutationResult> upsert(UpsertParam requestParam) {
17601775
return this.upsert(requestParam);
17611776
}
17621777

1763-
// if illegal data, server fails to process upsert, else succeed
1778+
// if illegal data, server fails to process upsert, clean the schema cache
1779+
// so that the next call of dml can update the cache
17641780
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
17651781
handleResponse(title, response.getStatus());
1766-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
1782+
1783+
// update the last write timestamp for SESSION consistency
1784+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1785+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
17671786
return R.success(response);
17681787
} catch (StatusRuntimeException e) {
17691788
logError("{} RPC failed! Exception:{}", title, e);
@@ -1807,11 +1826,15 @@ public ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam)
18071826
new FutureCallback<MutationResult>() {
18081827
@Override
18091828
public void onSuccess(MutationResult result) {
1810-
// if illegal data, server fails to process upsert, else succeed
1829+
// if illegal data, server fails to process upsert, clean the schema cache
1830+
// so that the next call of dml can update the cache
18111831
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
18121832
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
18131833
logDebug("{} successfully!", title);
1814-
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
1834+
1835+
// update the last write timestamp for SESSION consistency
1836+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1837+
GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
18151838
} else {
18161839
logError("{} failed:\n{}", title, result.getStatus().getReason());
18171840
}

sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,15 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
7373
private final long rpcDeadlineMs;
7474
private long timeoutMs = 0;
7575
private RetryParam retryParam = RetryParam.newBuilder().build();
76+
private String currentDatabaseName;
7677

7778
public MilvusServiceClient(@NonNull ConnectParam connectParam) {
7879
this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
7980

8081
Metadata metadata = new Metadata();
8182
metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
8283
if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
84+
currentDatabaseName = connectParam.getDatabaseName();
8385
metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
8486
}
8587

@@ -201,6 +203,7 @@ protected MilvusServiceClient(MilvusServiceClient src) {
201203
this.timeoutMs = src.timeoutMs;
202204
this.logLevel = src.logLevel;
203205
this.retryParam = src.retryParam;
206+
this.currentDatabaseName = src.currentDatabaseName;
204207
}
205208

206209
@Override
@@ -222,6 +225,11 @@ public boolean clientIsReady() {
222225
return channel != null && !channel.isShutdown() && !channel.isTerminated();
223226
}
224227

228+
@Override
229+
protected String currentDbName() {
230+
return currentDatabaseName;
231+
}
232+
225233
@Override
226234
public void close(long maxWaitSeconds) throws InterruptedException {
227235
channel.shutdownNow();

sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package io.milvus.common.utils;
2121

22+
import io.milvus.exception.ParamException;
23+
import org.apache.commons.lang3.StringUtils;
24+
2225
import java.util.concurrent.ConcurrentHashMap;
2326
import java.util.concurrent.ConcurrentMap;
2427

@@ -37,6 +40,16 @@ public static GTsDict getInstance() {
3740
return TS_DICT;
3841
}
3942

43+
public static String CombineCollectionName(String databaseName, String collectionName) {
44+
if (collectionName == null || StringUtils.isBlank(collectionName)) {
45+
throw new ParamException("Collection name is empty, not able to get collection info.");
46+
}
47+
if (StringUtils.isEmpty(databaseName)) {
48+
databaseName = "default";
49+
}
50+
return String.format("%s_%s", databaseName, collectionName);
51+
}
52+
4053
private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
4154

4255
public void updateCollectionTs(String collectionName, long ts) {
@@ -49,4 +62,12 @@ public void updateCollectionTs(String collectionName, long ts) {
4962
public Long getCollectionTs(String collectionName) {
5063
return tsDict.get(collectionName);
5164
}
65+
66+
public void removeCollectionTs(String collectionName) {
67+
tsDict.remove(collectionName);
68+
}
69+
70+
public void cleanAllCollectionTs() {
71+
tsDict.clear();
72+
}
5273
}

sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,14 @@
2323
import lombok.Builder;
2424
import lombok.Data;
2525
import lombok.NonNull;
26-
import lombok.experimental.SuperBuilder;
2726
import org.apache.commons.lang3.StringUtils;
2827

2928
import javax.net.ssl.SSLContext;
30-
import java.net.URI;
3129
import java.util.concurrent.TimeUnit;
3230
import java.util.regex.Pattern;
3331

3432
@Data
35-
@SuperBuilder
33+
@Builder
3634
public class ConnectConfig {
3735
@NonNull
3836
private String uri;

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,24 @@ public class MilvusClientV2 {
8383
public MilvusClientV2(ConnectConfig connectConfig) {
8484
if (connectConfig != null) {
8585
connect(connectConfig);
86+
87+
initServices(connectConfig.getDbName());
88+
8689
}
8790
}
91+
92+
private void initServices(String dbName) {
93+
this.databaseService.setCurrentDbName(dbName);
94+
this.collectionService.setCurrentDbName(dbName);
95+
this.indexService.setCurrentDbName(dbName);
96+
this.vectorService.setCurrentDbName(dbName);
97+
this.vectorService.cleanCollectionCache();
98+
this.partitionService.setCurrentDbName(dbName);
99+
this.rbacService.setCurrentDbName(dbName);
100+
this.rgroupService.setCurrentDbName(dbName);
101+
this.utilityService.setCurrentDbName(dbName);
102+
}
103+
88104
/**
89105
* connect to Milvus server
90106
*
@@ -159,6 +175,22 @@ public void retryConfig(RetryConfig retryConfig) {
159175
rpcUtils.retryConfig(retryConfig);
160176
}
161177

178+
public MilvusClientV2 withRetry(RetryConfig retryConfig) {
179+
rpcUtils.retryConfig(retryConfig);
180+
return this;
181+
}
182+
183+
public MilvusClientV2 withTimeout(long timeout, TimeUnit timeoutUnit) {
184+
// the unit of rpcDeadlineMs is millisecond
185+
// if the input timeout value is zero, rpcDeadlineMs is zero
186+
// if the input timeout value is not zero and less than 1ms, it will be treated as 1ms
187+
// if the input timeout value is larger than 1ms, it will be converted to an integer ms value
188+
long nn = timeoutUnit.toNanos(timeout);
189+
long ms = (nn == 0) ? 0 : (nn < 1000000 ? 1 : nn/1000000);
190+
connectConfig.setRpcDeadlineMs(ms);
191+
return this;
192+
}
193+
162194
/////////////////////////////////////////////////////////////////////////////////////////////
163195
// Database Operations
164196
/////////////////////////////////////////////////////////////////////////////////////////////
@@ -170,10 +202,10 @@ public void useDatabase(@NonNull String dbName) throws InterruptedException {
170202
// check if database exists
171203
clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
172204
try {
173-
this.vectorService.cleanCollectionCache();
174205
this.connectConfig.setDbName(dbName);
175206
this.close(3);
176207
this.connect(this.connectConfig);
208+
this.initServices(dbName);
177209
} catch (InterruptedException e){
178210
logger.error("close connect error");
179211
throw new RuntimeException(e);

sdk-core/src/main/java/io/milvus/v2/service/BaseService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.milvus.v2.utils.DataUtils;
2929
import io.milvus.v2.utils.RpcUtils;
3030
import io.milvus.v2.utils.VectorUtils;
31+
import org.apache.commons.lang3.StringUtils;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

@@ -37,6 +38,18 @@ public class BaseService {
3738
public DataUtils dataUtils = new DataUtils();
3839
public VectorUtils vectorUtils = new VectorUtils();
3940
public ConvertUtils convertUtils = new ConvertUtils();
41+
private String currentDbName;
42+
43+
public void setCurrentDbName(String dbName) {
44+
currentDbName = dbName;
45+
}
46+
47+
protected String actualDbName(String overwriteName) {
48+
if (StringUtils.isNotEmpty(overwriteName)) {
49+
return overwriteName;
50+
}
51+
return currentDbName;
52+
}
4053

4154
protected void checkCollectionExist(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, String collectionName) {
4255
HasCollectionRequest request = HasCollectionRequest.newBuilder().setCollectionName(collectionName).build();

0 commit comments

Comments
 (0)