Skip to content

Commit 44f89aa

Browse files
committed
tck consitent projection
1 parent ce4d24e commit 44f89aa

5 files changed

Lines changed: 107 additions & 6 deletions

File tree

thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.Executors;
1717

1818
import fr.maif.eventsourcing.Projection;
19+
import fr.maif.eventsourcing.datastore.TestConsistentProjection;
1920
import fr.maif.eventsourcing.datastore.TestProjection;
2021
import org.apache.kafka.clients.consumer.ConsumerConfig;
2122
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -101,6 +102,7 @@ public void initClass() {
101102
postgres.start();
102103
kafka = new KafkaContainer();
103104
kafka.start();
105+
consistentProjection = new TestConsistentProjection(actorSystem,kafka.getBootstrapServers(),eventFormat,dataSource);
104106
}
105107

106108

@@ -238,6 +240,11 @@ public Integer readProjection() {
238240
return ((TestProjection)this.testProjection).getCount();
239241
}
240242

243+
@Override
244+
public Integer readConsistentProjection() {
245+
return consistentProjection.getCount();
246+
}
247+
241248
private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
242249
Properties properties = new Properties();
243250
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);

thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public abstract class DataStoreVerification<TxCtx> implements DataStoreVerificat
4545
public abstract EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor(String topic);
4646
public abstract String kafkaBootstrapUrl();
4747

48+
protected TestConsistentProjection consistentProjection;
4849
@Override
4950
@Test
5051
public void required_submitValidSingleEventCommandMustWriteEventInDataStore() {
@@ -236,6 +237,33 @@ public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken
236237
restartDatabase();
237238
}
238239

240+
@Override
241+
public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() {
242+
243+
String topic = randomKafkaTopic();
244+
consistentProjection.init(topic);
245+
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
246+
submitValidCommand(eventProcessor, "1");
247+
sleep();
248+
249+
cleanup(eventProcessor);
250+
assertThat(readConsistentProjection()).isEqualTo(1);
251+
}
252+
253+
@Override
254+
public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() {
255+
String topic = randomKafkaTopic();
256+
consistentProjection.init(topic);
257+
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
258+
shutdownBroker();
259+
submitValidCommand(eventProcessor, "1");
260+
sleep();
261+
restartBroker();
262+
sleep();
263+
cleanup(eventProcessor);
264+
assertThat(readConsistentProjection()).isEqualTo(1);
265+
}
266+
239267
private void sleep() {
240268
try {
241269
Thread.sleep(10000);

thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
2020
void submitDeleteCommand(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
2121
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic);
2222
Integer readProjection();
23+
Integer readConsistentProjection();
2324
void shutdownBroker();
2425
void restartBroker();
2526
void shutdownDatabase();
@@ -43,7 +44,8 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
4344
void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst();
4445
void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken();
4546

46-
47+
void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright();
48+
void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst();
4749
List<EventEnvelope<Evt, Meta, Context>> readFromDataStore(EventStore<TxCtx, TestEvent, Tuple0, Tuple0> eventStore);
4850

4951
default void cleanup(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package fr.maif.eventsourcing.datastore;
2+
3+
import akka.actor.ActorSystem;
4+
import fr.maif.projections.EventuallyConsistentProjection;
5+
import io.vavr.Tuple;
6+
import io.vavr.concurrent.Future;
7+
8+
import javax.sql.DataSource;
9+
import java.sql.PreparedStatement;
10+
import java.sql.SQLException;
11+
12+
public class TestConsistentProjection {
13+
14+
private int counter = 0;
15+
private final ActorSystem actorSystem;
16+
private final String bootstrapServer;
17+
private final TestEventFormat eventFormat;
18+
private final DataSource dataSource;
19+
20+
public TestConsistentProjection(
21+
ActorSystem actorSystem,
22+
String bootstrapServer,
23+
TestEventFormat eventFormat,
24+
DataSource dataSource) {
25+
this.actorSystem = actorSystem;
26+
this.eventFormat = eventFormat;
27+
this.dataSource = dataSource;
28+
this.bootstrapServer =bootstrapServer;
29+
}
30+
31+
32+
public void init(String topic) {
33+
this.counter = 0;
34+
EventuallyConsistentProjection.create(
35+
ActorSystem.create(),
36+
"TestConsistentProjection",
37+
EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer),
38+
eventFormat,
39+
envelope ->
40+
Future.of(() -> {
41+
if (envelope.event instanceof TestEvent.SimpleEvent){
42+
counter++;
43+
}
44+
return Tuple.empty();
45+
})
46+
47+
).start();
48+
}
49+
50+
public int getCount() {
51+
return counter;
52+
}
53+
}

thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44
import java.util.concurrent.ExecutionException;
55
import java.util.function.Function;
66

7-
import fr.maif.eventsourcing.Projection;
87
import org.mockito.Mockito;
98
import org.testng.annotations.BeforeMethod;
109

11-
import akka.actor.ActorSystem;
1210
import akka.stream.javadsl.Sink;
1311
import fr.maif.eventsourcing.EventEnvelope;
1412
import fr.maif.eventsourcing.EventProcessor;
15-
import fr.maif.eventsourcing.EventStore;
1613
import fr.maif.eventsourcing.TransactionManager;
1714
import fr.maif.eventsourcing.impl.InMemoryEventStore;
1815
import io.vavr.Tuple;
@@ -54,6 +51,12 @@ public Integer readProjection() {
5451
return null;
5552
}
5653

54+
@Override
55+
public Integer readConsistentProjection() {
56+
// Not implemented for in memory
57+
return null;
58+
}
59+
5760
@Override
5861
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
5962
// Not implemented for in memory
@@ -66,17 +69,25 @@ public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(
6669
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
6770
// Not implemented for in memory
6871
}
72+
6973
@Override
7074
public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
7175
// Not implemented for in memory
7276
}
73-
74-
7577
@Override
7678
public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() {
7779
// Not implemented for in memory
7880
}
7981

82+
@Override
83+
public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() {
84+
// Not implemented for in memory
85+
}
86+
@Override
87+
public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() {
88+
// Not implemented for in memory
89+
}
90+
8091
@Override
8192
public void shutdownBroker() {
8293
throw new RuntimeException("Not implemented for in memory");

0 commit comments

Comments
 (0)