Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@FixMethodOrder()
class PipelinesWithAcksIT {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class);
private static final int NUM_RECORDS = 100;
private static final int WAIT_TIME_MS = 60000;
private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT";
private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml";
private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml";
Expand Down Expand Up @@ -71,7 +73,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -84,44 +86,41 @@ void simple_pipeline_with_single_record() {
@Test
void simple_pipeline_with_multiple_records() {
setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), equalTo(NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void two_pipelines_with_multiple_records() {
setUp(TWO_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), equalTo(NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void three_pipelines_with_multiple_records() {
setUp(THREE_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), equalTo(NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}
Expand All @@ -132,7 +131,7 @@ void three_pipelines_with_all_unrouted_records() {
final int numRecords = 2;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertNotNull(inMemorySourceAccessor);
assertNotNull(inMemorySourceAccessor.getAckReceived());
Expand All @@ -145,106 +144,99 @@ void three_pipelines_with_all_unrouted_records() {
@Test
void three_pipelines_with_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), lessThanOrEqualTo(numRecords));
assertThat(outputRecords.size(), lessThanOrEqualTo(NUM_RECORDS));
});
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
}

@Test
void three_pipelines_with_default_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST);
final int numRecords = 10;

inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(2*numRecords));
assertThat(outputRecords.size(), equalTo(2*NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void two_parallel_pipelines_multiple_records() {
setUp(TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(2*numRecords));
assertThat(outputRecords.size(), equalTo(2*NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void three_pipelines_multi_sink_multiple_records() {
setUp(THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(3*numRecords));
assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void one_pipeline_three_sinks_multiple_records() {
setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(3*numRecords));
assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}

@Test
void one_pipeline_ack_expiry_multiple_records() {
setUp(ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), equalTo(NUM_RECORDS));
});
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(null));
}

@Test
void one_pipeline_three_sinks_negative_ack_multiple_records() {
setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS);
inMemorySinkAccessor.setResult(false);

await().atMost(40000, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(3*numRecords));
assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS));
});
assertFalse(inMemorySourceAccessor.getAckReceived());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,42 +126,42 @@ void test_default_route() {

inMemorySourceAccessor.submit(TESTING_KEY, allEvents);

await().atMost(2, TimeUnit.SECONDS)
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALL_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty()));
});

final List<Record<Event>> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY);
final List<Record<Event>> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY);

assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size()));
assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size()));

assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream()
.filter(alphaEvents::contains).toArray()));
assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream()
.filter(alphaEvents::contains).toArray()));

final List<Record<Event>> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY);
final List<Record<Event>> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY);

assertThat(actualBetaRecords.size(), equalTo(betaEvents.size()));
assertThat(actualBetaRecords.size(), equalTo(betaEvents.size()));

assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream()
.filter(betaEvents::contains).toArray()));
assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream()
.filter(betaEvents::contains).toArray()));

final List<Record<Event>> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY);
final List<Record<Event>> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY);

assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size()));
assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(defaultEvents::contains).toArray()));
assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size()));
assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(defaultEvents::contains).toArray()));

final List<Record<Event>> actualAlphaDefaultRecords = new ArrayList<>();
actualAlphaDefaultRecords.addAll(actualAlphaRecords);
actualAlphaDefaultRecords.addAll(actualDefaultRecords);
assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size()));
assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray()));
final List<Record<Event>> actualAlphaDefaultRecords = new ArrayList<>();
actualAlphaDefaultRecords.addAll(actualAlphaRecords);
actualAlphaDefaultRecords.addAll(actualDefaultRecords);
assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size()));
assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray()));
});

}

Expand Down
Loading