Skip to content

Commit 52e42a3

Browse files
committed
Read all connection test
1 parent 5516bcf commit 52e42a3

2 files changed

Lines changed: 108 additions & 3 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.kurrent.dbclient;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Publisher;
5+
import org.reactivestreams.Subscription;
6+
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.ExecutionException;
9+
10+
public class Main {
11+
public static void main(String[] args) throws InterruptedException, ExecutionException {
12+
System.setProperty("org.slf4j.simpleLogger.log.io.kurrent.dbclient", "trace");
13+
System.setProperty("org.slf4j.simpleLogger.log.io.netty", "trace");
14+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
15+
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
16+
System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "HH:mm:ss.SSS");
17+
18+
KurrentDBClientSettings settings = KurrentDBConnectionString.parseOrThrow("kurrentdb://localhost:2113?tls=false");
19+
KurrentDBClient client = KurrentDBClient.create(settings);
20+
21+
ReadAllOptions options = ReadAllOptions.get()
22+
.forwards()
23+
.fromStart();
24+
25+
Publisher<ReadMessage> publisher = client.readAllReactive(options);
26+
27+
final CountDownLatch latch = new CountDownLatch(1);
28+
publisher.subscribe(new Subscriber<ReadMessage>() {
29+
@Override
30+
public void onSubscribe(Subscription subscription) {
31+
}
32+
33+
@Override
34+
public void onNext(ReadMessage readMessage) {
35+
RecordedEvent event = readMessage.getEvent().getOriginalEvent();
36+
37+
if (!event.getEventType().startsWith("$")) {
38+
System.out.println("Event: " + event.getEventType());
39+
}
40+
}
41+
42+
@Override
43+
public void onError(Throwable throwable) {
44+
System.out.println("Error type: " + throwable.getClass().getSimpleName());
45+
System.out.println("Error message: " + throwable.getMessage());
46+
latch.countDown();
47+
}
48+
49+
@Override
50+
public void onComplete() {
51+
latch.countDown();
52+
}
53+
});
54+
55+
latch.await();
56+
}
57+
}

src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,55 @@
11
package io.kurrent.dbclient.connection;
22

3+
import io.grpc.Status;
4+
import io.grpc.StatusRuntimeException;
35
import io.kurrent.dbclient.*;
4-
import org.junit.jupiter.api.Assertions;
5-
import org.junit.jupiter.api.Test;
6-
import org.junit.jupiter.api.Timeout;
6+
import io.kurrent.dbclient.databases.DockerContainerDatabase;
7+
import org.junit.jupiter.api.*;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
710

811
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.ExecutionException;
913
import java.util.concurrent.TimeUnit;
1014
import java.util.concurrent.atomic.AtomicBoolean;
1115
import java.util.concurrent.atomic.AtomicInteger;
1216
import java.util.concurrent.atomic.AtomicReference;
1317

1418
public class ConnectionShutdownTests {
19+
static private DockerContainerDatabase database;
20+
static private Logger logger;
21+
22+
@BeforeEach
23+
public void setup() {
24+
database = (DockerContainerDatabase) DatabaseFactory.spawn();
25+
logger = LoggerFactory.getLogger(PersistentSubscriptionsTests.class);
26+
}
27+
28+
@AfterEach
29+
public void cleanup() {
30+
unpauseDatabase();
31+
database.dispose();
32+
}
33+
34+
@Test
35+
@Timeout(value = 30, unit = TimeUnit.SECONDS)
36+
public void testCallTerminationWhenServerUnreachable() throws Throwable {
37+
KurrentDBClient client = database.defaultClient();
38+
39+
ReadResult initialResult = client.readAll(ReadAllOptions.get()).get(5, TimeUnit.SECONDS);
40+
41+
Assertions.assertFalse(initialResult.getEvents().isEmpty());
42+
43+
pauseDatabase();
44+
45+
ExecutionException e = Assertions.assertThrows(ExecutionException.class, () ->
46+
client.readAll().get(30, TimeUnit.SECONDS)
47+
);
48+
49+
StatusRuntimeException status = (StatusRuntimeException) e.getCause();
50+
Assertions.assertEquals(Status.Code.UNAVAILABLE, status.getStatus().getCode());
51+
}
52+
1553
@Test
1654
@Timeout(value = 1, unit = TimeUnit.MINUTES)
1755
public void testDatabaseCleanupWithActiveSubscription() throws Throwable {
@@ -59,4 +97,14 @@ public void onCancelled(Subscription subscription, Throwable throwable) {
5997
Throwable ex = reconnectError.get();
6098
Assertions.assertInstanceOf(ConnectionShutdownException.class, ex.getCause());
6199
}
100+
101+
static void pauseDatabase() {
102+
logger.debug("Pausing database container: {}", database.getContainerId());
103+
database.getDockerClient().pauseContainerCmd(database.getContainerId()).exec();
104+
}
105+
106+
static void unpauseDatabase() {
107+
logger.debug("Unpausing database container: {}", database.getContainerId());
108+
database.getDockerClient().unpauseContainerCmd(database.getContainerId()).exec();
109+
}
62110
}

0 commit comments

Comments
 (0)