From 60c35f4dbe4b6297718edb3704f190f9690b458c Mon Sep 17 00:00:00 2001 From: Alex Kulikov Date: Thu, 4 Sep 2025 11:48:54 +0100 Subject: [PATCH 1/3] feat: add recurringJobId to JobMetadata and PortabilityJob --- .../spi/cloud/types/PortabilityJob.java | 20 +++++++++++++++++++ .../transfer/JobMetadata.java | 13 ++++++++++++ .../transfer/JobPollingService.java | 1 + .../transfer/JobProcessorTest.java | 3 +++ 4 files changed, 37 insertions(+) diff --git a/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/types/PortabilityJob.java b/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/types/PortabilityJob.java index ef4814460..b96e11111 100644 --- a/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/types/PortabilityJob.java +++ b/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/types/PortabilityJob.java @@ -11,6 +11,7 @@ import java.time.Instant; import java.util.Map; import java.util.TimeZone; +import java.util.UUID; import javax.annotation.Nullable; import org.datatransferproject.types.common.models.DataVertical; @@ -38,6 +39,7 @@ public abstract class PortabilityJob { private static final String EXPORT_ENCRYPTED_INITIAL_AUTH_DATA = "EXPORT_ENCRYPTED_INITIAL_AUTH_DATA"; private static final String JOB_STATE = "JOB_STATE"; + private static final String RECURRING_JOB_ID = "RECURRING_JOB_ID"; private static final String TRANSFER_MODE = "TRANSFER_MODE"; private static final String FAILURE_REASON = "FAILURE_REASON"; private static final String NUMBER_OF_FAILED_FILES_KEY = "NUM_FAILED_FILES"; @@ -110,6 +112,11 @@ public static PortabilityJob fromMap(Map properties) { ? DataVertical.fromDataType((String) properties.get(DATA_TYPE_KEY)) : null; + UUID recurringJobId = + properties.containsKey(RECURRING_JOB_ID) + ? UUID.fromString((String) properties.get(RECURRING_JOB_ID)) + : null; + return PortabilityJob.builder() .setState(state) .setExportService((String) properties.get(EXPORT_SERVICE_KEY)) @@ -135,6 +142,7 @@ public static PortabilityJob fromMap(Map properties) { .setUserLocale(userLocale) .setUserAlias(userAlias) .setTransferMode(transferMode) + .setRecurringJobId(recurringJobId) .build(); } @@ -197,6 +205,10 @@ private static void isSet(String... strings) { @JsonProperty("transferMode") public abstract TransferMode transferMode(); + @Nullable + @JsonProperty("recurringJobId") + public abstract UUID recurringJobId(); + public abstract PortabilityJob.Builder toBuilder(); public Map toMap() { @@ -263,6 +275,10 @@ public Map toMap() { builder.put(TRANSFER_MODE, transferMode().toString()); } + if (null != recurringJobId()) { + builder.put(RECURRING_JOB_ID, recurringJobId()); + } + return builder.build(); } @@ -361,6 +377,10 @@ public Builder setAndValidateJobAuthorization(JobAuthorization jobAuthorization) @JsonProperty("transferMode") public abstract Builder setTransferMode(TransferMode transferMode); + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("recurringJobId") + public abstract Builder setRecurringJobId(@Nullable UUID recurringJobId); + // For internal use only; clients should use setAndValidateJobAuthorization protected abstract Builder setJobAuthorization(JobAuthorization jobAuthorization); } diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java index a8cd41c07..a58aa5a50 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java @@ -18,6 +18,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import javax.annotation.Nullable; + +import java.util.Optional; import java.util.UUID; import org.datatransferproject.types.common.models.DataVertical; @@ -34,12 +37,14 @@ public final class JobMetadata { private static byte[] encodedPrivateKey = null; private static UUID jobId = null; + private static UUID recurringJobId = null; private static DataVertical dataType = null; private static String exportService = null; private static String importService = null; private static Stopwatch stopWatch = null; public static boolean isInitialized() { + // recurringJobId can be null and that's ok return (jobId != null && encodedPrivateKey != null && dataType != null @@ -50,6 +55,7 @@ public static boolean isInitialized() { static void init( UUID initJobId, + @Nullable UUID initRecurringJobId, byte[] initEncodedPrivateKey, DataVertical initDataType, String initExportService, @@ -57,6 +63,7 @@ static void init( Stopwatch initStopWatch) { Preconditions.checkState(!isInitialized(), "JobMetadata cannot be initialized twice"); jobId = initJobId; + recurringJobId = initRecurringJobId; encodedPrivateKey = initEncodedPrivateKey; dataType = initDataType; exportService = initExportService; @@ -67,6 +74,7 @@ static void init( // TODO: remove this static synchronized void reset() { jobId = null; + recurringJobId = null; encodedPrivateKey = null; dataType = null; exportService = null; @@ -84,6 +92,11 @@ public static UUID getJobId() { return jobId; } + public static Optional getRecurringJobId() { + Preconditions.checkState(isInitialized(), "JobMetadata must be initialized"); + return Optional.ofNullable(recurringJobId); + } + public static DataVertical getDataType() { Preconditions.checkState(isInitialized(), "JobMetadata must be initialized"); return dataType; diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobPollingService.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobPollingService.java index 0f906c5ca..ca06937e2 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobPollingService.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobPollingService.java @@ -206,6 +206,7 @@ private boolean tryToClaimJob(UUID jobId, WorkerKeyPair keyPair) { JobMetadata.init( jobId, + existingJob.recurringJobId(), keyPair.getEncodedPrivateKey(), existingJob.transferDataType(), existingJob.exportService(), diff --git a/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java b/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java index 0f731d654..d15cbaafb 100644 --- a/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java +++ b/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java @@ -152,6 +152,7 @@ public void cleanUp() { public void processJobGetsErrorsEvenWhenCopyThrows() throws CopyException, IOException, RetryException { JobMetadata.init( jobId, + null, "".getBytes(), DataVertical.BLOBS, "", @@ -175,6 +176,7 @@ public void processJobGetsErrorsEvenWhenCopyThrows() throws CopyException, IOExc public void processJobCopiesSuccessfully() throws CopyException, IOException, RetryException { JobMetadata.init( jobId, + null, "".getBytes(), DataVertical.BLOBS, "", @@ -208,6 +210,7 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy JobMetadata.init( jobId, + null, "".getBytes(), DataVertical.BLOBS, "", From 6675be4bae8d4cf49c2b4f003a1bee388045fcae Mon Sep 17 00:00:00 2001 From: Alex Kulikov Date: Thu, 4 Sep 2025 12:02:53 +0100 Subject: [PATCH 2/3] feat: consume recurringJobId from JobMetadata and pass it to GenericImporter HTTP headers --- .../generic/GenericFileImporter.java | 12 ++++-- .../datatransfer/generic/GenericImporter.java | 22 ++++++---- .../generic/GenericFileImporterTest.java | 41 +++++++++++++++++++ .../generic/GenericImporterTest.java | 35 ++++++++++++++++ 4 files changed, 99 insertions(+), 11 deletions(-) diff --git a/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java b/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java index fb8f0bee0..50133a0ce 100644 --- a/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java +++ b/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java @@ -60,7 +60,8 @@ private boolean importSingleFileItem( dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null); MediaType mimeType = Optional.ofNullable(MediaType.parse(data.getFileMimeType())).orElse(OCTET_STREAM); - Request request = + + Request.Builder builder = new Request.Builder() .url(endpoint) .addHeader("Authorization", format("Bearer %s", authData.getToken())) @@ -71,8 +72,13 @@ private boolean importSingleFileItem( .setType(MULTIPART_RELATED) .addPart(RequestBody.create(JSON, om.writeValueAsBytes(data.getJsonData()))) .addPart(MultipartBody.create(mimeType, tempFile)) - .build()) - .build(); + .build()); + + if (this.recurringJobId.isPresent()) { + builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString()); + } + + Request request = builder.build(); try (Response response = client.newCall(request).execute()) { return parseResponse(response); diff --git a/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java b/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java index cc9986263..146aed172 100644 --- a/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java +++ b/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java @@ -81,6 +81,7 @@ public String toString() { ObjectMapper om = new ObjectMapper(); Map jobTokenManagerMap = new HashMap<>(); protected final String exportService; + protected final Optional recurringJobId; static final MediaType JSON = MediaType.parse("application/json"); @@ -94,6 +95,7 @@ public GenericImporter( this.endpoint = endpoint; this.containerSerializer = containerSerializer; this.exportService = JobMetadata.getExportService(); + this.recurringJobId = JobMetadata.getRecurringJobId(); configureObjectMapper(om); } @@ -162,14 +164,18 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData dataItem) throws IOException, InvalidTokenException, DestinationMemoryFullException { - Request request = - new Request.Builder() - .url(endpoint) - .addHeader("Authorization", format("Bearer %s", authData.getToken())) - .addHeader("X-DTP-Export-Service", this.exportService) - .addHeader("X-DTP-Job-Id", jobId.toString()) - .post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData()))) - .build(); + Request.Builder builder = new Request.Builder() + .url(endpoint) + .addHeader("Authorization", format("Bearer %s", authData.getToken())) + .addHeader("X-DTP-Export-Service", this.exportService) + .addHeader("X-DTP-Job-Id", jobId.toString()) + .post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData()))); + + if (this.recurringJobId.isPresent()) { + builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString()); + } + + Request request = builder.build(); try (Response response = client.newCall(request).execute()) { return parseResponse(response); diff --git a/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java b/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java index 574ab9f66..9664e8f28 100644 --- a/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java +++ b/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Optional; import java.util.UUID; import okhttp3.Headers; import okhttp3.mockwebserver.MockResponse; @@ -42,6 +43,7 @@ public InputStreamWrapper getStream(UUID jobId, String key) throws IOException { } }; private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000"); + private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001"); private static final String MOCK_EXPORT_SERVICE = "mockExportService"; private MockedStatic jobMetadataMock; @@ -220,4 +222,43 @@ public void testGenericFileImporterPassingJobMetadata() throws Exception { assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id")); assertTrue(executor.getErrors().isEmpty()); } + + + @Test + public void testGenericFileImporterPassingJobMetadataRecurringJob() throws Exception { + jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID)); + + GenericFileImporter importer = + new GenericFileImporter<>( + container -> + Arrays.asList( + new ImportableFileData<>( + new CachedDownloadableItem(container.getId(), container.getId()), + "video/mp4", + new GenericPayload<>(container.getId(), "schemasource"), + container.getId(), + container.getId())), + new AppCredentials("key", "secret"), + webServer.url("/id").url(), + dataStore, + monitor); + InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor); + webServer.enqueue(new MockResponse().setResponseCode(201).setBody("OK")); + + importer.importItem( + MOCK_JOB_ID, + executor, + new TokensAndUrlAuthData( + "accessToken", "refreshToken", webServer.url("/refresh").toString()), + new IdOnlyContainerResource("id")); + + assertEquals(1, webServer.getRequestCount()); + + RecordedRequest request = webServer.takeRequest(); + assertEquals("POST", request.getMethod()); + assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service")); + assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id")); + assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id")); + assertTrue(executor.getErrors().isEmpty()); + } } diff --git a/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java b/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java index 52189e76f..1380b2ac8 100644 --- a/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java +++ b/extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java @@ -10,6 +10,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.UUID; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; @@ -41,6 +42,7 @@ public class GenericImporterTest { @Parameter public String importerClass; private MockWebServer webServer; private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000"); + private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001"); private static final String MOCK_EXPORT_SERVICE = "mockExportService"; private MockedStatic jobMetadataMock; @@ -391,4 +393,37 @@ public void testGenericImporterPassingJobMetadata() throws Exception { assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id")); assertTrue(executor.getErrors().isEmpty()); } + + @Test + public void testGenericImporterPassingJobMetadataRecurringJob() throws Exception { + jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID)); + + InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor); + GenericImporter importer = + getImporter( + importerClass, + container -> + List.of( + new ImportableData<>( + new GenericPayload<>(container.getId(), "schemasource"), + container.getId(), + container.getId()))); + webServer.setDispatcher(getDispatcher()); + + importer.importItem( + MOCK_JOB_ID, + executor, + new TokensAndUrlAuthData( + "accessToken", "refreshToken", webServer.url("/refresh").toString()), + new IdOnlyContainerResource("id")); + + assertEquals(1, webServer.getRequestCount()); + RecordedRequest request = webServer.takeRequest(); + assertEquals("POST", request.getMethod()); + assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service")); + assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id")); + assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id")); + assertTrue(executor.getErrors().isEmpty()); + } + } From 645f672c8223779912f8cd1d1f293d5c1ef989a4 Mon Sep 17 00:00:00 2001 From: Alex Kulikov Date: Thu, 4 Sep 2025 18:41:58 +0100 Subject: [PATCH 3/3] fix: move JobMetadata.init with recurringJobId to a separate method --- .../transfer/JobMetadata.java | 16 +++++++++ .../transfer/JobProcessorTest.java | 35 +++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java index a58aa5a50..dcc42f500 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java @@ -53,6 +53,22 @@ public static boolean isInitialized() { && stopWatch != null); } + static void init( + UUID initJobId, + byte[] initEncodedPrivateKey, + DataVertical initDataType, + String initExportService, + String initImportService, + Stopwatch initStopWatch) { + Preconditions.checkState(!isInitialized(), "JobMetadata cannot be initialized twice"); + jobId = initJobId; + encodedPrivateKey = initEncodedPrivateKey; + dataType = initDataType; + exportService = initExportService; + importService = initImportService; + stopWatch = initStopWatch; + } + static void init( UUID initJobId, @Nullable UUID initRecurringJobId, diff --git a/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java b/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java index d15cbaafb..e1ca1cd70 100644 --- a/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java +++ b/portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java @@ -16,6 +16,8 @@ package org.datatransferproject.transfer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -52,6 +54,7 @@ public class JobProcessorTest { private UUID jobId; + private UUID recurringJobId; private ExportInformation exportInfo; private AuthData exportAuthData; private AuthData importAuthData; @@ -96,6 +99,7 @@ public TestJobProcessor(JobStore jobStore, public void setUp() throws JsonProcessingException { importAuthData = exportAuthData = Mockito.mock(AuthData.class); jobId = UUID.randomUUID(); + recurringJobId = UUID.randomUUID(); exportInfo = Mockito.mock(ExportInformation.class); copier = Mockito.mock(InMemoryDataCopier.class); importSignalHandlerProvider = (Provider) Mockito.mock(Provider.class); @@ -152,7 +156,6 @@ public void cleanUp() { public void processJobGetsErrorsEvenWhenCopyThrows() throws CopyException, IOException, RetryException { JobMetadata.init( jobId, - null, "".getBytes(), DataVertical.BLOBS, "", @@ -176,7 +179,6 @@ public void processJobGetsErrorsEvenWhenCopyThrows() throws CopyException, IOExc public void processJobCopiesSuccessfully() throws CopyException, IOException, RetryException { JobMetadata.init( jobId, - null, "".getBytes(), DataVertical.BLOBS, "", @@ -210,12 +212,14 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy JobMetadata.init( jobId, - null, "".getBytes(), DataVertical.BLOBS, "", "", Stopwatch.createUnstarted()); + + assertTrue(JobMetadata.getRecurringJobId().isEmpty()); + Mockito.doThrow(new CopyException("error", new Exception())).when(copier) .copy(importAuthData, exportAuthData, jobId, Optional.of(exportInfo)); processor.processJob(); @@ -229,4 +233,29 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy Mockito.verify(exportSignalHandler, Mockito.never()) .sendSignal(any(SignalRequest.class), eq(exportAuthData), any(Monitor.class)); } + + @Test + public void testInitJobMetadataWithRecurringId() throws CopyException, + IOException, RetryException { + processor = Mockito.spy( + new TestJobProcessor(jobStore, + copier, + objectMapper, + decryptionService, + Boolean.FALSE, + importSignalHandlerProvider, + exportSignalHandlerProvider)); + + JobMetadata.init( + jobId, + recurringJobId, + "".getBytes(), + DataVertical.BLOBS, + "", + "", + Stopwatch.createUnstarted()); + + assertEquals(jobId, JobMetadata.getJobId()); + assertEquals(recurringJobId, JobMetadata.getRecurringJobId().get()); + } }