Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
331ba50
initial
collincunn Apr 4, 2026
8a59523
remove scratch pad
collincunn Apr 4, 2026
31ab58f
fixes
collincunn Apr 4, 2026
20b9b64
linting
collincunn Apr 4, 2026
d23ba76
linting
collincunn Apr 4, 2026
bcc1b52
adding tui
collincunn Apr 4, 2026
f2f1460
pr comments
collincunn Apr 4, 2026
8a86592
pr comments
collincunn Apr 4, 2026
4a10f06
pr comments
collincunn Apr 4, 2026
f38223c
pr comments
collincunn Apr 4, 2026
50d7cb5
pr comments
collincunn Apr 5, 2026
6f8bbcd
made more configurable and way increased rate limiting
collincunn Apr 5, 2026
c687794
bug
collincunn Apr 5, 2026
0fe977c
testing
collincunn Apr 5, 2026
1d8de2a
bugs
collincunn Apr 5, 2026
6cedb9d
bugs
collincunn Apr 5, 2026
b2934af
Merge branch 'main' into collinc/bridge
collincunn Apr 5, 2026
4ff483d
renames
collincunn Apr 6, 2026
5d237f4
more cleanup
collincunn Apr 6, 2026
3f79132
ci fail
collincunn Apr 6, 2026
8c7f1f3
more testing
collincunn Apr 6, 2026
b9ceacd
[NA] [FE] Revert accidental local dev config changes
collincunn Apr 6, 2026
c6e0bb1
[NA] [BE] Remove scratch planning docs from tracked files
collincunn Apr 6, 2026
0b31672
added platform to checklist
collincunn Apr 6, 2026
9d8e6dc
[NA] [FE] Revert unrelated frontend changes from branch
collincunn Apr 6, 2026
cc67a16
bugs
collincunn Apr 6, 2026
c6c50bc
background bash
collincunn Apr 6, 2026
629516f
naked opik connect works now
collincunn Apr 6, 2026
8d7ad98
ci failure
collincunn Apr 6, 2026
6defadb
[NA] [FE] Reset frontend files to match main
collincunn Apr 6, 2026
70171a8
race condition fix
collincunn Apr 6, 2026
c6ca409
Fix StringRedisClient.getList call - remove extra codec argument
Lothiraldan Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,33 @@ localRunner:
# Default: 1000
# Description: Maximum number of log entries a runner can send in a single batch
maxLogEntriesPerBatch: ${OPIK_RUNNER_MAX_LOG_ENTRIES_PER_BATCH:-1000}
# Default: 50
# Description: Maximum number of pending bridge commands allowed per runner
bridgeMaxPendingPerRunner: ${OPIK_RUNNER_BRIDGE_MAX_PENDING:-50}
# Default: 600
# Description: Maximum bridge commands per minute per runner
bridgeMaxCommandsPerMinute: ${OPIK_RUNNER_BRIDGE_MAX_COMMANDS_PER_MIN:-600}
# Default: 120
# Description: Maximum bridge write commands per minute per runner
bridgeMaxWriteCommandsPerMinute: ${OPIK_RUNNER_BRIDGE_MAX_WRITE_COMMANDS_PER_MIN:-120}
# Default: 30s
# Description: How long the bridge nextCommands long-poll blocks before returning empty
bridgePollTimeout: ${OPIK_RUNNER_BRIDGE_POLL_TIMEOUT:-30s}
Comment thread
collincunn marked this conversation as resolved.
# Default: 30s
# Description: Default timeout for bridge commands when not specified by caller
bridgeDefaultCommandTimeout: ${OPIK_RUNNER_BRIDGE_DEFAULT_CMD_TIMEOUT:-30s}
# Default: 120s
# Description: Maximum allowed timeout for bridge commands
bridgeMaxCommandTimeout: ${OPIK_RUNNER_BRIDGE_MAX_CMD_TIMEOUT:-120s}
# Default: 1h
# Description: How long completed bridge command metadata is retained in Redis
bridgeCompletedCommandTtl: ${OPIK_RUNNER_BRIDGE_COMPLETED_CMD_TTL:-1h}
# Default: 5s
# Description: Extra buffer added to async response timeout beyond bridge poll/command timeout
bridgeAsyncTimeoutBuffer: ${OPIK_RUNNER_BRIDGE_ASYNC_TIMEOUT_BUFFER:-5s}
# Default: 1048576 (1MB)
# Description: Maximum payload size in bytes for bridge command args and results
bridgeMaxPayloadBytes: ${OPIK_RUNNER_BRIDGE_MAX_PAYLOAD_BYTES:-1048576}

# Trace Thread configuration
traceThreadConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.api.runner.BridgeCommand;
import com.comet.opik.api.runner.BridgeCommandBatchResponse;
import com.comet.opik.api.runner.BridgeCommandNextRequest;
import com.comet.opik.api.runner.BridgeCommandResultRequest;
import com.comet.opik.api.runner.BridgeCommandSubmitRequest;
import com.comet.opik.api.runner.BridgeCommandSubmitResponse;
import com.comet.opik.api.runner.CreateLocalRunnerJobRequest;
import com.comet.opik.api.runner.LocalRunner;
import com.comet.opik.api.runner.LocalRunnerConnectRequest;
import com.comet.opik.api.runner.LocalRunnerConnectResponse;
import com.comet.opik.api.runner.LocalRunnerHeartbeatRequest;
import com.comet.opik.api.runner.LocalRunnerHeartbeatResponse;
import com.comet.opik.api.runner.LocalRunnerJob;
import com.comet.opik.api.runner.LocalRunnerJobResultRequest;
Expand All @@ -17,6 +24,7 @@
import com.comet.opik.infrastructure.LocalRunnerConfig;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.ratelimit.RateLimited;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.headers.Header;
Expand All @@ -34,6 +42,7 @@
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PATCH;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
Expand Down Expand Up @@ -155,18 +164,35 @@ public Response registerAgents(@PathParam("runnerId") UUID runnerId,
return Response.noContent().build();
}

@PATCH
@Path("/{runnerId}/checklist")
@RateLimited
@Operation(operationId = "patchChecklist", summary = "Patch runner checklist", description = "Partial update of the runner's checklist (deep merge)", responses = {
@ApiResponse(responseCode = "204", description = "No content"),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response patchChecklist(@PathParam("runnerId") UUID runnerId,
@RequestBody(content = @Content(schema = @Schema(implementation = Object.class))) @NotNull JsonNode updates) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.patchChecklist(runnerId, workspaceId, userName, updates);
return Response.noContent().build();
}

@POST
@Path("/{runnerId}/heartbeats")
@RateLimited
@Operation(operationId = "heartbeat", summary = "Local runner heartbeat", description = "Refresh local runner heartbeat", responses = {
@ApiResponse(responseCode = "200", description = "Heartbeat response", content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatResponse.class))),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "410", description = "Gone", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response heartbeat(@PathParam("runnerId") UUID runnerId) {
public Response heartbeat(@PathParam("runnerId") UUID runnerId,
@RequestBody(content = @Content(schema = @Schema(implementation = LocalRunnerHeartbeatRequest.class))) LocalRunnerHeartbeatRequest body) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName);
List<String> capabilities = body != null ? body.capabilities() : null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed here, you are still checking for null in the service layer.

LocalRunnerHeartbeatResponse response = runnerService.heartbeat(runnerId, workspaceId, userName, capabilities);
return Response.ok(response).build();
}

Expand Down Expand Up @@ -307,6 +333,135 @@ public Response cancelJob(@PathParam("jobId") UUID jobId) {
return Response.noContent().build();
}

@POST
@Path("/{runnerId}/bridge/commands")
@RateLimited
@Operation(operationId = "submitBridgeCommand", summary = "Submit bridge command", description = "Submit a bridge command for execution by the local daemon", responses = {
@ApiResponse(responseCode = "201", description = "Command submitted", headers = @Header(name = "Location", description = "URI of the command"), content = @Content(schema = @Schema(implementation = BridgeCommandSubmitResponse.class))),
@ApiResponse(responseCode = "404", description = "Runner not found or not connected", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "409", description = "Runner does not support bridge", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "429", description = "Too many requests", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response submitBridgeCommand(@PathParam("runnerId") UUID runnerId,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match existing codebase would be better to rename to createBridgeCommand

@RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandSubmitRequest.class))) @NotNull @Valid BridgeCommandSubmitRequest request,
@Context UriInfo uriInfo) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
UUID commandId = runnerService.submitBridgeCommand(runnerId, workspaceId, userName, request);
var uri = uriInfo.getBaseUriBuilder()
.path("v1/private/local-runners/{runnerId}/bridge/commands/{commandId}")
.build(runnerId, commandId);
return Response.created(uri)
.entity(BridgeCommandSubmitResponse.builder().commandId(commandId).build())
.build();
}

@POST
@Path("/{runnerId}/bridge/commands/next")
@Operation(operationId = "nextBridgeCommands", summary = "Poll next bridge commands", description = "Long-poll for pending bridge commands (batch)", responses = {
@ApiResponse(responseCode = "200", description = "Commands batch", content = @Content(schema = @Schema(implementation = BridgeCommandBatchResponse.class))),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public void nextBridgeCommands(@PathParam("runnerId") UUID runnerId,
@Valid BridgeCommandNextRequest request,
@Suspended AsyncResponse asyncResponse) {
ensureEnabled();
int maxCommands = request != null ? request.effectiveMaxCommands() : 10;
long pollTimeoutSeconds = runnerConfig.getBridgePollTimeout().toSeconds();
Comment thread
collincunn marked this conversation as resolved.
long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds();
asyncResponse.setTimeout(pollTimeoutSeconds + bufferSeconds, TimeUnit.SECONDS);
asyncResponse.setTimeoutHandler(
ar -> ar.resume(Response.ok(BridgeCommandBatchResponse.builder()
.commands(List.of()).build()).build()));
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.nextBridgeCommands(runnerId, workspaceId, userName, maxCommands)
.map(batch -> Response.ok(batch).build())
.subscribe(
asyncResponse::resume,
error -> {
if (error instanceof WebApplicationException wae) {
asyncResponse.resume(wae);
} else {
log.error("Error polling bridge commands for runner='{}' workspace='{}'", runnerId,
workspaceId, error);
asyncResponse.resume(Response.serverError().build());
}
});
}

@POST
@Path("/{runnerId}/bridge/commands/{commandId}/results")
@Operation(operationId = "reportBridgeResult", summary = "Report bridge command result", description = "Report bridge command completion or failure", responses = {
@ApiResponse(responseCode = "200", description = "Result accepted"),
@ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class))),
@ApiResponse(responseCode = "409", description = "Already completed", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response reportBridgeResult(@PathParam("runnerId") UUID runnerId,
@PathParam("commandId") UUID commandId,
@RequestBody(content = @Content(schema = @Schema(implementation = BridgeCommandResultRequest.class))) @NotNull @Valid BridgeCommandResultRequest request) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();
runnerService.reportBridgeCommandResult(runnerId, workspaceId, userName, commandId, request);
return Response.ok().build();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change return code to 204, as you return an empty body always:
Response.noContent().build()

}

@GET
@Path("/{runnerId}/bridge/commands/{commandId}")
@Operation(operationId = "getBridgeCommand", summary = "Get bridge command", description = "Get bridge command status, optionally long-polling for completion", responses = {
@ApiResponse(responseCode = "200", description = "Command state", content = @Content(schema = @Schema(implementation = BridgeCommand.class))),
@ApiResponse(responseCode = "404", description = "Command not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public void getBridgeCommand(@PathParam("runnerId") UUID runnerId,
@PathParam("commandId") UUID commandId,
@QueryParam("wait") @DefaultValue("false") boolean wait,
@QueryParam("timeout") @DefaultValue("30") int timeout,
@Suspended AsyncResponse asyncResponse) {
ensureEnabled();
String workspaceId = requestContext.get().getWorkspaceId();
String userName = requestContext.get().getUserName();

if (!wait) {
BridgeCommand command = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId);
asyncResponse.resume(Response.ok(command).build());
return;
}
Comment thread
collincunn marked this conversation as resolved.

int maxTimeout = (int) runnerConfig.getBridgeMaxCommandTimeout().toSeconds();
int clampedTimeout = Math.min(Math.max(timeout, 1), maxTimeout);
long bufferSeconds = runnerConfig.getBridgeAsyncTimeoutBuffer().toSeconds();
asyncResponse.setTimeout(clampedTimeout + bufferSeconds, TimeUnit.SECONDS);
asyncResponse.setTimeoutHandler(ar -> {
try {
BridgeCommand cmd = runnerService.getBridgeCommand(runnerId, workspaceId, userName, commandId);
ar.resume(Response.ok(cmd).build());
} catch (Exception e) {
ar.resume(e);
}
});
Comment on lines +432 to +439
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout handler, endpoint will return a comment even if it's not done yet.


try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need in this try/catch, you already have exception handler in subscribe.

runnerService.awaitBridgeCommand(runnerId, workspaceId, userName, commandId, clampedTimeout)
.map(cmd -> Response.ok(cmd).build())
.subscribe(
asyncResponse::resume,
error -> {
if (error instanceof WebApplicationException wae) {
asyncResponse.resume(wae);
} else {
log.error("Error awaiting bridge command='{}' runner='{}' workspace='{}'",
commandId,
runnerId, workspaceId, error);
asyncResponse.resume(Response.serverError().build());
}
});
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
log.error("Error setting up bridge command await='{}' runner='{}' workspace='{}'", commandId,
runnerId, workspaceId, e);
asyncResponse.resume(Response.serverError().build());
}
}

private void ensureEnabled() {
if (!runnerConfig.isEnabled()) {
throw new WebApplicationException(Response.Status.NOT_IMPLEMENTED);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.time.Instant;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommand(
Comment thread
collincunn marked this conversation as resolved.
UUID commandId,
UUID runnerId,
BridgeCommandType type,
BridgeCommandStatus status,
JsonNode args,
JsonNode result,
JsonNode error,
int timeoutSeconds,
Instant submittedAt,
Instant pickedUpAt,
Instant completedAt,
Long durationMs) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommandBatchResponse(
List<BridgeCommandItem> commands) {

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommandItem(
UUID commandId,
BridgeCommandType type,
JsonNode args,
int timeoutSeconds,
Instant submittedAt) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommandNextRequest(
Integer maxCommands) {

private static final int DEFAULT_MAX_COMMANDS = 10;
private static final int MAX_MAX_COMMANDS = 20;

public int effectiveMaxCommands() {
if (maxCommands == null || maxCommands <= 0) {
return DEFAULT_MAX_COMMANDS;
}
return Math.min(maxCommands, MAX_MAX_COMMANDS);
}
Comment thread
collincunn marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommandResultRequest(
@NotNull BridgeCommandStatus status,
JsonNode result,
JsonNode error,
Comment on lines +15 to +16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we want to add here a validation that at least one of these is not null?

Long durationMs) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.Arrays;

@Getter
@RequiredArgsConstructor
public enum BridgeCommandStatus {

PENDING("pending"),
PICKED_UP("picked_up"),
COMPLETED("completed"),
FAILED("failed"),
TIMED_OUT("timed_out");

@JsonValue
private final String value;

@JsonCreator
public static BridgeCommandStatus fromValue(String value) {
return Arrays.stream(values())
.filter(status -> status.value.equals(value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown BridgeCommandStatus: " + value));
}

public boolean isTerminal() {
return this == COMPLETED || this == FAILED || this == TIMED_OUT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.comet.opik.api.runner;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record BridgeCommandSubmitRequest(
@NotNull BridgeCommandType type,
@NotNull JsonNode args,
Integer timeoutSeconds) {
}
Loading
Loading