Skip to content

Commit 91ba0b7

Browse files
committed
Fix a bug of collection schema cache for V1 and V2
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent a650fd4 commit 91ba0b7

6 files changed

Lines changed: 331 additions & 101 deletions

File tree

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ services:
3232

3333
standalone:
3434
container_name: milvus-javasdk-test-standalone
35-
image: milvusdb/milvus:master-20250610-9439eaef-amd64
35+
image: milvusdb/milvus:master-20250706-d0976450
3636
command: ["milvus", "run", "standalone"]
3737
environment:
3838
ETCD_ENDPOINTS: etcd:2379
@@ -77,7 +77,7 @@ services:
7777

7878
standaloneslave:
7979
container_name: milvus-javasdk-test-slave-standalone
80-
image: milvusdb/milvus:master-20250610-9439eaef-amd64
80+
image: milvusdb/milvus:master-20250706-d0976450
8181
command: ["milvus", "run", "standalone"]
8282
environment:
8383
ETCD_ENDPOINTS: etcdslave:2379

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 137 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
import io.milvus.param.resourcegroup.*;
4848
import io.milvus.param.role.*;
4949
import io.milvus.response.*;
50+
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
51+
import io.milvus.v2.service.vector.request.InsertReq;
52+
import io.milvus.v2.utils.DataUtils;
5053
import lombok.NonNull;
5154
import org.apache.commons.collections4.CollectionUtils;
5255
import org.apache.commons.lang3.StringUtils;
@@ -79,10 +82,10 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
7982
* If the cache doesn't have the collection info, call describeCollection() and cache it.
8083
* If insert/upsert get server error, remove the cached collection info.
8184
*/
82-
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) {
85+
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
8386
String key = combineCacheKey(databaseName, collectionName);
8487
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
85-
if (info == null) {
88+
if (info == null || forceUpdate) {
8689
String msg = String.format("Fail to describe collection '%s'", collectionName);
8790
DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
8891
.setCollectionName(collectionName);
@@ -119,10 +122,14 @@ private String combineCacheKey(String databaseName, String collectionName) {
119122
private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
120123
if ((status.getCode() != 0 && status.getCode() != 8) ||
121124
(!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) {
122-
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
125+
removeCollectionCache(databaseName, collectionName);
123126
}
124127
}
125128

129+
private void removeCollectionCache(String databaseName, String collectionName) {
130+
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
131+
}
132+
126133
private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
127134
long waitingInterval, long timeout) throws IllegalResponseException {
128135
long tsBegin = System.currentTimeMillis();
@@ -637,19 +644,21 @@ public R<RpcStatus> dropCollection(@NonNull DropCollectionParam requestParam) {
637644
}
638645

639646
logDebug(requestParam.toString());
640-
String title = String.format("DropCollectionRequest collectionName:%s", requestParam.getCollectionName());
647+
String dbName = requestParam.getDatabaseName();
648+
String collectionName = requestParam.getCollectionName();
649+
String title = String.format("DropCollectionRequest collectionName:%s", collectionName);
641650

642651
try {
643652
DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder()
644-
.setCollectionName(requestParam.getCollectionName());
645-
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
646-
builder.setDbName(requestParam.getDatabaseName());
653+
.setCollectionName(collectionName);
654+
if (StringUtils.isNotEmpty(dbName)) {
655+
builder.setDbName(dbName);
647656
}
648657
DropCollectionRequest dropCollectionRequest = builder.build();
649658

650659
Status response = blockingStub().dropCollection(dropCollectionRequest);
651660
handleResponse(title, response);
652-
cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName()));
661+
removeCollectionCache(dbName, collectionName);
653662
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
654663
} catch (StatusRuntimeException e) {
655664
logError("{} RPC failed! Exception:{}", title, e);
@@ -1587,30 +1596,60 @@ public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
15871596
}
15881597
}
15891598

1599+
private InsertRequest buildInsertRequest(InsertParam requestParam, DescribeCollectionResponse descResp) {
1600+
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1601+
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1602+
InsertRequest rpcRequest = builderWraper.buildInsertRequest();
1603+
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
1604+
}
1605+
15901606
@Override
15911607
public R<MutationResult> insert(@NonNull InsertParam requestParam) {
15921608
if (!clientIsReady()) {
15931609
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
15941610
}
15951611

15961612
logDebug(requestParam.toString());
1597-
String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName());
1613+
String dbName = requestParam.getDatabaseName();
1614+
String collectionName = requestParam.getCollectionName();
1615+
String title = String.format("InsertRequest collectionName:%s", collectionName);
15981616

15991617
try {
1600-
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
1601-
requestParam.getCollectionName());
1602-
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1603-
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1604-
MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
1605-
cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
1618+
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
1619+
1620+
// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
1621+
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
1622+
// could not convert the InsertRequest with the old collectionDesc, we need to update the
1623+
// collectionDesc and call buildInsertRequest() again.
1624+
InsertRequest rpcRequest;
1625+
try {
1626+
rpcRequest = buildInsertRequest(requestParam, descResp);
1627+
} catch (Exception ignored) {
1628+
descResp = getCollectionInfo(dbName, collectionName, true);
1629+
rpcRequest = buildInsertRequest(requestParam, descResp);
1630+
}
1631+
1632+
// If there are multiple clients, the client_A repeatedly do insert, the client_B changes
1633+
// the collection schema. The server might return a special error code "SchemaMismatch".
1634+
// If the client_A gets this special error code, it needs to update the collectionDesc and
1635+
// call insert() again.
1636+
MutationResult response = blockingStub().insert(rpcRequest);
1637+
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
1638+
getCollectionInfo(dbName, collectionName, true);
1639+
return this.insert(requestParam);
1640+
}
1641+
1642+
// if illegal data, server fails to process insert, else succeed
1643+
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
16061644
handleResponse(title, response.getStatus());
1607-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
1645+
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
16081646
return R.success(response);
16091647
} catch (StatusRuntimeException e) {
16101648
logError("{} RPC failed! Exception:{}", title, e);
16111649
return R.failed(e);
16121650
} catch (Exception e) {
16131651
logError("{} failed! Exception:{}", title, e);
1652+
removeCollectionCache(dbName, collectionName);
16141653
return R.failed(e);
16151654
}
16161655
}
@@ -1624,23 +1663,35 @@ public ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam)
16241663
}
16251664

16261665
logDebug(requestParam.toString());
1627-
String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
1666+
String dbName = requestParam.getDatabaseName();
1667+
String collectionName = requestParam.getCollectionName();
1668+
String title = String.format("InsertAsyncRequest collectionName:%s", collectionName);
16281669

1629-
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
1630-
requestParam.getCollectionName());
1631-
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1632-
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1633-
ListenableFuture<MutationResult> response = futureStub().insert(builderWraper.buildInsertRequest());
1670+
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
1671+
1672+
// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
1673+
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
1674+
// could not convert the InsertRequest with the old collectionDesc, we need to update the
1675+
// collectionDesc and call buildInsertRequest() again.
1676+
InsertRequest rpcRequest;
1677+
try {
1678+
rpcRequest = buildInsertRequest(requestParam, descResp);
1679+
} catch (Exception ignored) {
1680+
descResp = getCollectionInfo(dbName, collectionName, true);
1681+
rpcRequest = buildInsertRequest(requestParam, descResp);
1682+
}
1683+
ListenableFuture<MutationResult> response = futureStub().insert(rpcRequest);
16341684

16351685
Futures.addCallback(
16361686
response,
16371687
new FutureCallback<MutationResult>() {
16381688
@Override
16391689
public void onSuccess(MutationResult result) {
1640-
cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
1690+
// if illegal data, server fails to process insert, else succeed
1691+
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
16411692
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
16421693
logDebug("{} successfully!", title);
1643-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
1694+
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
16441695
} else {
16451696
logError("{} failed:\n{}", title, result.getStatus().getReason());
16461697
}
@@ -1666,30 +1717,60 @@ public void onFailure(@Nonnull Throwable t) {
16661717
return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
16671718
}
16681719

1720+
private UpsertRequest buildUpsertRequest(UpsertParam requestParam, DescribeCollectionResponse descResp) {
1721+
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1722+
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1723+
UpsertRequest rpcRequest = builderWraper.buildUpsertRequest();
1724+
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
1725+
}
1726+
16691727
@Override
16701728
public R<MutationResult> upsert(UpsertParam requestParam) {
16711729
if (!clientIsReady()) {
16721730
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
16731731
}
16741732

16751733
logDebug(requestParam.toString());
1676-
String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName());
1734+
String dbName = requestParam.getDatabaseName();
1735+
String collectionName = requestParam.getCollectionName();
1736+
String title = String.format("UpsertRequest collectionName:%s", collectionName);
16771737

16781738
try {
1679-
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
1680-
requestParam.getCollectionName());
1681-
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1682-
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1683-
MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
1684-
cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
1739+
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
1740+
1741+
// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
1742+
// if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
1743+
// could not convert the UpsertRequest with the old collectionDesc, we need to update the
1744+
// collectionDesc and call buildUpsertRequest() again.
1745+
UpsertRequest rpcRequest;
1746+
try {
1747+
rpcRequest = buildUpsertRequest(requestParam, descResp);
1748+
} catch (Exception ignored) {
1749+
descResp = getCollectionInfo(dbName, collectionName, true);
1750+
rpcRequest = buildUpsertRequest(requestParam, descResp);
1751+
}
1752+
1753+
// If there are multiple clients, the client_A repeatedly do upsert, the client_B changes
1754+
// the collection schema. The server might return a special error code "SchemaMismatch".
1755+
// If the client_A gets this special error code, it needs to update the collectionDesc and
1756+
// call upsert() again.
1757+
MutationResult response = blockingStub().upsert(rpcRequest);
1758+
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
1759+
getCollectionInfo(dbName, collectionName, true);
1760+
return this.upsert(requestParam);
1761+
}
1762+
1763+
// if illegal data, server fails to process upsert, else succeed
1764+
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
16851765
handleResponse(title, response.getStatus());
1686-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
1766+
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
16871767
return R.success(response);
16881768
} catch (StatusRuntimeException e) {
16891769
logError("{} RPC failed! Exception:{}", title, e);
16901770
return R.failed(e);
16911771
} catch (Exception e) {
16921772
logError("{} failed! Exception:{}", title, e);
1773+
removeCollectionCache(dbName, collectionName);
16931774
return R.failed(e);
16941775
}
16951776
}
@@ -1702,23 +1783,35 @@ public ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam)
17021783
}
17031784

17041785
logDebug(requestParam.toString());
1705-
String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
1786+
String dbName = requestParam.getDatabaseName();
1787+
String collectionName = requestParam.getCollectionName();
1788+
String title = String.format("UpsertAsyncRequest collectionName:%s", collectionName);
17061789

1707-
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
1708-
requestParam.getCollectionName());
1709-
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
1710-
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
1711-
ListenableFuture<MutationResult> response = futureStub().upsert(builderWraper.buildUpsertRequest());
1790+
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
1791+
1792+
// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
1793+
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
1794+
// could not convert the InsertRequest with the old collectionDesc, we need to update the
1795+
// collectionDesc and call buildInsertRequest() again.
1796+
UpsertRequest rpcRequest;
1797+
try {
1798+
rpcRequest = buildUpsertRequest(requestParam, descResp);
1799+
} catch (Exception ignored) {
1800+
descResp = getCollectionInfo(dbName, collectionName, true);
1801+
rpcRequest = buildUpsertRequest(requestParam, descResp);
1802+
}
1803+
ListenableFuture<MutationResult> response = futureStub().upsert(rpcRequest);
17121804

17131805
Futures.addCallback(
17141806
response,
17151807
new FutureCallback<MutationResult>() {
17161808
@Override
17171809
public void onSuccess(MutationResult result) {
1718-
cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
1810+
// if illegal data, server fails to process upsert, else succeed
1811+
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
17191812
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
17201813
logDebug("{} successfully!", title);
1721-
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
1814+
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
17221815
} else {
17231816
logError("{} failed:\n{}", title, result.getStatus().getReason());
17241817
}
@@ -3161,15 +3254,16 @@ public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
31613254
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
31623255
}
31633256
logDebug(requestParam.toString());
3164-
String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName());
3257+
String collectionName = requestParam.getCollectionName();
3258+
String title = String.format("DeleteIdsRequest collectionName:%s", collectionName);
31653259

31663260
try {
3167-
DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName());
3261+
DescribeCollectionResponse descResp = getCollectionInfo("", collectionName, false);
31683262
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
31693263

31703264
String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
31713265
DeleteParam deleteParam = DeleteParam.newBuilder()
3172-
.withCollectionName(requestParam.getCollectionName())
3266+
.withCollectionName(collectionName)
31733267
.withPartitionName(requestParam.getPartitionName())
31743268
.withExpr(expr)
31753269
.build();

0 commit comments

Comments
 (0)