Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 137 additions & 43 deletions sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import io.milvus.param.resourcegroup.*;
import io.milvus.param.role.*;
import io.milvus.response.*;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.vector.request.InsertReq;
import io.milvus.v2.utils.DataUtils;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -79,10 +82,10 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
* If the cache doesn't have the collection info, call describeCollection() and cache it.
* If insert/upsert get server error, remove the cached collection info.
*/
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) {
private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
String key = combineCacheKey(databaseName, collectionName);
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
if (info == null) {
if (info == null || forceUpdate) {
String msg = String.format("Fail to describe collection '%s'", collectionName);
DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
.setCollectionName(collectionName);
Expand Down Expand Up @@ -119,10 +122,14 @@ private String combineCacheKey(String databaseName, String collectionName) {
private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
if ((status.getCode() != 0 && status.getCode() != 8) ||
(!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) {
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
removeCollectionCache(databaseName, collectionName);
}
}

private void removeCollectionCache(String databaseName, String collectionName) {
cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
}

private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
long waitingInterval, long timeout) throws IllegalResponseException {
long tsBegin = System.currentTimeMillis();
Expand Down Expand Up @@ -637,19 +644,21 @@ public R<RpcStatus> dropCollection(@NonNull DropCollectionParam requestParam) {
}

logDebug(requestParam.toString());
String title = String.format("DropCollectionRequest collectionName:%s", requestParam.getCollectionName());
String dbName = requestParam.getDatabaseName();
String collectionName = requestParam.getCollectionName();
String title = String.format("DropCollectionRequest collectionName:%s", collectionName);

try {
DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder()
.setCollectionName(requestParam.getCollectionName());
if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
builder.setDbName(requestParam.getDatabaseName());
.setCollectionName(collectionName);
if (StringUtils.isNotEmpty(dbName)) {
builder.setDbName(dbName);
}
DropCollectionRequest dropCollectionRequest = builder.build();

Status response = blockingStub().dropCollection(dropCollectionRequest);
handleResponse(title, response);
cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName()));
removeCollectionCache(dbName, collectionName);
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
} catch (StatusRuntimeException e) {
logError("{} RPC failed! Exception:{}", title, e);
Expand Down Expand Up @@ -1587,30 +1596,60 @@ public R<MutationResult> delete(@NonNull DeleteParam requestParam) {
}
}

private InsertRequest buildInsertRequest(InsertParam requestParam, DescribeCollectionResponse descResp) {
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
InsertRequest rpcRequest = builderWraper.buildInsertRequest();
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

@Override
public R<MutationResult> insert(@NonNull InsertParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logDebug(requestParam.toString());
String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName());
String dbName = requestParam.getDatabaseName();
String collectionName = requestParam.getCollectionName();
String title = String.format("InsertRequest collectionName:%s", collectionName);

try {
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
requestParam.getCollectionName());
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);

// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
// could not convert the InsertRequest with the old collectionDesc, we need to update the
// collectionDesc and call buildInsertRequest() again.
InsertRequest rpcRequest;
try {
rpcRequest = buildInsertRequest(requestParam, descResp);
} catch (Exception ignored) {
descResp = getCollectionInfo(dbName, collectionName, true);
rpcRequest = buildInsertRequest(requestParam, descResp);
}

// If there are multiple clients, the client_A repeatedly do insert, the client_B changes
// the collection schema. The server might return a special error code "SchemaMismatch".
// If the client_A gets this special error code, it needs to update the collectionDesc and
// call insert() again.
MutationResult response = blockingStub().insert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
getCollectionInfo(dbName, collectionName, true);
return this.insert(requestParam);
}

// if illegal data, server fails to process insert, else succeed
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
return R.success(response);
} catch (StatusRuntimeException e) {
logError("{} RPC failed! Exception:{}", title, e);
return R.failed(e);
} catch (Exception e) {
logError("{} failed! Exception:{}", title, e);
removeCollectionCache(dbName, collectionName);
return R.failed(e);
}
}
Expand All @@ -1624,23 +1663,35 @@ public ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam)
}

logDebug(requestParam.toString());
String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
String dbName = requestParam.getDatabaseName();
String collectionName = requestParam.getCollectionName();
String title = String.format("InsertAsyncRequest collectionName:%s", collectionName);

DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
requestParam.getCollectionName());
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
ListenableFuture<MutationResult> response = futureStub().insert(builderWraper.buildInsertRequest());
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);

// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
// could not convert the InsertRequest with the old collectionDesc, we need to update the
// collectionDesc and call buildInsertRequest() again.
InsertRequest rpcRequest;
try {
rpcRequest = buildInsertRequest(requestParam, descResp);
} catch (Exception ignored) {
descResp = getCollectionInfo(dbName, collectionName, true);
rpcRequest = buildInsertRequest(requestParam, descResp);
}
ListenableFuture<MutationResult> response = futureStub().insert(rpcRequest);

Futures.addCallback(
response,
new FutureCallback<MutationResult>() {
@Override
public void onSuccess(MutationResult result) {
cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
// if illegal data, server fails to process insert, else succeed
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
logDebug("{} successfully!", title);
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
} else {
logError("{} failed:\n{}", title, result.getStatus().getReason());
}
Expand All @@ -1666,30 +1717,60 @@ public void onFailure(@Nonnull Throwable t) {
return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
}

private UpsertRequest buildUpsertRequest(UpsertParam requestParam, DescribeCollectionResponse descResp) {
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
UpsertRequest rpcRequest = builderWraper.buildUpsertRequest();
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

@Override
public R<MutationResult> upsert(UpsertParam requestParam) {
if (!clientIsReady()) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}

logDebug(requestParam.toString());
String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName());
String dbName = requestParam.getDatabaseName();
String collectionName = requestParam.getCollectionName();
String title = String.format("UpsertRequest collectionName:%s", collectionName);

try {
DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
requestParam.getCollectionName());
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);

// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
// if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
// could not convert the UpsertRequest with the old collectionDesc, we need to update the
// collectionDesc and call buildUpsertRequest() again.
UpsertRequest rpcRequest;
try {
rpcRequest = buildUpsertRequest(requestParam, descResp);
} catch (Exception ignored) {
descResp = getCollectionInfo(dbName, collectionName, true);
rpcRequest = buildUpsertRequest(requestParam, descResp);
}

// If there are multiple clients, the client_A repeatedly do upsert, the client_B changes
// the collection schema. The server might return a special error code "SchemaMismatch".
// If the client_A gets this special error code, it needs to update the collectionDesc and
// call upsert() again.
MutationResult response = blockingStub().upsert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
getCollectionInfo(dbName, collectionName, true);
return this.upsert(requestParam);
}

// if illegal data, server fails to process upsert, else succeed
cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
return R.success(response);
} catch (StatusRuntimeException e) {
logError("{} RPC failed! Exception:{}", title, e);
return R.failed(e);
} catch (Exception e) {
logError("{} failed! Exception:{}", title, e);
removeCollectionCache(dbName, collectionName);
return R.failed(e);
}
}
Expand All @@ -1702,23 +1783,35 @@ public ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam)
}

logDebug(requestParam.toString());
String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
String dbName = requestParam.getDatabaseName();
String collectionName = requestParam.getCollectionName();
String title = String.format("UpsertAsyncRequest collectionName:%s", collectionName);

DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
requestParam.getCollectionName());
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
ListenableFuture<MutationResult> response = futureStub().upsert(builderWraper.buildUpsertRequest());
DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);

// To handle this bug: https://github.com/milvus-io/milvus/issues/41688
// if the collection is already recreated, some schema might be changed, the buildInsertRequest()
// could not convert the InsertRequest with the old collectionDesc, we need to update the
// collectionDesc and call buildInsertRequest() again.
UpsertRequest rpcRequest;
try {
rpcRequest = buildUpsertRequest(requestParam, descResp);
} catch (Exception ignored) {
descResp = getCollectionInfo(dbName, collectionName, true);
rpcRequest = buildUpsertRequest(requestParam, descResp);
}
ListenableFuture<MutationResult> response = futureStub().upsert(rpcRequest);

Futures.addCallback(
response,
new FutureCallback<MutationResult>() {
@Override
public void onSuccess(MutationResult result) {
cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
// if illegal data, server fails to process upsert, else succeed
cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
if (result.getStatus().getErrorCode() == ErrorCode.Success) {
logDebug("{} successfully!", title);
GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
} else {
logError("{} failed:\n{}", title, result.getStatus().getReason());
}
Expand Down Expand Up @@ -3161,15 +3254,16 @@ public R<DeleteResponse> delete(DeleteIdsParam requestParam) {
return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
}
logDebug(requestParam.toString());
String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName());
String collectionName = requestParam.getCollectionName();
String title = String.format("DeleteIdsRequest collectionName:%s", collectionName);

try {
DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName());
DescribeCollectionResponse descResp = getCollectionInfo("", collectionName, false);
DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);

String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
DeleteParam deleteParam = DeleteParam.newBuilder()
.withCollectionName(requestParam.getCollectionName())
.withCollectionName(collectionName)
.withPartitionName(requestParam.getPartitionName())
.withExpr(expr)
.build();
Expand Down
Loading
Loading