Skip to content

Commit 4745bc8

Browse files
committed
feat(synology): implement streaming for large file uploads
Key changes: - Implemented streaming uploads using Okio and a custom RequestBody. - Introduced RequestBodyGenerator to support retries by re-opening streams. - Refactored stream retrieval into getMediaInputStreamWrapper. - Added SynologyDTPServiceOOMTest to verify 1GB+ uploads without OOM.
1 parent f17ce32 commit 4745bc8

4 files changed

Lines changed: 604 additions & 230 deletions

File tree

extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java

Lines changed: 203 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.base.Strings;
23-
import com.google.common.io.ByteStreams;
2423
import java.io.IOException;
25-
import java.io.InputStream;
2624
import java.net.MalformedURLException;
2725
import java.net.URL;
26+
import java.net.URLConnection;
2827
import java.util.Date;
2928
import java.util.Map;
3029
import java.util.UUID;
@@ -35,11 +34,15 @@
3534
import okhttp3.Request;
3635
import okhttp3.RequestBody;
3736
import okhttp3.Response;
37+
import okio.BufferedSink;
38+
import okio.Okio;
39+
import okio.Source;
3840
import org.datatransferproject.api.launcher.Monitor;
3941
import org.datatransferproject.datatransfer.synology.models.C2Api;
4042
import org.datatransferproject.datatransfer.synology.models.ServiceConfig;
4143
import org.datatransferproject.datatransfer.synology.utils.ServiceConfigParser;
4244
import org.datatransferproject.spi.cloud.storage.JobStore;
45+
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
4346
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
4447
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
4548
import org.datatransferproject.spi.transfer.types.NoNasInAccountException;
@@ -61,6 +64,11 @@ public class SynologyDTPService {
6164
C2Api c2Api;
6265
ServiceConfig.Retry retryConfig;
6366

67+
@FunctionalInterface
68+
protected interface RequestBodyGenerator {
69+
RequestBody get() throws CopyExceptionWithFailureReason;
70+
}
71+
6472
/**
6573
* Constructs a new {@code SynologyDTPService} instance.
6674
*
@@ -89,17 +97,6 @@ public SynologyDTPService(
8997
this.client = client;
9098
}
9199

92-
/**
93-
* getInputStream
94-
*
95-
* @param url
96-
* @return an InputStream Instance
97-
* @throws IOException
98-
*/
99-
protected InputStream getInputStream(String url) throws IOException {
100-
return new URL(url).openStream();
101-
}
102-
103100
/**
104101
* Creates album.
105102
*
@@ -115,62 +112,124 @@ public Map<String, Object> createAlbum(MediaAlbum album, UUID jobId)
115112
builder.add("service", exportingService);
116113
monitor.info(() -> "[SynologyImporter] Creating album", album.getName(), jobId);
117114

115+
RequestBody requestBody = builder.build();
118116
return (Map<String, Object>)
119-
sendPostRequest(c2Api.getCreateAlbum(), builder.build(), jobId).get("data");
117+
sendPostRequest(c2Api.getCreateAlbum(), () -> requestBody, jobId).get("data");
120118
}
121119

122120
/**
123-
* Creates photo.
121+
* get InputStreamWrapper for media file, it can be from temp store or from fetchable url
124122
*
125-
* @param photo the photo
126123
* @param jobId the job ID
127-
* @return a map of shape {"data": {"item_id": <item_id>}}
124+
* @param fetchableUrl the url to fetch media file, can be null if the file is in temp store
125+
* @return an InputStreamWrapper instance
126+
* @throws CopyExceptionWithFailureReason
128127
*/
129-
public Map<String, Object> createPhoto(PhotoModel photo, UUID jobId)
128+
@VisibleForTesting
129+
protected InputStreamWrapper getMediaInputStreamWrapper(
130+
UUID jobId, String fetchableUrl, boolean isInTempStore)
130131
throws CopyExceptionWithFailureReason {
131-
byte[] imageBytes;
132132
try {
133-
InputStream inputStream = null;
134-
if (photo.isInTempStore()) {
135-
inputStream = jobStore.getStream(jobId, photo.getFetchableUrl()).getStream();
136-
} else if (photo.getFetchableUrl() != null) {
137-
inputStream = getInputStream(photo.getFetchableUrl());
138-
} else {
139-
monitor.severe(() -> "[SynologyImporter] Can't get inputStream for a photo");
140-
return null;
133+
if (isInTempStore) {
134+
return jobStore.getStream(jobId, fetchableUrl);
135+
} else if (fetchableUrl != null) {
136+
URL url = new URL(fetchableUrl);
137+
URLConnection connection = url.openConnection();
138+
return new InputStreamWrapper(
139+
connection.getInputStream(), connection.getContentLengthLong());
141140
}
142-
imageBytes = ByteStreams.toByteArray(inputStream);
143141
} catch (MalformedURLException e) {
144-
throw new UploadErrorException("Failed to create url for photo", e);
142+
throw new UploadErrorException(
143+
String.format("Failed to create url for fetchableUrl [%s]", fetchableUrl), e);
145144
} catch (IOException e) {
146-
throw new UploadErrorException("Failed to create input stream for photo", e);
145+
throw new UploadErrorException(
146+
String.format("Failed to create input stream for fetchableUrl [%s]", fetchableUrl), e);
147147
}
148+
throw new UploadErrorException(
149+
"No valid input stream source for media",
150+
new IOException("fetchableUrl is null and isInTempStore is false"));
151+
}
148152

149-
RequestBody fileBody = RequestBody.create(MediaType.parse(photo.getMimeType()), imageBytes);
150-
151-
MultipartBody.Builder builder =
152-
new MultipartBody.Builder()
153-
.setType(MultipartBody.FORM)
154-
.addFormDataPart("file", photo.getTitle(), fileBody)
155-
.addFormDataPart("item_id", photo.getDataId())
156-
.addFormDataPart("title", photo.getTitle())
157-
.addFormDataPart("job_id", jobId.toString())
158-
.addFormDataPart("service", exportingService);
153+
/**
154+
* Creates photo.
155+
*
156+
* @param photo the photo
157+
* @param jobId the job ID
158+
* @return a map of shape {"data": {"item_id": <item_id>}}
159+
*/
160+
public Map<String, Object> createPhoto(PhotoModel photo, UUID jobId)
161+
throws CopyExceptionWithFailureReason {
162+
monitor.info(
163+
() ->
164+
String.format(
165+
"[SynologyImporter] starts creating photo, dataId: [%s], name: [%s]",
166+
photo.getDataId(), photo.getTitle()),
167+
jobId);
168+
169+
RequestBodyGenerator bodyGenerator =
170+
() -> {
171+
// Due to InputStream may not repeatable, we need to open it inside the generator function
172+
// to make sure it can be read when retrying.
173+
InputStreamWrapper inputStreamWrapper =
174+
getMediaInputStreamWrapper(jobId, photo.getFetchableUrl(), photo.isInTempStore());
175+
176+
RequestBody fileBody =
177+
new RequestBody() {
178+
private boolean isConsumed = false;
179+
180+
@Override
181+
public MediaType contentType() {
182+
return MediaType.parse(photo.getMimeType());
183+
}
184+
185+
@Override
186+
public long contentLength() {
187+
return inputStreamWrapper.getBytes();
188+
}
189+
190+
@Override
191+
public void writeTo(BufferedSink sink) throws IOException {
192+
if (isConsumed) {
193+
throw new IOException("InputStream has already been consumed");
194+
}
195+
isConsumed = true;
196+
try (Source source = Okio.source(inputStreamWrapper.getStream())) {
197+
sink.writeAll(source);
198+
}
199+
}
200+
};
201+
202+
MultipartBody.Builder builder =
203+
new MultipartBody.Builder()
204+
.setType(MultipartBody.FORM)
205+
.addFormDataPart("file", photo.getTitle(), fileBody)
206+
.addFormDataPart("item_id", photo.getDataId())
207+
.addFormDataPart("title", photo.getTitle())
208+
.addFormDataPart("job_id", jobId.toString())
209+
.addFormDataPart("service", exportingService);
210+
211+
String imageDescription = photo.getDescription();
212+
if (!Strings.isNullOrEmpty(imageDescription)) {
213+
builder.addFormDataPart("description", imageDescription);
214+
}
215+
Date imageUploadedTime = photo.getUploadedTime();
216+
if (imageUploadedTime != null) {
217+
long timestampInSeconds = imageUploadedTime.getTime() / 1000;
218+
builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds));
219+
}
159220

160-
String imageDescription = photo.getDescription();
161-
if (!Strings.isNullOrEmpty(imageDescription)) {
162-
builder.addFormDataPart("description", imageDescription);
163-
}
164-
Date imageUploadedTime = photo.getUploadedTime();
165-
if (imageUploadedTime != null) {
166-
long timestampInSeconds = imageUploadedTime.getTime() / 1000;
167-
builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds));
168-
}
221+
return builder.build();
222+
};
169223

170224
@SuppressWarnings("unchecked")
171225
Map<String, Object> responseData =
172226
(Map<String, Object>)
173-
sendPostRequest(c2Api.getUploadItem(), builder.build(), jobId).get("data");
227+
sendPostRequest(c2Api.getUploadItem(), bodyGenerator, jobId).get("data");
228+
monitor.info(
229+
() ->
230+
String.format(
231+
"[SynologyImporter] photo created successfully, name: [%s]", photo.getTitle()),
232+
jobId);
174233
return responseData;
175234
}
176235

@@ -183,49 +242,72 @@ public Map<String, Object> createPhoto(PhotoModel photo, UUID jobId)
183242
*/
184243
public Map<String, Object> createVideo(VideoModel video, UUID jobId)
185244
throws CopyExceptionWithFailureReason {
186-
byte[] videoBytes;
187-
try {
188-
InputStream inputStream = null;
189-
if (video.isInTempStore()) {
190-
inputStream = jobStore.getStream(jobId, video.getFetchableUrl()).getStream();
191-
} else if (video.getFetchableUrl() != null) {
192-
inputStream = getInputStream(video.getFetchableUrl());
193-
} else {
194-
monitor.severe(() -> "[SynologyImporter] Can't get inputStream for a video");
195-
return null;
196-
}
197-
198-
videoBytes = ByteStreams.toByteArray(inputStream);
199-
} catch (MalformedURLException e) {
200-
throw new UploadErrorException("Failed to create url for video", e);
201-
} catch (IOException e) {
202-
throw new UploadErrorException("Failed to create input stream for video", e);
203-
}
245+
monitor.info(
246+
() ->
247+
String.format(
248+
"[SynologyImporter] starts creating video, dataId: [%s], name: [%s]",
249+
video.getDataId(), video.getName()),
250+
jobId);
251+
252+
RequestBodyGenerator bodyGenerator =
253+
() -> {
254+
// Due to InputStream may not repeatable, we need to open it inside the generator function
255+
// to make sure it can be read when retrying.
256+
InputStreamWrapper inputStreamWrapper =
257+
getMediaInputStreamWrapper(jobId, video.getFetchableUrl(), video.isInTempStore());
258+
259+
RequestBody fileBody =
260+
new RequestBody() {
261+
private boolean isConsumed = false;
262+
263+
@Override
264+
public MediaType contentType() {
265+
return MediaType.parse(video.getMimeType());
266+
}
267+
268+
@Override
269+
public long contentLength() {
270+
return inputStreamWrapper.getBytes();
271+
}
272+
273+
@Override
274+
public void writeTo(BufferedSink sink) throws IOException {
275+
if (isConsumed) {
276+
throw new IOException("InputStream has already been consumed");
277+
}
278+
isConsumed = true;
279+
try (Source source = Okio.source(inputStreamWrapper.getStream())) {
280+
sink.writeAll(source);
281+
}
282+
}
283+
};
284+
285+
MultipartBody.Builder builder =
286+
new MultipartBody.Builder()
287+
.setType(MultipartBody.FORM)
288+
.addFormDataPart("file", video.getName(), fileBody)
289+
.addFormDataPart("item_id", video.getDataId())
290+
.addFormDataPart("title", video.getName())
291+
.addFormDataPart("job_id", jobId.toString())
292+
.addFormDataPart("service", exportingService);
293+
294+
String imageDescription = video.getDescription();
295+
if (!Strings.isNullOrEmpty(imageDescription)) {
296+
builder.addFormDataPart("description", imageDescription);
297+
}
298+
Date videoUploadedTime = video.getUploadedTime();
299+
if (videoUploadedTime != null) {
300+
long timestampInSeconds = videoUploadedTime.getTime() / 1000;
301+
builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds));
302+
}
204303

205-
RequestBody fileBody = RequestBody.create(MediaType.parse(video.getMimeType()), videoBytes);
206-
MultipartBody.Builder builder =
207-
new MultipartBody.Builder()
208-
.setType(MultipartBody.FORM)
209-
.addFormDataPart("file", video.getName(), fileBody)
210-
.addFormDataPart("item_id", video.getDataId())
211-
.addFormDataPart("title", video.getName())
212-
.addFormDataPart("job_id", jobId.toString())
213-
.addFormDataPart("service", exportingService);
214-
215-
String imageDescription = video.getDescription();
216-
if (!Strings.isNullOrEmpty(imageDescription)) {
217-
builder.addFormDataPart("description", imageDescription);
218-
}
219-
Date videoUploadedTime = video.getUploadedTime();
220-
if (videoUploadedTime != null) {
221-
long timestampInSeconds = videoUploadedTime.getTime() / 1000;
222-
builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds));
223-
}
304+
return builder.build();
305+
};
224306

225307
@SuppressWarnings("unchecked")
226308
Map<String, Object> responseData =
227309
(Map<String, Object>)
228-
sendPostRequest(c2Api.getUploadItem(), builder.build(), jobId).get("data");
310+
sendPostRequest(c2Api.getUploadItem(), bodyGenerator, jobId, 300).get("data");
229311
return responseData;
230312
}
231313

@@ -244,7 +326,8 @@ public Map<String, Object> addItemToAlbum(String albumId, String itemId, UUID jo
244326
.add("service", exportingService)
245327
.add("album_id", albumId)
246328
.add("item_id", itemId);
247-
return sendPostRequest(c2Api.getAddItemToAlbum(), builder.build(), jobId);
329+
RequestBody requestBody = builder.build();
330+
return sendPostRequest(c2Api.getAddItemToAlbum(), () -> requestBody, jobId);
248331
}
249332

250333
/**
@@ -265,7 +348,8 @@ public Map<String, Object> sendJobSignal(JobLifeCycle jobStatus, UUID jobId)
265348
builder.add("end_reason", jobStatus.endReason().name());
266349
}
267350

268-
return sendPostRequest(c2Api.getSignalJob(), builder.build(), jobId);
351+
RequestBody requestBody = builder.build();
352+
return sendPostRequest(c2Api.getSignalJob(), () -> requestBody, jobId);
269353
}
270354

271355
@VisibleForTesting
@@ -304,17 +388,43 @@ protected void throwExceptionIfNoQuota(Response response) throws CopyExceptionWi
304388
}
305389

306390
@VisibleForTesting
307-
protected Map<String, Object> sendPostRequest(String url, RequestBody body, UUID jobId)
391+
protected Map<String, Object> sendPostRequest(
392+
String url, RequestBodyGenerator bodyGenerator, UUID jobId)
393+
throws CopyExceptionWithFailureReason {
394+
return sendPostRequest(url, bodyGenerator, jobId, -1);
395+
}
396+
397+
/*
398+
* @param url the URL to send the POST request to
399+
* @param bodyGenerator a generator function that produces the request body, it will be called for each retry attempt
400+
* @param jobId the job ID
401+
* @param timeoutInSeconds the timeout for the request in seconds, -1 means do not modify the default timeout of OkHttpClient
402+
*/
403+
@VisibleForTesting
404+
protected Map<String, Object> sendPostRequest(
405+
String url, RequestBodyGenerator bodyGenerator, UUID jobId, int timeoutInSeconds)
308406
throws CopyExceptionWithFailureReason {
309407
boolean triedRefreshToken = false;
310-
Request.Builder requestBuilder = new Request.Builder().url(url).post(body);
311408

312409
Exception lastException = null;
313410
for (int retry = retryConfig.getMaxAttempts(); retry > 0; retry--) {
314411
Response response = null;
315412
try {
413+
RequestBody body = bodyGenerator.get();
414+
Request.Builder requestBuilder = new Request.Builder().url(url).post(body);
316415
requestBuilder.header("Authorization", "Bearer " + tokenManager.getAccessToken(jobId));
317-
response = client.newCall(requestBuilder.build()).execute();
416+
if (timeoutInSeconds < 0) {
417+
// Use default timeout of OkHttpClient
418+
response = client.newCall(requestBuilder.build()).execute();
419+
} else {
420+
response =
421+
client
422+
.newBuilder()
423+
.readTimeout(timeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS)
424+
.build()
425+
.newCall(requestBuilder.build())
426+
.execute();
427+
}
318428
if (!response.isSuccessful()) {
319429
int code = response.code();
320430
if (code == 401 && !triedRefreshToken) {

0 commit comments

Comments
 (0)