Skip to content

Commit 8b17711

Browse files
committed
works but need to run the test only on CI version
1 parent 70af183 commit 8b17711

8 files changed

Lines changed: 84 additions & 14 deletions

File tree

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
strategy:
1515
fail-fast: false
1616
matrix:
17-
test: [Streams, PersistentSubscriptions, Telemetry]
17+
test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend]
1818

1919
runs-on: ubuntu-latest
2020
steps:
@@ -101,7 +101,7 @@ jobs:
101101
strategy:
102102
fail-fast: false
103103
matrix:
104-
test: [Streams, PersistentSubscriptions]
104+
test: [Streams, PersistentSubscriptions, MultiStreamAppendTests]
105105

106106
runs-on: ubuntu-latest
107107
steps:

src/main/java/io/kurrent/dbclient/FeatureFlags.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ class FeatureFlags {
77
public final static int PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM = 8;
88
public final static int PERSISTENT_SUBSCRIPTION_GET_INFO = 16;
99
public final static int PERSISTENT_SUBSCRIPTION_TO_ALL = 32;
10+
public final static int MULTI_STREAM_APPEND = 64;
1011
public final static int PERSISTENT_SUBSCRIPTION_MANAGEMENT = PERSISTENT_SUBSCRIPTION_LIST | PERSISTENT_SUBSCRIPTION_REPLAY | PERSISTENT_SUBSCRIPTION_GET_INFO | PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM;
1112
}

src/main/java/io/kurrent/dbclient/GossipClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class GossipClient {
2121

2222
public GossipClient(KurrentDBClientSettings settings, ManagedChannel channel) {
2323
_channel = channel;
24-
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), (long)settings.getGossipTimeout());
24+
_stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), settings.getGossipTimeout());
2525
}
2626

2727
public void shutdown() {

src/main/java/io/kurrent/dbclient/GrpcUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,14 @@ static public StreamsOuterClass.ReadReq.Options.StreamOptions toStreamOptions(St
113113
}
114114

115115
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options) {
116-
return configureStub(stub, settings, options, null);
116+
return configureStub(stub, settings, options, null, true);
117117
}
118118

119-
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, Long forceDeadlineInMs) {
119+
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, long forceDeadlineInMs) {
120+
return configureStub(stub, settings, options, forceDeadlineInMs, true);
121+
}
122+
123+
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase<O> options, Long forceDeadlineInMs, boolean forwardRequiresLeader) {
120124
S finalStub = stub;
121125
ConnectionMetadata metadata = new ConnectionMetadata();
122126

@@ -146,7 +150,7 @@ static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, Kurren
146150
metadata.authenticated(credentials);
147151
}
148152

149-
if (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER) {
153+
if (forwardRequiresLeader && (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER)) {
150154
metadata.requiresLeader();
151155
}
152156

src/main/java/io/kurrent/dbclient/MultiStreamAppend.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.kurrent.dbclient;
22

33
import com.google.protobuf.ByteString;
4-
import io.grpc.ManagedChannel;
54
import io.grpc.Metadata;
65
import io.grpc.StatusRuntimeException;
76
import io.grpc.stub.StreamObserver;
@@ -25,12 +24,18 @@ public MultiStreamAppend(GrpcClient client, Iterator<AppendStreamRequest> reques
2524
}
2625

2726
public CompletableFuture<MultiAppendWriteResult> execute() {
28-
return this.client.run(this::append);
27+
return this.client.runWithArgs(this::append);
2928
}
3029

31-
private CompletableFuture<MultiAppendWriteResult> append(ManagedChannel channel) {
30+
private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
3231
CompletableFuture<MultiAppendWriteResult> result = new CompletableFuture<>();
33-
StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(channel), this.client.getSettings(), new OptionsBase<>());
32+
33+
if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) {
34+
result.completeExceptionally(new UnsupportedOperationException("Multi-stream append is not supported by the server"));
35+
return result;
36+
}
37+
38+
StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false);
3439
StreamObserver<io.kurrentdb.v2.AppendStreamRequest> requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse));
3540

3641
try {
@@ -44,15 +49,15 @@ private CompletableFuture<MultiAppendWriteResult> append(ManagedChannel channel)
4449
builder.addRecords(AppendRecord.newBuilder()
4550
.setData(ByteString.copyFrom(event.getEventData()))
4651
.setRecordId(event.getEventId().toString())
47-
.putProperties(SystemMetadataKeys.CONTENT_TYPE, DynamicValueOuterClass
52+
.putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValueOuterClass
4853
.DynamicValue
4954
.newBuilder()
50-
.setStringValue(event.getContentType())
55+
.setBytesValue(ByteString.copyFromUtf8(event.getContentType()))
5156
.build())
52-
.putProperties(SystemMetadataKeys.TYPE, DynamicValueOuterClass
57+
.putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValueOuterClass
5358
.DynamicValue
5459
.newBuilder()
55-
.setStringValue(event.getEventType())
60+
.setBytesValue(ByteString.copyFromUtf8(event.getEventType()))
5661
.build())
5762
.build());
5863
}

src/main/java/io/kurrent/dbclient/ServerFeatures.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ private static CompletableFuture<ServerInfo> getSupportedFeaturesInternal(Server
9595
default:
9696
break;
9797
}
98+
} else if (method.getMethodName().equals("multistreamappendsession")) {
99+
features |= FeatureFlags.MULTI_STREAM_APPEND;
98100
}
99101
}
100102

src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ class SystemMetadataKeys {
55
static final String CREATED = "created";
66
static final String IS_JSON = "is-json";
77
static final String TYPE = "type";
8+
static final String SCHEMA_NAME = "$schema.name";
9+
static final String DATA_FORMAT = "$schema.data-format";
810
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.kurrent.dbclient;
2+
3+
import org.junit.jupiter.api.AfterAll;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.BeforeAll;
6+
import org.junit.jupiter.api.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.concurrent.ExecutionException;
13+
14+
public class MultiStreamAppendTests implements ConnectionAware {
15+
static private Database database;
16+
static private Logger logger;
17+
18+
@BeforeAll
19+
public static void setup() {
20+
database = DatabaseFactory.spawn();
21+
logger = LoggerFactory.getLogger(MultiStreamAppendTests.class);
22+
}
23+
24+
@Override
25+
public Database getDatabase() {
26+
return database;
27+
}
28+
29+
@Override
30+
public Logger getLogger() {
31+
return logger;
32+
}
33+
34+
@AfterAll
35+
public static void cleanup() {
36+
database.dispose();
37+
}
38+
39+
@Test
40+
public void testMultiStreamAppend() throws ExecutionException, InterruptedException {
41+
KurrentDBClient client = getDefaultClient();
42+
43+
List<AppendStreamRequest> requests = new ArrayList<>();
44+
45+
List<EventData> events = new ArrayList<>();
46+
for (int i = 0; i < 10; i++)
47+
events.add(EventData.builderAsBinary("created", new byte[0]).build());
48+
49+
requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any()));
50+
requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any()));
51+
52+
MultiAppendWriteResult result = client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get();
53+
54+
Assertions.assertTrue(result.getSuccesses().isPresent());
55+
}
56+
}

0 commit comments

Comments
 (0)