|
1 | | -package org.cloudfoundry.multiapps.controller.persistence.services; |
2 | | - |
3 | | -import java.io.IOException; |
4 | | -import java.io.InputStream; |
5 | | -import java.net.MalformedURLException; |
6 | | -import java.net.URL; |
7 | | -import java.time.LocalDateTime; |
8 | | -import java.util.List; |
9 | | -import java.util.Map; |
10 | | -import java.util.Set; |
11 | | -import java.util.function.Predicate; |
12 | | -import java.util.stream.Collectors; |
13 | | - |
14 | | -import com.azure.core.http.HttpClient; |
15 | | -import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder; |
16 | | -import com.azure.core.http.policy.ExponentialBackoffOptions; |
17 | | -import com.azure.core.http.policy.RetryOptions; |
18 | | -import com.azure.storage.blob.BlobClient; |
19 | | -import com.azure.storage.blob.BlobContainerClient; |
20 | | -import com.azure.storage.blob.BlobServiceClient; |
21 | | -import com.azure.storage.blob.BlobServiceClientBuilder; |
22 | | -import com.azure.storage.blob.models.BlobItem; |
23 | | -import com.azure.storage.blob.models.BlobListDetails; |
24 | | -import com.azure.storage.blob.models.BlobRange; |
25 | | -import com.azure.storage.blob.models.BlobStorageException; |
26 | | -import com.azure.storage.blob.models.ListBlobsOptions; |
27 | | -import com.azure.storage.blob.models.ParallelTransferOptions; |
28 | | -import com.azure.storage.blob.options.BlobParallelUploadOptions; |
29 | | -import org.cloudfoundry.multiapps.controller.persistence.Messages; |
30 | | -import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry; |
31 | | -import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreConstants; |
32 | | -import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreFilter; |
33 | | -import org.cloudfoundry.multiapps.controller.persistence.util.ObjectStoreMapper; |
34 | | - |
35 | | -public class AzureObjectStoreFileStorage extends ObjectStoreFileStorage { |
36 | | - |
37 | | - private static final String SAS_TOKEN = "sas_token"; |
38 | | - private static final String CONTAINER_NAME = "container_name"; |
39 | | - private static final String CONTAINER_URI = "container_uri"; |
40 | | - private static final long MAX_SINGLE_UPLOAD_SIZE = 15L * 1024 * 1024; // 10MB |
41 | | - private static final long BLOCK_SIZE = 15L * 1024 * 1024; // 10MB |
42 | | - private static final int MAX_CONCURRENCY = 30; |
43 | | - private final HttpClient httpClient; |
44 | | - private final BlobContainerClient containerClient; |
45 | | - |
46 | | - public AzureObjectStoreFileStorage(Map<String, Object> credentials) { |
47 | | - this.containerClient = createContainerClient(credentials); |
48 | | - this.httpClient = new OkHttpAsyncHttpClientBuilder().build(); |
49 | | - } |
50 | | - |
51 | | - @Override |
52 | | - public void addFile(FileEntry fileEntry, InputStream content) throws FileStorageException { |
53 | | - BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId()); |
54 | | - try { |
55 | | - ParallelTransferOptions pto = new ParallelTransferOptions().setMaxSingleUploadSizeLong(MAX_SINGLE_UPLOAD_SIZE) |
56 | | - .setMaxConcurrency(MAX_CONCURRENCY) |
57 | | - .setBlockSizeLong(BLOCK_SIZE); |
58 | | - |
59 | | - BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(content); |
60 | | - blobParallelUploadOptions.setParallelTransferOptions(pto); |
61 | | - blobParallelUploadOptions.setMetadata(ObjectStoreMapper.createFileEntryMetadata(fileEntry)); |
62 | | - |
63 | | - blobClient.uploadWithResponse(blobParallelUploadOptions, |
64 | | - ObjectStoreConstants.AZURE_OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES, null); |
65 | | - } catch (BlobStorageException e) { |
66 | | - throw new FileStorageException(e); |
67 | | - } |
68 | | - } |
69 | | - |
70 | | - @Override |
71 | | - public List<FileEntry> getFileEntriesWithoutContent(List<FileEntry> fileEntries) throws FileStorageException { |
72 | | - Set<String> existingFiles = getAllEntriesNames(); |
73 | | - return fileEntries.stream() |
74 | | - .filter(fileEntry -> !existingFiles.contains(fileEntry.getId())) |
75 | | - .toList(); |
76 | | - } |
77 | | - |
78 | | - @Override |
79 | | - protected boolean existsInObjectStore(FileEntry fileEntry) { |
80 | | - return containerClient.getBlobClient(fileEntry.getId()) |
81 | | - .exists(); |
82 | | - } |
83 | | - |
84 | | - @Override |
85 | | - public void deleteFile(String id, String space) throws FileStorageException { |
86 | | - BlobClient blobClient = containerClient.getBlobClient(id); |
87 | | - try { |
88 | | - blobClient.deleteIfExists(); |
89 | | - } catch (BlobStorageException e) { |
90 | | - throw new FileStorageException(e); |
91 | | - } |
92 | | - } |
93 | | - |
94 | | - @Override |
95 | | - public void deleteFilesBySpaceIds(List<String> spaceIds) throws FileStorageException { |
96 | | - removeBlobsByFilter(blob -> ObjectStoreFilter.filterBySpaceIds(blob.getMetadata(), spaceIds)); |
97 | | - } |
98 | | - |
99 | | - @Override |
100 | | - public void deleteFilesBySpaceAndNamespace(String space, String namespace) { |
101 | | - removeBlobsByFilter(blob -> ObjectStoreFilter.filterBySpaceAndNamespace(blob.getMetadata(), space, namespace)); |
102 | | - } |
103 | | - |
104 | | - @Override |
105 | | - public int deleteFilesModifiedBefore(LocalDateTime modificationTime) throws FileStorageException { |
106 | | - return removeBlobsByFilter( |
107 | | - blob -> ObjectStoreFilter.filterByModificationTime(blob.getMetadata(), blob.getName(), modificationTime)); |
108 | | - } |
109 | | - |
110 | | - @Override |
111 | | - public <T> T processFileContent(String space, String id, FileContentProcessor<T> fileContentProcessor) throws FileStorageException { |
112 | | - FileEntry fileEntry = ObjectStoreMapper.createFileEntry(space, id); |
113 | | - try (InputStream inputStream = openBlobInputStream(fileEntry)) { |
114 | | - return fileContentProcessor.process(inputStream); |
115 | | - } catch (Exception e) { |
116 | | - throw new FileStorageException(e); |
117 | | - } |
118 | | - } |
119 | | - |
120 | | - private InputStream openBlobInputStream(FileEntry fileEntry) throws FileStorageException { |
121 | | - BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId()); |
122 | | - try { |
123 | | - return blobClient.openInputStream(); |
124 | | - } catch (BlobStorageException e) { |
125 | | - throw new FileStorageException(e); |
126 | | - } |
127 | | - } |
128 | | - |
129 | | - @Override |
130 | | - public InputStream openInputStream(String space, String id) throws FileStorageException { |
131 | | - FileEntry fileEntry = ObjectStoreMapper.createFileEntry(space, id); |
132 | | - return openBlobInputStream(fileEntry); |
133 | | - } |
134 | | - |
135 | | - @Override |
136 | | - public void testConnection() { |
137 | | - containerClient.getBlobClient("test"); |
138 | | - } |
139 | | - |
140 | | - @Override |
141 | | - public void deleteFilesByIds(List<String> fileIds) throws FileStorageException { |
142 | | - removeBlobsByFilter(blob -> fileIds.contains(blob.getName())); |
143 | | - } |
144 | | - |
145 | | - @Override |
146 | | - public <T> T processArchiveEntryContent(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor) |
147 | | - throws FileStorageException { |
148 | | - FileEntry fileEntry = ObjectStoreMapper.createFileEntry(fileContentToProcess.getSpaceGuid(), fileContentToProcess.getGuid()); |
149 | | - BlobClient blobClient = containerClient.getBlobClient(fileEntry.getId()); |
150 | | - long contentSize = fileContentToProcess.getEndOffset() - fileContentToProcess.getStartOffset(); |
151 | | - BlobRange blobRange = new BlobRange(fileContentToProcess.getStartOffset(), contentSize); |
152 | | - |
153 | | - try { |
154 | | - return fileContentProcessor.process(blobClient.openInputStream(blobRange, null)); |
155 | | - } catch (IOException e) { |
156 | | - throw new FileStorageException(e); |
157 | | - } |
158 | | - } |
159 | | - |
160 | | - protected BlobContainerClient createContainerClient(Map<String, Object> credentials) { |
161 | | - BlobServiceClient serviceClient = new BlobServiceClientBuilder().endpoint(getContainerUriEndpoint(credentials)) |
162 | | - .retryOptions(createRetryOptions()) |
163 | | - .httpClient(httpClient) |
164 | | - .sasToken((String) credentials.get(SAS_TOKEN)) |
165 | | - .buildClient(); |
166 | | - |
167 | | - return serviceClient.getBlobContainerClient((String) credentials.get(CONTAINER_NAME)); |
168 | | - } |
169 | | - |
170 | | - public String getContainerUriEndpoint(Map<String, Object> credentials) { |
171 | | - if (!credentials.containsKey(CONTAINER_URI)) { |
172 | | - throw new IllegalStateException(Messages.MISSING_CONTAINER_URI_IN_THE_CREDENTIALS); |
173 | | - } |
174 | | - try { |
175 | | - URL containerUri = new URL((String) credentials.get(CONTAINER_URI)); |
176 | | - return new URL(containerUri.getProtocol(), containerUri.getHost(), containerUri.getPort(), "").toString(); |
177 | | - } catch (MalformedURLException e) { |
178 | | - throw new IllegalStateException(Messages.CANNOT_PARSE_CONTAINER_URI_OF_OBJECT_STORE, e); |
179 | | - } |
180 | | - } |
181 | | - |
182 | | - private RetryOptions createRetryOptions() { |
183 | | - ExponentialBackoffOptions exponentialBackoffOptions = new ExponentialBackoffOptions().setBaseDelay( |
184 | | - ObjectStoreConstants.OBJECT_STORE_INITIAL_RETRY_DELAY_CONFIG_IN_MILLIS) |
185 | | - .setMaxDelay( |
186 | | - ObjectStoreConstants.OBJECT_STORE_MAX_RETRY_DELAY_CONFIG_IN_SECONDS) |
187 | | - .setMaxRetries( |
188 | | - ObjectStoreConstants.OBJECT_STORE_MAX_ATTEMPTS_CONFIG); |
189 | | - |
190 | | - return new RetryOptions(exponentialBackoffOptions); |
191 | | - } |
192 | | - |
193 | | - private int removeBlobsByFilter(Predicate<? super BlobItem> filter) { |
194 | | - Set<String> blobNames = getEntryNames(filter); |
195 | | - int deletedBlobsResult = 0; |
196 | | - |
197 | | - if (blobNames.isEmpty()) { |
198 | | - return deletedBlobsResult; |
199 | | - } |
200 | | - for (String blobName : blobNames) { |
201 | | - BlobClient blobClient = containerClient.getBlobClient(blobName); |
202 | | - if (blobClient.deleteIfExists()) { |
203 | | - deletedBlobsResult++; |
204 | | - } |
205 | | - } |
206 | | - |
207 | | - return deletedBlobsResult; |
208 | | - } |
209 | | - |
210 | | - protected Set<String> getEntryNames(Predicate<? super BlobItem> filter) { |
211 | | - BlobListDetails blobListDetails = new BlobListDetails().setRetrieveMetadata(true); |
212 | | - ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setDetails(blobListDetails); |
213 | | - |
214 | | - return containerClient.listBlobs(listBlobsOptions, ObjectStoreConstants.AZURE_OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES) |
215 | | - .stream() |
216 | | - .filter(filter) |
217 | | - .map(BlobItem::getName) |
218 | | - .collect(Collectors.toSet()); |
219 | | - } |
220 | | - |
221 | | - public Set<String> getAllEntriesNames() { |
222 | | - ListBlobsOptions listOptions = new ListBlobsOptions(); |
223 | | - return containerClient.listBlobs(listOptions, ObjectStoreConstants.AZURE_OBJECT_STORE_TOTAL_TIMEOUT_CONFIG_IN_MINUTES) |
224 | | - .stream() |
225 | | - .map(BlobItem::getName) |
226 | | - .collect(Collectors.toSet()); |
227 | | - } |
228 | | -} |
0 commit comments