Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -968,33 +968,6 @@ localRunner:
# Default: 1000
# Description: Maximum number of log entries a runner can send in a single batch
maxLogEntriesPerBatch: ${OPIK_RUNNER_MAX_LOG_ENTRIES_PER_BATCH:-1000}
# Default: 50
# Description: Maximum number of pending bridge commands allowed per runner
bridgeMaxPendingPerRunner: ${OPIK_RUNNER_BRIDGE_MAX_PENDING:-50}
# Default: 600
# Description: Maximum bridge commands per minute per runner
bridgeMaxCommandsPerMinute: ${OPIK_RUNNER_BRIDGE_MAX_COMMANDS_PER_MIN:-600}
# Default: 120
# Description: Maximum bridge write commands per minute per runner
bridgeMaxWriteCommandsPerMinute: ${OPIK_RUNNER_BRIDGE_MAX_WRITE_COMMANDS_PER_MIN:-120}
# Default: 30s
# Description: How long the bridge nextCommands long-poll blocks before returning empty
bridgePollTimeout: ${OPIK_RUNNER_BRIDGE_POLL_TIMEOUT:-30s}
# Default: 30s
# Description: Default timeout for bridge commands when not specified by caller
bridgeDefaultCommandTimeout: ${OPIK_RUNNER_BRIDGE_DEFAULT_CMD_TIMEOUT:-30s}
# Default: 120s
# Description: Maximum allowed timeout for bridge commands
bridgeMaxCommandTimeout: ${OPIK_RUNNER_BRIDGE_MAX_CMD_TIMEOUT:-120s}
# Default: 1h
# Description: How long completed bridge command metadata is retained in Redis
bridgeCompletedCommandTtl: ${OPIK_RUNNER_BRIDGE_COMPLETED_CMD_TTL:-1h}
# Default: 5s
# Description: Extra buffer added to async response timeout beyond bridge poll/command timeout
bridgeAsyncTimeoutBuffer: ${OPIK_RUNNER_BRIDGE_ASYNC_TIMEOUT_BUFFER:-5s}
# Default: 1048576 (1MB)
# Description: Maximum payload size in bytes for bridge command args and results
bridgeMaxPayloadBytes: ${OPIK_RUNNER_BRIDGE_MAX_PAYLOAD_BYTES:-1048576}

# Trace Thread configuration
traceThreadConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.runner.BridgeCommand;
import com.comet.opik.api.runner.BridgeCommandBatchResponse;
import com.comet.opik.api.runner.BridgeCommandNextRequest;
import com.comet.opik.api.runner.BridgeCommandResultRequest;
import com.comet.opik.api.runner.BridgeCommandSubmitRequest;
import com.comet.opik.api.runner.BridgeCommandSubmitResponse;
import com.comet.opik.api.runner.CreateLocalRunnerJobRequest;
import com.comet.opik.api.runner.LocalRunner;
import com.comet.opik.api.runner.LocalRunnerConnectRequest;
import com.comet.opik.api.runner.LocalRunnerConnectResponse;
import com.comet.opik.api.runner.LocalRunnerHeartbeatRequest;
import com.comet.opik.api.runner.LocalRunnerHeartbeatResponse;
import com.comet.opik.api.runner.LocalRunnerJob;
import com.comet.opik.api.runner.LocalRunnerJobResultRequest;
Expand All @@ -24,7 +17,6 @@
import com.comet.opik.infrastructure.LocalRunnerConfig;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.ratelimit.RateLimited;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.headers.Header;
Expand All @@ -42,7 +34,6 @@
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PATCH;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
Expand Down Expand Up @@ -164,35 +155,18 @@ public Response registerAgents(@PathParam("runnerId") UUID runnerId,
return Response.noContent().build();
}

@PATCH
@Path("/{runnerId}/checklist")
@RateLimited
@Operation(operationId = "patchChecklist", summary = "Patch runner checklist", description = "Partial update of the runner's checklist (deep merge)", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response patchChecklist(@PathParam("runnerId") UUID runnerId,
@RequestBody(content = @Content(schema = @Schema(implementation = Object.class))) @NotNull JsonNode updates) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.patchChecklist(runnerId, workspaceId, userName, updates);
return Response.noContent().build();
}

@POST
@Path("/{runnerId}/heartbeats")
@RateLimited
@Operation(operationId = "heartbeat", summary = "Local runner heartbeat", description = "Refresh local runner heartbeat", responses = {
@ApiResponse(responseCode = "200", description = "Heartbeat response", content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatResponse.class))),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "410", description = "Gone", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response heartbeat(@PathParam("runnerId") UUID runnerId,
@RequestBody(content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatRequest.class))) LocalRunnerHeartbeatRequest body) {
public Response heartbeat(@PathParam("runnerId") UUID runnerId) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
List<String> capabilities = body != null ? body.capabilities() : null;
LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName, capabilities);
LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName);
return Response.ok(response).build();
}

Expand Down Expand Up @@ -333,135 +307,6 @@ public Response cancelJob(@PathParam("jobId") UUID jobId) {
return Response.noContent().build();
}

@POST
@Path("/{runnerId}/bridge/commands")
@RateLimited
@Operation(operationId = "submitBridgeCommand", summary = "Submit bridge command", description = "Submit a bridge command for execution by the local daemon", responses = {
@ApiResponse(responseCode = "201", description = "Command submitted", headers = @Header(name = "Location", description = "URI of the command"), content = @Content(schema = @Schema(implementation = BridgeCommandSubmitResponse.class))),
@ApiResponse(responseCode = "404", description = "Runner not found or not connected", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "409", description = "Runner does not support bridge", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "429", description = "Too many requests", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response submitBridgeCommand(@PathParam("runnerId") UUID runnerId,
@RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandSubmitRequest.class))) @NotNull @Valid BridgeCommandSubmitRequest request,
@Context UriInfo uriInfo) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
UUID commandId = runnerService.submitBridgeCommand(runnerId, workspaceId, userName, request);
var uri = uriInfo.getBaseUriBuilder()
.path("v1/private/local-runners/{runnerId}/bridge/commands/{commandId}")
.build(runnerId, commandId);
return Response.created(uri)
.entity(BridgeCommandSubmitResponse.builder().commandId(commandId).build())
.build();
}

@POST
@Path("/{runnerId}/bridge/commands/next")
@Operation(operationId = "nextBridgeCommands", summary = "Poll next bridge commands", description = "Long-poll for pending bridge commands (batch)", responses = {
@ApiResponse(responseCode = "200", description = "Commands batch", content = @Content(schema = @Schema(implementation = BridgeCommandBatchResponse.class))),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public void nextBridgeCommands(@PathParam("runnerId") UUID runnerId,
@Valid BridgeCommandNextRequest request,
@Suspended AsyncResponse asyncResponse) {
ensureEnabled();
int maxCommands = request != null ? request.effectiveMaxCommands() : 10;
long pollTimeoutSeconds = runnerConfig.getBridgePollTimeout().toSeconds();
long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds();
asyncResponse.setTimeout(pollTimeoutSeconds + bufferSeconds, TimeUnit.SECONDS);
asyncResponse.setTimeoutHandler(
ar -> ar.resume(Response.ok(BridgeCommandBatchResponse.builder()
.commands(List.of()).build()).build()));
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.nextBridgeCommands(runnerId, workspaceId, userName, maxCommands)
.map(batch -> Response.ok(batch).build())
.subscribe(
asyncResponse::resume,
error -> {
if (error instanceof WebApplicationException wae) {
asyncResponse.resume(wae);
} else {
log.error("Error polling bridge commands for runner='{}' workspace='{}'", runnerId,
workspaceId, error);
asyncResponse.resume(Response.serverError().build());
}
});
}

@POST
@Path("/{runnerId}/bridge/commands/{commandId}/results")
@Operation(operationId = "reportBridgeResult", summary = "Report bridge command result", description = "Report bridge command completion or failure", responses = {
@ApiResponse(responseCode = "200", description = "Result accepted"),
@ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "409", description = "Already completed", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response reportBridgeResult(@PathParam("runnerId") UUID runnerId,
@PathParam("commandId") UUID commandId,
@RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandResultRequest.class))) @NotNull @Valid BridgeCommandResultRequest request) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.reportBridgeCommandResult(runnerId, workspaceId, userName, commandId, request);
return Response.ok().build();
}

@GET
@Path("/{runnerId}/bridge/commands/{commandId}")
@Operation(operationId = "getBridgeCommand", summary = "Get bridge command", description = "Get bridge command status, optionally long-polling for completion", responses = {
@ApiResponse(responseCode = "200", description = "Command state", content = @Content(schema = @Schema(implementation = BridgeCommand.class))),
@ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public void getBridgeCommand(@PathParam("runnerId") UUID runnerId,
@PathParam("commandId") UUID commandId,
@QueryParam("wait") @DefaultValue("false") boolean wait,
@QueryParam("timeout") @DefaultValue("30") int timeout,
@Suspended AsyncResponse asyncResponse) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();

if (!wait) {
BridgeCommand command = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId);
asyncResponse.resume(Response.ok(command).build());
return;
}

int maxTimeout = (int) runnerConfig.getBridgeMaxCommandTimeout().toSeconds();
int clampedTimeout = Math.min(Math.max(timeout, 1), maxTimeout);
long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds();
asyncResponse.setTimeout(clampedTimeout + bufferSeconds, TimeUnit.SECONDS);
asyncResponse.setTimeoutHandler(ar -> {
try {
BridgeCommand cmd = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId);
ar.resume(Response.ok(cmd).build());
} catch (Exception e) {
ar.resume(e);
}
});

try {
runnerService.awaitBridgeCommand(runnerId, workspaceId, userName, commandId, clampedTimeout)
.map(cmd -> Response.ok(cmd).build())
.subscribe(
asyncResponse::resume,
error -> {
if (error instanceof WebApplicationException wae) {
asyncResponse.resume(wae);
} else {
log.error("Error awaiting bridge command='{}' runner='{}' workspace='{}'",
commandId,
runnerId, workspaceId, error);
asyncResponse.resume(Response.serverError().build());
}
});
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
log.error("Error setting up bridge command await='{}' runner='{}' workspace='{}'", commandId,
runnerId, workspaceId, e);
asyncResponse.resume(Response.serverError().build());
}
}

private void ensureEnabled() {
if (!runnerConfig.isEnabled()) {
throw new WebApplicationException(Response.Status.NOT_IMPLEMENTED);
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading
Loading