Skip to content

Commit f17ce32

Browse files
authored
feat: add SynologySignalHandler to Synology transfer extension (#1484)
Implemented `SynologySignalHandler` to manage job lifecycle signals (e.g., job started or completion) within the Synology transfer extension. This ensures that the Synology C2 API is notified of the final state of a transfer job, allowing for better synchronization. ## Changes - New Signal Handler: Added `SynologySignalHandler` to process and retry job lifecycle signals using the RetryingCallable framework. - Service Integration: Added `sendJobSignal` to `SynologyDTPService` to handle the actual HTTP POST request to the Synology C2 API. - Extension Update: Modified `SynologyTransferExtension` to register the new SignalHandler during initialization. - Configuration & Models: - Updated `synology.yaml` to include the new `/import/job/signal` endpoint. - Updated `C2Api` and its `ApiPath` inner class to support the new signal path. - Testing: - Added `SynologySignalHandlerTest` to verify successful signal transmission and the retry mechanism on failure. - Updated `TestConfigs` to include the signal path for existing tests.
1 parent 56e2386 commit f17ce32

7 files changed

Lines changed: 292 additions & 2 deletions

File tree

extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/SynologyTransferExtension.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.datatransferproject.datatransfer.synology.photos.SynologyPhotosImporter;
1515
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
1616
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
17+
import org.datatransferproject.datatransfer.synology.signals.SynologySignalHandler;
1718
import org.datatransferproject.datatransfer.synology.uploader.SynologyUploader;
1819
import org.datatransferproject.datatransfer.synology.videos.SynologyVideosImporter;
1920
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
@@ -23,6 +24,7 @@
2324
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension;
2425
import org.datatransferproject.spi.transfer.provider.Exporter;
2526
import org.datatransferproject.spi.transfer.provider.Importer;
27+
import org.datatransferproject.spi.transfer.provider.SignalHandler;
2628
import org.datatransferproject.transfer.JobMetadata;
2729
import org.datatransferproject.types.common.models.DataVertical;
2830
import org.datatransferproject.types.transfer.auth.AppCredentials;
@@ -36,6 +38,7 @@ public class SynologyTransferExtension implements TransferExtension {
3638
private boolean initialized = false;
3739

3840
private ImmutableMap<DataVertical, Importer> importerMap;
41+
private SynologySignalHandler signalHandler;
3942

4043
@Override
4144
public String getServiceId() {
@@ -64,6 +67,11 @@ private String getExtensionSecret() {
6467
return importerMap.get(transferDataType);
6568
}
6669

70+
@Override
71+
public SignalHandler<?> getSignalHandler() {
72+
return signalHandler;
73+
}
74+
6775
@Override
6876
public void initialize(ExtensionContext context) {
6977
if (initialized) {
@@ -113,6 +121,9 @@ public void initialize(ExtensionContext context) {
113121
importerBuilder.put(
114122
VIDEOS, new SynologyVideosImporter(monitor, tokenManager, synologyUploader));
115123
importerMap = importerBuilder.build();
124+
signalHandler =
125+
new SynologySignalHandler(
126+
tokenManager, synologyDTPService, context.getSetting("retryLibrary", null));
116127

117128
monitor.info(() -> "Initializing SynologyTransferExtension");
118129
initialized = true;

extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/models/C2Api.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class C2Api extends ServiceConfig.Service {
2828
private final String createAlbum;
2929
private final String uploadItem;
3030
private final String addItemToAlbum;
31+
private final String signalJob;
3132

3233
@JsonCreator
3334
public C2Api(@JsonProperty("baseUrl") String baseUrl, @JsonProperty("apiPath") ApiPath apiPath) {
@@ -38,6 +39,7 @@ public C2Api(@JsonProperty("baseUrl") String baseUrl, @JsonProperty("apiPath") A
3839
this.createAlbum = UrlUtils.join(baseUrl, apiPath.getCreateAlbumPath());
3940
this.uploadItem = UrlUtils.join(baseUrl, apiPath.getUploadItemPath());
4041
this.addItemToAlbum = UrlUtils.join(baseUrl, apiPath.getAddItemToAlbumPath());
42+
this.signalJob = UrlUtils.join(baseUrl, apiPath.getSignalJobPath());
4143
}
4244

4345
public ApiPath getApiPath() {
@@ -56,19 +58,26 @@ public String getAddItemToAlbum() {
5658
return addItemToAlbum;
5759
}
5860

61+
public String getSignalJob() {
62+
return signalJob;
63+
}
64+
5965
public static class ApiPath {
6066
private final String createAlbumPath;
6167
private final String uploadItemPath;
6268
private final String addItemToAlbumPath;
69+
private final String signalJobPath;
6370

6471
@JsonCreator
6572
public ApiPath(
6673
@JsonProperty("createAlbum") String createAlbumPath,
6774
@JsonProperty("uploadItem") String uploadItemPath,
68-
@JsonProperty("addItemToAlbum") String addItemToAlbumPath) {
75+
@JsonProperty("addItemToAlbum") String addItemToAlbumPath,
76+
@JsonProperty("signalJob") String signalJobPath) {
6977
this.createAlbumPath = createAlbumPath;
7078
this.uploadItemPath = uploadItemPath;
7179
this.addItemToAlbumPath = addItemToAlbumPath;
80+
this.signalJobPath = signalJobPath;
7281
}
7382

7483
public String getCreateAlbumPath() {
@@ -82,5 +91,9 @@ public String getUploadItemPath() {
8291
public String getAddItemToAlbumPath() {
8392
return addItemToAlbumPath;
8493
}
94+
95+
public String getSignalJobPath() {
96+
return signalJobPath;
97+
}
8598
}
8699
}

extensions/data-transfer/portability-data-transfer-synology/src/main/java/org/datatransferproject/datatransfer/synology/service/SynologyDTPService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
4545
import org.datatransferproject.spi.transfer.types.NoNasInAccountException;
4646
import org.datatransferproject.spi.transfer.types.UploadErrorException;
47+
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle;
4748
import org.datatransferproject.types.common.models.media.MediaAlbum;
4849
import org.datatransferproject.types.common.models.photos.PhotoModel;
4950
import org.datatransferproject.types.common.models.videos.VideoModel;
@@ -246,6 +247,27 @@ public Map<String, Object> addItemToAlbum(String albumId, String itemId, UUID jo
246247
return sendPostRequest(c2Api.getAddItemToAlbum(), builder.build(), jobId);
247248
}
248249

250+
/**
251+
* Updates job status.
252+
*
253+
* @param jobStatus the job status
254+
* @param jobId the job ID
255+
* @return a map of shape {"success": <bool>}
256+
*/
257+
public Map<String, Object> sendJobSignal(JobLifeCycle jobStatus, UUID jobId)
258+
throws CopyExceptionWithFailureReason {
259+
FormBody.Builder builder =
260+
new FormBody.Builder()
261+
.add("job_id", jobId.toString())
262+
.add("service", exportingService)
263+
.add("state", jobStatus.state().name());
264+
if (jobStatus.endReason() != null) {
265+
builder.add("end_reason", jobStatus.endReason().name());
266+
}
267+
268+
return sendPostRequest(c2Api.getSignalJob(), builder.build(), jobId);
269+
}
270+
249271
@VisibleForTesting
250272
protected void throwExceptionIfNoQuota(Response response) throws CopyExceptionWithFailureReason {
251273
String errorCode = "";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2026 The Data Transfer Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.datatransferproject.datatransfer.synology.signals;
19+
20+
import java.time.Clock;
21+
import java.util.Objects;
22+
import java.util.UUID;
23+
import java.util.concurrent.Callable;
24+
import org.datatransferproject.api.launcher.Monitor;
25+
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
26+
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
27+
import org.datatransferproject.spi.transfer.provider.SignalHandler;
28+
import org.datatransferproject.spi.transfer.provider.SignalRequest;
29+
import org.datatransferproject.types.transfer.auth.AuthData;
30+
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
31+
import org.datatransferproject.types.transfer.retry.RetryException;
32+
import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary;
33+
import org.datatransferproject.types.transfer.retry.RetryingCallable;
34+
35+
/** A {@link SignalHandler} for Synology. */
36+
public class SynologySignalHandler implements SignalHandler<TokensAndUrlAuthData> {
37+
private final SynologyOAuthTokenManager tokenManager;
38+
private final SynologyDTPService synologyDTPService;
39+
private final RetryStrategyLibrary retryStrategyLibrary;
40+
41+
/**
42+
* Constructs a new {@code SynologySignalHandler} instance.
43+
*
44+
* @param synologyDTPService the Synology DTP service
45+
* @param tokenManager the Synology OAuth token manager
46+
* @param retryStrategyLibrary the retry strategy library for handling transient failures
47+
*/
48+
public SynologySignalHandler(
49+
SynologyOAuthTokenManager tokenManager,
50+
SynologyDTPService synologyDTPService,
51+
RetryStrategyLibrary retryStrategyLibrary) {
52+
this.tokenManager = tokenManager;
53+
this.synologyDTPService = synologyDTPService;
54+
this.retryStrategyLibrary = retryStrategyLibrary;
55+
}
56+
57+
@Override
58+
public void sendSignal(SignalRequest signalRequest, AuthData authData, Monitor monitor)
59+
throws RetryException {
60+
Objects.requireNonNull(signalRequest, "signalRequest cannot be null");
61+
Objects.requireNonNull(authData, "authData cannot be null");
62+
Objects.requireNonNull(monitor, "monitor cannot be null");
63+
64+
UUID uuidJobId = UUID.fromString(signalRequest.jobId());
65+
tokenManager.addAuthDataIfNotExist(uuidJobId, (TokensAndUrlAuthData) authData);
66+
67+
monitor.info(
68+
() ->
69+
String.format(
70+
"[SynologySignalHandler] Received signal for jobId: %s, jobStatus.state: %s,"
71+
+ " jobStatus.endReason: %s",
72+
signalRequest.jobId(),
73+
signalRequest.jobStatus().state(),
74+
signalRequest.jobStatus().endReason()));
75+
76+
Callable<Void> sendJobSignalCallable =
77+
() -> {
78+
Boolean success =
79+
(Boolean)
80+
this.synologyDTPService
81+
.sendJobSignal(signalRequest.jobStatus(), uuidJobId)
82+
.get("success");
83+
84+
if (success) {
85+
monitor.debug(
86+
() ->
87+
String.format(
88+
"[SynologySignalHandler] Successfully sent signal for jobId: %s,"
89+
+ " jobStatus.state: %s, jobStatus.endReason: %s",
90+
signalRequest.jobId(),
91+
signalRequest.jobStatus().state(),
92+
signalRequest.jobStatus().endReason()));
93+
return null;
94+
}
95+
96+
throw new RuntimeException(
97+
String.format(
98+
"Failed to send signal for jobId: %s. Response with success=false.",
99+
signalRequest.jobId()));
100+
};
101+
102+
RetryingCallable<Void> retryingSendJobSignalCallable =
103+
new RetryingCallable<>(
104+
sendJobSignalCallable, retryStrategyLibrary, Clock.systemUTC(), monitor);
105+
106+
try {
107+
retryingSendJobSignalCallable.call();
108+
} catch (Throwable e) {
109+
throw e;
110+
}
111+
}
112+
}

extensions/data-transfer/portability-data-transfer-synology/src/main/resources/config/synology.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ serviceConfig:
1111
createAlbum: "/import/album"
1212
uploadItem: "/import/item"
1313
addItemToAlbum: "/import/album/item"
14+
signalJob: "/import/job/signal"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2026 The Data Transfer Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.datatransferproject.datatransfer.synology.signals;
19+
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
import java.util.Collections;
27+
import java.util.Map;
28+
import java.util.UUID;
29+
import org.datatransferproject.api.launcher.Monitor;
30+
import org.datatransferproject.datatransfer.synology.service.SynologyDTPService;
31+
import org.datatransferproject.datatransfer.synology.service.SynologyOAuthTokenManager;
32+
import org.datatransferproject.spi.transfer.provider.SignalRequest;
33+
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
34+
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle;
35+
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.EndReason;
36+
import org.datatransferproject.spi.transfer.types.signals.JobLifeCycle.State;
37+
import org.datatransferproject.types.common.models.DataVertical;
38+
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
39+
import org.datatransferproject.types.transfer.retry.RetryException;
40+
import org.datatransferproject.types.transfer.retry.RetryMapping;
41+
import org.datatransferproject.types.transfer.retry.RetryStrategy;
42+
import org.datatransferproject.types.transfer.retry.RetryStrategyLibrary;
43+
import org.datatransferproject.types.transfer.retry.UniformRetryStrategy;
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
47+
public class SynologySignalHandlerTest {
48+
49+
private SynologyOAuthTokenManager tokenManager;
50+
private RetryStrategyLibrary retryStrategyLibrary;
51+
private Monitor monitor;
52+
private String jobId;
53+
private SynologySignalHandler signalHandler;
54+
private SynologyDTPService synologyDTPService;
55+
private TokensAndUrlAuthData authData;
56+
57+
@BeforeEach
58+
public void setUp() {
59+
jobId = UUID.randomUUID().toString();
60+
61+
tokenManager = mock(SynologyOAuthTokenManager.class);
62+
RetryStrategy retryStrategy = new UniformRetryStrategy(3, 1, "test");
63+
RetryMapping retryMapping =
64+
new RetryMapping(
65+
new String[] {RuntimeException.class.getName()}, new String[] {}, retryStrategy);
66+
retryStrategyLibrary =
67+
new RetryStrategyLibrary(
68+
Collections.singletonList(retryMapping), new UniformRetryStrategy(1, 1, "default"));
69+
monitor = mock(Monitor.class);
70+
synologyDTPService = mock(SynologyDTPService.class);
71+
authData = mock(TokensAndUrlAuthData.class);
72+
73+
signalHandler =
74+
new SynologySignalHandler(tokenManager, synologyDTPService, retryStrategyLibrary);
75+
}
76+
77+
@Test
78+
public void testSendSignal() throws RetryException, CopyExceptionWithFailureReason {
79+
JobLifeCycle jobStatus =
80+
JobLifeCycle.builder()
81+
.setState(State.ENDED)
82+
.setEndReason(EndReason.SUCCESSFULLY_COMPLETED)
83+
.build();
84+
85+
SignalRequest signalRequest =
86+
SignalRequest.builder()
87+
.setJobId(jobId)
88+
.setJobStatus(jobStatus)
89+
.setExportingService("EXPORT_SERVICE")
90+
.setImportingService("IMPORT_SERVICE")
91+
.setDataType(DataVertical.MAIL.getDataType())
92+
.build();
93+
94+
when(synologyDTPService.sendJobSignal(any(JobLifeCycle.class), any(UUID.class)))
95+
.thenReturn(Map.of("success", true));
96+
97+
signalHandler.sendSignal(signalRequest, authData, monitor);
98+
99+
verify(synologyDTPService, times(1)).sendJobSignal(any(JobLifeCycle.class), any(UUID.class));
100+
}
101+
102+
@Test
103+
public void testSendSignalRetry() throws RetryException, CopyExceptionWithFailureReason {
104+
JobLifeCycle jobStatus =
105+
JobLifeCycle.builder()
106+
.setState(State.ENDED)
107+
.setEndReason(EndReason.SUCCESSFULLY_COMPLETED)
108+
.build();
109+
110+
SignalRequest signalRequest =
111+
SignalRequest.builder()
112+
.setJobId(jobId)
113+
.setJobStatus(jobStatus)
114+
.setExportingService("EXPORT_SERVICE")
115+
.setImportingService("IMPORT_SERVICE")
116+
.setDataType(DataVertical.MAIL.getDataType())
117+
.build();
118+
119+
when(synologyDTPService.sendJobSignal(any(JobLifeCycle.class), any(UUID.class)))
120+
.thenThrow(new RuntimeException("Failed to send signal"))
121+
.thenReturn(Map.of("success", true));
122+
123+
signalHandler.sendSignal(signalRequest, authData, monitor);
124+
125+
verify(synologyDTPService, times(2)).sendJobSignal(any(JobLifeCycle.class), any(UUID.class));
126+
}
127+
}

extensions/data-transfer/portability-data-transfer-synology/src/test/java/org/datatransferproject/datatransfer/synology/utils/TestConfigs.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ public class TestConfigs {
3030
public static final String TEST_CREATE_ALBUM_PATH = "/create";
3131
public static final String TEST_UPLOAD_ITEM_PATH = "/upload";
3232
public static final String TEST_ADD_ITEM_TO_ALBUM_PATH = "/add";
33+
public static final String TEST_SIGNAL_JOB_PATH = "/signal";
3334
public static final int TEST_MAX_ATTEMPTS = 5;
3435

3536
public static ServiceConfig createServiceConfig() {
3637
C2Api.ApiPath apiPath =
3738
new C2Api.ApiPath(
38-
TEST_CREATE_ALBUM_PATH, TEST_UPLOAD_ITEM_PATH, TEST_ADD_ITEM_TO_ALBUM_PATH);
39+
TEST_CREATE_ALBUM_PATH,
40+
TEST_UPLOAD_ITEM_PATH,
41+
TEST_ADD_ITEM_TO_ALBUM_PATH,
42+
TEST_SIGNAL_JOB_PATH);
3943

4044
C2Api c2Api = new C2Api(TEST_C2_BASE_URL, apiPath);
4145

0 commit comments

Comments
 (0)