Skip to content

Commit f0fa8ae

Browse files
committed
feat: add multi stream append support
1 parent 575cf96 commit f0fa8ae

6 files changed

Lines changed: 334 additions & 0 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.kurrent.dbclient;
2+
3+
import java.util.Iterator;
4+
5+
public class AppendStreamRequest {
6+
private final String streamName;
7+
private final Iterator<EventData> events;
8+
private final StreamState expectedState;
9+
10+
public AppendStreamRequest(String streamName, Iterator<EventData> events, StreamState expectedState) {
11+
this.streamName = streamName;
12+
this.events = events;
13+
this.expectedState = expectedState;
14+
}
15+
16+
public String getStreamName() {
17+
return streamName;
18+
}
19+
20+
public Iterator<EventData> getEvents() {
21+
return events;
22+
}
23+
24+
public StreamState getExpectedState() {
25+
return expectedState;
26+
}
27+
}

src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
7575
return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute();
7676
}
7777

78+
public CompletableFuture<MultiAppendWriteResult> multiAppend(AppendToStreamOptions options, Iterator<AppendStreamRequest> requests) {
79+
return new MultiStreamAppend(this.getGrpcClient(), requests).execute();
80+
}
81+
7882
/**
7983
* Sets a stream's metadata.
8084
* @param streamName stream's name.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.kurrent.dbclient;
2+
3+
public class MultiAppendWriteResult {
4+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package io.kurrent.dbclient;
2+
3+
import com.google.protobuf.ByteString;
4+
import io.grpc.ManagedChannel;
5+
import io.grpc.Metadata;
6+
import io.grpc.StatusRuntimeException;
7+
import io.grpc.stub.StreamObserver;
8+
import io.kurrentdb.v2.AppendRecord;
9+
import io.kurrentdb.v2.MultiStreamAppendResponse;
10+
import io.kurrentdb.v2.StreamsServiceGrpc;
11+
import kurrentdb.protobuf.DynamicValueOuterClass;
12+
13+
import java.util.Iterator;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
class MultiStreamAppend {
17+
private final GrpcClient client;
18+
private final Iterator<AppendStreamRequest> requests;
19+
20+
public MultiStreamAppend(GrpcClient client, Iterator<AppendStreamRequest> requests) {
21+
this.client = client;
22+
this.requests = requests;
23+
}
24+
25+
public CompletableFuture<MultiAppendWriteResult> execute() {
26+
return this.client.run(this::append);
27+
}
28+
29+
private CompletableFuture<MultiAppendWriteResult> append(ManagedChannel channel) {
30+
CompletableFuture<MultiAppendWriteResult> result = new CompletableFuture<>();
31+
StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(channel), this.client.getSettings(), new OptionsBase<>());
32+
StreamObserver<io.kurrentdb.v2.AppendStreamRequest> requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse));
33+
34+
try {
35+
while (this.requests.hasNext()) {
36+
AppendStreamRequest request = this.requests.next();
37+
io.kurrentdb.v2.AppendStreamRequest.Builder builder = io.kurrentdb.v2.AppendStreamRequest.newBuilder()
38+
.setStream(request.getStreamName());
39+
40+
while (request.getEvents().hasNext()) {
41+
EventData event = request.getEvents().next();
42+
builder.addRecords(AppendRecord.newBuilder()
43+
.setData(ByteString.copyFrom(event.getEventData()))
44+
.setRecordId(event.getEventId().toString())
45+
.putProperties(SystemMetadataKeys.CONTENT_TYPE, DynamicValueOuterClass
46+
.DynamicValue
47+
.newBuilder()
48+
.setStringValue(event.getContentType())
49+
.build())
50+
.putProperties(SystemMetadataKeys.TYPE, DynamicValueOuterClass
51+
.DynamicValue
52+
.newBuilder()
53+
.setStringValue(event.getEventType())
54+
.build())
55+
.build());
56+
}
57+
58+
requestStream.onNext(builder.build());
59+
}
60+
61+
requestStream.onCompleted();
62+
} catch (StatusRuntimeException e) {
63+
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
64+
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
65+
66+
if (leaderHost != null && leaderPort != null) {
67+
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
68+
requestStream.onError(reason);
69+
result.completeExceptionally(reason);
70+
} else {
71+
requestStream.onError(e);
72+
result.completeExceptionally(e);
73+
}
74+
} catch (RuntimeException e) {
75+
requestStream.onError(e);
76+
result.completeExceptionally(e);
77+
}
78+
79+
return result;
80+
}
81+
82+
public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) {
83+
// Handle the response from the multi-stream append operation
84+
// This could involve processing the results for each stream in the response
85+
throw new RuntimeException("Not implemented");
86+
}
87+
}

src/main/proto/dynamic-value.proto

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
syntax = "proto3";
2+
3+
import "google/protobuf/duration.proto";
4+
import "google/protobuf/timestamp.proto";
5+
import "google/protobuf/struct.proto";
6+
7+
package kurrentdb.protobuf;
8+
option csharp_namespace = "KurrentDB.Protobuf";
9+
10+
message DynamicValue {
11+
oneof kind {
12+
// Represents a null value.
13+
google.protobuf.NullValue null_value = 1;
14+
15+
// Represents a 32-bit signed integer value.
16+
sint32 int32_value = 2;
17+
18+
// Represents a 64-bit signed integer value.
19+
sint64 int64_value = 3;
20+
21+
// Represents a byte array value.
22+
bytes bytes_value = 4;
23+
24+
// Represents a 64-bit double-precision floating-point value.
25+
double double_value = 5;
26+
27+
// Represents a 32-bit single-precision floating-point value
28+
float float_value = 6;
29+
30+
// Represents a string value.
31+
string string_value = 7;
32+
33+
// Represents a boolean value.
34+
bool boolean_value = 8;
35+
36+
// Represents a timestamp value.
37+
google.protobuf.Timestamp timestamp_value = 9;
38+
39+
// Represents a duration value.
40+
google.protobuf.Duration duration_value = 10;
41+
}
42+
}

src/main/proto/streams.v2.proto

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
syntax = "proto3";
2+
3+
//
4+
// This protocol is UNSTABLE in the sense of being subject to change.
5+
//
6+
7+
package kurrentdb.protocol.v2;
8+
9+
option csharp_namespace = "KurrentDB.Protocol.V2";
10+
option java_package = "io.kurrentdb.v2";
11+
option java_multiple_files = true;
12+
13+
import "dynamic-value.proto";
14+
15+
service StreamsService {
16+
// Executes an atomic operation to append records to multiple streams.
17+
// This transactional method ensures that all appends either succeed
18+
// completely, or are entirely rolled back, thereby maintaining strict data
19+
// consistency across all involved streams.
20+
rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse);
21+
22+
// Streaming version of MultiStreamAppend that allows clients to send multiple
23+
// append requests over a single connection. When the stream completes, all
24+
// records are appended transactionally (all succeed or fail together).
25+
// Provides improved efficiency for high-throughput scenarios while
26+
// maintaining the same transactional guarantees.
27+
rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse);
28+
}
29+
30+
// Record to be appended to a stream.
31+
message AppendRecord {
32+
// Universally Unique identifier for the record. Must be a guid.
33+
// If not provided, the server will generate a new one.
34+
optional string record_id = 1;
35+
36+
// A collection of properties providing additional system information about the
37+
// record.
38+
map<string, kurrentdb.protobuf.DynamicValue> properties = 2;
39+
40+
// The actual data payload of the record, stored as bytes.
41+
bytes data = 3;
42+
}
43+
44+
// Constants that match the expected state of a stream during an
45+
// append operation. It can be used to specify whether the stream should exist,
46+
// not exist, or can be in any state.
47+
enum ExpectedRevisionConstants {
48+
// The stream should exist and have a single event.
49+
EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0;
50+
51+
// It is not important whether the stream exists or not.
52+
EXPECTED_REVISION_CONSTANTS_ANY = -2;
53+
54+
// The stream should not exist. If it does, the append will fail.
55+
EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1;
56+
57+
// The stream should exist
58+
EXPECTED_REVISION_CONSTANTS_EXISTS = -4;
59+
}
60+
61+
// Represents the input for appending records to a specific stream.
62+
message AppendStreamRequest {
63+
// The name of the stream to append records to.
64+
string stream = 1;
65+
66+
// The records to append to the stream.
67+
repeated AppendRecord records = 2;
68+
69+
// The expected revision of the stream. If the stream's current revision does
70+
// not match, the append will fail.
71+
// The expected revision can also be one of the special values
72+
// from ExpectedRevisionConstants.
73+
// missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY
74+
optional sint64 expected_revision = 3;
75+
}
76+
77+
// Success represents the successful outcome of an append operation.
78+
message AppendStreamSuccess {
79+
// The name of the stream to which records were appended.
80+
string stream = 1;
81+
82+
// The position of the last appended record in the transaction.
83+
int64 position = 2;
84+
85+
// The revision of the stream after the append operation.
86+
int64 stream_revision = 3;
87+
}
88+
89+
// Failure represents the detailed error information when an append operation fails.
90+
message AppendStreamFailure {
91+
// The name of the stream to which records failed to append.
92+
string stream = 1;
93+
94+
// The error details
95+
oneof error {
96+
// Failed because the actual stream revision didn't match the expected revision.
97+
ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2;
98+
99+
// Failed because the client lacks sufficient permissions.
100+
ErrorDetails.AccessDenied access_denied = 3;
101+
102+
// Failed because the target stream has been deleted.
103+
ErrorDetails.StreamDeleted stream_deleted = 4;
104+
105+
ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5;
106+
}
107+
}
108+
109+
// Represents the output of appending records to a specific stream.
110+
message AppendStreamResponse {
111+
// The result of the append operation.
112+
oneof result {
113+
// Success represents the successful outcome of an append operation.
114+
AppendStreamSuccess success = 1;
115+
116+
// Failure represents the details of a failed append operation.
117+
AppendStreamFailure failure = 2;
118+
}
119+
}
120+
121+
// MultiStreamAppendRequest represents a request to append records to multiple streams.
122+
message MultiStreamAppendRequest {
123+
// A list of AppendStreamInput messages, each representing a stream to which records should be appended.
124+
repeated AppendStreamRequest input = 1;
125+
}
126+
127+
// Response from the MultiStreamAppend operation.
128+
message MultiStreamAppendResponse {
129+
oneof result {
130+
// Success represents the successful outcome of a multi-stream append operation.
131+
Success success = 1;
132+
133+
// Failure represents the details of a failed multi-stream append operation.
134+
Failure failure = 2;
135+
}
136+
137+
message Success {
138+
repeated AppendStreamSuccess output = 1;
139+
}
140+
141+
message Failure {
142+
repeated AppendStreamFailure output = 1;
143+
}
144+
}
145+
146+
// ErrorDetails provides detailed information about specific error conditions.
147+
message ErrorDetails {
148+
// When the user does not have sufficient permissions to perform the operation.
149+
message AccessDenied {
150+
// The reason for access denial.
151+
string reason = 1;
152+
}
153+
154+
// When the stream has been deleted.
155+
message StreamDeleted {
156+
}
157+
158+
// When the expected revision of the stream does not match the actual revision.
159+
message WrongExpectedRevision {
160+
// The actual revision of the stream.
161+
int64 stream_revision = 1;
162+
}
163+
164+
// When the transaction exceeds the maximum size allowed
165+
// (it's bigger than the configured chunk size).
166+
message TransactionMaxSizeExceeded {
167+
// The maximum allowed size of the transaction.
168+
int32 max_size = 1;
169+
}
170+
}

0 commit comments

Comments
 (0)