Skip to content

Commit 2ccbe51

Browse files
committed
Refine the schema cache machinery and add test cases
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 45f5477 commit 2ccbe51

6 files changed

Lines changed: 203 additions & 95 deletions

File tree

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,31 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
6868
protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
6969
protected LogLevel logLevel = LogLevel.Info;
7070

71-
private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
71+
protected ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
7272

7373
protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
7474

7575
protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
7676

7777
protected abstract boolean clientIsReady();
7878

79+
protected abstract String currentDbName();
80+
81+
private String actualDbName(String overwriteName) {
82+
if (StringUtils.isNotEmpty(overwriteName)) {
83+
return overwriteName;
84+
}
85+
return currentDbName();
86+
}
87+
7988
/**
8089
* This method is for insert/upsert requests to reduce the rpc call of describeCollection()
8190
* Always try to get the collection info from cache.
8291
* If the cache doesn't have the collection info, call describeCollection() and cache it.
8392
* If insert/upsert get server error, remove the cached collection info.
8493
*/
8594
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
86-
String key = combineCacheKey(databaseName, collectionName);
95+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
8796
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
8897
if (info == null || forceUpdate) {
8998
String msg = String.format("Fail to describe collection '%s'", collectionName);
@@ -104,17 +113,6 @@ private DescribeCollectionResponse getCollectionInfo(String databaseName, String
104113
return info;
105114
}
106115

107-
private String combineCacheKey(String databaseName, String collectionName) {
108-
if (collectionName == null || StringUtils.isBlank(collectionName)) {
109-
throw new ParamException("Collection name is empty, not able to get collection info.");
110-
}
111-
String key = collectionName;
112-
if (StringUtils.isNotEmpty(databaseName)) {
113-
key = String.format("%s|%s", databaseName, collectionName);
114-
}
115-
return key;
116-
}
117-
118116
/**
119117
* insert/upsert return an error, but is not a RateLimit error,
120118
* clean the cache so that the next insert will call describeCollection() to get the latest info.
@@ -127,7 +125,8 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle
127125
}
128126

129127
private void removeCollectionCache(String databaseName, String collectionName) {
130-
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
128+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
129+
cacheCollectionInfo.remove(key);
131130
}
132131

133132
private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
@@ -1570,22 +1569,25 @@ public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
15701569
}
15711570

15721571
logDebug(requestParam.toString());
1573-
String title = String.format("DeleteRequest collectionName:%s", requestParam.getCollectionName());
1572+
String dbName = requestParam.getDatabaseName();
1573+
String collectionName = requestParam.getCollectionName();
1574+
String title = String.format("DeleteRequest collectionName:%s", collectionName);
15741575

15751576
try {
15761577
DeleteRequest.Builder builder = DeleteRequest.newBuilder()
15771578
.setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
1578-
.setCollectionName(requestParam.getCollectionName())
1579+
.setCollectionName(collectionName)
15791580
.setPartitionName(requestParam.getPartitionName())
15801581
.setExpr(requestParam.getExpr());
15811582

1582-
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
1583-
builder.setDbName(requestParam.getDatabaseName());
1583+
if (StringUtils.isNotEmpty(dbName)) {
1584+
builder.setDbName(dbName);
15841585
}
15851586

15861587
MutationResult response = blockingStub().delete(builder.build());
15871588
handleResponse(title, response.getStatus());
1588-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
1589+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1590+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
15891591
return R.success(response);
15901592
} catch (StatusRuntimeException e) {
15911593
logError("{} RPC failed! Exception:{}", title, e);
@@ -1642,7 +1644,8 @@ public R<MutationResult> insert(@NonNull InsertParam requestParam) {
16421644
// if illegal data, server fails to process insert, else succeed
16431645
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
16441646
handleResponse(title, response.getStatus());
1645-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
1647+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1648+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
16461649
return R.success(response);
16471650
} catch (StatusRuntimeException e) {
16481651
logError("{} RPC failed! Exception:{}", title, e);
@@ -1691,7 +1694,8 @@ public void onSuccess(MutationResult result) {
16911694
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
16921695
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
16931696
logDebug("{} successfully!", title);
1694-
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
1697+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1698+
GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
16951699
} else {
16961700
logError("{} failed:\n{}", title, result.getStatus().getReason());
16971701
}
@@ -1763,7 +1767,8 @@ public R<MutationResult> upsert(UpsertParam requestParam) {
17631767
// if illegal data, server fails to process upsert, else succeed
17641768
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
17651769
handleResponse(title, response.getStatus());
1766-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
1770+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1771+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
17671772
return R.success(response);
17681773
} catch (StatusRuntimeException e) {
17691774
logError("{} RPC failed! Exception:{}", title, e);
@@ -1811,7 +1816,8 @@ public void onSuccess(MutationResult result) {
18111816
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
18121817
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
18131818
logDebug("{} successfully!", title);
1814-
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
1819+
String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
1820+
GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
18151821
} else {
18161822
logError("{} failed:\n{}", title, result.getStatus().getReason());
18171823
}

sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,15 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
7373
private final long rpcDeadlineMs;
7474
private long timeoutMs = 0;
7575
private RetryParam retryParam = RetryParam.newBuilder().build();
76+
private String currentDatabaseName;
7677

7778
public MilvusServiceClient(@NonNull ConnectParam connectParam) {
7879
this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
7980

8081
Metadata metadata = new Metadata();
8182
metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
8283
if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
84+
currentDatabaseName = connectParam.getDatabaseName();
8385
metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
8486
}
8587

@@ -201,6 +203,7 @@ protected MilvusServiceClient(MilvusServiceClient src) {
201203
this.timeoutMs = src.timeoutMs;
202204
this.logLevel = src.logLevel;
203205
this.retryParam = src.retryParam;
206+
this.currentDatabaseName = src.currentDatabaseName;
204207
}
205208

206209
@Override
@@ -222,6 +225,11 @@ public boolean clientIsReady() {
222225
return channel != null && !channel.isShutdown() && !channel.isTerminated();
223226
}
224227

228+
@Override
229+
protected String currentDbName() {
230+
return currentDatabaseName;
231+
}
232+
225233
@Override
226234
public void close(long maxWaitSeconds) throws InterruptedException {
227235
channel.shutdownNow();

sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package io.milvus.common.utils;
2121

22+
import io.milvus.exception.ParamException;
23+
import org.apache.commons.lang3.StringUtils;
24+
2225
import java.util.concurrent.ConcurrentHashMap;
2326
import java.util.concurrent.ConcurrentMap;
2427

@@ -37,6 +40,16 @@ public static GTsDict getInstance() {
3740
return TS_DICT;
3841
}
3942

43+
public static String CombineCollectionName(String databaseName, String collectionName) {
44+
if (collectionName == null || StringUtils.isBlank(collectionName)) {
45+
throw new ParamException("Collection name is empty, not able to get collection info.");
46+
}
47+
if (StringUtils.isEmpty(databaseName)) {
48+
databaseName = "default";
49+
}
50+
return String.format("%s_%s", databaseName, collectionName);
51+
}
52+
4053
private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
4154

4255
public void updateCollectionTs(String collectionName, long ts) {
@@ -49,4 +62,12 @@ public void updateCollectionTs(String collectionName, long ts) {
4962
public Long getCollectionTs(String collectionName) {
5063
return tsDict.get(collectionName);
5164
}
65+
66+
public void removeCollectionTs(String collectionName) {
67+
tsDict.remove(collectionName);
68+
}
69+
70+
public void cleanAllCollectionTs() {
71+
tsDict.clear();
72+
}
5273
}

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class MilvusClientV2 {
8383
public MilvusClientV2(ConnectConfig connectConfig) {
8484
if (connectConfig != null) {
8585
connect(connectConfig);
86+
this.vectorService.cleanCollectionCache(connectConfig.getDbName());
8687
}
8788
}
8889
/**
@@ -170,7 +171,7 @@ public void useDatabase(@NonNull String dbName) throws InterruptedException {
170171
// check if database exists
171172
clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
172173
try {
173-
this.vectorService.cleanCollectionCache();
174+
this.vectorService.cleanCollectionCache(dbName);
174175
this.connectConfig.setDbName(dbName);
175176
this.close(3);
176177
this.connect(this.connectConfig);

sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public class VectorService extends BaseService {
5050
public CollectionService collectionService = new CollectionService();
5151
private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
5252

53+
private String currentDbName;
54+
55+
private String actualDbName(String overwriteName) {
56+
if (StringUtils.isNotEmpty(overwriteName)) {
57+
return overwriteName;
58+
}
59+
return currentDbName;
60+
}
61+
5362
private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
5463
String databaseName, String collectionName) {
5564
String msg = String.format("Fail to describe collection '%s'", collectionName);
@@ -74,7 +83,7 @@ private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusSe
7483
*/
7584
private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
7685
String databaseName, String collectionName, boolean forceUpdate) {
77-
String key = combineCacheKey(databaseName, collectionName);
86+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
7887
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
7988
if (info == null || forceUpdate) {
8089
info = describeCollection(blockingStub, databaseName, collectionName);
@@ -84,21 +93,11 @@ private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusSer
8493
return info;
8594
}
8695

87-
public void cleanCollectionCache() {
96+
public void cleanCollectionCache(String dbName) {
97+
currentDbName = dbName;
8898
cacheCollectionInfo.clear();
8999
}
90100

91-
private String combineCacheKey(String databaseName, String collectionName) {
92-
if (collectionName == null || StringUtils.isBlank(collectionName)) {
93-
throw new ParamException("Collection name is empty, not able to get collection info.");
94-
}
95-
String key = collectionName;
96-
if (StringUtils.isNotEmpty(databaseName)) {
97-
key = String.format("%s|%s", databaseName, collectionName);
98-
}
99-
return key;
100-
}
101-
102101
/**
103102
* insert/upsert return an error, but is not a RateLimit error,
104103
* clean the cache so that the next insert will call describeCollection() to get the latest info.
@@ -112,7 +111,8 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle
112111
}
113112

114113
private void removeCollectionCache(String databaseName, String collectionName) {
115-
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
114+
String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
115+
cacheCollectionInfo.remove(key);
116116
}
117117

118118
private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
@@ -154,7 +154,8 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
154154
// if illegal data, server fails to process insert, else succeed
155155
cleanCacheIfFailed(response.getStatus(), "", collectionName);
156156
rpcUtils.handleResponse(title, response.getStatus());
157-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
157+
String key = GTsDict.CombineCollectionName(actualDbName(""), collectionName);
158+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
158159

159160
if (response.getIDs().hasIntId()) {
160161
List<Object> ids = new ArrayList<>(response.getIDs().getIntId().getDataList());
@@ -210,7 +211,8 @@ public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
210211
// if illegal data, server fails to process upsert, else succeed
211212
cleanCacheIfFailed(response.getStatus(), "", collectionName);
212213
rpcUtils.handleResponse(title, response.getStatus());
213-
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
214+
String key = GTsDict.CombineCollectionName(actualDbName(""), collectionName);
215+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
214216
return UpsertResp.builder()
215217
.upsertCnt(response.getUpsertCnt())
216218
.build();
@@ -297,19 +299,20 @@ public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlocking
297299
}
298300

299301
public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DeleteReq request) {
300-
String title = String.format("DeleteRequest collectionName:%s", request.getCollectionName());
302+
String collectionName = request.getCollectionName();
303+
String title = String.format("DeleteRequest collectionName:%s", collectionName);
301304

302305
if (request.getFilter() != null && request.getIds() != null) {
303306
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
304307
}
305308

306309
if (request.getFilter() == null) {
307-
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
310+
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false);
308311
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
309312
request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
310313
}
311314
DeleteRequest.Builder builder = DeleteRequest.newBuilder()
312-
.setCollectionName(request.getCollectionName())
315+
.setCollectionName(collectionName)
313316
.setPartitionName(request.getPartitionName())
314317
.setExpr(request.getFilter());
315318
if (request.getFilter() != null && !request.getFilter().isEmpty()) {
@@ -320,7 +323,8 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
320323
}
321324
MutationResult response = blockingStub.delete(builder.build());
322325
rpcUtils.handleResponse(title, response.getStatus());
323-
GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
326+
String key = GTsDict.CombineCollectionName(actualDbName(""), collectionName);
327+
GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
324328
return DeleteResp.builder()
325329
.deleteCnt(response.getDeleteCnt())
326330
.build();

0 commit comments

Comments
 (0)