Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.datatransferproject.datatransfer.synology.photos.SynologyPhotosImporter;
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
import org.datatransferproject.datatransfer.synology.signals.SynologySignalHandler;
import org.datatransferproject.datatransfer.synology.uploader.SynologyUploader;
import org.datatransferproject.datatransfer.synology.videos.SynologyVideosImporter;
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
Expand All @@ -23,6 +24,7 @@
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension;
import org.datatransferproject.spi.transfer.provider.Exporter;
import org.datatransferproject.spi.transfer.provider.Importer;
import org.datatransferproject.spi.transfer.provider.SignalHandler;
import org.datatransferproject.transfer.JobMetadata;
import org.datatransferproject.types.common.models.DataVertical;
import org.datatransferproject.types.transfer.auth.AppCredentials;
Expand All @@ -36,6 +38,7 @@ public class SynologyTransferExtension implements TransferExtension {
private boolean initialized = false;

private ImmutableMap<DataVertical, Importer> importerMap;
private SynologySignalHandler signalHandler;

@Override
public String getServiceId() {
Expand Down Expand Up @@ -64,6 +67,11 @@ private String getExtensionSecret() {
return importerMap.get(transferDataType);
}

@Override
public SignalHandler<?> getSignalHandler() {
return signalHandler;
}

@Override
public void initialize(ExtensionContext context) {
if (initialized) {
Expand Down Expand Up @@ -113,6 +121,9 @@ public void initialize(ExtensionContext context) {
importerBuilder.put(
VIDEOS, new SynologyVideosImporter(monitor, tokenManager, synologyUploader));
importerMap = importerBuilder.build();
signalHandler =
new SynologySignalHandler(
tokenManager, synologyDTPService, context.getSetting("retryLibrary", null));

monitor.info(() -> "Initializing SynologyTransferExtension");
initialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class C2Api extends ServiceConfig.Service {
private final String createAlbum;
private final String uploadItem;
private final String addItemToAlbum;
private final String signalJob;

@JsonCreator
public C2Api(@JsonProperty("baseUrl") String baseUrl, @JsonProperty("apiPath") ApiPath apiPath) {
Expand All @@ -38,6 +39,7 @@ public C2Api(@JsonProperty("baseUrl") String baseUrl, @JsonProperty("apiPath") A
this.createAlbum = UrlUtils.join(baseUrl, apiPath.getCreateAlbumPath());
this.uploadItem = UrlUtils.join(baseUrl, apiPath.getUploadItemPath());
this.addItemToAlbum = UrlUtils.join(baseUrl, apiPath.getAddItemToAlbumPath());
this.signalJob = UrlUtils.join(baseUrl, apiPath.getSignalJobPath());
}

public ApiPath getApiPath() {
Expand All @@ -56,19 +58,26 @@ public String getAddItemToAlbum() {
return addItemToAlbum;
}

public String getSignalJob() {
return signalJob;
}

public static class ApiPath {
private final String createAlbumPath;
private final String uploadItemPath;
private final String addItemToAlbumPath;
private final String signalJobPath;

@JsonCreator
public ApiPath(
@JsonProperty("createAlbum") String createAlbumPath,
@JsonProperty("uploadItem") String uploadItemPath,
@JsonProperty("addItemToAlbum") String addItemToAlbumPath) {
@JsonProperty("addItemToAlbum") String addItemToAlbumPath,
@JsonProperty("signalJob") String signalJobPath) {
this.createAlbumPath = createAlbumPath;
this.uploadItemPath = uploadItemPath;
this.addItemToAlbumPath = addItemToAlbumPath;
this.signalJobPath = signalJobPath;
}

public String getCreateAlbumPath() {
Expand All @@ -82,5 +91,9 @@ public String getUploadItemPath() {
public String getAddItemToAlbumPath() {
return addItemToAlbumPath;
}

public String getSignalJobPath() {
return signalJobPath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.NoNasInAccountException;
import org.datatransferproject.spi.transfer.types.UploadErrorException;
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle;
import org.datatransferproject.types.common.models.media.MediaAlbum;
import org.datatransferproject.types.common.models.photos.PhotoModel;
import org.datatransferproject.types.common.models.videos.VideoModel;
Expand Down Expand Up @@ -246,6 +247,27 @@ public Map<String, Object> addItemToAlbum(String albumId, String itemId, UUID jo
return sendPostRequest(c2Api.getAddItemToAlbum(), builder.build(), jobId);
}

/**
* Updates job status.
*
* @param jobStatus the job status
* @param jobId the job ID
* @return a map of shape {"success": <bool>}
*/
public Map<String, Object> sendJobSignal(JobLifeCycle jobStatus, UUID jobId)
throws CopyExceptionWithFailureReason {
FormBody.Builder builder =
new FormBody.Builder()
.add("job_id", jobId.toString())
.add("service", exportingService)
.add("state", jobStatus.state().name());
if (jobStatus.endReason() != null) {
builder.add("end_reason", jobStatus.endReason().name());
}

return sendPostRequest(c2Api.getSignalJob(), builder.build(), jobId);
}

@VisibleForTesting
protected void throwExceptionIfNoQuota(Response response) throws CopyExceptionWithFailureReason {
String errorCode = "";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2026 The Data Transfer Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.datatransferproject.datatransfer.synology.signals;

import java.time.Clock;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
import org.datatransferproject.spi.transfer.provider.SignalHandler;
import org.datatransferproject.spi.transfer.provider.SignalRequest;
import org.datatransferproject.types.transfer.auth.AuthData;
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
import org.datatransferproject.types.transfer.retry.RetryException;
import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary;
import org.datatransferproject.types.transfer.retry.RetryingCallable;

/** A {@link SignalHandler} for Synology. */
public class SynologySignalHandler implements SignalHandler<TokensAndUrlAuthData> {
private final SynologyOAuthTokenManager tokenManager;
private final SynologyDTPService synologyDTPService;
private final RetryStrategyLibrary retryStrategyLibrary;

/**
* Constructs a new {@code SynologySignalHandler} instance.
*
* @param synologyDTPService the Synology DTP service
* @param tokenManager the Synology OAuth token manager
* @param retryStrategyLibrary the retry strategy library for handling transient failures
*/
public SynologySignalHandler(
SynologyOAuthTokenManager tokenManager,
SynologyDTPService synologyDTPService,
RetryStrategyLibrary retryStrategyLibrary) {
this.tokenManager = tokenManager;
this.synologyDTPService = synologyDTPService;
this.retryStrategyLibrary = retryStrategyLibrary;
}

@Override
public void sendSignal(SignalRequest signalRequest, AuthData authData, Monitor monitor)
throws RetryException {
Objects.requireNonNull(signalRequest, "signalRequest cannot be null");
Objects.requireNonNull(authData, "authData cannot be null");
Objects.requireNonNull(monitor, "monitor cannot be null");

UUID uuidJobId = UUID.fromString(signalRequest.jobId());
tokenManager.addAuthDataIfNotExist(uuidJobId, (TokensAndUrlAuthData) authData);

monitor.info(
() ->
String.format(
"[SynologySignalHandler] Received signal for jobId: %s, jobStatus.state: %s,"
+ " jobStatus.endReason: %s",
signalRequest.jobId(),
signalRequest.jobStatus().state(),
signalRequest.jobStatus().endReason()));

Callable<Void> sendJobSignalCallable =
() -> {
Boolean success =
(Boolean)
this.synologyDTPService
.sendJobSignal(signalRequest.jobStatus(), uuidJobId)
.get("success");

if (success) {
monitor.debug(
() ->
String.format(
"[SynologySignalHandler] Successfully sent signal for jobId: %s,"
+ " jobStatus.state: %s, jobStatus.endReason: %s",
signalRequest.jobId(),
signalRequest.jobStatus().state(),
signalRequest.jobStatus().endReason()));
return null;
}

throw new RuntimeException(
String.format(
"Failed to send signal for jobId: %s. Response with success=false.",
signalRequest.jobId()));
};

RetryingCallable<Void> retryingSendJobSignalCallable =
new RetryingCallable<>(
sendJobSignalCallable, retryStrategyLibrary, Clock.systemUTC(), monitor);

try {
retryingSendJobSignalCallable.call();
} catch (Throwable e) {
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ serviceConfig:
createAlbum: "/import/album"
uploadItem: "/import/item"
addItemToAlbum: "/import/album/item"
signalJob: "/import/job/signal"
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2026 The Data Transfer Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.datatransferproject.datatransfer.synology.signals;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
import org.datatransferproject.spi.transfer.provider.SignalRequest;
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle;
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.EndReason;
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.State;
import org.datatransferproject.types.common.models.DataVertical;
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
import org.datatransferproject.types.transfer.retry.RetryException;
import org.datatransferproject.types.transfer.retry.RetryMapping;
import org.datatransferproject.types.transfer.retry.RetryStrategy;
import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary;
import org.datatransferproject.types.transfer.retry.UniformRetryStrategy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SynologySignalHandlerTest {

private SynologyOAuthTokenManager tokenManager;
private RetryStrategyLibrary retryStrategyLibrary;
private Monitor monitor;
private String jobId;
private SynologySignalHandler signalHandler;
private SynologyDTPService synologyDTPService;
private TokensAndUrlAuthData authData;

@BeforeEach
public void setUp() {
jobId = UUID.randomUUID().toString();

tokenManager = mock(SynologyOAuthTokenManager.class);
RetryStrategy retryStrategy = new UniformRetryStrategy(3, 1, "test");
RetryMapping retryMapping =
new RetryMapping(
new String[] {RuntimeException.class.getName()}, new String[] {}, retryStrategy);
retryStrategyLibrary =
new RetryStrategyLibrary(
Collections.singletonList(retryMapping), new UniformRetryStrategy(1, 1, "default"));
monitor = mock(Monitor.class);
synologyDTPService = mock(SynologyDTPService.class);
authData = mock(TokensAndUrlAuthData.class);

signalHandler =
new SynologySignalHandler(tokenManager, synologyDTPService, retryStrategyLibrary);
}

@Test
public void testSendSignal() throws RetryException, CopyExceptionWithFailureReason {
JobLifeCycle jobStatus =
JobLifeCycle.builder()
.setState(State.ENDED)
.setEndReason(EndReason.SUCCESSFULLY_COMPLETED)
.build();

SignalRequest signalRequest =
SignalRequest.builder()
.setJobId(jobId)
.setJobStatus(jobStatus)
.setExportingService("EXPORT_SERVICE")
.setImportingService("IMPORT_SERVICE")
.setDataType(DataVertical.MAIL.getDataType())
.build();

when(synologyDTPService.sendJobSignal(any(JobLifeCycle.class), any(UUID.class)))
.thenReturn(Map.of("success", true));

signalHandler.sendSignal(signalRequest, authData, monitor);

verify(synologyDTPService, times(1)).sendJobSignal(any(JobLifeCycle.class), any(UUID.class));
}

@Test
public void testSendSignalRetry() throws RetryException, CopyExceptionWithFailureReason {
JobLifeCycle jobStatus =
JobLifeCycle.builder()
.setState(State.ENDED)
.setEndReason(EndReason.SUCCESSFULLY_COMPLETED)
.build();

SignalRequest signalRequest =
SignalRequest.builder()
.setJobId(jobId)
.setJobStatus(jobStatus)
.setExportingService("EXPORT_SERVICE")
.setImportingService("IMPORT_SERVICE")
.setDataType(DataVertical.MAIL.getDataType())
.build();

when(synologyDTPService.sendJobSignal(any(JobLifeCycle.class), any(UUID.class)))
.thenThrow(new RuntimeException("Failed to send signal"))
.thenReturn(Map.of("success", true));

signalHandler.sendSignal(signalRequest, authData, monitor);

verify(synologyDTPService, times(2)).sendJobSignal(any(JobLifeCycle.class), any(UUID.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ public class TestConfigs {
public static final String TEST_CREATE_ALBUM_PATH = "/create";
public static final String TEST_UPLOAD_ITEM_PATH = "/upload";
public static final String TEST_ADD_ITEM_TO_ALBUM_PATH = "/add";
public static final String TEST_SIGNAL_JOB_PATH = "/signal";
public static final int TEST_MAX_ATTEMPTS = 5;

public static ServiceConfig createServiceConfig() {
C2Api.ApiPath apiPath =
new C2Api.ApiPath(
TEST_CREATE_ALBUM_PATH, TEST_UPLOAD_ITEM_PATH, TEST_ADD_ITEM_TO_ALBUM_PATH);
TEST_CREATE_ALBUM_PATH,
TEST_UPLOAD_ITEM_PATH,
TEST_ADD_ITEM_TO_ALBUM_PATH,
TEST_SIGNAL_JOB_PATH);

C2Api c2Api = new C2Api(TEST_C2_BASE_URL, apiPath);

Expand Down
Loading