-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathCreateProjection.java
More file actions
73 lines (57 loc) · 2.98 KB
/
Copy pathCreateProjection.java
File metadata and controls
73 lines (57 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package io.kurrent.dbclient;
import io.kurrent.dbclient.proto.projections.Projectionmanagement;
import io.kurrent.dbclient.proto.projections.ProjectionsGrpc;
import java.util.concurrent.CompletableFuture;
class CreateProjection {
private final GrpcClient client;
private final String projectionName;
private final String query;
private final boolean trackEmittedStreams;
private final boolean emitEnabled;
private final int engineVersion;
private final CreateProjectionOptions options;
public CreateProjection(final GrpcClient client, final String projectionName, final String query,
final CreateProjectionOptions options) {
this.client = client;
this.projectionName = projectionName;
this.query = query;
this.trackEmittedStreams = options.isTrackingEmittedStreams();
this.emitEnabled = options.isEmitEnabled();
this.engineVersion = options.getEngineVersion();
this.options = options;
}
@SuppressWarnings("unchecked")
public CompletableFuture execute() {
if (engineVersion == 2 && trackEmittedStreams) {
CompletableFuture<Projectionmanagement.CreateResp> result = new CompletableFuture<>();
result.completeExceptionally(new IllegalArgumentException(
"trackEmittedStreams is not supported when engineVersion is 2 (V2)"));
return result;
}
return this.client.run(channel -> {
Projectionmanagement.CreateReq.Options.Continuous.Builder continuousBuilder =
Projectionmanagement.CreateReq.Options.Continuous.newBuilder()
.setName(projectionName)
.setTrackEmittedStreams(trackEmittedStreams);
Projectionmanagement.CreateReq.Options.Builder optionsBuilder =
Projectionmanagement.CreateReq.Options.newBuilder()
.setQuery(query)
.setEngineVersion(engineVersion)
.setContinuous(continuousBuilder);
Projectionmanagement.CreateReq request = Projectionmanagement.CreateReq.newBuilder()
.setOptions(optionsBuilder)
.build();
ProjectionsGrpc.ProjectionsStub client = GrpcUtils.configureStub(ProjectionsGrpc.newStub(channel), this.client.getSettings(), this.options);
CompletableFuture<Projectionmanagement.CreateResp> result = new CompletableFuture<>();
client.create(request, GrpcUtils.convertSingleResponse(result));
return result;
}).thenApplyAsync(result -> {
if (emitEnabled) {
UpdateProjectionOptions options = UpdateProjectionOptions.get().emitEnabled(true);
UpdateProjection update = new UpdateProjection(client, projectionName, query, options);
return update.execute().thenApply(x -> result);
}
return CompletableFuture.completedFuture(result);
});
}
}