-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[NA] [BE] [SDK] Opik Connect #6074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
331ba50
8a59523
31ab58f
20b9b64
d23ba76
bcc1b52
f2f1460
8a86592
4a10f06
f38223c
50d7cb5
6f8bbcd
c687794
0fe977c
1d8de2a
6cedb9d
b2934af
4ff483d
5d237f4
3f79132
8c7f1f3
b9ceacd
c6e0bb1
0b31672
9d8e6dc
cc67a16
c6c50bc
629516f
8d7ad98
6defadb
70171a8
c6ca409
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,10 +2,17 @@ | |
|
|
||
| import com.codahale.metrics.annotation.Timed; | ||
| import com.comet.opik.api.error.ErrorMessage; | ||
| import com.comet.opik.api.runner.BridgeCommand; | ||
| import com.comet.opik.api.runner.BridgeCommandBatchResponse; | ||
| import com.comet.opik.api.runner.BridgeCommandNextRequest; | ||
| import com.comet.opik.api.runner.BridgeCommandResultRequest; | ||
| import com.comet.opik.api.runner.BridgeCommandSubmitRequest; | ||
| import com.comet.opik.api.runner.BridgeCommandSubmitResponse; | ||
| import com.comet.opik.api.runner.CreateLocalRunnerJobRequest; | ||
| import com.comet.opik.api.runner.LocalRunner; | ||
| import com.comet.opik.api.runner.LocalRunnerConnectRequest; | ||
| import com.comet.opik.api.runner.LocalRunnerConnectResponse; | ||
| import com.comet.opik.api.runner.LocalRunnerHeartbeatRequest; | ||
| import com.comet.opik.api.runner.LocalRunnerHeartbeatResponse; | ||
| import com.comet.opik.api.runner.LocalRunnerJob; | ||
| import com.comet.opik.api.runner.LocalRunnerJobResultRequest; | ||
|
|
@@ -17,6 +24,7 @@ | |
| import com.comet.opik.infrastructure.LocalRunnerConfig; | ||
| import com.comet.opik.infrastructure.auth.RequestContext; | ||
| import com.comet.opik.infrastructure.ratelimit.RateLimited; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.node.NullNode; | ||
| import io.swagger.v3.oas.annotations.Operation; | ||
| import io.swagger.v3.oas.annotations.headers.Header; | ||
|
|
@@ -34,6 +42,7 @@ | |
| import jakarta.ws.rs.Consumes; | ||
| import jakarta.ws.rs.DefaultValue; | ||
| import jakarta.ws.rs.GET; | ||
| import jakarta.ws.rs.PATCH; | ||
| import jakarta.ws.rs.POST; | ||
| import jakarta.ws.rs.PUT; | ||
| import jakarta.ws.rs.Path; | ||
|
|
@@ -155,18 +164,35 @@ public Response registerAgents(@PathParam("runnerId") UUID runnerId, | |
| return Response.noContent().build(); | ||
| } | ||
|
|
||
| @PATCH | ||
| @Path("/{runnerId}/checklist") | ||
| @RateLimited | ||
| @Operation(operationId = "patchChecklist", summary = "Patch runner checklist", description = "Partial update of the runner's checklist (deep merge)", responses = { | ||
| @ApiResponse(responseCode = "204", description = "No content"), | ||
| @ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public Response patchChecklist(@PathParam("runnerId") UUID runnerId, | ||
| @RequestBody(content = @Content(schema = @Schema(implementation = Object.class))) @NotNull JsonNode updates) { | ||
| ensureEnabled(); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| runnerService.patchChecklist(runnerId, workspaceId, userName, updates); | ||
| return Response.noContent().build(); | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{runnerId}/heartbeats") | ||
| @RateLimited | ||
| @Operation(operationId = "heartbeat", summary = "Local runner heartbeat", description = "Refresh local runner heartbeat", responses = { | ||
| @ApiResponse(responseCode = "200", description = "Heartbeat response", content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatResponse.class))), | ||
| @ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "410", description = "Gone", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public Response heartbeat(@PathParam("runnerId") UUID runnerId) { | ||
| public Response heartbeat(@PathParam("runnerId") UUID runnerId, | ||
| @RequestBody(content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatRequest.class))) LocalRunnerHeartbeatRequest body) { | ||
| ensureEnabled(); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName); | ||
| List<String> capabilities = body != null ? body.capabilities() : null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed here, you are still checking for null in the service layer. |
||
| LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName, capabilities); | ||
| return Response.ok(response).build(); | ||
| } | ||
|
|
||
|
|
@@ -307,6 +333,135 @@ public Response cancelJob(@PathParam("jobId") UUID jobId) { | |
| return Response.noContent().build(); | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{runnerId}/bridge/commands") | ||
| @RateLimited | ||
| @Operation(operationId = "submitBridgeCommand", summary = "Submit bridge command", description = "Submit a bridge command for execution by the local daemon", responses = { | ||
| @ApiResponse(responseCode = "201", description = "Command submitted", headers = @Header(name = "Location", description = "URI of the command"), content = @Content(schema = @Schema(implementation = BridgeCommandSubmitResponse.class))), | ||
| @ApiResponse(responseCode = "404", description = "Runner not found or not connected", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "409", description = "Runner does not support bridge", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "429", description = "Too many requests", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public Response submitBridgeCommand(@PathParam("runnerId") UUID runnerId, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To match existing codebase would be better to rename to |
||
| @RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandSubmitRequest.class))) @NotNull @Valid BridgeCommandSubmitRequest request, | ||
| @Context UriInfo uriInfo) { | ||
| ensureEnabled(); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| UUID commandId = runnerService.submitBridgeCommand(runnerId, workspaceId, userName, request); | ||
| var uri = uriInfo.getBaseUriBuilder() | ||
| .path("v1/private/local-runners/{runnerId}/bridge/commands/{commandId}") | ||
| .build(runnerId, commandId); | ||
| return Response.created(uri) | ||
| .entity(BridgeCommandSubmitResponse.builder().commandId(commandId).build()) | ||
| .build(); | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{runnerId}/bridge/commands/next") | ||
| @Operation(operationId = "nextBridgeCommands", summary = "Poll next bridge commands", description = "Long-poll for pending bridge commands (batch)", responses = { | ||
| @ApiResponse(responseCode = "200", description = "Commands batch", content = @Content(schema = @Schema(implementation = BridgeCommandBatchResponse.class))), | ||
| @ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public void nextBridgeCommands(@PathParam("runnerId") UUID runnerId, | ||
| @Valid BridgeCommandNextRequest request, | ||
| @Suspended AsyncResponse asyncResponse) { | ||
| ensureEnabled(); | ||
| int maxCommands = request != null ? request.effectiveMaxCommands() : 10; | ||
| long pollTimeoutSeconds = runnerConfig.getBridgePollTimeout().toSeconds(); | ||
|
collincunn marked this conversation as resolved.
|
||
| long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds(); | ||
| asyncResponse.setTimeout(pollTimeoutSeconds + bufferSeconds, TimeUnit.SECONDS); | ||
| asyncResponse.setTimeoutHandler( | ||
| ar -> ar.resume(Response.ok(BridgeCommandBatchResponse.builder() | ||
| .commands(List.of()).build()).build())); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| runnerService.nextBridgeCommands(runnerId, workspaceId, userName, maxCommands) | ||
| .map(batch -> Response.ok(batch).build()) | ||
| .subscribe( | ||
| asyncResponse::resume, | ||
| error -> { | ||
| if (error instanceof WebApplicationException wae) { | ||
| asyncResponse.resume(wae); | ||
| } else { | ||
| log.error("Error polling bridge commands for runner='{}' workspace='{}'", runnerId, | ||
| workspaceId, error); | ||
| asyncResponse.resume(Response.serverError().build()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{runnerId}/bridge/commands/{commandId}/results") | ||
| @Operation(operationId = "reportBridgeResult", summary = "Report bridge command result", description = "Report bridge command completion or failure", responses = { | ||
| @ApiResponse(responseCode = "200", description = "Result accepted"), | ||
| @ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))), | ||
| @ApiResponse(responseCode = "409", description = "Already completed", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public Response reportBridgeResult(@PathParam("runnerId") UUID runnerId, | ||
| @PathParam("commandId") UUID commandId, | ||
| @RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandResultRequest.class))) @NotNull @Valid BridgeCommandResultRequest request) { | ||
| ensureEnabled(); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
| runnerService.reportBridgeCommandResult(runnerId, workspaceId, userName, commandId, request); | ||
| return Response.ok().build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change return code to 204, as you return an empty body always: |
||
| } | ||
|
|
||
| @GET | ||
| @Path("/{runnerId}/bridge/commands/{commandId}") | ||
| @Operation(operationId = "getBridgeCommand", summary = "Get bridge command", description = "Get bridge command status, optionally long-polling for completion", responses = { | ||
| @ApiResponse(responseCode = "200", description = "Command state", content = @Content(schema = @Schema(implementation = BridgeCommand.class))), | ||
| @ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))}) | ||
| public void getBridgeCommand(@PathParam("runnerId") UUID runnerId, | ||
| @PathParam("commandId") UUID commandId, | ||
| @QueryParam("wait") @DefaultValue("false") boolean wait, | ||
| @QueryParam("timeout") @DefaultValue("30") int timeout, | ||
| @Suspended AsyncResponse asyncResponse) { | ||
| ensureEnabled(); | ||
| String workspaceId = requestContext.get().getWorkspaceId(); | ||
| String userName = requestContext.get().getUserName(); | ||
|
|
||
| if (!wait) { | ||
| BridgeCommand command = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId); | ||
| asyncResponse.resume(Response.ok(command).build()); | ||
| return; | ||
| } | ||
|
collincunn marked this conversation as resolved.
|
||
|
|
||
| int maxTimeout = (int) runnerConfig.getBridgeMaxCommandTimeout().toSeconds(); | ||
| int clampedTimeout = Math.min(Math.max(timeout, 1), maxTimeout); | ||
| long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds(); | ||
| asyncResponse.setTimeout(clampedTimeout + bufferSeconds, TimeUnit.SECONDS); | ||
| asyncResponse.setTimeoutHandler(ar -> { | ||
| try { | ||
| BridgeCommand cmd = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId); | ||
| ar.resume(Response.ok(cmd).build()); | ||
| } catch (Exception e) { | ||
| ar.resume(e); | ||
| } | ||
| }); | ||
|
Comment on lines
+432
to
+439
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout handler, endpoint will return a comment even if it's not done yet. |
||
|
|
||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need in this try/catch, you already have exception handler in subscribe. |
||
| runnerService.awaitBridgeCommand(runnerId, workspaceId, userName, commandId, clampedTimeout) | ||
| .map(cmd -> Response.ok(cmd).build()) | ||
| .subscribe( | ||
| asyncResponse::resume, | ||
| error -> { | ||
| if (error instanceof WebApplicationException wae) { | ||
| asyncResponse.resume(wae); | ||
| } else { | ||
| log.error("Error awaiting bridge command='{}' runner='{}' workspace='{}'", | ||
| commandId, | ||
| runnerId, workspaceId, error); | ||
| asyncResponse.resume(Response.serverError().build()); | ||
| } | ||
| }); | ||
| } catch (WebApplicationException wae) { | ||
| asyncResponse.resume(wae); | ||
| } catch (Exception e) { | ||
| log.error("Error setting up bridge command await='{}' runner='{}' workspace='{}'", commandId, | ||
| runnerId, workspaceId, e); | ||
| asyncResponse.resume(Response.serverError().build()); | ||
| } | ||
| } | ||
|
|
||
| private void ensureEnabled() { | ||
| if (!runnerConfig.isEnabled()) { | ||
| throw new WebApplicationException(Response.Status.NOT_IMPLEMENTED); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.PropertyNamingStrategies; | ||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||
| import lombok.Builder; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.UUID; | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommand( | ||
|
collincunn marked this conversation as resolved.
|
||
| UUID commandId, | ||
| UUID runnerId, | ||
| BridgeCommandType type, | ||
| BridgeCommandStatus status, | ||
| JsonNode args, | ||
| JsonNode result, | ||
| JsonNode error, | ||
| int timeoutSeconds, | ||
| Instant submittedAt, | ||
| Instant pickedUpAt, | ||
| Instant completedAt, | ||
| Long durationMs) { | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.PropertyNamingStrategies; | ||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||
| import lombok.Builder; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommandBatchResponse( | ||
| List<BridgeCommandItem> commands) { | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommandItem( | ||
| UUID commandId, | ||
| BridgeCommandType type, | ||
| JsonNode args, | ||
| int timeoutSeconds, | ||
| Instant submittedAt) { | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.databind.PropertyNamingStrategies; | ||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||
| import lombok.Builder; | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommandNextRequest( | ||
| Integer maxCommands) { | ||
|
|
||
| private static final int DEFAULT_MAX_COMMANDS = 10; | ||
| private static final int MAX_MAX_COMMANDS = 20; | ||
|
|
||
| public int effectiveMaxCommands() { | ||
| if (maxCommands == null || maxCommands <= 0) { | ||
| return DEFAULT_MAX_COMMANDS; | ||
| } | ||
| return Math.min(maxCommands, MAX_MAX_COMMANDS); | ||
| } | ||
|
collincunn marked this conversation as resolved.
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.PropertyNamingStrategies; | ||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import lombok.Builder; | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommandResultRequest( | ||
| @NotNull BridgeCommandStatus status, | ||
| JsonNode result, | ||
| JsonNode error, | ||
|
Comment on lines
+15
to
+16
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably we want to add here a validation that at least one of these is not null? |
||
| Long durationMs) { | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonValue; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| import java.util.Arrays; | ||
|
|
||
| @Getter | ||
| @RequiredArgsConstructor | ||
| public enum BridgeCommandStatus { | ||
|
|
||
| PENDING("pending"), | ||
| PICKED_UP("picked_up"), | ||
| COMPLETED("completed"), | ||
| FAILED("failed"), | ||
| TIMED_OUT("timed_out"); | ||
|
|
||
| @JsonValue | ||
| private final String value; | ||
|
|
||
| @JsonCreator | ||
| public static BridgeCommandStatus fromValue(String value) { | ||
| return Arrays.stream(values()) | ||
| .filter(status -> status.value.equals(value)) | ||
| .findFirst() | ||
| .orElseThrow(() -> new IllegalArgumentException("Unknown BridgeCommandStatus: " + value)); | ||
| } | ||
|
|
||
| public boolean isTerminal() { | ||
| return this == COMPLETED || this == FAILED || this == TIMED_OUT; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| package com.comet.opik.api.runner; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.PropertyNamingStrategies; | ||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import lombok.Builder; | ||
|
|
||
| @Builder(toBuilder = true) | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) | ||
| public record BridgeCommandSubmitRequest( | ||
| @NotNull BridgeCommandType type, | ||
| @NotNull JsonNode args, | ||
| Integer timeoutSeconds) { | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.