diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java b/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java index e946e2fda..d69202585 100644 --- a/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java +++ b/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java @@ -20,11 +20,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.io.ByteStreams; import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URL; +import java.net.URLConnection; import java.util.Date; import java.util.Map; import java.util.UUID; @@ -35,15 +33,19 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okio.BufferedSink; +import okio.Okio; +import okio.Source; import org.datatransferproject.api.launcher.Monitor; import org.datatransferproject.datatransfer.synology.models.C2Api; import org.datatransferproject.datatransfer.synology.models.ServiceConfig; import org.datatransferproject.datatransfer.synology.utils.ServiceConfigParser; import org.datatransferproject.spi.cloud.storage.JobStore; +import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason; import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException; +import org.datatransferproject.spi.transfer.types.InvalidTokenException; import org.datatransferproject.spi.transfer.types.NoNasInAccountException; -import org.datatransferproject.spi.transfer.types.UploadErrorException; import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle; import org.datatransferproject.types.common.models.media.MediaAlbum; import org.datatransferproject.types.common.models.photos.PhotoModel; @@ -61,6 +63,11 @@ public class SynologyDTPService { C2Api c2Api; ServiceConfig.Retry retryConfig; + @FunctionalInterface + protected interface RequestBodyGenerator { + RequestBody get() throws CopyExceptionWithFailureReason, IOException; + } + /** * Constructs a new {@code SynologyDTPService} instance. * @@ -89,17 +96,6 @@ public SynologyDTPService( this.client = client; } - /** - * getInputStream - * - * @param url - * @return an InputStream Instance - * @throws IOException - */ - protected InputStream getInputStream(String url) throws IOException { - return new URL(url).openStream(); - } - /** * Creates album. * @@ -108,15 +104,38 @@ protected InputStream getInputStream(String url) throws IOException { * @return a map of shape {"data": {"album_id": }} */ public Map createAlbum(MediaAlbum album, UUID jobId) - throws CopyExceptionWithFailureReason { + throws CopyExceptionWithFailureReason, IOException { FormBody.Builder builder = new FormBody.Builder().add("title", album.getName()); builder.add("album_id", album.getId()); builder.add("job_id", jobId.toString()); builder.add("service", exportingService); monitor.info(() -> "[SynologyImporter] Creating album", album.getName(), jobId); + RequestBody requestBody = builder.build(); return (Map) - sendPostRequest(c2Api.getCreateAlbum(), builder.build(), jobId).get("data"); + sendPostRequest(c2Api.getCreateAlbum(), () -> requestBody, jobId).get("data"); + } + + /** + * get InputStreamWrapper for media file, it can be from temp store or from fetchable url + * + * @param jobId the job ID + * @param fetchableUrl the url to fetch media file, can be null if the file is in temp store + * @return an InputStreamWrapper instance + * @throws CopyExceptionWithFailureReason + */ + @VisibleForTesting + protected InputStreamWrapper getMediaInputStreamWrapper( + UUID jobId, String fetchableUrl, boolean isInTempStore) throws IOException { + if (isInTempStore) { + return jobStore.getStream(jobId, fetchableUrl); + } else if (fetchableUrl != null) { + URL url = new URL(fetchableUrl); + URLConnection connection = url.openConnection(); + return new InputStreamWrapper(connection.getInputStream(), connection.getContentLengthLong()); + } + + throw new IllegalArgumentException("fetchableUrl is null and isInTempStore is false"); } /** @@ -127,50 +146,78 @@ public Map createAlbum(MediaAlbum album, UUID jobId) * @return a map of shape {"data": {"item_id": }} */ public Map createPhoto(PhotoModel photo, UUID jobId) - throws CopyExceptionWithFailureReason { - byte[] imageBytes; - try { - InputStream inputStream = null; - if (photo.isInTempStore()) { - inputStream = jobStore.getStream(jobId, photo.getFetchableUrl()).getStream(); - } else if (photo.getFetchableUrl() != null) { - inputStream = getInputStream(photo.getFetchableUrl()); - } else { - monitor.severe(() -> "[SynologyImporter] Can't get inputStream for a photo"); - return null; - } - imageBytes = ByteStreams.toByteArray(inputStream); - } catch (MalformedURLException e) { - throw new UploadErrorException("Failed to create url for photo", e); - } catch (IOException e) { - throw new UploadErrorException("Failed to create input stream for photo", e); - } - - RequestBody fileBody = RequestBody.create(MediaType.parse(photo.getMimeType()), imageBytes); - - MultipartBody.Builder builder = - new MultipartBody.Builder() - .setType(MultipartBody.FORM) - .addFormDataPart("file", photo.getTitle(), fileBody) - .addFormDataPart("item_id", photo.getDataId()) - .addFormDataPart("title", photo.getTitle()) - .addFormDataPart("job_id", jobId.toString()) - .addFormDataPart("service", exportingService); + throws CopyExceptionWithFailureReason, IOException { + monitor.info( + () -> + String.format( + "[SynologyImporter] starts creating photo, dataId: [%s], name: [%s].", + photo.getDataId(), photo.getTitle()), + jobId); + + RequestBodyGenerator bodyGenerator = + () -> { + // Due to InputStream may not repeatable, we need to open it inside the generator function + // to make sure it can be read when retrying. + InputStreamWrapper inputStreamWrapper = + getMediaInputStreamWrapper(jobId, photo.getFetchableUrl(), photo.isInTempStore()); + + RequestBody fileBody = + new RequestBody() { + private boolean isConsumed = false; + + @Override + public MediaType contentType() { + return MediaType.parse(photo.getMimeType()); + } + + @Override + public long contentLength() { + return inputStreamWrapper.getBytes(); + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + if (isConsumed) { + throw new IOException("InputStream has already been consumed"); + } + isConsumed = true; + try (Source source = Okio.source(inputStreamWrapper.getStream())) { + sink.writeAll(source); + } + } + }; + + MultipartBody.Builder builder = + new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("file", photo.getTitle(), fileBody) + .addFormDataPart("item_id", photo.getDataId()) + .addFormDataPart("title", photo.getTitle()) + .addFormDataPart("job_id", jobId.toString()) + .addFormDataPart("service", exportingService); + + String imageDescription = photo.getDescription(); + if (!Strings.isNullOrEmpty(imageDescription)) { + builder.addFormDataPart("description", imageDescription); + } + Date imageUploadedTime = photo.getUploadedTime(); + if (imageUploadedTime != null) { + long timestampInSeconds = imageUploadedTime.getTime() / 1000; + builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds)); + } - String imageDescription = photo.getDescription(); - if (!Strings.isNullOrEmpty(imageDescription)) { - builder.addFormDataPart("description", imageDescription); - } - Date imageUploadedTime = photo.getUploadedTime(); - if (imageUploadedTime != null) { - long timestampInSeconds = imageUploadedTime.getTime() / 1000; - builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds)); - } + return builder.build(); + }; @SuppressWarnings("unchecked") Map responseData = (Map) - sendPostRequest(c2Api.getUploadItem(), builder.build(), jobId).get("data"); + sendPostRequest(c2Api.getUploadItem(), bodyGenerator, jobId).get("data"); + monitor.info( + () -> + String.format( + "[SynologyImporter] photo created successfully, name: [%s].", photo.getTitle()), + jobId); return responseData; } @@ -182,50 +229,78 @@ public Map createPhoto(PhotoModel photo, UUID jobId) * @return a map of shape {"data": {"item_id": }} */ public Map createVideo(VideoModel video, UUID jobId) - throws CopyExceptionWithFailureReason { - byte[] videoBytes; - try { - InputStream inputStream = null; - if (video.isInTempStore()) { - inputStream = jobStore.getStream(jobId, video.getFetchableUrl()).getStream(); - } else if (video.getFetchableUrl() != null) { - inputStream = getInputStream(video.getFetchableUrl()); - } else { - monitor.severe(() -> "[SynologyImporter] Can't get inputStream for a video"); - return null; - } - - videoBytes = ByteStreams.toByteArray(inputStream); - } catch (MalformedURLException e) { - throw new UploadErrorException("Failed to create url for video", e); - } catch (IOException e) { - throw new UploadErrorException("Failed to create input stream for video", e); - } + throws CopyExceptionWithFailureReason, IOException { + monitor.info( + () -> + String.format( + "[SynologyImporter] starts creating video, dataId: [%s], name: [%s].", + video.getDataId(), video.getName()), + jobId); + + RequestBodyGenerator bodyGenerator = + () -> { + // Due to InputStream may not repeatable, we need to open it inside the generator function + // to make sure it can be read when retrying. + InputStreamWrapper inputStreamWrapper = + getMediaInputStreamWrapper(jobId, video.getFetchableUrl(), video.isInTempStore()); + + RequestBody fileBody = + new RequestBody() { + private boolean isConsumed = false; + + @Override + public MediaType contentType() { + return MediaType.parse(video.getMimeType()); + } + + @Override + public long contentLength() { + return inputStreamWrapper.getBytes(); + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + if (isConsumed) { + throw new IOException("InputStream has already been consumed"); + } + isConsumed = true; + try (Source source = Okio.source(inputStreamWrapper.getStream())) { + sink.writeAll(source); + } + } + }; + + MultipartBody.Builder builder = + new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("file", video.getName(), fileBody) + .addFormDataPart("item_id", video.getDataId()) + .addFormDataPart("title", video.getName()) + .addFormDataPart("job_id", jobId.toString()) + .addFormDataPart("service", exportingService); + + String imageDescription = video.getDescription(); + if (!Strings.isNullOrEmpty(imageDescription)) { + builder.addFormDataPart("description", imageDescription); + } + Date videoUploadedTime = video.getUploadedTime(); + if (videoUploadedTime != null) { + long timestampInSeconds = videoUploadedTime.getTime() / 1000; + builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds)); + } - RequestBody fileBody = RequestBody.create(MediaType.parse(video.getMimeType()), videoBytes); - MultipartBody.Builder builder = - new MultipartBody.Builder() - .setType(MultipartBody.FORM) - .addFormDataPart("file", video.getName(), fileBody) - .addFormDataPart("item_id", video.getDataId()) - .addFormDataPart("title", video.getName()) - .addFormDataPart("job_id", jobId.toString()) - .addFormDataPart("service", exportingService); - - String imageDescription = video.getDescription(); - if (!Strings.isNullOrEmpty(imageDescription)) { - builder.addFormDataPart("description", imageDescription); - } - Date videoUploadedTime = video.getUploadedTime(); - if (videoUploadedTime != null) { - long timestampInSeconds = videoUploadedTime.getTime() / 1000; - builder.addFormDataPart("uploaded_time", String.valueOf(timestampInSeconds)); - } + return builder.build(); + }; @SuppressWarnings("unchecked") Map responseData = (Map) - sendPostRequest(c2Api.getUploadItem(), builder.build(), jobId).get("data"); + sendPostRequest(c2Api.getUploadItem(), bodyGenerator, jobId, 300).get("data"); + monitor.info( + () -> + String.format( + "[SynologyImporter] video created successfully, name: [%s].", video.getName()), + jobId); return responseData; } @@ -237,14 +312,15 @@ public Map createVideo(VideoModel video, UUID jobId) * @return a map of shape {"success": } */ public Map addItemToAlbum(String albumId, String itemId, UUID jobId) - throws CopyExceptionWithFailureReason { + throws CopyExceptionWithFailureReason, IOException { FormBody.Builder builder = new FormBody.Builder() .add("job_id", jobId.toString()) .add("service", exportingService) .add("album_id", albumId) .add("item_id", itemId); - return sendPostRequest(c2Api.getAddItemToAlbum(), builder.build(), jobId); + RequestBody requestBody = builder.build(); + return sendPostRequest(c2Api.getAddItemToAlbum(), () -> requestBody, jobId); } /** @@ -255,7 +331,7 @@ public Map addItemToAlbum(String albumId, String itemId, UUID jo * @return a map of shape {"success": } */ public Map sendJobSignal(JobLifeCycle jobStatus, UUID jobId) - throws CopyExceptionWithFailureReason { + throws CopyExceptionWithFailureReason, IOException { FormBody.Builder builder = new FormBody.Builder() .add("job_id", jobId.toString()) @@ -265,7 +341,8 @@ public Map sendJobSignal(JobLifeCycle jobStatus, UUID jobId) builder.add("end_reason", jobStatus.endReason().name()); } - return sendPostRequest(c2Api.getSignalJob(), builder.build(), jobId); + RequestBody requestBody = builder.build(); + return sendPostRequest(c2Api.getSignalJob(), () -> requestBody, jobId); } @VisibleForTesting @@ -304,20 +381,51 @@ protected void throwExceptionIfNoQuota(Response response) throws CopyExceptionWi } @VisibleForTesting - protected Map sendPostRequest(String url, RequestBody body, UUID jobId) - throws CopyExceptionWithFailureReason { + protected Map sendPostRequest( + String url, RequestBodyGenerator bodyGenerator, UUID jobId) + throws CopyExceptionWithFailureReason, IOException { + return sendPostRequest(url, bodyGenerator, jobId, -1); + } + + /* + * @param url the URL to send the POST request to + * @param bodyGenerator a generator function that produces the request body, it will be called for each retry attempt + * @param jobId the job ID + * @param timeoutInSeconds the timeout for the request in seconds, -1 means do not modify the default timeout of OkHttpClient + */ + @VisibleForTesting + protected Map sendPostRequest( + String url, RequestBodyGenerator bodyGenerator, UUID jobId, int timeoutInSeconds) + throws CopyExceptionWithFailureReason, IOException { boolean triedRefreshToken = false; - Request.Builder requestBuilder = new Request.Builder().url(url).post(body); Exception lastException = null; for (int retry = retryConfig.getMaxAttempts(); retry > 0; retry--) { Response response = null; try { + RequestBody body = bodyGenerator.get(); + Request.Builder requestBuilder = new Request.Builder().url(url).post(body); requestBuilder.header("Authorization", "Bearer " + tokenManager.getAccessToken(jobId)); - response = client.newCall(requestBuilder.build()).execute(); + if (timeoutInSeconds < 0) { + // Use default timeout of OkHttpClient + response = client.newCall(requestBuilder.build()).execute(); + } else { + response = + client + .newBuilder() + .readTimeout(timeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS) + .build() + .newCall(requestBuilder.build()) + .execute(); + } if (!response.isSuccessful()) { int code = response.code(); - if (code == 401 && !triedRefreshToken) { + if (code == 401) { + if (triedRefreshToken) { + throw new InvalidTokenException( + "Synology access token is invalid even after refresh", + new IOException("SynologyDTPService get http 401 unauthorized")); + } triedRefreshToken = true; if (tokenManager.refreshToken(jobId, client, objectMapper)) { continue; @@ -367,7 +475,7 @@ protected Map sendPostRequest(String url, RequestBody body, UUID lastException = e; } } - throw new UploadErrorException( + throw new IOException( String.format( "Failed to send POST request after %d retries: %s", retryConfig.getMaxAttempts(), lastException.getMessage()), diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploader.java b/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploader.java index eb3b343f3..de0277c66 100644 --- a/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploader.java +++ b/extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploader.java @@ -17,6 +17,7 @@ package org.datatransferproject.datatransfer.synology.uploader; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -27,8 +28,6 @@ import org.datatransferproject.datatransfer.synology.utils.SynologyMediaAlbumBinder; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason; -import org.datatransferproject.spi.transfer.types.UploadErrorException; -import org.datatransferproject.types.common.DownloadableItem; import org.datatransferproject.types.common.ImportableItem; import org.datatransferproject.types.common.models.media.MediaAlbum; import org.datatransferproject.types.common.models.photos.PhotoAlbum; @@ -72,10 +71,13 @@ public SynologyUploader( */ public void importAlbums(Collection albums, UUID jobId) throws CopyExceptionWithFailureReason { + monitor.info( + () -> String.format("[SynologyImporter] starts importing albums, size: %d.", albums.size()), + jobId); if (albums.isEmpty()) { return; } - monitor.info(() -> "[SynologyImporter] starts importing albums", jobId); + for (ImportableItem album : albums) { try { MediaAlbum mediaAlbum; @@ -86,9 +88,13 @@ public void importAlbums(Collection albums, UUID jobId } else if (album instanceof VideoAlbum) { mediaAlbum = MediaAlbum.videoToMediaAlbum(((VideoAlbum) album)); } else { - throw new UploadErrorException( - "cannot convert to MediaAlbum", - new IllegalArgumentException("Unsupported ImportableItem type: " + album.getClass())); + monitor.severe( + () -> + String.format( + "[SynologyImporter] unsupported album type: %s, album name: %s.", + album.getClass(), album.getName()), + jobId); + continue; } String newAlbumId = importItemWithCache( @@ -104,10 +110,9 @@ public void importAlbums(Collection albums, UUID jobId throw e; } catch (Exception e) { monitor.severe(e::toString, jobId); - throw new UploadErrorException("Failed to import albums", e); } } - monitor.info(() -> "[SynologyImporter] imported albums successfully", jobId); + monitor.info(() -> "[SynologyImporter] ended importing albums.", jobId); } /** @@ -118,17 +123,26 @@ public void importAlbums(Collection albums, UUID jobId */ public void importPhotos(Collection photos, UUID jobId) throws CopyExceptionWithFailureReason { + monitor.info( + () -> String.format("[SynologyImporter] starts importing photos, size: %d.", photos.size()), + jobId); if (photos.isEmpty()) { return; } - monitor.info(() -> "[SynologyImporter] starts importing photos", jobId); + monitor.info(() -> "[SynologyImporter] starts importing photos.", jobId); for (PhotoModel photo : photos) { + monitor.info( + () -> + String.format( + "[SynologyImporter] starts importing photo, dataId: [%s], name: [%s]", + photo.getDataId(), photo.getName()), + jobId); try { String newPhotoId = - importDownloadableItemWithCache( + importItemWithCache( photo, jobId, - PhotoModel::getAlbumId, + "item_id", synologyDTPService::createPhoto, PhotoModel::getDataId, PhotoModel::getName); @@ -138,10 +152,9 @@ public void importPhotos(Collection photos, UUID jobId) throw e; } catch (Exception e) { monitor.severe(e::toString, jobId); - throw new UploadErrorException("Failed to import photos", e); } } - monitor.info(() -> "[SynologyImporter] imported photos successfully", jobId); + monitor.info(() -> "[SynologyImporter] ended importing photos.", jobId); } /** @@ -152,17 +165,26 @@ public void importPhotos(Collection photos, UUID jobId) */ public void importVideos(Collection videos, UUID jobId) throws CopyExceptionWithFailureReason { + monitor.info( + () -> String.format("[SynologyImporter] starts importing videos, size: %d", videos.size()), + jobId); if (videos.isEmpty()) { return; } monitor.info(() -> "[SynologyImporter] starts importing videos", jobId); for (VideoModel video : videos) { + monitor.info( + () -> + String.format( + "[SynologyImporter] starts importing video, dataId: [%s], name: [%s]", + video.getDataId(), video.getName()), + jobId); try { String newVideoId = - importDownloadableItemWithCache( + importItemWithCache( video, jobId, - VideoModel::getAlbumId, + "item_id", synologyDTPService::createVideo, VideoModel::getDataId, VideoModel::getName); @@ -172,46 +194,14 @@ public void importVideos(Collection videos, UUID jobId) throw e; } catch (Exception e) { monitor.severe(e::toString, jobId); - throw new UploadErrorException("Failed to import videos", e); } } - monitor.info(() -> "[SynologyImporter] imported videos successfully", jobId); + monitor.info(() -> "[SynologyImporter] ended importing videos.", jobId); } @FunctionalInterface private interface SynologyBiFunction { - R apply(T t, U u) throws CopyExceptionWithFailureReason; - } - - /** - * Imports a item - * - * @param item the item - * @return the item ID - */ - private String importDownloadableItemWithCache( - T item, - UUID jobId, - Function albumIdFunction, - SynologyBiFunction> createFunction, - Function dataIdFunction, - Function nameFunction) - throws CopyExceptionWithFailureReason { - String newItemId = null; - try { - newItemId = - importItemWithCache(item, jobId, "item_id", createFunction, dataIdFunction, nameFunction); - } catch (CopyExceptionWithFailureReason e) { - throw e; - } catch (Exception e) { - throw new UploadErrorException( - String.format( - "Failed to import item [%s], name [%s]", - dataIdFunction.apply(item), nameFunction.apply(item)), - e); - } - - return newItemId; + R apply(T t, U u) throws CopyExceptionWithFailureReason, IOException; } private String importItemWithCache( @@ -250,16 +240,22 @@ private void addItemToAlbum(String albumId, String itemId, UUID jobId) (Boolean) synologyDTPService.addItemToAlbum(albumId, itemId, jobId).get("success")); if (Boolean.FALSE.equals(createResult)) { - throw new UploadErrorException( - String.format( - "Unsuccessful result from adding item [%s] to album [%s]", itemId, albumId), - null); + monitor.severe( + () -> + String.format( + "[SynologyImporter] failed to add item to album, albumId: [%s], itemId: [%s].", + albumId, itemId)); } } catch (CopyExceptionWithFailureReason e) { throw e; } catch (Exception e) { - throw new UploadErrorException( - String.format("Failed to add item [%s] to album [%s]", itemId, albumId), e); + monitor.severe( + () -> + String.format( + "[SynologyImporter] failed to add item to album, albumId: [%s], itemId: [%s].", + albumId, itemId), + e); + return; } monitor.info( () -> "[SynologyImporter] added item to album successfully", diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceOOMTest.java b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceOOMTest.java new file mode 100644 index 000000000..48b5510cd --- /dev/null +++ b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceOOMTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2025 The Data Transfer Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.datatransferproject.datatransfer.synology.service; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.UUID; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okio.Buffer; +import okio.Okio; +import okio.Sink; +import okio.Timeout; +import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.datatransfer.synology.utils.TestConfigs; +import org.datatransferproject.spi.cloud.storage.JobStore; +import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; +import org.datatransferproject.spi.transfer.types.InvalidTokenException; +import org.datatransferproject.types.common.models.photos.PhotoModel; +import org.datatransferproject.types.common.models.videos.VideoModel; +import org.datatransferproject.types.transfer.serviceconfig.TransferServiceConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class SynologyDTPServiceOOMTest { + private final String exportingService = "mockService"; + private final UUID jobId = UUID.randomUUID(); + private SynologyDTPService dtpService; + @Mock private Monitor monitor; + @Mock private TransferServiceConfig transferServiceConfig; + @Mock private JobStore jobStore; + @Mock private SynologyOAuthTokenManager tokenManager; + @Captor private ArgumentCaptor requestBodyCaptor; + @Mock private OkHttpClient client; + + // Helper class for OOM test + private static class FakeLargeInputStream extends InputStream { + private final long size; + private long bytesRead = 0; + private final byte[] singleByte = new byte[] {'a'}; + + public FakeLargeInputStream(long size) { + this.size = size; + } + + @Override + public int read() { + if (bytesRead >= size) { + return -1; // End of stream + } + bytesRead++; + return singleByte[0]; + } + + @Override + public int read(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (bytesRead >= size) { + return -1; + } + long remaining = size - bytesRead; + int toRead = (int) Math.min(len, remaining); + + // Don't bother filling the array, we are just simulating reading + // Arrays.fill(b, off, off + toRead, singleByte[0]); + + bytesRead += toRead; + return toRead; + } + + @Override + public int available() { + long remaining = size - bytesRead; + return remaining > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) remaining; + } + } + + private static class BlackholeSink implements Sink { + @Override + public void write(Buffer source, long byteCount) throws IOException { + source.skip(byteCount); + } + + @Override + public void flush() throws IOException {} + + @Override + public Timeout timeout() { + return Timeout.NONE; + } + + @Override + public void close() throws IOException {} + } + + @BeforeEach + public void setUp() throws InvalidTokenException { + when(transferServiceConfig.getServiceConfig()) + .thenReturn(TestConfigs.createServiceConfigJson()); + dtpService = + new SynologyDTPService( + monitor, transferServiceConfig, exportingService, jobStore, tokenManager, client); + } + + @Test + public void createPhoto_withLargeFile_shouldStreamData() throws Exception { + // setup + long oneGB = 1024L * 1024L * 1024L; + InputStream fakeInputStream = new FakeLargeInputStream(oneGB); + InputStreamWrapper streamWrapper = new InputStreamWrapper(fakeInputStream, oneGB); + + PhotoModel photo = + new PhotoModel( + "large-photo", "large-photo-url", "desc", "image/jpeg", "photo-id", null, true); + + when(jobStore.getStream(jobId, "large-photo-url")).thenReturn(streamWrapper); + + SynologyDTPService spyService = Mockito.spy(dtpService); + doReturn(Map.of("success", true, "data", Map.of("item_id", "photo-id"))) + .when(spyService) + .sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); + + // act + spyService.createPhoto(photo, jobId); + + // Get the generated request body + RequestBody requestBody = requestBodyCaptor.getValue().get(); + + // assert + // The content length is not asserted to be equal to 1GB because it includes multipart + // boundaries and headers, + // making it larger than the raw file size. The main purpose of this test is to ensure that + // the file is streamed without causing an OutOfMemoryError. + assertTrue(requestBody.contentLength() > oneGB); + + // Simulate writing the body to a sink that discards the data. + // This will throw OutOfMemoryError if the whole stream is loaded into memory. + okio.BufferedSink discardingSink = Okio.buffer(new BlackholeSink()); + + // The MultipartBody will try to read the stream to write it. + // If it buffers the whole 1GB in memory, this will OOM. + requestBody.writeTo(discardingSink); + discardingSink.flush(); + + // If we reach here, it means we streamed the data without OOM. + } + + @Test + public void createVideo_withLargeFile_shouldStreamData() throws Exception { + // setup + long oneGB = 1024L * 1024L * 1024L; + InputStream fakeInputStream = new FakeLargeInputStream(oneGB); + InputStreamWrapper streamWrapper = new InputStreamWrapper(fakeInputStream, oneGB); + + VideoModel video = + new VideoModel( + "large-video", "large-video-url", "desc", "video/mp4", "video-id", null, true, null); + + when(jobStore.getStream(jobId, "large-video-url")).thenReturn(streamWrapper); + + SynologyDTPService spyService = Mockito.spy(dtpService); + doReturn(Map.of("success", true, "data", Map.of("item_id", "video-id"))) + .when(spyService) + .sendPostRequest(anyString(), requestBodyCaptor.capture(), any(), anyInt()); + + // act + spyService.createVideo(video, jobId); + + // Get the generated request body + RequestBody requestBody = requestBodyCaptor.getValue().get(); + + // assert + // The content length is not asserted to be equal to 1GB because it includes multipart + // boundaries and headers, + // making it larger than the raw file size. The main purpose of this test is to ensure that + // the file is streamed without causing an OutOfMemoryError. + assertTrue(requestBody.contentLength() > oneGB); + + // Simulate writing the body to a sink that discards the data. + // This will throw OutOfMemoryError if the whole stream is loaded into memory. + okio.BufferedSink discardingSink = Okio.buffer(new BlackholeSink()); + + // The MultipartBody will try to read the stream to write it. + // If it buffers the whole 1GB in memory, this will OOM. + requestBody.writeTo(discardingSink); + discardingSink.flush(); + + // If we reach here, it means we streamed the data without OOM. + } +} diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceTest.java b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceTest.java index ba2925a65..cb6757905 100644 --- a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceTest.java +++ b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPServiceTest.java @@ -24,14 +24,26 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.MalformedURLException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -48,6 +60,7 @@ import okhttp3.ResponseBody; import okio.Buffer; import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.datatransfer.synology.service.SynologyDTPService.RequestBodyGenerator; import org.datatransferproject.datatransfer.synology.utils.TestConfigs; import org.datatransferproject.spi.cloud.storage.JobStore; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; @@ -84,7 +97,7 @@ public class SynologyDTPServiceTest { @Mock protected TransferServiceConfig transferServiceConfig; @Mock protected JobStore jobStore; @Mock protected SynologyOAuthTokenManager tokenManager; - @Captor ArgumentCaptor requestBodyCaptor; + @Captor ArgumentCaptor requestBodyCaptor; @Mock private OkHttpClient client; @BeforeEach @@ -105,16 +118,19 @@ public class AddItemToAlbum { private final String itemId = "testItem"; @Test - public void shouldSendPostRequestWithCorrectFormBody() throws CopyExceptionWithFailureReason { + public void shouldSendPostRequestWithCorrectFormBody() + throws CopyExceptionWithFailureReason, IOException { SynologyDTPService spyService = Mockito.spy(dtpService); - doReturn(Map.of("success", true)).when(spyService).sendPostRequest(anyString(), any(), any()); + doReturn(Map.of("success", true)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); Map result = spyService.addItemToAlbum(albumId, itemId, jobId); verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); assertEquals(result.get("success"), true); - RequestBody capturedBody = requestBodyCaptor.getValue(); + RequestBody capturedBody = requestBodyCaptor.getValue().get(); assertTrue(capturedBody instanceof FormBody); FormBody formBody = (FormBody) capturedBody; @@ -131,12 +147,12 @@ public void shouldSendPostRequestWithCorrectFormBody() throws CopyExceptionWithF @Test public void shouldThrowExceptionIfSendPostRequestFailed() - throws CopyExceptionWithFailureReason { + throws CopyExceptionWithFailureReason, IOException { SynologyDTPService spyService = Mockito.spy(dtpService); doThrow(new UploadErrorException("MockException", null)) .when(spyService) - .sendPostRequest(anyString(), any(), any()); + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); assertThrows( UploadErrorException.class, @@ -152,20 +168,21 @@ public class CreateAlbum { private final MediaAlbum album = new MediaAlbum(albumId, albumName, ""); @Test - public void shouldSendPostRequestWithCorrectFormBody() throws CopyExceptionWithFailureReason { + public void shouldSendPostRequestWithCorrectFormBody() + throws CopyExceptionWithFailureReason, IOException { SynologyDTPService spyService = Mockito.spy(dtpService); Map dataMap = Map.of("album_id", albumId); doReturn(Map.of("success", true, "data", dataMap)) .when(spyService) - .sendPostRequest(anyString(), any(), any()); + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); Map result = spyService.createAlbum(album, jobId); verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); assertEquals(result.get("success"), null); assertEquals(result.get("album_id"), albumId); - RequestBody capturedBody = requestBodyCaptor.getValue(); + RequestBody capturedBody = requestBodyCaptor.getValue().get(); assertTrue(capturedBody instanceof FormBody); FormBody formBody = (FormBody) capturedBody; @@ -182,12 +199,12 @@ public void shouldSendPostRequestWithCorrectFormBody() throws CopyExceptionWithF @Test public void shouldThrowExceptionIfSendPostRequestFailed() - throws CopyExceptionWithFailureReason { + throws CopyExceptionWithFailureReason, IOException { SynologyDTPService spyService = Mockito.spy(dtpService); doThrow(new UploadErrorException("MockException", null)) .when(spyService) - .sendPostRequest(anyString(), any(), any()); + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); assertThrows( UploadErrorException.class, () -> spyService.createAlbum(album, jobId), "MockException"); @@ -224,108 +241,6 @@ public Stream provideMediaObjectsNotInTempStore() { new VideoModel(itemName, fetchUrl, description, "format", itemId, null, false, null)); } - @ParameterizedTest(name = "shouldGetStreamFromJobStoreIfMediaItemIsInTempStore [{index}] {0}") - @MethodSource("provideMediaObjectsInTempStore") - public void shouldGetStreamFromJobStoreIfMediaItemIsInTempStore(DownloadableFile item) - throws IOException, CopyExceptionWithFailureReason { - byte[] mockImage = new byte[] {1, 2, 3}; - InputStream mockInputStream = new ByteArrayInputStream(mockImage); - InputStreamWrapper streamWrapper = mock(InputStreamWrapper.class); - SynologyDTPService spyService = Mockito.spy(dtpService); - - when(jobStore.getStream(jobId, fetchUrl)).thenReturn(streamWrapper); - when(streamWrapper.getStream()).thenReturn(mockInputStream); - doReturn(mock(Map.class)).when(spyService).sendPostRequest(anyString(), any(), any()); - - if (item instanceof PhotoModel) { - spyService.createPhoto((PhotoModel) item, jobId); - } else if (item instanceof VideoModel) { - spyService.createVideo((VideoModel) item, jobId); - } - - verify(spyService).sendPostRequest(anyString(), any(), any()); - verify(streamWrapper).getStream(); - verify(spyService, never()).getInputStream(fetchUrl); - } - - @ParameterizedTest(name = "shouldGetStreamFromUrlIfPhotoIsNotInTempStore [{index}] {0}") - @MethodSource("provideMediaObjectsNotInTempStore") - public void shouldGetStreamFromUrlIfPhotoIsNotInTempStore(DownloadableFile item) - throws IOException, CopyExceptionWithFailureReason { - byte[] mockImage = new byte[] {1, 2, 3}; - SynologyDTPService spyService = Mockito.spy(dtpService); - InputStream mockInputStream = new ByteArrayInputStream(mockImage); - - doReturn(mockInputStream).when(spyService).getInputStream(fetchUrl); - doReturn(mock(Map.class)).when(spyService).sendPostRequest(anyString(), any(), any()); - - if (item instanceof PhotoModel) { - spyService.createPhoto((PhotoModel) item, jobId); - } else if (item instanceof VideoModel) { - spyService.createVideo((VideoModel) item, jobId); - } - - verify(spyService).sendPostRequest(anyString(), any(), any()); - verify(spyService).getInputStream(fetchUrl); - verifyNoInteractions(jobStore); - } - - // VideoModel.contentUrl can't be null - @Test - public void shouldReturnNullWhenPhotoIsNotInTempStoreAndHasNoUrl() - throws CopyExceptionWithFailureReason { - PhotoModel photo = - new PhotoModel(itemName, null, description, "mediaType", itemId, null, false); - SynologyDTPService spyService = Mockito.spy(dtpService); - assertEquals(null, spyService.createPhoto((PhotoModel) photo, jobId)); - verify(spyService, never()).sendPostRequest(anyString(), any(), any()); - } - - @ParameterizedTest(name = "shouldThrowExceptionIfNewURLFailed [{index}] {0}") - @MethodSource("provideMediaObjectsNotInTempStore") - public void shouldThrowExceptionIfNewURLFailed(DownloadableFile item) - throws IOException, CopyExceptionWithFailureReason { - SynologyDTPService spyService = Mockito.spy(dtpService); - doThrow(new MalformedURLException("Failed to create url for photo")) - .when(spyService) - .getInputStream(fetchUrl); - - if (item instanceof PhotoModel) { - assertThrows( - UploadErrorException.class, - () -> spyService.createPhoto((PhotoModel) item, jobId), - "Failed to create url for photo"); - } else if (item instanceof VideoModel) { - assertThrows( - UploadErrorException.class, - () -> spyService.createVideo((VideoModel) item, jobId), - "Failed to create url for video"); - } - - verify(spyService, never()).sendPostRequest(anyString(), any(), any()); - } - - @ParameterizedTest(name = "shouldThrowExceptionIfFailedToGetStream [{index}] {0}") - @MethodSource("provideMediaObjectsInTempStore") - public void shouldThrowExceptionIfFailedToGetStream(DownloadableFile item) - throws IOException, CopyExceptionWithFailureReason { - SynologyDTPService spyService = Mockito.spy(dtpService); - when(jobStore.getStream(jobId, fetchUrl)).thenThrow(new IOException("Failed to get stream")); - - if (item instanceof PhotoModel) { - assertThrows( - UploadErrorException.class, - () -> spyService.createPhoto((PhotoModel) item, jobId), - "Failed to create input stream for photo"); - } else if (item instanceof VideoModel) { - assertThrows( - UploadErrorException.class, - () -> spyService.createVideo((VideoModel) item, jobId), - "Failed to create input stream for video"); - } - verify(spyService, never()).sendPostRequest(anyString(), any(), any()); - } - @ParameterizedTest( name = "shouldSendPostRequestWithCorrectFormBodyWithDescriptionAndUploadedTime [{index}] {0}") @@ -340,22 +255,26 @@ public void shouldSendPostRequestWithCorrectFormBodyWithDescriptionAndUploadedTi when(jobStore.getStream(jobId, fetchUrl)).thenReturn(streamWrapper); when(streamWrapper.getStream()).thenReturn(mockInputStream); - doReturn(Map.of("success", true, "data", dataMap)) - .when(spyService) - .sendPostRequest(anyString(), any(), any()); - Map result = new HashMap(); if (item instanceof PhotoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); result = spyService.createPhoto((PhotoModel) item, jobId); + verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); } else if (item instanceof VideoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any(), anyInt()); result = spyService.createVideo((VideoModel) item, jobId); + verify(spyService) + .sendPostRequest(anyString(), requestBodyCaptor.capture(), any(), anyInt()); } - verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); assertEquals(result.get("item_id"), itemId); assertEquals(result.get("success"), null); - RequestBody capturedBody = requestBodyCaptor.getValue(); + RequestBody capturedBody = requestBodyCaptor.getValue().get(); MultipartBody multipartBody = (MultipartBody) capturedBody; Map multipartFormAnswer = @@ -401,22 +320,26 @@ public void shouldSendPostRequestWithCorrectFormBodyWithoutDescription(Downloada when(jobStore.getStream(jobId, fetchUrl)).thenReturn(streamWrapper); when(streamWrapper.getStream()).thenReturn(mockInputStream); - doReturn(Map.of("success", true, "data", dataMap)) - .when(spyService) - .sendPostRequest(anyString(), any(), any()); - Map result = new HashMap(); if (item instanceof PhotoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); result = spyService.createPhoto((PhotoModel) item, jobId); + verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); } else if (item instanceof VideoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any(), anyInt()); result = spyService.createVideo((VideoModel) item, jobId); + verify(spyService) + .sendPostRequest(anyString(), requestBodyCaptor.capture(), any(), anyInt()); } - verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); assertEquals(result.get("item_id"), itemId); assertEquals(result.get("success"), null); - RequestBody capturedBody = requestBodyCaptor.getValue(); + RequestBody capturedBody = requestBodyCaptor.getValue().get(); MultipartBody multipartBody = (MultipartBody) capturedBody; Map multipartFormAnswer = @@ -446,27 +369,61 @@ public void shouldSendPostRequestWithCorrectFormBodyWithoutDescription(Downloada } } - @ParameterizedTest(name = "shouldThrowExceptionIfSendPostRequestFailed [{index}] {0}") + @ParameterizedTest(name = "shouldThrowExceptionIfInputStreamIsConsumed [{index}] {0}") @MethodSource("provideMediaObjectsInTempStore") - public void shouldThrowExceptionIfSendPostRequestFailed(DownloadableFile item) + public void shouldThrowExceptionIfInputStreamIsConsumed(DownloadableFile item) throws IOException, CopyExceptionWithFailureReason { byte[] mockImage = new byte[] {1, 2, 3}; InputStream mockInputStream = new ByteArrayInputStream(mockImage); InputStreamWrapper streamWrapper = mock(InputStreamWrapper.class); SynologyDTPService spyService = Mockito.spy(dtpService); + Map dataMap = Map.of("item_id", itemId); when(jobStore.getStream(jobId, fetchUrl)).thenReturn(streamWrapper); when(streamWrapper.getStream()).thenReturn(mockInputStream); - doThrow(new UploadErrorException("MockException", null)) - .when(spyService) - .sendPostRequest(anyString(), any(), any()); + if (item instanceof PhotoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); + spyService.createPhoto((PhotoModel) item, jobId); + verify(spyService).sendPostRequest(anyString(), requestBodyCaptor.capture(), any()); + } else if (item instanceof VideoModel) { + doReturn(Map.of("success", true, "data", dataMap)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any(), anyInt()); + spyService.createVideo((VideoModel) item, jobId); + verify(spyService) + .sendPostRequest(anyString(), requestBodyCaptor.capture(), any(), anyInt()); + } + + RequestBody capturedBody = requestBodyCaptor.getValue().get(); + Buffer buffer = new Buffer(); + capturedBody.writeTo(buffer); + IOException exception = assertThrows(IOException.class, () -> capturedBody.writeTo(buffer)); + assertTrue(exception.getMessage().contains("InputStream has already been consumed")); + } + + @ParameterizedTest(name = "shouldThrowExceptionIfSendPostRequestFailed [{index}] {0}") + @MethodSource("provideMediaObjectsInTempStore") + public void shouldThrowExceptionIfSendPostRequestFailed(DownloadableFile item) + throws IOException, CopyExceptionWithFailureReason { + byte[] mockImage = new byte[] {1, 2, 3}; + InputStream mockInputStream = new ByteArrayInputStream(mockImage); + InputStreamWrapper streamWrapper = mock(InputStreamWrapper.class); + SynologyDTPService spyService = Mockito.spy(dtpService); if (item instanceof PhotoModel) { + doThrow(new UploadErrorException("MockException", null)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any()); assertThrows( UploadErrorException.class, () -> spyService.createPhoto((PhotoModel) item, jobId), "MockException"); } else if (item instanceof VideoModel) { + doThrow(new UploadErrorException("MockException", null)) + .when(spyService) + .sendPostRequest(anyString(), any(RequestBodyGenerator.class), any(), anyInt()); assertThrows( UploadErrorException.class, () -> spyService.createVideo((VideoModel) item, jobId), @@ -497,7 +454,7 @@ public void shouldRetryIfGotError() throws IOException, CopyExceptionWithFailure when(mockCall.execute()).thenReturn(mockResponseFail).thenReturn(mockResponseSuccess); Map result = - dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId); + dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId); assertEquals(Map.of("success", true), result); @@ -518,9 +475,10 @@ public void shouldThrowExceptionIfReachMaxRetries() throws IOException { when(mockCall.execute()).thenReturn(mockResponseFail); assertThrows( - UploadErrorException.class, - () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId), - String.format("Failed to send POST request %d times", TestConfigs.TEST_MAX_ATTEMPTS)); + IOException.class, + () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId), + String.format( + "Failed to send POST request after %d retries", TestConfigs.TEST_MAX_ATTEMPTS)); verify(tokenManager, never()) .refreshToken(any(UUID.class), eq(client), any(ObjectMapper.class)); verify(client, times(TestConfigs.TEST_MAX_ATTEMPTS)).newCall(any(Request.class)); @@ -547,7 +505,7 @@ public void shouldRefreshTokenAndRetryIfGotUnauthorizedHttpError() .thenReturn(true); Map result = - dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId); + dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId); assertEquals(Map.of("success", true), result); @@ -571,13 +529,12 @@ public void shouldInvokeRefreshTokenOnlyOnceIfGotUnauthorizedMultipleTimes() .thenReturn(false); assertThrows( - UploadErrorException.class, - () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId), - String.format("Failed to send POST request %d times", TestConfigs.TEST_MAX_ATTEMPTS)); + InvalidTokenException.class, + () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId)); verify(tokenManager, times(1)) .refreshToken(any(UUID.class), eq(client), any(ObjectMapper.class)); - verify(client, times(TestConfigs.TEST_MAX_ATTEMPTS)).newCall(any(Request.class)); + verify(client, times(2)).newCall(any(Request.class)); } @Test @@ -594,9 +551,10 @@ public void shouldThrowExceptionIfParseResponseFailed() throws IOException { .thenThrow(new IOException("Error when call response.body.string()")); assertThrows( - UploadErrorException.class, - () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId), - String.format("Failed to send POST request %d times", TestConfigs.TEST_MAX_ATTEMPTS)); + IOException.class, + () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId), + String.format( + "Failed to send POST request after %d retries", TestConfigs.TEST_MAX_ATTEMPTS)); verify(tokenManager, never()) .refreshToken(any(UUID.class), eq(client), any(ObjectMapper.class)); verify(client, times(TestConfigs.TEST_MAX_ATTEMPTS)).newCall(any(Request.class)); @@ -616,7 +574,7 @@ public void shouldReturnResponseData() when(mockResponseBody.string()).thenReturn("{\"success\": true}"); Map result = - dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId); + dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId); assertEquals(Map.of("success", true), result); verify(tokenManager, never()) .refreshToken(any(UUID.class), eq(client), any(ObjectMapper.class)); @@ -636,7 +594,7 @@ public void shouldThrowExceptionIfGot413() throws IOException { assertThrows( DestinationMemoryFullException.class, - () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId)); + () -> dtpService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId)); } @Test @@ -660,7 +618,7 @@ public void shouldCallCheckUnprocessableContentIfGot422() assertThrows( NoNasInAccountException.class, - () -> spyService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, requestBody, jobId)); + () -> spyService.sendPostRequest(TestConfigs.TEST_C2_BASE_URL, () -> requestBody, jobId)); } } @@ -715,4 +673,59 @@ public void shouldNotThrowExceptionIfBodyIsInvalidJson() throws CopyExceptionWit dtpService.throwExceptionIfNoQuota(response); } } + + @Nested + public class GetMediaInputStreamWrapperTest { + @Test + public void shouldGetStreamFromJobStore() throws CopyExceptionWithFailureReason, IOException { + // setup + String fetchableUrl = "some_key"; + InputStream inputStream = new ByteArrayInputStream("test data".getBytes()); + long size = 100L; + InputStreamWrapper expected = new InputStreamWrapper(inputStream, size); + when(jobStore.getStream(jobId, fetchableUrl)).thenReturn(expected); + + // act + InputStreamWrapper result = dtpService.getMediaInputStreamWrapper(jobId, fetchableUrl, true); + + // assert + assertEquals(expected, result); + } + + @Test + public void shouldGetStreamFromUrl() throws Exception { + // setup + Path tempFile = Files.createTempFile("test", ".txt"); + tempFile.toFile().deleteOnExit(); + byte[] testData = "test data".getBytes(); + Files.write(tempFile, testData); + String fetchableUrl = tempFile.toUri().toURL().toString(); + + // act + InputStreamWrapper result = dtpService.getMediaInputStreamWrapper(jobId, fetchableUrl, false); + + // assert + assertEquals(testData.length, result.getBytes()); + try (InputStream is = result.getStream()) { + byte[] bytes = new byte[is.available()]; + is.read(bytes); + assertArrayEquals(testData, bytes); + } + } + + @Test + public void shouldThrowExceptionWhenNoSource() { + assertThrows( + IllegalArgumentException.class, + () -> dtpService.getMediaInputStreamWrapper(jobId, null, false)); + } + + @Test + public void shouldThrowExceptionForMalformedUrl() { + String malformedUrl = "this is not a url"; + assertThrows( + IOException.class, + () -> dtpService.getMediaInputStreamWrapper(jobId, malformedUrl, false)); + } + } } diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/signals/SynologySignalHandlerTest.java b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/signals/SynologySignalHandlerTest.java index 12f31498c..116d56e36 100644 --- a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/signals/SynologySignalHandlerTest.java +++ b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/signals/SynologySignalHandlerTest.java @@ -30,13 +30,11 @@ import org.datatransferproject.datatransfer.synology.service.SynologyDTPService; import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager; import org.datatransferproject.spi.transfer.provider.SignalRequest; -import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason; import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle; import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.EndReason; import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.State; import org.datatransferproject.types.common.models.DataVertical; import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData; -import org.datatransferproject.types.transfer.retry.RetryException; import org.datatransferproject.types.transfer.retry.RetryMapping; import org.datatransferproject.types.transfer.retry.RetryStrategy; import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary; @@ -75,7 +73,7 @@ public void setUp() { } @Test - public void testSendSignal() throws RetryException, CopyExceptionWithFailureReason { + public void testSendSignal() throws Exception { JobLifeCycle jobStatus = JobLifeCycle.builder() .setState(State.ENDED) @@ -100,7 +98,7 @@ public void testSendSignal() throws RetryException, CopyExceptionWithFailureReas } @Test - public void testSendSignalRetry() throws RetryException, CopyExceptionWithFailureReason { + public void testSendSignalRetry() throws Exception { JobLifeCycle jobStatus = JobLifeCycle.builder() .setState(State.ENDED) diff --git a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploaderTest.java b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploaderTest.java index 42962d3f3..d27def76d 100644 --- a/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploaderTest.java +++ b/extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/uploader/SynologyUploaderTest.java @@ -115,7 +115,7 @@ public void shouldImportAlbums() throws Exception { } @Test - public void shouldThrowExceptionIfCreateAlbumFails() throws CopyExceptionWithFailureReason { + public void shouldThrowExceptionIfCreateAlbumFails() throws Exception { SynologyUploader spyUploader = Mockito.spy(new SynologyUploader(executor, monitor, synologyDTPService)); List albums = List.of(new MediaAlbum("1", "album1", "desc")); @@ -156,7 +156,7 @@ private Stream provideMediaItems() { } @BeforeEach - public void setUp() throws CopyExceptionWithFailureReason { + public void setUp() throws Exception { lenient() .when(synologyDTPService.addItemToAlbum(any(), any(), any())) .thenReturn(Map.of("success", true)); @@ -190,7 +190,7 @@ public void setUp() throws CopyExceptionWithFailureReason { @ParameterizedTest(name = "shouldImportMediaItemsWhenAlbumBeforeItem [{index}] {0}") @MethodSource("provideMediaItems") public void shouldImportMediaItemsWhenAlbumBeforeItem( - List mediaItems) throws CopyExceptionWithFailureReason { + List mediaItems) throws Exception { SynologyUploader uploader = new SynologyUploader(executor, monitor, synologyDTPService); List albums = List.of(new MediaAlbum(albumId, "album1", "desc")); @@ -268,12 +268,11 @@ public void shouldImportMediaItemsWithCache(List med } } - @ParameterizedTest(name = "shouldThrowExceptionIfCreateMediaItemNotSuccess [{index}] {0}") + @ParameterizedTest(name = "shouldNotThrowExceptionIfCreateMediaItemNotSuccess [{index}] {0}") @MethodSource("provideMediaItems") - public void shouldThrowExceptionIfCreateMediaItemNotSuccess( - List mediaItems) throws CopyExceptionWithFailureReason { - SynologyUploader spyUploader = - Mockito.spy(new SynologyUploader(executor, monitor, synologyDTPService)); + public void shouldNotThrowExceptionIfCreateMediaItemNotSuccess( + List mediaItems) throws Exception { + SynologyUploader uploader = new SynologyUploader(executor, monitor, synologyDTPService); List albums = List.of(new MediaAlbum("1", "album1", "desc")); if (mediaItems.get(0) instanceof PhotoModel) { @@ -282,27 +281,19 @@ public void shouldThrowExceptionIfCreateMediaItemNotSuccess( when(synologyDTPService.createVideo(any(), any())).thenReturn(Map.of("success", false)); } - spyUploader.importAlbums(albums, mockJobId); + uploader.importAlbums(albums, mockJobId); if (mediaItems.get(0) instanceof PhotoModel) { - Exception e = - assertThrows( - CopyExceptionWithFailureReason.class, - () -> spyUploader.importPhotos((List) mediaItems, mockJobId)); - assertTrue(containsMessage(e, "Failed to import item")); + uploader.importPhotos((List) mediaItems, mockJobId); } else if (mediaItems.get(0) instanceof VideoModel) { - Exception e = - assertThrows( - CopyExceptionWithFailureReason.class, - () -> spyUploader.importVideos((List) mediaItems, mockJobId)); - assertTrue(containsMessage(e, "Failed to import item")); + uploader.importVideos((List) mediaItems, mockJobId); } } @ParameterizedTest(name = "shouldThrowExceptionIfAddItemToAlbumFails [{index}] {0}") @MethodSource("provideMediaItems") public void shouldThrowExceptionIfAddItemToAlbumFails( - List mediaItems) throws CopyExceptionWithFailureReason { + List mediaItems) throws Exception { SynologyUploader spyUploader = Mockito.spy(new SynologyUploader(executor, monitor, synologyDTPService)); List albums = List.of(new MediaAlbum("1", "album1", "desc")); @@ -328,31 +319,21 @@ public void shouldThrowExceptionIfAddItemToAlbumFails( } } - @ParameterizedTest(name = "shouldThrowExceptionIfAddItemToAlbumNotSuccess [{index}] {0}") + @ParameterizedTest(name = "shouldNotThrowExceptionIfAddItemToAlbumNotSuccess [{index}] {0}") @MethodSource("provideMediaItems") - public void shouldThrowExceptionIfAddItemToAlbumNotSuccess( - List mediaItems) throws CopyExceptionWithFailureReason { - SynologyUploader spyUploader = - Mockito.spy(new SynologyUploader(executor, monitor, synologyDTPService)); + public void shouldNotThrowExceptionIfAddItemToAlbumNotSuccess( + List mediaItems) throws Exception { + SynologyUploader uploader = new SynologyUploader(executor, monitor, synologyDTPService); List albums = List.of(new MediaAlbum("1", "album1", "desc")); - String expectedMessage = "Unsuccessful"; when(synologyDTPService.addItemToAlbum(any(), any(), any())) .thenReturn(Map.of("success", false)); - spyUploader.importAlbums(albums, mockJobId); + uploader.importAlbums(albums, mockJobId); if (mediaItems.get(0) instanceof PhotoModel) { - Exception e = - assertThrows( - CopyExceptionWithFailureReason.class, - () -> spyUploader.importPhotos((List) mediaItems, mockJobId)); - assertTrue(containsMessage(e, expectedMessage)); + uploader.importPhotos((List) mediaItems, mockJobId); } else if (mediaItems.get(0) instanceof VideoModel) { - Exception e = - assertThrows( - CopyExceptionWithFailureReason.class, - () -> spyUploader.importVideos((List) mediaItems, mockJobId)); - assertTrue(containsMessage(e, expectedMessage)); + uploader.importVideos((List) mediaItems, mockJobId); } } }