Skip to content

Commit 75531bf

Browse files
authored
GCP supports using stage (#1670)
Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
1 parent 820e4bd commit 75531bf

3 files changed

Lines changed: 47 additions & 4 deletions

File tree

sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public CompletableFuture<UploadFilesResult> uploadFilesAsync(UploadFilesRequest
137137
public void shutdownGracefully() {
138138
executor.shutdown();
139139
try {
140-
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
140+
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
141141
logger.warn("Executor didn't terminate in time, forcing shutdown...");
142142
executor.shutdownNow();
143143
}

sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/clientenum/CloudStorage.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ public static boolean isTcCloud(String cloudName) {
7171
return tcCloudStorages.stream().anyMatch(e -> e.getCloudName().equalsIgnoreCase(cloudName));
7272
}
7373

74+
public static boolean isGcpCloud(String cloudName) {
75+
List<CloudStorage> gcpCloudStorages = Lists.newArrayList(
76+
CloudStorage.GCP
77+
);
78+
return gcpCloudStorages.stream().anyMatch(e -> e.getCloudName().equalsIgnoreCase(cloudName));
79+
}
80+
7481
public static boolean isAzCloud(String cloudName) {
7582
List<CloudStorage> azCloudStorages = Lists.newArrayList(
7683
CloudStorage.AZ, CloudStorage.AZURE

sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/MinioStorageClient.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,15 @@
2323
import io.milvus.bulkwriter.common.clientenum.CloudStorage;
2424
import io.milvus.bulkwriter.model.CompleteMultipartUploadOutputModel;
2525
import io.milvus.bulkwriter.storage.StorageClient;
26-
import io.minio.*;
26+
import io.minio.BucketExistsArgs;
27+
import io.minio.MinioAsyncClient;
28+
import io.minio.ObjectWriteResponse;
29+
import io.minio.PutObjectArgs;
30+
import io.minio.S3Base;
31+
import io.minio.StatObjectArgs;
32+
import io.minio.StatObjectResponse;
33+
import io.minio.Xml;
34+
import io.minio.credentials.Provider;
2735
import io.minio.credentials.StaticProvider;
2836
import io.minio.errors.ErrorResponseException;
2937
import io.minio.errors.InsufficientDataException;
@@ -33,7 +41,9 @@
3341
import io.minio.messages.CompleteMultipartUpload;
3442
import io.minio.messages.ErrorResponse;
3543
import io.minio.messages.Part;
44+
import okhttp3.Interceptor;
3645
import okhttp3.OkHttpClient;
46+
import okhttp3.Request;
3747
import org.apache.commons.lang3.StringUtils;
3848
import org.slf4j.Logger;
3949
import org.slf4j.LoggerFactory;
@@ -65,8 +75,14 @@ public static MinioStorageClient getStorageClient(String cloudName,
6575
String region,
6676
OkHttpClient httpClient) {
6777
MinioAsyncClient.Builder minioClientBuilder = MinioAsyncClient.builder()
68-
.endpoint(endpoint)
69-
.credentialsProvider(new StaticProvider(accessKey, secretKey, sessionToken));
78+
.endpoint(endpoint);
79+
80+
if (CloudStorage.isGcpCloud(cloudName) && StringUtils.isNotEmpty(sessionToken)) {
81+
httpClient = buildAuthorizedClient(httpClient, sessionToken);
82+
} else {
83+
Provider credentialsProvider = new StaticProvider(accessKey, secretKey, sessionToken);
84+
minioClientBuilder.credentialsProvider(credentialsProvider);
85+
}
7086

7187
if (StringUtils.isNotEmpty(region)) {
7288
minioClientBuilder.region(region);
@@ -84,6 +100,26 @@ public static MinioStorageClient getStorageClient(String cloudName,
84100
return new MinioStorageClient(minioClient);
85101
}
86102

103+
private static OkHttpClient buildAuthorizedClient(OkHttpClient httpClient, String sessionToken) {
104+
Interceptor authInterceptor = chain -> {
105+
Request original = chain.request();
106+
Request requestWithAuth = original.newBuilder()
107+
.header("Authorization", "Bearer " + sessionToken)
108+
.build();
109+
return chain.proceed(requestWithAuth);
110+
};
111+
112+
if (httpClient != null) {
113+
return httpClient.newBuilder()
114+
.addInterceptor(authInterceptor)
115+
.build();
116+
} else {
117+
return new OkHttpClient.Builder()
118+
.addInterceptor(authInterceptor)
119+
.build();
120+
}
121+
}
122+
87123
public Long getObjectEntity(String bucketName, String objectKey) throws Exception {
88124
StatObjectArgs statObjectArgs = StatObjectArgs.builder()
89125
.bucket(bucketName)

0 commit comments

Comments
 (0)