Skip to content

Commit 4c44f6d

Browse files
authored
feat(synology): optimize video upload with chunk preloading (#1490)
Improve video upload efficiency by implementing an asynchronous producer-consumer model for chunked uploads. This minimizes idle time between network requests and increases overall throughput for large video files. ## Goal The goal of this change is to optimize video uploads to Synology by pre-fetching the next data chunk while the current one is being uploaded, reducing the total duration of sequential transfers. ## Changes - **Pattern Implementation:** Refactored `uploadVideoChunks` into a producer-consumer model using a dedicated thread for pre-loading data from the input stream. - **Memory Management:** Introduced a `bufferPool` with a fixed size (2 chunks) to ensure predictable memory usage (~100MB) regardless of video size. - **Asynchronous Execution:** Utilized `CompletableFuture` and `LinkedBlockingQueue` for thread-safe coordination between chunk production and upload. - **Robustness:** Added explicit error propagation and resource cleanup (buffers, threads) using `AtomicBoolean` and `finally` blocks. - **Observability:** Enhanced logging with detailed markers for chunk fetching, queueing, and upload progress to facilitate monitoring and debugging. <img width="1492" height="957" alt="image" src="https://github.com/user-attachments/assets/e4a1ee82-374e-498c-9d17-b3751e9d427d" />
1 parent c7dbae7 commit 4c44f6d

1 file changed

Lines changed: 175 additions & 38 deletions

File tree

  • extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service

extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java

Lines changed: 175 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
import java.util.Date;
3030
import java.util.Map;
3131
import java.util.UUID;
32+
import java.util.concurrent.BlockingQueue;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3237
import okhttp3.FormBody;
3338
import okhttp3.MediaType;
3439
import okhttp3.MultipartBody;
@@ -271,60 +276,192 @@ public Map<String, Object> createVideo(VideoModel video, UUID jobId)
271276
private int uploadVideoChunks(VideoModel video, UUID jobId, InputStream inputStream)
272277
throws CopyExceptionWithFailureReason, IOException {
273278
final int CHUNK_SIZE = 50 * 1024 * 1024; // 50MB
274-
int actualChunkCount = 0;
279+
// Use a pool of 2 buffers to limit memory usage to ~100MB regardless of video
280+
BlockingQueue<byte[]> bufferPool = new LinkedBlockingQueue<>(2);
281+
bufferPool.add(new byte[CHUNK_SIZE]);
282+
bufferPool.add(new byte[CHUNK_SIZE]);
283+
284+
// Pass video chunks from producer to consumer using a blocking queue
285+
// the producer will put a VideoChunk with null data to indicate the end of stream or an error.
286+
BlockingQueue<VideoChunk> chunkQueue = new LinkedBlockingQueue<>();
287+
AtomicBoolean consumerAborted = new AtomicBoolean(false);
288+
289+
// Start a producer thread to read the input stream.
290+
CompletableFuture<Void> producerFuture =
291+
CompletableFuture.runAsync(
292+
() ->
293+
produceVideoChunks(
294+
video,
295+
jobId,
296+
inputStream,
297+
bufferPool,
298+
chunkQueue,
299+
consumerAborted,
300+
CHUNK_SIZE));
301+
302+
try {
303+
return consumeAndUploadVideoChunks(video, jobId, bufferPool, chunkQueue);
304+
} catch (InterruptedException e) {
305+
Thread.currentThread().interrupt();
306+
throw new IOException("Video upload interrupted", e);
307+
} finally {
308+
consumerAborted.set(true);
309+
producerFuture.cancel(true);
310+
}
311+
}
312+
313+
private void produceVideoChunks(
314+
VideoModel video,
315+
UUID jobId,
316+
InputStream inputStream,
317+
BlockingQueue<byte[]> bufferPool,
318+
BlockingQueue<VideoChunk> chunkQueue,
319+
AtomicBoolean consumerAborted,
320+
int chunkSize) {
321+
try {
322+
int index = 0;
323+
while (!consumerAborted.get()) {
324+
byte[] chunkData = bufferPool.poll(500, TimeUnit.MILLISECONDS);
325+
if (chunkData == null) {
326+
continue;
327+
}
328+
329+
int bytesRead = ByteStreams.read(inputStream, chunkData, 0, chunkSize);
330+
if (bytesRead == 0) {
331+
bufferPool.add(chunkData); // Return unused buffer
332+
chunkQueue.add(new VideoChunk(null, 0, -1));
333+
break;
334+
}
275335

276-
// reuse the same byte array for each chunk to reduce memory usage,
277-
// since we are uploading in sequential, there is no concurrency issue
278-
byte[] chunkData = new byte[CHUNK_SIZE];
336+
final int fIndex = index;
337+
monitor.info(
338+
() ->
339+
String.format(
340+
"[SynologyImporter] fetched video chunk, video name: [%s], chunk index: [%d],"
341+
+ " chunk size: [%d].",
342+
video.getName(), fIndex, bytesRead),
343+
jobId);
344+
345+
chunkQueue.add(new VideoChunk(chunkData, bytesRead, index++));
346+
347+
monitor.info(
348+
() ->
349+
String.format(
350+
"[SynologyImporter] video chunk put into queue, video name: [%s], chunk index:"
351+
+ " [%d], chunk size: [%d].",
352+
video.getName(), fIndex, bytesRead),
353+
jobId);
354+
}
355+
} catch (Exception e) {
356+
chunkQueue.add(new VideoChunk(e));
357+
}
358+
}
359+
360+
private int consumeAndUploadVideoChunks(
361+
VideoModel video,
362+
UUID jobId,
363+
BlockingQueue<byte[]> bufferPool,
364+
BlockingQueue<VideoChunk> chunkQueue)
365+
throws CopyExceptionWithFailureReason, IOException, InterruptedException {
366+
int actualChunkCount = 0;
279367
while (true) {
280-
int bytesRead = ByteStreams.read(inputStream, chunkData, 0, CHUNK_SIZE);
281-
if (bytesRead == 0) {
282-
break;
368+
VideoChunk chunk = chunkQueue.take();
369+
if (chunk.error != null) {
370+
throw new IOException("Error reading video stream", chunk.error);
371+
}
372+
if (chunk.data == null) {
373+
final int finalActualChunkCount = actualChunkCount;
374+
monitor.info(
375+
() ->
376+
String.format(
377+
"[SynologyImporter] finished reading video stream, total chunks: [%d].",
378+
finalActualChunkCount),
379+
jobId);
380+
return actualChunkCount;
283381
}
284382

285-
RequestBody fileBody =
286-
new RequestBody() {
287-
@Override
288-
public MediaType contentType() {
289-
return MediaType.parse(video.getMimeType());
290-
}
383+
monitor.info(
384+
() ->
385+
String.format(
386+
"[SynologyImporter] get video chunk, video name: [%s], chunk index: [%d], chunk"
387+
+ " size: [%d].",
388+
video.getName(), chunk.index, chunk.size),
389+
jobId);
291390

292-
@Override
293-
public long contentLength() {
294-
return bytesRead;
295-
}
391+
try {
392+
uploadVideoChunk(video, jobId, chunk);
393+
} finally {
394+
bufferPool.add(chunk.data);
395+
}
396+
actualChunkCount++;
397+
}
398+
}
296399

297-
@Override
298-
public void writeTo(BufferedSink sink) throws IOException {
299-
// write the actual bytes read for the last chunk, which may be smaller than
300-
// CHUNK_SIZE
301-
sink.write(chunkData, 0, bytesRead);
302-
}
303-
};
400+
private void uploadVideoChunk(VideoModel video, UUID jobId, VideoChunk chunk)
401+
throws CopyExceptionWithFailureReason, IOException {
402+
final int bytesRead = chunk.size;
403+
final byte[] data = chunk.data;
404+
405+
RequestBodyGenerator bodyGenerator =
406+
() -> {
407+
RequestBody fileBody =
408+
new RequestBody() {
409+
@Override
410+
public MediaType contentType() {
411+
return MediaType.parse(video.getMimeType());
412+
}
304413

305-
MultipartBody.Builder builder =
306-
new MultipartBody.Builder()
414+
@Override
415+
public long contentLength() {
416+
return bytesRead;
417+
}
418+
419+
@Override
420+
public void writeTo(BufferedSink sink) throws IOException {
421+
sink.write(data, 0, bytesRead);
422+
}
423+
};
424+
425+
return new MultipartBody.Builder()
307426
.setType(MultipartBody.FORM)
308427
.addFormDataPart("item_id", video.getDataId())
309-
.addFormDataPart("index", String.valueOf(actualChunkCount))
428+
.addFormDataPart("index", String.valueOf(chunk.index))
310429
.addFormDataPart("job_id", jobId.toString())
311430
.addFormDataPart("service", exportingService)
312431
.addFormDataPart("file_size", String.valueOf(bytesRead))
313-
.addFormDataPart("file", video.getName(), fileBody);
432+
.addFormDataPart("file", video.getName(), fileBody)
433+
.build();
434+
};
314435

315-
RequestBody requestBody = builder.build();
436+
monitor.info(
437+
() ->
438+
String.format(
439+
"[SynologyImporter] uploading video chunk, video name: [%s], chunk index: [%d],"
440+
+ " chunk size: [%d].",
441+
video.getName(), chunk.index, bytesRead),
442+
jobId);
443+
sendPostRequest(c2Api.getChunkUploadItem(), bodyGenerator, jobId);
444+
}
316445

317-
final String chunkInfo =
318-
String.format(
319-
"[SynologyImporter] uploading video chunk, video name: [%s], chunk index: [%d], chunk"
320-
+ " size: [%d].",
321-
video.getName(), actualChunkCount, bytesRead);
322-
monitor.info(() -> chunkInfo, jobId);
323-
sendPostRequest(c2Api.getChunkUploadItem(), () -> requestBody, jobId);
446+
private static class VideoChunk {
447+
final byte[] data;
448+
final int size;
449+
final int index;
450+
final Throwable error;
451+
452+
VideoChunk(byte[] data, int size, int index) {
453+
this.data = data;
454+
this.size = size;
455+
this.index = index;
456+
this.error = null;
457+
}
324458

325-
actualChunkCount++;
459+
VideoChunk(Throwable error) {
460+
this.data = null;
461+
this.size = 0;
462+
this.index = -1;
463+
this.error = error;
326464
}
327-
return actualChunkCount;
328465
}
329466

330467
private Map<String, Object> completeVideoUpload(VideoModel video, UUID jobId, int totalChunks)

0 commit comments

Comments
 (0)