Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
Expand Down Expand Up @@ -271,60 +276,192 @@ public Map<String, Object> createVideo(VideoModel video, UUID jobId)
private int uploadVideoChunks(VideoModel video, UUID jobId, InputStream inputStream)
throws CopyExceptionWithFailureReason, IOException {
final int CHUNK_SIZE = 50 * 1024 * 1024; // 50MB
int actualChunkCount = 0;
// Use a pool of 2 buffers to limit memory usage to ~100MB regardless of video
BlockingQueue<byte[]> bufferPool = new LinkedBlockingQueue<>(2);
bufferPool.add(new byte[CHUNK_SIZE]);
bufferPool.add(new byte[CHUNK_SIZE]);

// Pass video chunks from producer to consumer using a blocking queue
// the producer will put a VideoChunk with null data to indicate the end of stream or an error.
BlockingQueue<VideoChunk> chunkQueue = new LinkedBlockingQueue<>();
AtomicBoolean consumerAborted = new AtomicBoolean(false);

// Start a producer thread to read the input stream.
CompletableFuture<Void> producerFuture =
CompletableFuture.runAsync(
() ->
produceVideoChunks(
video,
jobId,
inputStream,
bufferPool,
chunkQueue,
consumerAborted,
CHUNK_SIZE));

try {
return consumeAndUploadVideoChunks(video, jobId, bufferPool, chunkQueue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Video upload interrupted", e);
} finally {
consumerAborted.set(true);
producerFuture.cancel(true);
}
}

private void produceVideoChunks(
VideoModel video,
UUID jobId,
InputStream inputStream,
BlockingQueue<byte[]> bufferPool,
BlockingQueue<VideoChunk> chunkQueue,
AtomicBoolean consumerAborted,
int chunkSize) {
try {
int index = 0;
while (!consumerAborted.get()) {
byte[] chunkData = bufferPool.poll(500, TimeUnit.MILLISECONDS);
if (chunkData == null) {
continue;
}

int bytesRead = ByteStreams.read(inputStream, chunkData, 0, chunkSize);
if (bytesRead == 0) {
bufferPool.add(chunkData); // Return unused buffer
chunkQueue.add(new VideoChunk(null, 0, -1));
break;
}

// reuse the same byte array for each chunk to reduce memory usage,
// since we are uploading in sequential, there is no concurrency issue
byte[] chunkData = new byte[CHUNK_SIZE];
final int fIndex = index;
monitor.info(
() ->
String.format(
"[SynologyImporter] fetched video chunk, video name: [%s], chunk index: [%d],"
+ " chunk size: [%d].",
video.getName(), fIndex, bytesRead),
jobId);

chunkQueue.add(new VideoChunk(chunkData, bytesRead, index++));

monitor.info(
() ->
String.format(
"[SynologyImporter] video chunk put into queue, video name: [%s], chunk index:"
+ " [%d], chunk size: [%d].",
video.getName(), fIndex, bytesRead),
jobId);
}
} catch (Exception e) {
chunkQueue.add(new VideoChunk(e));
}
}

private int consumeAndUploadVideoChunks(
VideoModel video,
UUID jobId,
BlockingQueue<byte[]> bufferPool,
BlockingQueue<VideoChunk> chunkQueue)
throws CopyExceptionWithFailureReason, IOException, InterruptedException {
int actualChunkCount = 0;
while (true) {
int bytesRead = ByteStreams.read(inputStream, chunkData, 0, CHUNK_SIZE);
if (bytesRead == 0) {
break;
VideoChunk chunk = chunkQueue.take();
if (chunk.error != null) {
throw new IOException("Error reading video stream", chunk.error);
}
if (chunk.data == null) {
final int finalActualChunkCount = actualChunkCount;
monitor.info(
() ->
String.format(
"[SynologyImporter] finished reading video stream, total chunks: [%d].",
finalActualChunkCount),
jobId);
return actualChunkCount;
}

RequestBody fileBody =
new RequestBody() {
@Override
public MediaType contentType() {
return MediaType.parse(video.getMimeType());
}
monitor.info(
() ->
String.format(
"[SynologyImporter] get video chunk, video name: [%s], chunk index: [%d], chunk"
+ " size: [%d].",
video.getName(), chunk.index, chunk.size),
jobId);

@Override
public long contentLength() {
return bytesRead;
}
try {
uploadVideoChunk(video, jobId, chunk);
} finally {
bufferPool.add(chunk.data);
}
actualChunkCount++;
}
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
// write the actual bytes read for the last chunk, which may be smaller than
// CHUNK_SIZE
sink.write(chunkData, 0, bytesRead);
}
};
private void uploadVideoChunk(VideoModel video, UUID jobId, VideoChunk chunk)
throws CopyExceptionWithFailureReason, IOException {
final int bytesRead = chunk.size;
final byte[] data = chunk.data;

RequestBodyGenerator bodyGenerator =
() -> {
RequestBody fileBody =
new RequestBody() {
@Override
public MediaType contentType() {
return MediaType.parse(video.getMimeType());
}

MultipartBody.Builder builder =
new MultipartBody.Builder()
@Override
public long contentLength() {
return bytesRead;
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
sink.write(data, 0, bytesRead);
}
};

return new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("item_id", video.getDataId())
.addFormDataPart("index", String.valueOf(actualChunkCount))
.addFormDataPart("index", String.valueOf(chunk.index))
.addFormDataPart("job_id", jobId.toString())
.addFormDataPart("service", exportingService)
.addFormDataPart("file_size", String.valueOf(bytesRead))
.addFormDataPart("file", video.getName(), fileBody);
.addFormDataPart("file", video.getName(), fileBody)
.build();
};

RequestBody requestBody = builder.build();
monitor.info(
() ->
String.format(
"[SynologyImporter] uploading video chunk, video name: [%s], chunk index: [%d],"
+ " chunk size: [%d].",
video.getName(), chunk.index, bytesRead),
jobId);
sendPostRequest(c2Api.getChunkUploadItem(), bodyGenerator, jobId);
}

final String chunkInfo =
String.format(
"[SynologyImporter] uploading video chunk, video name: [%s], chunk index: [%d], chunk"
+ " size: [%d].",
video.getName(), actualChunkCount, bytesRead);
monitor.info(() -> chunkInfo, jobId);
sendPostRequest(c2Api.getChunkUploadItem(), () -> requestBody, jobId);
private static class VideoChunk {
final byte[] data;
final int size;
final int index;
final Throwable error;

VideoChunk(byte[] data, int size, int index) {
this.data = data;
this.size = size;
this.index = index;
this.error = null;
}

actualChunkCount++;
VideoChunk(Throwable error) {
this.data = null;
this.size = 0;
this.index = -1;
this.error = error;
}
return actualChunkCount;
}

private Map<String, Object> completeVideoUpload(VideoModel video, UUID jobId, int totalChunks)
Expand Down
Loading