Skip to content

Commit f6c6ed9

Browse files
committed
tck async jooq
1 parent c65f1a1 commit f6c6ed9

13 files changed

Lines changed: 502 additions & 78 deletions

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ lazy val `thoth-kafka-goodies` = project
9292
)
9393

9494
lazy val `thoth-jooq-async` = project
95-
.dependsOn(`thoth-core`)
95+
.dependsOn(`thoth-core`, `thoth-tck`)
9696
.settings(
9797
sonatypeRepository := "https://s01.oss.sonatype.org/service/local",
9898
sonatypeCredentialHost := "s01.oss.sonatype.org",

thoth-jooq-async/build.sbt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@ libraryDependencies ++= Seq(
1717
"org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test,
1818
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
1919
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
20-
"net.aichler" % "jupiter-interface" % "0.9.1" % Test
20+
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
21+
"org.mockito" % "mockito-core" % "2.22.0" % Test,
22+
"org.testng" % "testng" % "6.3" % Test,
23+
"org.testcontainers" % "postgresql" % "1.15.0" % Test,
24+
"org.testcontainers" % "kafka" % "1.15.0" % Test
2125
)
2226

27+
28+
testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath)
29+
2330
javacOptions in Compile ++= Seq(
2431
"-source",
2532
"8",
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
package fr.maif.eventsourcing;
2+
3+
import akka.actor.ActorSystem;
4+
import akka.kafka.ProducerSettings;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.fasterxml.jackson.databind.node.ObjectNode;
7+
import fr.maif.eventsourcing.datastore.DataStoreVerification;
8+
import fr.maif.eventsourcing.datastore.TestCommand;
9+
import fr.maif.eventsourcing.datastore.TestCommandHandler;
10+
import fr.maif.eventsourcing.datastore.TestConsistentProjection;
11+
import fr.maif.eventsourcing.datastore.TestEvent;
12+
import fr.maif.eventsourcing.datastore.TestEventFormat;
13+
import fr.maif.eventsourcing.datastore.TestEventHandler;
14+
import fr.maif.eventsourcing.datastore.TestInstransactionProjection;
15+
import fr.maif.eventsourcing.datastore.TestInstransactionProjectionAsync;
16+
import fr.maif.eventsourcing.datastore.TestState;
17+
import fr.maif.eventsourcing.format.JacksonEventFormat;
18+
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
19+
import fr.maif.jooq.PgAsyncPool;
20+
import fr.maif.jooq.reactive.ReactivePgAsyncPool;
21+
import fr.maif.json.EventEnvelopeJson;
22+
import fr.maif.kafka.JsonSerializer;
23+
import fr.maif.kafka.KafkaSettings;
24+
import io.vavr.Tuple0;
25+
import io.vertx.core.Vertx;
26+
import io.vertx.pgclient.PgConnectOptions;
27+
import io.vertx.pgclient.PgPool;
28+
import io.vertx.sqlclient.PoolOptions;
29+
import org.apache.kafka.clients.consumer.ConsumerConfig;
30+
import org.apache.kafka.clients.consumer.ConsumerRecord;
31+
import org.apache.kafka.clients.consumer.ConsumerRecords;
32+
import org.apache.kafka.clients.consumer.KafkaConsumer;
33+
import org.apache.kafka.common.PartitionInfo;
34+
import org.apache.kafka.common.TopicPartition;
35+
import org.apache.kafka.common.serialization.StringDeserializer;
36+
import org.jooq.SQLDialect;
37+
import org.jooq.impl.DefaultConfiguration;
38+
import org.postgresql.ds.PGSimpleDataSource;
39+
import org.testcontainers.containers.GenericContainer;
40+
import org.testcontainers.containers.KafkaContainer;
41+
import org.testcontainers.containers.PostgreSQLContainer;
42+
import org.testcontainers.utility.DockerImageName;
43+
import org.testng.annotations.AfterClass;
44+
import org.testng.annotations.BeforeClass;
45+
import org.testng.annotations.BeforeMethod;
46+
47+
import java.io.IOException;
48+
import java.io.UncheckedIOException;
49+
import java.sql.Connection;
50+
import java.sql.ResultSet;
51+
import java.sql.SQLException;
52+
import java.time.Duration;
53+
import java.util.ArrayList;
54+
import java.util.Collections;
55+
import java.util.List;
56+
import java.util.Optional;
57+
import java.util.Properties;
58+
import java.util.UUID;
59+
import java.util.concurrent.CompletableFuture;
60+
import java.util.concurrent.ExecutionException;
61+
62+
public class JooqAsyncKafkaTCKImplementation extends DataStoreVerification<Connection> {
63+
64+
65+
public static final String DEFAULT_POSTGRE_TAG = "9.6.12";
66+
private static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
67+
private static final String DEFAULT_KAFKA_TAG = "5.4.3";
68+
private static final DockerImageName DEFAULT_POSTGRE_IMAGE_NAME = DockerImageName.parse("postgres");
69+
private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" +
70+
" id UUID primary key,\n" +
71+
" entity_id varchar(100) not null,\n" +
72+
" sequence_num bigint not null,\n" +
73+
" event_type varchar(100) not null,\n" +
74+
" version int not null,\n" +
75+
" transaction_id varchar(100) not null,\n" +
76+
" event jsonb not null,\n" +
77+
" metadata jsonb,\n" +
78+
" context jsonb,\n" +
79+
" total_message_in_transaction int default 1,\n" +
80+
" num_message_in_transaction int default 1,\n" +
81+
" emission_date timestamp not null default now(),\n" +
82+
" user_id varchar(100),\n" +
83+
" system_id varchar(100),\n" +
84+
" published boolean default false,\n" +
85+
" UNIQUE (entity_id, sequence_num)\n" +
86+
" );\n" +
87+
"CREATE TABLE IF NOT EXISTS test_projection (\n" +
88+
" counter int not null\n" +
89+
" );\n" +
90+
" CREATE SEQUENCE if not exists test_sequence_num;";
91+
private final String INIT_TABLE_QUERY = "TRUNCATE TABLE test_journal;\n" +
92+
" TRUNCATE TABLE test_projection;\n" +
93+
" INSERT INTO test_projection VALUES(0);\n" ;
94+
95+
private PGSimpleDataSource dataSource;
96+
private TableNames tableNames;
97+
private TestEventFormat eventFormat;
98+
private PostgreSQLContainer postgres;
99+
private KafkaContainer kafka;
100+
private Projection testProjection;
101+
private PgAsyncPool pgAsyncPool;
102+
private Vertx vertx;
103+
private PgPool pgPool;
104+
105+
private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
106+
Properties properties = new Properties();
107+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
108+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
109+
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
110+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
111+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
112+
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
113+
KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(properties);
114+
PartitionInfo partitionInfo = consumer.partitionsFor("foo").get(0);
115+
TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
116+
consumer.assign(Collections.singletonList(topicPartition));
117+
118+
long position = consumer.position(topicPartition);
119+
consumer.seekToEnd(Collections.singletonList(topicPartition));
120+
final long endOffset = consumer.position(topicPartition);
121+
122+
Optional<Long> result = Optional.empty();
123+
if (endOffset > 0 && endOffset > position) {
124+
result = Optional.of(consumer.position(topicPartition) - 1);
125+
}
126+
127+
consumer.close();
128+
return result;
129+
}
130+
131+
@AfterClass(alwaysRun = true)
132+
public void tearDown() throws InterruptedException {
133+
Thread.sleep(10000);
134+
postgres.stop();
135+
kafka.stop();
136+
this.vertx.close();
137+
this.pgPool.close();
138+
}
139+
140+
@BeforeClass(alwaysRun = true)
141+
public void initClass() {
142+
143+
this.tableNames = new TableNames("test_journal", "test_sequence_num");
144+
this.eventFormat = new TestEventFormat();
145+
146+
postgres = new PostgreSQLContainer(DEFAULT_POSTGRE_IMAGE_NAME.withTag(DEFAULT_POSTGRE_TAG));
147+
postgres.start();
148+
149+
kafka = new KafkaContainer(DEFAULT_KAFKA_IMAGE_NAME.withTag(DEFAULT_KAFKA_TAG));
150+
kafka.start();
151+
consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource);
152+
this.pgAsyncPool = pgAsyncPool(postgres);
153+
}
154+
155+
@BeforeMethod(alwaysRun = true)
156+
public void init() throws SQLException {
157+
this.testProjection = new TestInstransactionProjectionAsync();
158+
this.dataSource = new PGSimpleDataSource();
159+
dataSource.setUrl(postgres.getJdbcUrl());
160+
dataSource.setUser(postgres.getUsername());
161+
dataSource.setPassword(postgres.getPassword());
162+
// Override default setting, which wait indefinitely if database is down
163+
dataSource.setLoginTimeout(5);
164+
165+
dataSource.getConnection().prepareStatement(SCHEMA).execute();
166+
dataSource.getConnection().prepareStatement(INIT_TABLE_QUERY).execute();
167+
168+
169+
170+
171+
}
172+
173+
private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) {
174+
this.vertx = Vertx.vertx();
175+
DefaultConfiguration jooqConfig = new DefaultConfiguration();
176+
jooqConfig.setSQLDialect(SQLDialect.POSTGRES);
177+
178+
final PgConnectOptions options = new PgConnectOptions()
179+
.setPort(server.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT))
180+
.setHost(server.getContainerIpAddress())
181+
.setDatabase(server.getDatabaseName())
182+
.setUser(server.getUsername())
183+
.setPassword(server.getPassword());
184+
PoolOptions poolOptions = new PoolOptions().setMaxSize(50);
185+
this.pgPool = PgPool.pool(vertx, options, poolOptions);
186+
187+
return new ReactivePgAsyncPool(pgPool, jooqConfig);
188+
}
189+
190+
@Override
191+
public EventProcessor<String, TestState, TestCommand, TestEvent, Connection, Tuple0, Tuple0, Tuple0> eventProcessor(String topic) {
192+
193+
194+
return ReactivePostgresKafkaEventProcessor
195+
.withSystem(ActorSystem.create())
196+
.withPgAsyncPool(this.pgAsyncPool)
197+
.withTables(tableNames)
198+
.withTransactionManager()
199+
.withEventFormater(eventFormat)
200+
.withNoMetaFormater()
201+
.withNoContextFormater()
202+
.withKafkaSettings(topic, producerSettings(settings(), new TestEventFormat()))
203+
.withEventHandler(new TestEventHandler())
204+
.withDefaultAggregateStore()
205+
.withCommandHandler(new TestCommandHandler<>())
206+
.withProjections(this.testProjection)
207+
.build();
208+
}
209+
210+
@Override
211+
public String kafkaBootstrapUrl() {
212+
return kafka.getBootstrapServers();
213+
}
214+
215+
@Override
216+
public void shutdownBroker() {
217+
pauseContainer(kafka);
218+
}
219+
220+
@Override
221+
public void restartBroker() {
222+
unPauseContainer(kafka);
223+
}
224+
225+
@Override
226+
public void shutdownDatabase() {
227+
pauseContainer(postgres);
228+
}
229+
230+
@Override
231+
public void restartDatabase() {
232+
unPauseContainer(postgres);
233+
}
234+
235+
private void pauseContainer(GenericContainer container) {
236+
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
237+
}
238+
239+
private void unPauseContainer(GenericContainer container) {
240+
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
241+
}
242+
243+
private KafkaSettings settings() {
244+
return KafkaSettings.newBuilder(kafka.getBootstrapServers()).build();
245+
}
246+
247+
private ProducerSettings<String, EventEnvelope<TestEvent, Tuple0, Tuple0>> producerSettings(
248+
KafkaSettings kafkaSettings,
249+
JacksonEventFormat<String, TestEvent> eventFormat) {
250+
return kafkaSettings.producerSettings(actorSystem, JsonSerializer.of(
251+
eventFormat,
252+
JacksonSimpleFormat.empty(),
253+
JacksonSimpleFormat.empty()
254+
)
255+
);
256+
}
257+
258+
@Override
259+
public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic) {
260+
try {
261+
Thread.sleep(1000);
262+
} catch (InterruptedException e) {
263+
throw new RuntimeException(e);
264+
}
265+
String groupId = "reader-" + UUID.randomUUID();
266+
Optional<Long> maybeLastOffset = getEndOffsetIfNotReached(topic, kafkaBootstrapUrl, groupId);
267+
if (!maybeLastOffset.isPresent()) {
268+
return Collections.emptyList();
269+
}
270+
long lastOffset = maybeLastOffset.get();
271+
272+
Properties props = new Properties();
273+
props.put("bootstrap.servers", kafkaBootstrapUrl);
274+
props.put("group.id", groupId);
275+
props.put("key.deserializer",
276+
"org.apache.kafka.common.serialization.StringDeserializer");
277+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
278+
props.put("value.deserializer",
279+
"org.apache.kafka.common.serialization.StringDeserializer");
280+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
281+
282+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
283+
284+
consumer.subscribe(Collections.singletonList(topic));
285+
286+
boolean running = true;
287+
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes = new ArrayList<>();
288+
while (running) {
289+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
290+
for (ConsumerRecord<String, String> record : records) {
291+
final long offset = record.offset();
292+
if (offset >= lastOffset) {
293+
running = false;
294+
}
295+
envelopes.add(parsEnvelope(record.value()));
296+
}
297+
consumer.commitSync();
298+
}
299+
consumer.close();
300+
return envelopes;
301+
}
302+
303+
@Override
304+
public Integer readProjection() {
305+
try(final ResultSet resultSet = this.dataSource.getConnection()
306+
.prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) {
307+
resultSet.next();
308+
return resultSet.getInt(1);
309+
} catch (SQLException throwables) {
310+
throwables.printStackTrace();
311+
}
312+
return null;
313+
}
314+
315+
@Override
316+
public Integer readConsistentProjection() {
317+
return consistentProjection.getCount();
318+
}
319+
320+
public EventEnvelope<TestEvent, Tuple0, Tuple0> parsEnvelope(String value) {
321+
try {
322+
ObjectMapper mapper = new ObjectMapper();
323+
ObjectNode node = (ObjectNode) mapper.readTree(value);
324+
CompletableFuture<EventEnvelope<TestEvent, Tuple0, Tuple0>> future = new CompletableFuture<>();
325+
EventEnvelopeJson.deserialize(
326+
node,
327+
eventFormat,
328+
JacksonSimpleFormat.empty(),
329+
JacksonSimpleFormat.empty(),
330+
(event, err) -> {
331+
future.completeExceptionally(new RuntimeException(err.toString()));
332+
},
333+
future::complete
334+
);
335+
return future.get();
336+
} catch (IOException e) {
337+
throw new UncheckedIOException(e);
338+
} catch (InterruptedException e) {
339+
throw new RuntimeException(e);
340+
} catch (ExecutionException e) {
341+
throw new RuntimeException(e);
342+
}
343+
}
344+
}
345+
346+
347+

0 commit comments

Comments
 (0)