Skip to content

Commit 7ed8262

Browse files
authored
feat: pass export service and jobid as HTTP headers in generic importer (#1473)
Add 2 HTTP headers to `GenericImporter` and `GenericFileImporter`: - `X-DTP-Export-Service`, passing the `ExportService` from the `JobMetadata`, i.e. the source of data; - `X-DTP-Job-Id`, passing the current job UUID as String. This will help the receiving service in building the internal logic (i.e. if they want to know which service is sending the data). TODO: - [x] this PR: figure out tests (what is the currently recommended workflow with testing?) - [ ] next PR: add an optional `recurringJobId` field to `PortabilityJob` (populated from `*JobStore`) and `JobMetadata` -- to be sent for recurring transfers only via `X-DTP-Recurring-Job-Id`
1 parent 914a3db commit 7ed8262

4 files changed

Lines changed: 93 additions & 0 deletions

File tree

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ private <T> boolean importSingleFileItem(
6464
new Request.Builder()
6565
.url(endpoint)
6666
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
67+
.addHeader("X-DTP-Export-Service", this.exportService)
68+
.addHeader("X-DTP-Job-Id", jobId.toString())
6769
.post(
6870
new MultipartBody.Builder()
6971
.setType(MULTIPART_RELATED)

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.datatransferproject.spi.transfer.provider.Importer;
3434
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
3535
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
36+
import org.datatransferproject.transfer.JobMetadata;
3637
import org.datatransferproject.types.common.models.ContainerResource;
3738
import org.datatransferproject.types.transfer.auth.AppCredentials;
3839
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
@@ -79,6 +80,7 @@ public String toString() {
7980
OkHttpClient client = new OkHttpClient();
8081
ObjectMapper om = new ObjectMapper();
8182
Map<UUID, OAuthTokenManager> jobTokenManagerMap = new HashMap<>();
83+
protected final String exportService;
8284

8385
static final MediaType JSON = MediaType.parse("application/json");
8486

@@ -91,6 +93,7 @@ public GenericImporter(
9193
this.appCredentials = appCredentials;
9294
this.endpoint = endpoint;
9395
this.containerSerializer = containerSerializer;
96+
this.exportService = JobMetadata.getExportService();
9497
configureObjectMapper(om);
9598
}
9699

@@ -163,6 +166,8 @@ boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableDa
163166
new Request.Builder()
164167
.url(endpoint)
165168
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
169+
.addHeader("X-DTP-Export-Service", this.exportService)
170+
.addHeader("X-DTP-Job-Id", jobId.toString())
166171
.post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData())))
167172
.build();
168173

extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import org.datatransferproject.api.launcher.Monitor;
2222
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
2323
import org.datatransferproject.spi.transfer.idempotentexecutor.InMemoryIdempotentImportExecutor;
24+
import org.datatransferproject.transfer.JobMetadata;
2425
import org.datatransferproject.types.common.models.IdOnlyContainerResource;
2526
import org.datatransferproject.types.transfer.auth.AppCredentials;
2627
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
2728
import org.junit.After;
2829
import org.junit.Before;
2930
import org.junit.Test;
31+
import org.mockito.MockedStatic;
32+
import org.mockito.Mockito;
3033

3134
public class GenericFileImporterTest {
3235
private MockWebServer webServer;
@@ -38,16 +41,24 @@ public InputStreamWrapper getStream(UUID jobId, String key) throws IOException {
3841
return new InputStreamWrapper(new ByteArrayInputStream("Hello world".getBytes()));
3942
}
4043
};
44+
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
45+
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
46+
private MockedStatic<JobMetadata> jobMetadataMock;
4147

4248
@Before
4349
public void setup() throws IOException {
4450
webServer = new MockWebServer();
4551
webServer.start();
52+
53+
jobMetadataMock = Mockito.mockStatic(JobMetadata.class);
54+
jobMetadataMock.when(JobMetadata::getJobId).thenReturn(MOCK_JOB_ID);
55+
jobMetadataMock.when(JobMetadata::getExportService).thenReturn(MOCK_EXPORT_SERVICE);
4656
}
4757

4858
@After
4959
public void teardown() throws IOException {
5060
webServer.shutdown();
61+
jobMetadataMock.close();
5162
}
5263

5364
public static MultipartStream getMultipartStream(RecordedRequest request) {
@@ -174,4 +185,39 @@ public void testGenericFileImporterMixedTypes() throws Exception {
174185
+ " file\",\"schemaSource\":\"schemasource\",\"apiVersion\":\"0.1.0\"}",
175186
jsonRequest.getBody().readUtf8());
176187
}
188+
189+
@Test
190+
public void testGenericFileImporterPassingJobMetadata() throws Exception {
191+
GenericFileImporter<IdOnlyContainerResource, String> importer =
192+
new GenericFileImporter<>(
193+
container ->
194+
Arrays.asList(
195+
new ImportableFileData<>(
196+
new CachedDownloadableItem(container.getId(), container.getId()),
197+
"video/mp4",
198+
new GenericPayload<>(container.getId(), "schemasource"),
199+
container.getId(),
200+
container.getId())),
201+
new AppCredentials("key", "secret"),
202+
webServer.url("/id").url(),
203+
dataStore,
204+
monitor);
205+
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
206+
webServer.enqueue(new MockResponse().setResponseCode(201).setBody("OK"));
207+
208+
importer.importItem(
209+
MOCK_JOB_ID,
210+
executor,
211+
new TokensAndUrlAuthData(
212+
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
213+
new IdOnlyContainerResource("id"));
214+
215+
assertEquals(1, webServer.getRequestCount());
216+
217+
RecordedRequest request = webServer.takeRequest();
218+
assertEquals("POST", request.getMethod());
219+
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
220+
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
221+
assertTrue(executor.getErrors().isEmpty());
222+
}
177223
}

extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
2020
import org.datatransferproject.spi.transfer.idempotentexecutor.InMemoryIdempotentImportExecutor;
2121
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
22+
import org.datatransferproject.transfer.JobMetadata;
2223
import org.datatransferproject.types.common.models.IdOnlyContainerResource;
2324
import org.datatransferproject.types.transfer.auth.AppCredentials;
2425
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
@@ -30,13 +31,18 @@
3031
import org.junit.runners.Parameterized;
3132
import org.junit.runners.Parameterized.Parameter;
3233
import org.junit.runners.Parameterized.Parameters;
34+
import org.mockito.MockedStatic;
35+
import org.mockito.Mockito;
3336

3437
@RunWith(Parameterized.class)
3538
public class GenericImporterTest {
3639
private final Monitor monitor = new Monitor() {};
3740
private final TemporaryPerJobDataStore dataStore = new TemporaryPerJobDataStore() {};
3841
@Parameter public String importerClass;
3942
private MockWebServer webServer;
43+
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
44+
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
45+
private MockedStatic<JobMetadata> jobMetadataMock;
4046

4147
@Parameters(name = "{0}")
4248
public static Collection<String> strings() {
@@ -52,11 +58,16 @@ static void assertContains(String expected, String actual) {
5258
public void setup() throws IOException {
5359
webServer = new MockWebServer();
5460
webServer.start();
61+
62+
jobMetadataMock = Mockito.mockStatic(JobMetadata.class);
63+
jobMetadataMock.when(JobMetadata::getJobId).thenReturn(MOCK_JOB_ID);
64+
jobMetadataMock.when(JobMetadata::getExportService).thenReturn(MOCK_EXPORT_SERVICE);
5565
}
5666

5767
@After
5868
public void teardown() throws IOException {
5969
webServer.shutdown();
70+
jobMetadataMock.close();
6071
}
6172

6273
<C> GenericImporter<IdOnlyContainerResource, C> getImporter(
@@ -351,4 +362,33 @@ public void testGenericImporterDestinationFull() throws Exception {
351362
assertEquals("itemId", error.title());
352363
assertContains("Generic importer failed with code (413)", error.exception());
353364
}
365+
366+
@Test
367+
public void testGenericImporterPassingJobMetadata() throws Exception {
368+
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
369+
GenericImporter<IdOnlyContainerResource, String> importer =
370+
getImporter(
371+
importerClass,
372+
container ->
373+
List.of(
374+
new ImportableData<>(
375+
new GenericPayload<>(container.getId(), "schemasource"),
376+
container.getId(),
377+
container.getId())));
378+
webServer.setDispatcher(getDispatcher());
379+
380+
importer.importItem(
381+
MOCK_JOB_ID,
382+
executor,
383+
new TokensAndUrlAuthData(
384+
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
385+
new IdOnlyContainerResource("id"));
386+
387+
assertEquals(1, webServer.getRequestCount());
388+
RecordedRequest request = webServer.takeRequest();
389+
assertEquals("POST", request.getMethod());
390+
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
391+
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
392+
assertTrue(executor.getErrors().isEmpty());
393+
}
354394
}

0 commit comments

Comments
 (0)