-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathUploadObjectObserver.java
More file actions
131 lines (120 loc) · 5.45 KB
/
UploadObjectObserver.java
File metadata and controls
131 lines (120 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.encryption.s3.internal;
import org.apache.commons.logging.LogFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.encryption.s3.S3EncryptionClient;
import software.amazon.encryption.s3.S3EncryptionClientException;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class UploadObjectObserver {
private final List<Future<Map<Integer, UploadPartResponse>>> futures = new ArrayList<>();
private PutObjectRequest request;
private String uploadId;
private S3AsyncClient s3AsyncClient;
private S3EncryptionClient s3EncryptionClient;
private ExecutorService es;
public UploadObjectObserver init(PutObjectRequest req,
S3AsyncClient s3AsyncClient, S3EncryptionClient s3EncryptionClient, ExecutorService es) {
this.request = req;
this.s3AsyncClient = s3AsyncClient;
this.s3EncryptionClient = s3EncryptionClient;
this.es = es;
return this;
}
public String onUploadCreation(PutObjectRequest req) {
CreateMultipartUploadResponse res =
s3EncryptionClient.createMultipartUpload(ConvertSDKRequests.convert(req));
return this.uploadId = res.uploadId();
}
public void onPartCreate(PartCreationEvent event) {
final File part = event.getPart();
final UploadPartRequest reqUploadPart =
newUploadPartRequest(event);
final OnFileDelete fileDeleteObserver = event.getFileDeleteObserver();
futures.add(es.submit(new Callable<Map<Integer, UploadPartResponse>>() {
@Override
public Map<Integer, UploadPartResponse> call() {
// Upload the ciphertext directly via the non-encrypting
// s3 client
try {
AsyncRequestBody noRetriesBody = new NoRetriesAsyncRequestBody(AsyncRequestBody.fromFile(part));
return uploadPart(reqUploadPart, noRetriesBody);
} catch (CompletionException e) {
// Unwrap completion exception
throw new S3EncryptionClientException(e.getCause().getMessage(), e.getCause());
} finally {
// clean up part already uploaded
if (!part.delete()) {
LogFactory.getLog(getClass()).debug(
"Ignoring failure to delete file " + part
+ " which has already been uploaded");
} else {
if (fileDeleteObserver != null)
fileDeleteObserver.onFileDelete(null);
}
}
}
}));
}
public CompleteMultipartUploadResponse onCompletion(List<CompletedPart> partETags) {
return s3EncryptionClient.completeMultipartUpload(builder -> builder
.bucket(request.bucket())
.key(request.key())
.uploadId(uploadId)
.multipartUpload(partBuilder -> partBuilder.parts(partETags)));
}
public void onAbort() {
for (Future<?> future : futures()) {
future.cancel(true);
}
if (uploadId != null) {
try {
s3EncryptionClient.abortMultipartUpload(builder -> builder.bucket(request.bucket())
.key(request.key())
.uploadId(uploadId));
} catch (Exception e) {
LogFactory.getLog(getClass())
.debug("Failed to abort multi-part upload: " + uploadId, e);
}
}
}
protected UploadPartRequest newUploadPartRequest(PartCreationEvent event) {
final SdkPartType partType;
if (event.isLastPart()) {
partType = SdkPartType.LAST;
} else {
partType = SdkPartType.DEFAULT;
}
return UploadPartRequest.builder()
.bucket(request.bucket())
.key(request.key())
.partNumber(event.getPartNumber())
.sdkPartType(partType)
.uploadId(uploadId)
.build();
}
protected Map<Integer, UploadPartResponse> uploadPart(UploadPartRequest reqUploadPart, AsyncRequestBody requestBody) {
// Upload the ciphertext directly via the non-encrypting
// s3 client
return Collections.singletonMap(reqUploadPart.partNumber(), s3AsyncClient.uploadPart(reqUploadPart, requestBody).join());
}
public List<Future<Map<Integer, UploadPartResponse>>> futures() {
return futures;
}
}