Skip to content

Commit ad47b83

Browse files
committed
test projection
1 parent 1f9e4d5 commit ad47b83

7 files changed

Lines changed: 123 additions & 10 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
target
44
.bsp/
55
.DS_Store
6-
.bloop
6+
.bloop
7+
*.iml

thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ public Source<EventEnvelope<E, Meta, Context>, NotUsed> loadEventsByQuery(Tuple0
118118
@Override
119119
public Source<EventEnvelope<E, Meta, Context>, NotUsed> loadEventsByQuery(Query query) {
120120
return Source.from(eventStore)
121-
.filter(e -> {
122-
return Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true);
123-
});
121+
.filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true));
124122
}
125123

126124
@Override

thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.ExecutionException;
1616
import java.util.concurrent.Executors;
1717

18+
import fr.maif.eventsourcing.Projection;
19+
import fr.maif.eventsourcing.datastore.TestProjection;
1820
import org.apache.kafka.clients.consumer.ConsumerConfig;
1921
import org.apache.kafka.clients.consumer.ConsumerRecord;
2022
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -58,6 +60,7 @@ public class JooqKafkaTckImplementation extends DataStoreVerification<Connection
5860
private TestEventFormat eventFormat;
5961
private PostgreSQLContainer postgres;
6062
private KafkaContainer kafka;
63+
private Projection testProjection;
6164

6265
private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" +
6366
" id UUID primary key,\n" +
@@ -90,6 +93,7 @@ public void tearDown() throws InterruptedException {
9093

9194
@BeforeClass(alwaysRun = true)
9295
public void initClass() {
96+
9397
this.tableNames = new TableNames("test_journal", "test_sequence_num");
9498
this.eventFormat = new TestEventFormat();
9599

@@ -102,6 +106,7 @@ public void initClass() {
102106

103107
@BeforeMethod(alwaysRun = true)
104108
public void init() throws SQLException {
109+
this.testProjection = new TestProjection();
105110
this.dataSource = new PGSimpleDataSource();
106111
dataSource.setUrl(postgres.getJdbcUrl());
107112
dataSource.setUser(postgres.getUsername());
@@ -128,7 +133,7 @@ public EventProcessor<String, TestState, TestCommand, TestEvent, Connection, Tup
128133
.withEventHandler(new TestEventHandler())
129134
.withDefaultAggregateStore()
130135
.withCommandHandler(new TestCommandHandler<>())
131-
.withNoProjections()
136+
.withProjections(this.testProjection)
132137
.build();
133138

134139

@@ -228,6 +233,11 @@ public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String
228233
return envelopes;
229234
}
230235

236+
@Override
237+
public Integer readProjection() {
238+
return ((TestProjection)this.testProjection).getCount();
239+
}
240+
231241
private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
232242
Properties properties = new Properties();
233243
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);

thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.vavr.control.Either;
1313
import io.vavr.control.Option;
1414

15+
import io.vavr.control.Try;
1516
import org.apache.kafka.clients.consumer.ConsumerConfig;
1617
import org.apache.kafka.clients.consumer.ConsumerRecord;
1718
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -168,11 +169,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() {
168169
submitValidCommand(eventProcessor, "1");
169170

170171
restartBroker();
171-
try {
172-
Thread.sleep(10000);
173-
} catch (InterruptedException e) {
174-
throw new RuntimeException(e);
175-
}
172+
sleep();
176173
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic));
177174

178175
cleanup(eventProcessor);
@@ -199,6 +196,55 @@ public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
199196
}
200197
}
201198

199+
200+
@Override
201+
@Test
202+
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
203+
String topic = randomKafkaTopic();
204+
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
205+
submitValidCommand(eventProcessor, "1");
206+
sleep();
207+
208+
cleanup(eventProcessor);
209+
assertThat(readProjection()).isEqualTo(1);
210+
}
211+
@Override
212+
@Test
213+
public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){
214+
String topic = randomKafkaTopic();
215+
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
216+
shutdownBroker();
217+
submitValidCommand(eventProcessor, "1");
218+
sleep();
219+
restartBroker();
220+
sleep();
221+
cleanup(eventProcessor);
222+
assertThat(readProjection()).isEqualTo(1);
223+
}
224+
@Override
225+
@Test
226+
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
227+
String topic = randomKafkaTopic();
228+
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
229+
shutdownDatabase();
230+
try {
231+
submitValidCommand(eventProcessor, "1");
232+
}catch (Throwable t){}
233+
sleep();
234+
cleanup(eventProcessor);
235+
assertThat(readProjection()).isEqualTo(0);
236+
restartDatabase();
237+
}
238+
239+
private void sleep() {
240+
try {
241+
Thread.sleep(10000);
242+
} catch (InterruptedException e) {
243+
throw new RuntimeException(e);
244+
}
245+
}
246+
247+
202248
@Override
203249
public Either<String, ProcessingSuccess<TestState, TestEvent, Tuple0, Tuple0, Tuple0>> submitValidCommand(
204250
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor,

thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
1919
Option<Ste> readState(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
2020
void submitDeleteCommand(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
2121
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic);
22+
Integer readProjection();
2223
void shutdownBroker();
2324
void restartBroker();
2425
void shutdownDatabase();
@@ -38,6 +39,11 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
3839
void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst();
3940
void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable();
4041

42+
void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright();
43+
void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst();
44+
void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken();
45+
46+
4147
List<EventEnvelope<Evt, Meta, Context>> readFromDataStore(EventStore<TxCtx, TestEvent, Tuple0, Tuple0> eventStore);
4248

4349
default void cleanup(
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package fr.maif.eventsourcing.datastore;
2+
3+
import fr.maif.eventsourcing.EventEnvelope;
4+
import fr.maif.eventsourcing.Projection;
5+
import io.vavr.Tuple;
6+
import io.vavr.Tuple0;
7+
import io.vavr.collection.List;
8+
import io.vavr.concurrent.Future;
9+
10+
import java.sql.Connection;
11+
12+
public class TestProjection implements Projection<Connection, TestEvent, Tuple0, Tuple0> {
13+
private int counter = 0;
14+
15+
@Override
16+
public Future<Tuple0> storeProjection(Connection connection, List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes) {
17+
return Future.of(() -> {
18+
envelopes.forEach(envelope -> {
19+
if (envelope.event instanceof TestEvent.SimpleEvent) {
20+
counter++;
21+
}
22+
});
23+
return Tuple.empty();
24+
});
25+
}
26+
27+
public int getCount() {
28+
return counter;
29+
}
30+
31+
32+
}

thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.concurrent.ExecutionException;
55
import java.util.function.Function;
66

7+
import fr.maif.eventsourcing.Projection;
78
import org.mockito.Mockito;
89
import org.testng.annotations.BeforeMethod;
910

@@ -22,6 +23,7 @@ public class InMemoryDataStoreTest extends DataStoreVerification<Tuple0> {
2223
public InMemoryEventStore<TestEvent, Tuple0, Tuple0> eventStore;
2324
public EventProcessor<String, TestState, TestCommand, TestEvent, Tuple0, Tuple0, Tuple0, Tuple0> eventProcessor;
2425

26+
2527
@BeforeMethod(alwaysRun = true)
2628
public void init() {
2729
this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem));
@@ -46,6 +48,24 @@ public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String
4648
}
4749
}
4850

51+
@Override
52+
public Integer readProjection() {
53+
// Not implemented for in memory
54+
return null;
55+
}
56+
57+
@Override
58+
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
59+
// Not implemented for in memory
60+
}
61+
@Override
62+
public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){
63+
// Not implemented for in memory
64+
}
65+
@Override
66+
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
67+
// Not implemented for in memory
68+
}
4969
@Override
5070
public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
5171
// Not implemented for in memory

0 commit comments

Comments
 (0)