Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ private <T> boolean importSingleFileItem(
dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null);
MediaType mimeType =
Optional.ofNullable(MediaType.parse(data.getFileMimeType())).orElse(OCTET_STREAM);
Request request =

Request.Builder builder =
new Request.Builder()
.url(endpoint)
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
Expand All @@ -71,8 +72,13 @@ private <T> boolean importSingleFileItem(
.setType(MULTIPART_RELATED)
.addPart(RequestBody.create(JSON, om.writeValueAsBytes(data.getJsonData())))
.addPart(MultipartBody.create(mimeType, tempFile))
.build())
.build();
.build());

if (this.recurringJobId.isPresent()) {
builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString());
}

Request request = builder.build();

try (Response response = client.newCall(request).execute()) {
return parseResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public String toString() {
ObjectMapper om = new ObjectMapper();
Map<UUID, OAuthTokenManager> jobTokenManagerMap = new HashMap<>();
protected final String exportService;
protected final Optional<UUID> recurringJobId;

static final MediaType JSON = MediaType.parse("application/json");

Expand All @@ -94,6 +95,7 @@ public GenericImporter(
this.endpoint = endpoint;
this.containerSerializer = containerSerializer;
this.exportService = JobMetadata.getExportService();
this.recurringJobId = JobMetadata.getRecurringJobId();
configureObjectMapper(om);
}

Expand Down Expand Up @@ -162,14 +164,18 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti
boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException, DestinationMemoryFullException {

Request request =
new Request.Builder()
.url(endpoint)
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
.addHeader("X-DTP-Export-Service", this.exportService)
.addHeader("X-DTP-Job-Id", jobId.toString())
.post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData())))
.build();
Request.Builder builder = new Request.Builder()
.url(endpoint)
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
.addHeader("X-DTP-Export-Service", this.exportService)
.addHeader("X-DTP-Job-Id", jobId.toString())
.post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData())));

if (this.recurringJobId.isPresent()) {
builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString());
}

Request request = builder.build();

try (Response response = client.newCall(request).execute()) {
return parseResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import okhttp3.Headers;
import okhttp3.mockwebserver.MockResponse;
Expand Down Expand Up @@ -42,6 +43,7 @@ public InputStreamWrapper getStream(UUID jobId, String key) throws IOException {
}
};
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001");
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
private MockedStatic<JobMetadata> jobMetadataMock;

Expand Down Expand Up @@ -220,4 +222,43 @@ public void testGenericFileImporterPassingJobMetadata() throws Exception {
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
assertTrue(executor.getErrors().isEmpty());
}


@Test
public void testGenericFileImporterPassingJobMetadataRecurringJob() throws Exception {
jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID));

GenericFileImporter<IdOnlyContainerResource, String> importer =
new GenericFileImporter<>(
container ->
Arrays.asList(
new ImportableFileData<>(
new CachedDownloadableItem(container.getId(), container.getId()),
"video/mp4",
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())),
new AppCredentials("key", "secret"),
webServer.url("/id").url(),
dataStore,
monitor);
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
webServer.enqueue(new MockResponse().setResponseCode(201).setBody("OK"));

importer.importItem(
MOCK_JOB_ID,
executor,
new TokensAndUrlAuthData(
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
new IdOnlyContainerResource("id"));

assertEquals(1, webServer.getRequestCount());

RecordedRequest request = webServer.takeRequest();
assertEquals("POST", request.getMethod());
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id"));
assertTrue(executor.getErrors().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class GenericImporterTest {
@Parameter public String importerClass;
private MockWebServer webServer;
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001");
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
private MockedStatic<JobMetadata> jobMetadataMock;

Expand Down Expand Up @@ -391,4 +393,37 @@ public void testGenericImporterPassingJobMetadata() throws Exception {
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
assertTrue(executor.getErrors().isEmpty());
}

@Test
public void testGenericImporterPassingJobMetadataRecurringJob() throws Exception {
jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID));

InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
GenericImporter<IdOnlyContainerResource, String> importer =
getImporter(
importerClass,
container ->
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.setDispatcher(getDispatcher());

importer.importItem(
MOCK_JOB_ID,
executor,
new TokensAndUrlAuthData(
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
new IdOnlyContainerResource("id"));

assertEquals(1, webServer.getRequestCount());
RecordedRequest request = webServer.takeRequest();
assertEquals("POST", request.getMethod());
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id"));
assertTrue(executor.getErrors().isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Instant;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import javax.annotation.Nullable;
import org.datatransferproject.types.common.models.DataVertical;

Expand Down Expand Up @@ -38,6 +39,7 @@ public abstract class PortabilityJob {
private static final String EXPORT_ENCRYPTED_INITIAL_AUTH_DATA =
"EXPORT_ENCRYPTED_INITIAL_AUTH_DATA";
private static final String JOB_STATE = "JOB_STATE";
private static final String RECURRING_JOB_ID = "RECURRING_JOB_ID";
private static final String TRANSFER_MODE = "TRANSFER_MODE";
private static final String FAILURE_REASON = "FAILURE_REASON";
private static final String NUMBER_OF_FAILED_FILES_KEY = "NUM_FAILED_FILES";
Expand Down Expand Up @@ -110,6 +112,11 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
? DataVertical.fromDataType((String) properties.get(DATA_TYPE_KEY))
: null;

UUID recurringJobId =
properties.containsKey(RECURRING_JOB_ID)
? UUID.fromString((String) properties.get(RECURRING_JOB_ID))
: null;

return PortabilityJob.builder()
.setState(state)
.setExportService((String) properties.get(EXPORT_SERVICE_KEY))
Expand All @@ -135,6 +142,7 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
.setUserLocale(userLocale)
.setUserAlias(userAlias)
.setTransferMode(transferMode)
.setRecurringJobId(recurringJobId)
.build();
}

Expand Down Expand Up @@ -197,6 +205,10 @@ private static void isSet(String... strings) {
@JsonProperty("transferMode")
public abstract TransferMode transferMode();

@Nullable
@JsonProperty("recurringJobId")
public abstract UUID recurringJobId();

public abstract PortabilityJob.Builder toBuilder();

public Map<String, Object> toMap() {
Expand Down Expand Up @@ -263,6 +275,10 @@ public Map<String, Object> toMap() {
builder.put(TRANSFER_MODE, transferMode().toString());
}

if (null != recurringJobId()) {
builder.put(RECURRING_JOB_ID, recurringJobId());
}

return builder.build();
}

Expand Down Expand Up @@ -361,6 +377,10 @@ public Builder setAndValidateJobAuthorization(JobAuthorization jobAuthorization)
@JsonProperty("transferMode")
public abstract Builder setTransferMode(TransferMode transferMode);

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("recurringJobId")
public abstract Builder setRecurringJobId(@Nullable UUID recurringJobId);

// For internal use only; clients should use setAndValidateJobAuthorization
protected abstract Builder setJobAuthorization(JobAuthorization jobAuthorization);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.UUID;
import org.datatransferproject.types.common.models.DataVertical;

Expand All @@ -34,12 +37,14 @@
public final class JobMetadata {
private static byte[] encodedPrivateKey = null;
private static UUID jobId = null;
private static UUID recurringJobId = null;
private static DataVertical dataType = null;
private static String exportService = null;
private static String importService = null;
private static Stopwatch stopWatch = null;

public static boolean isInitialized() {
// recurringJobId can be null and that's ok
return (jobId != null
&& encodedPrivateKey != null
&& dataType != null
Expand All @@ -64,9 +69,28 @@ static void init(
stopWatch = initStopWatch;
}

static void init(
UUID initJobId,
@Nullable UUID initRecurringJobId,
byte[] initEncodedPrivateKey,
DataVertical initDataType,
String initExportService,
String initImportService,
Stopwatch initStopWatch) {
Preconditions.checkState(!isInitialized(), "JobMetadata cannot be initialized twice");
jobId = initJobId;
recurringJobId = initRecurringJobId;
encodedPrivateKey = initEncodedPrivateKey;
dataType = initDataType;
exportService = initExportService;
importService = initImportService;
stopWatch = initStopWatch;
}

// TODO: remove this
static synchronized void reset() {
jobId = null;
recurringJobId = null;
encodedPrivateKey = null;
dataType = null;
exportService = null;
Expand All @@ -84,6 +108,11 @@ public static UUID getJobId() {
return jobId;
}

public static Optional<UUID> getRecurringJobId() {
Preconditions.checkState(isInitialized(), "JobMetadata must be initialized");
return Optional.ofNullable(recurringJobId);
}

public static DataVertical getDataType() {
Preconditions.checkState(isInitialized(), "JobMetadata must be initialized");
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private boolean tryToClaimJob(UUID jobId, WorkerKeyPair keyPair) {

JobMetadata.init(
Comment thread
alexeyqu marked this conversation as resolved.
jobId,
existingJob.recurringJobId(),
keyPair.getEncodedPrivateKey(),
existingJob.transferDataType(),
existingJob.exportService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.datatransferproject.transfer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -52,6 +54,7 @@
public class JobProcessorTest {

private UUID jobId;
private UUID recurringJobId;
private ExportInformation exportInfo;
private AuthData exportAuthData;
private AuthData importAuthData;
Expand Down Expand Up @@ -96,6 +99,7 @@ public TestJobProcessor(JobStore jobStore,
public void setUp() throws JsonProcessingException {
importAuthData = exportAuthData = Mockito.mock(AuthData.class);
jobId = UUID.randomUUID();
recurringJobId = UUID.randomUUID();
exportInfo = Mockito.mock(ExportInformation.class);
copier = Mockito.mock(InMemoryDataCopier.class);
importSignalHandlerProvider = (Provider<SignalHandler>) Mockito.mock(Provider.class);
Expand Down Expand Up @@ -213,6 +217,9 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy
"",
"",
Stopwatch.createUnstarted());

assertTrue(JobMetadata.getRecurringJobId().isEmpty());

Mockito.doThrow(new CopyException("error", new Exception())).when(copier)
.copy(importAuthData, exportAuthData, jobId, Optional.of(exportInfo));
processor.processJob();
Expand All @@ -226,4 +233,29 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy
Mockito.verify(exportSignalHandler, Mockito.never())
.sendSignal(any(SignalRequest.class), eq(exportAuthData), any(Monitor.class));
}

@Test
public void testInitJobMetadataWithRecurringId() throws CopyException,
IOException, RetryException {
processor = Mockito.spy(
new TestJobProcessor(jobStore,
copier,
objectMapper,
decryptionService,
Boolean.FALSE,
importSignalHandlerProvider,
exportSignalHandlerProvider));

JobMetadata.init(
jobId,
recurringJobId,
"".getBytes(),
DataVertical.BLOBS,
"",
"",
Stopwatch.createUnstarted());

assertEquals(jobId, JobMetadata.getJobId());
assertEquals(recurringJobId, JobMetadata.getRecurringJobId().get());
}
}