Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 130 additions & 6 deletions client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import dev.restate.client.ResponseHead
import dev.restate.client.SendResponse
import dev.restate.common.Output
import dev.restate.common.Request
import dev.restate.common.Target
import dev.restate.common.WorkflowRequest
import dev.restate.serde.TypeTag
import dev.restate.serde.kotlinx.typeTag
Expand All @@ -24,54 +25,82 @@ import kotlinx.coroutines.future.await

// Extension methods for the Client

/** Request options builder function */
fun requestOptions(init: RequestOptions.Builder.() -> Unit): RequestOptions {
val builder = RequestOptions.builder()
builder.init()
return builder.build()
}

/** Shorthand for [callSuspend] */
/**
* Shorthand for [callSuspend]
*
* @param client the client to use for the call
* @return the response
*/
suspend fun <Req, Res> Request<Req, Res>.call(client: Client): Response<Res> {
return client.callSuspend(this)
}

/** Suspend version of [Client.callAsync] */
/** Call a service and wait for the response. */
suspend fun <Req, Res> Client.callSuspend(request: Request<Req, Res>): Response<Res> {
return this.callAsync(request).await()
}

/** Shorthand for [sendSuspend] */
/**
* Shorthand for [sendSuspend]
*
* @param client the client to use for sending
* @param delay optional execution delay
* @return the send response
*/
suspend fun <Req, Res> Request<Req, Res>.send(
client: Client,
delay: Duration? = null
): SendResponse<Res> {
return client.sendSuspend(this, delay)
}

/** Suspend version of [Client.sendAsync] */
/**
* Send a request to a service without waiting for the response, optionally providing an execution
* delay to wait for.
*/
suspend fun <Req, Res> Client.sendSuspend(
request: Request<Req, Res>,
delay: Duration? = null
): SendResponse<Res> {
return this.sendAsync(request, delay?.toJavaDuration()).await()
}

/** Shorthand for [submitSuspend] */
/**
* Shorthand for [submitSuspend]
*
* @param client the client to use for submission
* @param delay optional execution delay
* @return the send response
*/
suspend fun <Req, Res> WorkflowRequest<Req, Res>.submit(
client: Client,
delay: Duration? = null
): SendResponse<Res> {
return client.submitSuspend(this, delay)
}

/** Suspend version of [Client.submitAsync] */
/** Submit a workflow, optionally providing an execution delay to wait for. */
suspend fun <Req, Res> Client.submitSuspend(
request: WorkflowRequest<Req, Res>,
delay: Duration? = null
): SendResponse<Res> {
return this.submitAsync(request, delay?.toJavaDuration()).await()
}

/**
* Complete with success the Awakeable.
*
* @param typeTag the type tag for serialization
* @param payload the payload
* @param options request options
*/
suspend fun <T : Any> Client.AwakeableHandle.resolveSuspend(
typeTag: TypeTag<T>,
payload: T,
Expand All @@ -80,61 +109,156 @@ suspend fun <T : Any> Client.AwakeableHandle.resolveSuspend(
return this.resolveAsync(typeTag, payload, options).await()
}

/**
* Complete with success the Awakeable.
*
* @param payload the payload
* @param options request options
*/
suspend inline fun <reified T : Any> Client.AwakeableHandle.resolveSuspend(
payload: T,
options: RequestOptions = RequestOptions.DEFAULT
): Response<Void> {
return this.resolveSuspend(typeTag<T>(), payload, options)
}

/**
* Complete with failure the Awakeable.
*
* @param reason the rejection reason
* @param options request options
*/
suspend fun Client.AwakeableHandle.rejectSuspend(
reason: String,
options: RequestOptions = RequestOptions.DEFAULT
): Response<Void> {
return this.rejectAsync(reason, options).await()
}

/**
* Create a new [Client.InvocationHandle] for the provided invocation identifier.
*
* @param invocationId the invocation identifier
* @return the invocation handle
*/
inline fun <reified Res> Client.invocationHandle(
invocationId: String
): Client.InvocationHandle<Res> {
return this.invocationHandle(invocationId, typeTag<Res>())
}

/**
* Suspend version of [Client.InvocationHandle.attach].
*
* @param options request options
* @return the response
*/
suspend fun <T> Client.InvocationHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<T> {
return this.attachAsync(options).await()
}

/**
* Suspend version of [Client.InvocationHandle.getOutput].
*
* @param options request options
* @return the output response
*/
suspend fun <T : Any?> Client.InvocationHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<Output<T>> {
return this.getOutputAsync(options).await()
}

/**
* Create a new [Client.IdempotentInvocationHandle] for the provided target and idempotency key.
*
* @param target the target service/method
* @param idempotencyKey the idempotency key
* @return the idempotent invocation handle
*/
inline fun <reified Res> Client.idempotentInvocationHandle(
target: Target,
idempotencyKey: String
): Client.IdempotentInvocationHandle<Res> {
return this.idempotentInvocationHandle(target, idempotencyKey, typeTag<Res>())
}

/**
* Suspend version of [Client.IdempotentInvocationHandle.attach].
*
* @param options request options
* @return the response
*/
suspend fun <T> Client.IdempotentInvocationHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<T> {
return this.attachAsync(options).await()
}

/**
* Suspend version of [Client.IdempotentInvocationHandle.getOutput].
*
* @param options request options
* @return the output response
*/
suspend fun <T> Client.IdempotentInvocationHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<Output<T>> {
return this.getOutputAsync(options).await()
}

/**
* Create a new [Client.WorkflowHandle] for the provided workflow name and identifier.
*
* @param workflowName the workflow name
* @param workflowId the workflow identifier
* @return the workflow handle
*/
inline fun <reified Res> Client.workflowHandle(
workflowName: String,
workflowId: String
): Client.WorkflowHandle<Res> {
return this.workflowHandle(workflowName, workflowId, typeTag<Res>())
}

/**
* Suspend version of [Client.WorkflowHandle.attach].
*
* @param options request options
* @return the response
*/
suspend fun <T> Client.WorkflowHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<T> {
return this.attachAsync(options).await()
}

/**
* Suspend version of [Client.WorkflowHandle.getOutput].
*
* @param options request options
* @return the output response
*/
suspend fun <T> Client.WorkflowHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): Response<Output<T>> {
return this.getOutputAsync(options).await()
}

/** @see ResponseHead.statusCode */
val ResponseHead.status: Int
get() = this.statusCode()

/** @see ResponseHead.headers */
val ResponseHead.headers: ResponseHead.Headers
get() = this.headers()

/** @see Response.response */
val <Res> Response<Res>.response: Res
get() = this.response()

/** @see SendResponse.sendStatus */
val <Res> SendResponse<Res>.sendStatus: SendResponse.SendStatus
get() = this.sendStatus()
Loading