-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathRestateHttpServer.java
More file actions
191 lines (167 loc) · 6.92 KB
/
RestateHttpServer.java
File metadata and controls
191 lines (167 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.http.vertx;
import dev.restate.sdk.endpoint.Endpoint;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.Http2Settings;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Endpoint builder for a Restate HTTP Endpoint using Vert.x, to serve Restate services.
*
* <p>This endpoint supports the Restate HTTP/2 Streaming component Protocol.
*
* <p>Example usage:
*
* <pre>
* public static void main(String[] args) {
* Endpoint endpoint = Endpoint.builder()
* .bind(new Counter())
* .build();
*
* RestateHttpServer.listen(endpoint);
* }
* </pre>
*/
public class RestateHttpServer {
private static final Logger LOG = LogManager.getLogger(RestateHttpServer.class);
private static final int DEFAULT_PORT =
Optional.ofNullable(System.getenv("PORT")).map(Integer::parseInt).orElse(9080);
private static final HttpServerOptions DEFAULT_OPTIONS =
new HttpServerOptions()
.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(Integer.MAX_VALUE));
/**
* Start serving the provided {@code endpoint} on the port specified by the environment variable
* {@code PORT}, or alternatively on the default {@code 9080} port.
*
* <p>NOTE: this method will block for opening the socket and reserving the port. If you need a
* non-blocking variant, manually create the server with {@link #fromEndpoint(Endpoint)} and start
* listening it.
*
* @return The listening port
*/
public static int listen(Endpoint endpoint) {
return handleStart(fromEndpoint(endpoint).listen(DEFAULT_PORT));
}
/** Like {@link #listen(Endpoint)} */
public static int listen(Endpoint.Builder endpointBuilder) {
return listen(endpointBuilder.build());
}
/**
* Start serving the provided {@code endpoint} on the specified port.
*
* <p>NOTE: this method will block for opening the socket and reserving the port. If you need a
* non-blocking variant, manually create the server with {@link #fromEndpoint(Endpoint)} and start
* listening it.
*
* @return The listening port
*/
public static int listen(Endpoint endpoint, int port) {
return handleStart(fromEndpoint(endpoint).listen(port));
}
/** Like {@link #listen(Endpoint, int)} */
public static int listen(Endpoint.Builder endpointBuilder, int port) {
return listen(endpointBuilder.build(), port);
}
/** Like {@link #listen(Endpoint)}, with an already built request handler */
public static int listen(HttpEndpointRequestHandler requestHandler) {
return listen(requestHandler, DEFAULT_PORT);
}
/** Like {@link #listen(Endpoint, int)}, with an already built request handler */
public static int listen(HttpEndpointRequestHandler requestHandler, int port) {
return handleStart(fromHandler(requestHandler).listen(port));
}
/** Create a Vert.x {@link HttpServer} from the provided endpoint. */
public static HttpServer fromEndpoint(Endpoint endpoint) {
return fromEndpoint(endpoint, DEFAULT_OPTIONS);
}
/** Like {@link #fromEndpoint(Endpoint)} */
public static HttpServer fromEndpoint(Endpoint.Builder endpointBuilder) {
return fromEndpoint(endpointBuilder.build());
}
/**
* Create a Vert.x {@link HttpServer} from the provided endpoint, with the given {@link
* HttpServerOptions}.
*/
public static HttpServer fromEndpoint(Endpoint endpoint, HttpServerOptions options) {
return fromEndpoint(Vertx.vertx(), endpoint, options);
}
/** Like {@link #fromEndpoint(Endpoint, HttpServerOptions)} */
public static HttpServer fromEndpoint(
Endpoint.Builder endpointBuilder, HttpServerOptions options) {
return fromEndpoint(endpointBuilder.build(), options);
}
/** Create a Vert.x {@link HttpServer} from the provided endpoint. */
public static HttpServer fromEndpoint(Vertx vertx, Endpoint endpoint) {
return fromEndpoint(vertx, endpoint, DEFAULT_OPTIONS);
}
/** Like {@link #fromEndpoint(Vertx, Endpoint)} */
public static HttpServer fromEndpoint(Vertx vertx, Endpoint.Builder endpointBuilder) {
return fromEndpoint(vertx, endpointBuilder.build());
}
/**
* Create a Vert.x {@link HttpServer} from the provided endpoint, with the given {@link
* HttpServerOptions}.
*/
public static HttpServer fromEndpoint(Vertx vertx, Endpoint endpoint, HttpServerOptions options) {
return fromHandler(vertx, HttpEndpointRequestHandler.fromEndpoint(endpoint), options);
}
/** Like {@link #fromEndpoint(Vertx, Endpoint, HttpServerOptions)} */
public static HttpServer fromEndpoint(
Vertx vertx, Endpoint.Builder endpointBuilder, HttpServerOptions options) {
return fromEndpoint(vertx, endpointBuilder.build(), options);
}
/** Create a Vert.x {@link HttpServer} from the provided {@link HttpEndpointRequestHandler}. */
public static HttpServer fromHandler(HttpEndpointRequestHandler handler) {
return fromHandler(handler, DEFAULT_OPTIONS);
}
/**
* Create a Vert.x {@link HttpServer} from the provided {@link HttpEndpointRequestHandler}, with
* the given {@link HttpServerOptions}.
*/
public static HttpServer fromHandler(
HttpEndpointRequestHandler handler, HttpServerOptions options) {
return fromHandler(Vertx.vertx(), handler, options);
}
/** Create a Vert.x {@link HttpServer} from the provided {@link HttpEndpointRequestHandler}. */
public static HttpServer fromHandler(Vertx vertx, HttpEndpointRequestHandler handler) {
return fromHandler(vertx, handler, DEFAULT_OPTIONS);
}
/**
* Create a Vert.x {@link HttpServer} from the provided {@link HttpEndpointRequestHandler}, with
* the given {@link HttpServerOptions}.
*/
public static HttpServer fromHandler(
Vertx vertx, HttpEndpointRequestHandler handler, HttpServerOptions options) {
HttpServer server = vertx.createHttpServer(options);
server.requestHandler(handler);
return server;
}
private static int handleStart(Future<HttpServer> fut) {
try {
HttpServer server = fut.toCompletionStage().toCompletableFuture().join();
LOG.info("Restate HTTP Endpoint server started on port {}", server.actualPort());
return server.actualPort();
} catch (CompletionException e) {
LOG.error("Restate HTTP Endpoint server start failed", e.getCause());
sneakyThrow(e.getCause());
// This is never reached
return -1;
}
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
throw (E) e;
}
}