diff --git a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt index 4250913d..b74bd299 100644 --- a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt +++ b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt @@ -15,6 +15,7 @@ import dev.restate.client.ResponseHead import dev.restate.client.SendResponse import dev.restate.common.Output import dev.restate.common.Request +import dev.restate.common.Target import dev.restate.common.WorkflowRequest import dev.restate.serde.TypeTag import dev.restate.serde.kotlinx.typeTag @@ -24,23 +25,35 @@ import kotlinx.coroutines.future.await // Extension methods for the Client +/** Request options builder function */ fun requestOptions(init: RequestOptions.Builder.() -> Unit): RequestOptions { val builder = RequestOptions.builder() builder.init() return builder.build() } -/** Shorthand for [callSuspend] */ +/** + * Shorthand for [callSuspend] + * + * @param client the client to use for the call + * @return the response + */ suspend fun Request.call(client: Client): Response { return client.callSuspend(this) } -/** Suspend version of [Client.callAsync] */ +/** Call a service and wait for the response. */ suspend fun Client.callSuspend(request: Request): Response { return this.callAsync(request).await() } -/** Shorthand for [sendSuspend] */ +/** + * Shorthand for [sendSuspend] + * + * @param client the client to use for sending + * @param delay optional execution delay + * @return the send response + */ suspend fun Request.send( client: Client, delay: Duration? = null @@ -48,7 +61,10 @@ suspend fun Request.send( return client.sendSuspend(this, delay) } -/** Suspend version of [Client.sendAsync] */ +/** + * Send a request to a service without waiting for the response, optionally providing an execution + * delay to wait for. + */ suspend fun Client.sendSuspend( request: Request, delay: Duration? = null @@ -56,7 +72,13 @@ suspend fun Client.sendSuspend( return this.sendAsync(request, delay?.toJavaDuration()).await() } -/** Shorthand for [submitSuspend] */ +/** + * Shorthand for [submitSuspend] + * + * @param client the client to use for submission + * @param delay optional execution delay + * @return the send response + */ suspend fun WorkflowRequest.submit( client: Client, delay: Duration? = null @@ -64,7 +86,7 @@ suspend fun WorkflowRequest.submit( return client.submitSuspend(this, delay) } -/** Suspend version of [Client.submitAsync] */ +/** Submit a workflow, optionally providing an execution delay to wait for. */ suspend fun Client.submitSuspend( request: WorkflowRequest, delay: Duration? = null @@ -72,6 +94,13 @@ suspend fun Client.submitSuspend( return this.submitAsync(request, delay?.toJavaDuration()).await() } +/** + * Complete with success the Awakeable. + * + * @param typeTag the type tag for serialization + * @param payload the payload + * @param options request options + */ suspend fun Client.AwakeableHandle.resolveSuspend( typeTag: TypeTag, payload: T, @@ -80,6 +109,12 @@ suspend fun Client.AwakeableHandle.resolveSuspend( return this.resolveAsync(typeTag, payload, options).await() } +/** + * Complete with success the Awakeable. + * + * @param payload the payload + * @param options request options + */ suspend inline fun Client.AwakeableHandle.resolveSuspend( payload: T, options: RequestOptions = RequestOptions.DEFAULT @@ -87,6 +122,12 @@ suspend inline fun Client.AwakeableHandle.resolveSuspend( return this.resolveSuspend(typeTag(), payload, options) } +/** + * Complete with failure the Awakeable. + * + * @param reason the rejection reason + * @param options request options + */ suspend fun Client.AwakeableHandle.rejectSuspend( reason: String, options: RequestOptions = RequestOptions.DEFAULT @@ -94,47 +135,130 @@ suspend fun Client.AwakeableHandle.rejectSuspend( return this.rejectAsync(reason, options).await() } +/** + * Create a new [Client.InvocationHandle] for the provided invocation identifier. + * + * @param invocationId the invocation identifier + * @return the invocation handle + */ +inline fun Client.invocationHandle( + invocationId: String +): Client.InvocationHandle { + return this.invocationHandle(invocationId, typeTag()) +} + +/** + * Suspend version of [Client.InvocationHandle.attach]. + * + * @param options request options + * @return the response + */ suspend fun Client.InvocationHandle.attachSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response { return this.attachAsync(options).await() } +/** + * Suspend version of [Client.InvocationHandle.getOutput]. + * + * @param options request options + * @return the output response + */ suspend fun Client.InvocationHandle.getOutputSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response> { return this.getOutputAsync(options).await() } +/** + * Create a new [Client.IdempotentInvocationHandle] for the provided target and idempotency key. + * + * @param target the target service/method + * @param idempotencyKey the idempotency key + * @return the idempotent invocation handle + */ +inline fun Client.idempotentInvocationHandle( + target: Target, + idempotencyKey: String +): Client.IdempotentInvocationHandle { + return this.idempotentInvocationHandle(target, idempotencyKey, typeTag()) +} + +/** + * Suspend version of [Client.IdempotentInvocationHandle.attach]. + * + * @param options request options + * @return the response + */ suspend fun Client.IdempotentInvocationHandle.attachSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response { return this.attachAsync(options).await() } +/** + * Suspend version of [Client.IdempotentInvocationHandle.getOutput]. + * + * @param options request options + * @return the output response + */ suspend fun Client.IdempotentInvocationHandle.getOutputSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response> { return this.getOutputAsync(options).await() } +/** + * Create a new [Client.WorkflowHandle] for the provided workflow name and identifier. + * + * @param workflowName the workflow name + * @param workflowId the workflow identifier + * @return the workflow handle + */ +inline fun Client.workflowHandle( + workflowName: String, + workflowId: String +): Client.WorkflowHandle { + return this.workflowHandle(workflowName, workflowId, typeTag()) +} + +/** + * Suspend version of [Client.WorkflowHandle.attach]. + * + * @param options request options + * @return the response + */ suspend fun Client.WorkflowHandle.attachSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response { return this.attachAsync(options).await() } +/** + * Suspend version of [Client.WorkflowHandle.getOutput]. + * + * @param options request options + * @return the output response + */ suspend fun Client.WorkflowHandle.getOutputSuspend( options: RequestOptions = RequestOptions.DEFAULT ): Response> { return this.getOutputAsync(options).await() } +/** @see ResponseHead.statusCode */ val ResponseHead.status: Int get() = this.statusCode() + +/** @see ResponseHead.headers */ val ResponseHead.headers: ResponseHead.Headers get() = this.headers() + +/** @see Response.response */ val Response.response: Res get() = this.response() + +/** @see SendResponse.sendStatus */ val SendResponse.sendStatus: SendResponse.SendStatus get() = this.sendStatus() diff --git a/client/src/main/java/dev/restate/client/Client.java b/client/src/main/java/dev/restate/client/Client.java index e93759df..16543ab4 100644 --- a/client/src/main/java/dev/restate/client/Client.java +++ b/client/src/main/java/dev/restate/client/Client.java @@ -22,8 +22,14 @@ public interface Client { + /** + * Future version of {@link #call(Request)} + * + * @see #call(Request) + */ CompletableFuture> callAsync(Request request); + /** Call a service and wait for the response. */ default Response call(Request request) throws IngressException { try { return callAsync(request).join(); @@ -35,17 +41,32 @@ default Response call(Request request) throws IngressE } } + /** + * Future version of {@link #send(Request)} + * + * @see #send(Request) + */ default CompletableFuture> sendAsync(Request request) { return sendAsync(request, null); } + /** Send a request to a service without waiting for the response. */ default SendResponse send(Request request) throws IngressException { return send(request, null); } + /** + * Future version of {@link #send(Request, Duration)} + * + * @see #send(Request, Duration) + */ CompletableFuture> sendAsync( Request request, @Nullable Duration delay); + /** + * Send a request to a service without waiting for the response, optionally providing an execution + * delay to wait for. + */ default SendResponse send(Request request, @Nullable Duration delay) throws IngressException { try { @@ -58,21 +79,33 @@ default SendResponse send(Request request, @Nullable D } } + /** + * Future version of {@link #submit(WorkflowRequest)} + * + * @see #submit(WorkflowRequest) + */ default CompletableFuture> submitAsync( WorkflowRequest request) { return submitAsync(request, null); } + /** Submit a workflow. */ default SendResponse submit(WorkflowRequest request) throws IngressException { return submit(request, null); } + /** + * Future version of {@link #submit(WorkflowRequest, Duration)} + * + * @see #submit(WorkflowRequest, Duration) + */ default CompletableFuture> submitAsync( WorkflowRequest request, @Nullable Duration delay) { return sendAsync(request, delay); } + /** Submit a workflow, optionally providing an execution delay to wait for. */ default SendResponse submit( WorkflowRequest request, @Nullable Duration delay) throws IngressException { try { @@ -186,22 +219,54 @@ default Response reject(String reason) { } } - InvocationHandle invocationHandle(String invocationId, TypeTag resSerde); + /** + * Create a new {@link InvocationHandle} for the provided invocation identifier. + * + * @param invocationId the invocation identifier + * @param resTypeTag type tag used to deserialize the invocation result + * @return the invocation handle + */ + InvocationHandle invocationHandle(String invocationId, TypeTag resTypeTag); + /** + * Create a new {@link InvocationHandle} for the provided invocation identifier. + * + * @param invocationId the invocation identifier + * @param clazz used to deserialize the invocation result + * @return the invocation handle + */ default InvocationHandle invocationHandle(String invocationId, Class clazz) { return invocationHandle(invocationId, TypeTag.of(clazz)); } interface InvocationHandle { + /** + * @return the invocation identifier + */ String invocationId(); + /** + * Future version of {@link #attach()}, with options. + * + * @see #attach() + */ CompletableFuture> attachAsync(RequestOptions options); + /** + * Future version of {@link #attach()} + * + * @see #attach() + */ default CompletableFuture> attachAsync() { return attachAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #attach()}, with request options. + * + * @see #attach() + */ default Response attach(RequestOptions options) throws IngressException { try { return attachAsync(options).join(); @@ -213,16 +278,32 @@ default Response attach(RequestOptions options) throws IngressException { } } + /** Attach to a running invocation, waiting for its output. */ default Response attach() throws IngressException { return attach(RequestOptions.DEFAULT); } + /** + * Future version of {@link #getOutput()}, with options. + * + * @see #getOutput() + */ CompletableFuture>> getOutputAsync(RequestOptions options); + /** + * Future version of {@link #getOutput()} + * + * @see #getOutput() + */ default CompletableFuture>> getOutputAsync() { return getOutputAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #getOutput()}, with request options. + * + * @see #getOutput() + */ default Response> getOutput(RequestOptions options) throws IngressException { try { return getOutputAsync(options).join(); @@ -234,14 +315,31 @@ default Response> getOutput(RequestOptions options) throws IngressEx } } + /** Get the output of an invocation. If running, {@link Output#isReady()} will be false. */ default Response> getOutput() throws IngressException { return getOutput(RequestOptions.DEFAULT); } } + /** + * Create a new {@link IdempotentInvocationHandle} for the provided target and idempotency key. + * + * @param target the target service/method + * @param idempotencyKey the idempotency key + * @param resTypeTag type tag used to deserialize the invocation result + * @return the idempotent invocation handle + */ IdempotentInvocationHandle idempotentInvocationHandle( - Target target, String idempotencyKey, TypeTag resSerde); + Target target, String idempotencyKey, TypeTag resTypeTag); + /** + * Create a new {@link IdempotentInvocationHandle} for the provided target and idempotency key. + * + * @param target the target service/method + * @param idempotencyKey the idempotency key + * @param clazz used to deserialize the invocation result + * @return the idempotent invocation handle + */ default IdempotentInvocationHandle idempotentInvocationHandle( Target target, String idempotencyKey, Class clazz) { return idempotentInvocationHandle(target, idempotencyKey, TypeTag.of(clazz)); @@ -249,12 +347,27 @@ default IdempotentInvocationHandle idempotentInvocationHandle( interface IdempotentInvocationHandle { + /** + * Future version of {@link #attach()}, with options. + * + * @see #attach() + */ CompletableFuture> attachAsync(RequestOptions options); + /** + * Future version of {@link #attach()} + * + * @see #attach() + */ default CompletableFuture> attachAsync() { return attachAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #attach()}, with request options. + * + * @see #attach() + */ default Response attach(RequestOptions options) throws IngressException { try { return attachAsync(options).join(); @@ -266,16 +379,32 @@ default Response attach(RequestOptions options) throws IngressException { } } + /** Attach to a running idempotent invocation, waiting for its output. */ default Response attach() throws IngressException { return attach(RequestOptions.DEFAULT); } + /** + * Future version of {@link #getOutput()}, with options. + * + * @see #getOutput() + */ CompletableFuture>> getOutputAsync(RequestOptions options); + /** + * Future version of {@link #getOutput()} + * + * @see #getOutput() + */ default CompletableFuture>> getOutputAsync() { return getOutputAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #getOutput()}, with request options. + * + * @see #getOutput() + */ default Response> getOutput(RequestOptions options) throws IngressException { try { return getOutputAsync(options).join(); @@ -287,26 +416,61 @@ default Response> getOutput(RequestOptions options) throws IngressEx } } + /** + * Get the output of an idempotent invocation. If running, {@link Output#isReady()} will be + * false. + */ default Response> getOutput() throws IngressException { return getOutput(RequestOptions.DEFAULT); } } + /** + * Create a new {@link WorkflowHandle} for the provided workflow name and identifier. + * + * @param workflowName the workflow name + * @param workflowId the workflow identifier + * @param resTypeTag type tag used to deserialize the invocation result + * @return the workflow handle + */ WorkflowHandle workflowHandle( - String workflowName, String workflowId, TypeTag resSerde); + String workflowName, String workflowId, TypeTag resTypeTag); + /** + * Create a new {@link WorkflowHandle} for the provided workflow name and identifier. + * + * @param workflowName the workflow name + * @param workflowId the workflow identifier + * @param clazz used to deserialize the workflow result + * @return the workflow handle + */ default WorkflowHandle workflowHandle( String workflowName, String workflowId, Class clazz) { return workflowHandle(workflowName, workflowId, TypeTag.of(clazz)); } interface WorkflowHandle { + /** + * Future version of {@link #attach()}, with options. + * + * @see #attach() + */ CompletableFuture> attachAsync(RequestOptions options); + /** + * Future version of {@link #attach()} + * + * @see #attach() + */ default CompletableFuture> attachAsync() { return attachAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #attach()}, with request options. + * + * @see #attach() + */ default Response attach(RequestOptions options) throws IngressException { try { return attachAsync(options).join(); @@ -318,16 +482,32 @@ default Response attach(RequestOptions options) throws IngressException { } } + /** Attach to a running workflow, waiting for its output. */ default Response attach() throws IngressException { return attach(RequestOptions.DEFAULT); } + /** + * Future version of {@link #getOutput()}, with options. + * + * @see #getOutput() + */ CompletableFuture>> getOutputAsync(RequestOptions options); + /** + * Future version of {@link #getOutput()} + * + * @see #getOutput() + */ default CompletableFuture>> getOutputAsync() { return getOutputAsync(RequestOptions.DEFAULT); } + /** + * Like {@link #getOutput()}, with request options. + * + * @see #getOutput() + */ default Response> getOutput(RequestOptions options) throws IngressException { try { return getOutputAsync(options).join(); @@ -339,6 +519,7 @@ default Response> getOutput(RequestOptions options) throws IngressEx } } + /** Get the output of a workflow. If running, {@link Output#isReady()} will be false. */ default Response> getOutput() throws IngressException { return getOutput(RequestOptions.DEFAULT); } diff --git a/client/src/main/java/dev/restate/client/ResponseHead.java b/client/src/main/java/dev/restate/client/ResponseHead.java index bcabe8bd..7807c887 100644 --- a/client/src/main/java/dev/restate/client/ResponseHead.java +++ b/client/src/main/java/dev/restate/client/ResponseHead.java @@ -13,8 +13,14 @@ import org.jspecify.annotations.Nullable; public interface ResponseHead { + /** + * @return the response status code returned from the HTTP client + */ int statusCode(); + /** + * @return the response headers returned from the HTTP client + */ Headers headers(); interface Headers {