Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package dev.restate.sdk.http.vertx;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.*;

import dev.restate.sdk.core.EndpointRequestHandler;
import dev.restate.sdk.core.ProtocolException;
Expand Down Expand Up @@ -40,9 +39,11 @@ public class HttpEndpointRequestHandler implements Handler<HttpServerRequest> {
AsciiString.cached(Version.X_RESTATE_SERVER);

private final EndpointRequestHandler endpoint;
private final boolean enableBidirectionalStreaming;

private HttpEndpointRequestHandler(Endpoint endpoint) {
private HttpEndpointRequestHandler(Endpoint endpoint, boolean enableBidirectionalStreaming) {
this.endpoint = EndpointRequestHandler.create(endpoint);
this.enableBidirectionalStreaming = enableBidirectionalStreaming;
}

@Override
Expand All @@ -68,7 +69,7 @@ public Iterable<String> keys() {
},
ContextualData::put,
currentContextExecutor(vertxCurrentContext),
request.version() == HttpVersion.HTTP_2);
enableBidirectionalStreaming && request.version() == HttpVersion.HTTP_2);
} catch (ProtocolException e) {
LOG.warn("Error when handling the request", e);
request
Expand Down Expand Up @@ -101,7 +102,29 @@ private Executor currentContextExecutor(Context currentContext) {
return runnable -> currentContext.runOnContext(v -> runnable.run());
}

/**
* Create a {@link HttpEndpointRequestHandler}
*
* @param endpoint the endpoint to wrap
* @return the built handler
*/
public static HttpEndpointRequestHandler fromEndpoint(Endpoint endpoint) {
return new HttpEndpointRequestHandler(endpoint);
return new HttpEndpointRequestHandler(endpoint, true);
}

/**
* Create a {@link HttpEndpointRequestHandler}
*
* @param endpoint the endpoint to wrap
* @param disableBidirectionalStreaming if true, disable bidirectional streaming with HTTP/2
* requests. Restate initiates for each invocation a bidirectional streaming using HTTP/2
* between restate-server and the SDK. In some network setups, for example when using a load
* balancers that buffer request/response, bidirectional streaming will not work correctly.
* Only in these scenarios, we suggest disabling bidirectional streaming.
* @return the built handler
*/
public static HttpEndpointRequestHandler fromEndpoint(
Endpoint endpoint, boolean disableBidirectionalStreaming) {
return new HttpEndpointRequestHandler(endpoint, !disableBidirectionalStreaming);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ public static int listen(Endpoint.Builder endpointBuilder, int port) {
return listen(endpointBuilder.build(), port);
}

/** Like {@link #listen(Endpoint)}, with an already built request handler */
public static int listen(HttpEndpointRequestHandler requestHandler) {
return listen(requestHandler, DEFAULT_PORT);
}

/** Like {@link #listen(Endpoint, int)}, with an already built request handler */
public static int listen(HttpEndpointRequestHandler requestHandler, int port) {
HttpServer server = Vertx.vertx().createHttpServer(DEFAULT_OPTIONS);
server.requestHandler(requestHandler);
return handleStart(server.listen(port));
}

/** Create a Vert.x {@link HttpServer} from the provided endpoint. */
public static HttpServer fromEndpoint(Endpoint endpoint) {
return fromEndpoint(endpoint, DEFAULT_OPTIONS);
Expand Down