diff --git a/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java index 0bcc6188..07a6f3e2 100644 --- a/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java @@ -49,7 +49,11 @@ public ListWorkflowsInput asInput() { parent_workflow_id, null, // wasForkedFrom null, // hasParent - null // attributes + null, // attributes + null, // completedAfter + null, // completedBefore + null, // dequeuedAfter + null // dequeuedBefore ); } } diff --git a/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java index 17c05bdb..8e06d7c3 100644 --- a/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java @@ -16,6 +16,10 @@ public record ListWorkflowsRequest( @JsonDeserialize(using = StringOrListDeserializer.class) List authenticated_user, String start_time, String end_time, + String completed_after, + String completed_before, + String dequeued_after, + String dequeued_before, @JsonDeserialize(using = StringOrListDeserializer.class) List status, @JsonDeserialize(using = StringOrListDeserializer.class) List application_version, @JsonDeserialize(using = StringOrListDeserializer.class) List fork_from, @@ -53,7 +57,10 @@ public ListWorkflowsInput asInput() { parent_workflow_id, null, // wasForkedFrom null, // hasParent - null // attributes - ); + null, // attributes + completed_after != null ? Instant.parse(completed_after) : null, + completed_before != null ? Instant.parse(completed_before) : null, + dequeued_after != null ? Instant.parse(dequeued_after) : null, + dequeued_before != null ? Instant.parse(dequeued_before) : null); } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 530ee0bd..17d70c6d 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -56,6 +56,7 @@ import tools.jackson.core.StreamReadConstraints; import tools.jackson.core.json.JsonFactory; import tools.jackson.core.type.TypeReference; +import tools.jackson.databind.JsonNode; import tools.jackson.databind.cfg.DateTimeFeature; import tools.jackson.databind.json.JsonMapper; @@ -237,12 +238,50 @@ public CompletableFuture onText(WebSocket ws, CharSequence data, boolean last logger.debug("Received {} chars from Conductor {}", messageSize, ws.getClass().getName()); + // Pre-parse to extract type and request_id so unknown types get a structured error response + // rather than a silent drop. + JsonNode envelope; + try { + envelope = mapper.readTree(messageText); + } catch (Exception e) { + logger.error("Conductor JSON Parsing error for {} char message", messageSize, e); + return null; + } + JsonNode typeNode = envelope.get("type"); + JsonNode requestIdNode = envelope.get("request_id"); + if (typeNode != null + && requestIdNode != null + && MessageType.fromValue(typeNode.stringValue()) == null) { + logger.warn("Conductor unknown message type {}", typeNode.stringValue()); + try { + writeFragmentedResponse( + ws, + new BaseResponse( + typeNode.stringValue(), requestIdNode.stringValue(), "Unknown message type"), + mapper); + } catch (Exception e) { + logger.error("Error writing unknown-type error response", e); + } + return null; + } + BaseMessage request; try (InputStream is = new ByteArrayInputStream(messageText.getBytes(StandardCharsets.UTF_8))) { request = mapper.readValue(is, BaseMessage.class); } catch (Exception e) { logger.error("Conductor JSON Parsing error for {} char message", messageSize, e); + if (typeNode != null && requestIdNode != null) { + try { + writeFragmentedResponse( + ws, + new BaseResponse( + typeNode.stringValue(), requestIdNode.stringValue(), e.getMessage()), + mapper); + } catch (Exception writeEx) { + logger.error("Error writing parse-error response", writeEx); + } + } return null; } @@ -707,70 +746,71 @@ void connectWebSocket() { CompletableFuture getResponseAsync(BaseMessage message, WebSocket ws) { logger.debug("getResponseAsync {}", message.type); - MessageType messageType = MessageType.fromValue(message.type); - if (messageType == null) { - logger.warn("Conductor unknown message type {}", message.type); - return CompletableFuture.completedFuture( - new BaseResponse(message.type, message.request_id, "Unknown message type")); - } - return switch (messageType) { - case ALERT -> handleAlert(this, message); - case BACKFILL_SCHEDULE -> handleBackfillSchedule(this, message); - case CANCEL -> handleCancel(this, message); - case DELETE -> handleDelete(this, message); - case EXECUTOR_INFO -> handleExecutorInfo(this, message); - case EXIST_PENDING_WORKFLOWS -> handleExistPendingWorkflows(this, message); - case EXPORT_WORKFLOW -> handleExportWorkflow(this, message, ws); - case FORK_FROM_FAILURE -> handleForkFromFailure(this, message); - case FORK_WORKFLOW -> handleFork(this, message); - case GET_METRICS -> handleGetMetrics(this, message); - case GET_QUEUE -> handleGetQueue(this, message); - case GET_SCHEDULE -> handleGetSchedule(this, message); - case GET_STEP_AGGREGATES -> handleGetStepAggregates(this, message); - case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message); - case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message); - case GET_WORKFLOW_NOTIFICATIONS -> handleGetWorkflowNotifications(this, message); - case GET_WORKFLOW_STREAMS -> handleGetWorkflowStreams(this, message); - case GET_WORKFLOW -> handleGetWorkflow(this, message); - case IMPORT_WORKFLOW -> handleImportWorkflow(this, message); - case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message); - case LIST_QUEUED_WORKFLOWS -> handleListQueuedWorkflows(this, message); - case LIST_QUEUES -> handleListQueues(this, message); - case LIST_SCHEDULES -> handleListSchedules(this, message); - case LIST_STEPS -> handleListSteps(this, message); - case LIST_WORKFLOWS -> handleListWorkflows(this, message); - case PAUSE_SCHEDULE -> handlePauseSchedule(this, message); - case RECOVERY -> handleRecovery(this, message); - case RESTART -> handleRestart(this, message); - case RESUME -> handleResume(this, message); - case RESUME_SCHEDULE -> handleResumeSchedule(this, message); - case RETENTION -> handleRetention(this, message); - case SET_LATEST_APPLICATION_VERSION -> handleSetLatestApplicationVersion(this, message); - case TRIGGER_SCHEDULE -> handleTriggerSchedule(this, message); + return switch (MessageType.fromValue(message.type)) { + case ALERT -> handleAlert(this, (AlertRequest) message); + case BACKFILL_SCHEDULE -> handleBackfillSchedule(this, (BackfillScheduleRequest) message); + case CANCEL -> handleCancel(this, (CancelRequest) message); + case DELETE -> handleDelete(this, (DeleteRequest) message); + case EXECUTOR_INFO -> handleExecutorInfo(this, (ExecutorInfoRequest) message); + case EXIST_PENDING_WORKFLOWS -> + handleExistPendingWorkflows(this, (ExistPendingWorkflowsRequest) message); + case EXPORT_WORKFLOW -> handleExportWorkflow(this, (ExportWorkflowRequest) message, ws); + case FORK_FROM_FAILURE -> handleForkFromFailure(this, (ForkFromFailureRequest) message); + case FORK_WORKFLOW -> handleFork(this, (ForkWorkflowRequest) message); + case GET_METRICS -> handleGetMetrics(this, (GetMetricsRequest) message); + case GET_QUEUE -> handleGetQueue(this, (GetQueueRequest) message); + case GET_SCHEDULE -> handleGetSchedule(this, (GetScheduleRequest) message); + case GET_STEP_AGGREGATES -> handleGetStepAggregates(this, (GetStepAggregatesRequest) message); + case GET_WORKFLOW_AGGREGATES -> + handleGetWorkflowAggregates(this, (GetWorkflowAggregatesRequest) message); + case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, (GetWorkflowEventsRequest) message); + case GET_WORKFLOW_NOTIFICATIONS -> + handleGetWorkflowNotifications(this, (GetWorkflowNotificationsRequest) message); + case GET_WORKFLOW_STREAMS -> + handleGetWorkflowStreams(this, (GetWorkflowStreamsRequest) message); + case GET_WORKFLOW -> handleGetWorkflow(this, (GetWorkflowRequest) message); + case IMPORT_WORKFLOW -> handleImportWorkflow(this, (ImportWorkflowRequest) message); + case LIST_APPLICATION_VERSIONS -> + handleListApplicationVersions(this, (ListApplicationVersionsRequest) message); + case LIST_QUEUED_WORKFLOWS -> + handleListQueuedWorkflows(this, (ListQueuedWorkflowsRequest) message); + case LIST_QUEUES -> handleListQueues(this, (ListQueuesRequest) message); + case LIST_SCHEDULES -> handleListSchedules(this, (ListSchedulesRequest) message); + case LIST_STEPS -> handleListSteps(this, (ListStepsRequest) message); + case LIST_WORKFLOWS -> handleListWorkflows(this, (ListWorkflowsRequest) message); + case PAUSE_SCHEDULE -> handlePauseSchedule(this, (PauseScheduleRequest) message); + case RECOVERY -> handleRecovery(this, (RecoveryRequest) message); + case RESTART -> handleRestart(this, (RestartRequest) message); + case RESUME -> handleResume(this, (ResumeRequest) message); + case RESUME_SCHEDULE -> handleResumeSchedule(this, (ResumeScheduleRequest) message); + case RETENTION -> handleRetention(this, (RetentionRequest) message); + case SET_LATEST_APPLICATION_VERSION -> + handleSetLatestApplicationVersion(this, (SetLatestApplicationVersionRequest) message); + case TRIGGER_SCHEDULE -> handleTriggerSchedule(this, (TriggerScheduleRequest) message); }; } static CompletableFuture handleExecutorInfo( - Conductor conductor, BaseMessage message) { + Conductor conductor, ExecutorInfoRequest request) { return CompletableFuture.supplyAsync( () -> { try { return new ExecutorInfoResponse( - message, + request, conductor.dbosExecutor.executorId(), conductor.dbosExecutor.appVersion(), hostname, conductor.dbosExecutor.executorMetadata()); } catch (Exception e) { - return new ExecutorInfoResponse(message, e); + return new ExecutorInfoResponse(request, e); } }); } - static CompletableFuture handleRecovery(Conductor conductor, BaseMessage message) { + static CompletableFuture handleRecovery( + Conductor conductor, RecoveryRequest request) { return CompletableFuture.supplyAsync( () -> { - RecoveryRequest request = (RecoveryRequest) message; try { conductor.dbosExecutor.recoverPendingWorkflows(request.executor_ids); return new SuccessResponse(request, true); @@ -781,10 +821,9 @@ static CompletableFuture handleRecovery(Conductor conductor, BaseM }); } - static CompletableFuture handleCancel(Conductor conductor, BaseMessage message) { + static CompletableFuture handleCancel(Conductor conductor, CancelRequest request) { return CompletableFuture.supplyAsync( () -> { - CancelRequest request = (CancelRequest) message; List ids = (request.workflow_ids != null && !request.workflow_ids.isEmpty()) ? request.workflow_ids @@ -799,10 +838,9 @@ static CompletableFuture handleCancel(Conductor conductor, BaseMes }); } - static CompletableFuture handleDelete(Conductor conductor, BaseMessage message) { + static CompletableFuture handleDelete(Conductor conductor, DeleteRequest request) { return CompletableFuture.supplyAsync( () -> { - DeleteRequest request = (DeleteRequest) message; List ids = (request.workflow_ids != null && !request.workflow_ids.isEmpty()) ? request.workflow_ids @@ -817,10 +855,9 @@ static CompletableFuture handleDelete(Conductor conductor, BaseMes }); } - static CompletableFuture handleResume(Conductor conductor, BaseMessage message) { + static CompletableFuture handleResume(Conductor conductor, ResumeRequest request) { return CompletableFuture.supplyAsync( () -> { - ResumeRequest request = (ResumeRequest) message; List ids = (request.workflow_ids != null && !request.workflow_ids.isEmpty()) ? request.workflow_ids @@ -835,10 +872,10 @@ static CompletableFuture handleResume(Conductor conductor, BaseMes }); } - static CompletableFuture handleRestart(Conductor conductor, BaseMessage message) { + static CompletableFuture handleRestart( + Conductor conductor, RestartRequest request) { return CompletableFuture.supplyAsync( () -> { - RestartRequest request = (RestartRequest) message; try { ForkOptions options = new ForkOptions(); conductor.dbosExecutor.forkWorkflow(request.workflow_id, 0, options); @@ -851,10 +888,10 @@ static CompletableFuture handleRestart(Conductor conductor, BaseMe }); } - static CompletableFuture handleFork(Conductor conductor, BaseMessage message) { + static CompletableFuture handleFork( + Conductor conductor, ForkWorkflowRequest request) { return CompletableFuture.supplyAsync( () -> { - ForkWorkflowRequest request = (ForkWorkflowRequest) message; if (request.body.workflow_id == null || request.body.start_step == null) { return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request"); } @@ -871,10 +908,9 @@ static CompletableFuture handleFork(Conductor conductor, BaseMessa } static CompletableFuture handleForkFromFailure( - Conductor conductor, BaseMessage message) { + Conductor conductor, ForkFromFailureRequest request) { return CompletableFuture.supplyAsync( () -> { - ForkFromFailureRequest request = (ForkFromFailureRequest) message; try { var options = request.toOptions(); var handles = @@ -891,10 +927,9 @@ static CompletableFuture handleForkFromFailure( } static CompletableFuture handleListWorkflows( - Conductor conductor, BaseMessage message) { + Conductor conductor, ListWorkflowsRequest request) { return CompletableFuture.supplyAsync( () -> { - ListWorkflowsRequest request = (ListWorkflowsRequest) message; try { ListWorkflowsInput input = request.asInput(); List statuses = conductor.dbosExecutor.listWorkflows(input); @@ -909,10 +944,9 @@ static CompletableFuture handleListWorkflows( } static CompletableFuture handleListQueuedWorkflows( - Conductor conductor, BaseMessage message) { + Conductor conductor, ListQueuedWorkflowsRequest request) { return CompletableFuture.supplyAsync( () -> { - ListQueuedWorkflowsRequest request = (ListQueuedWorkflowsRequest) message; try { ListWorkflowsInput input = request.asInput(); List statuses = conductor.dbosExecutor.listWorkflows(input); @@ -927,24 +961,23 @@ static CompletableFuture handleListQueuedWorkflows( } static CompletableFuture handleListApplicationVersions( - Conductor conductor, BaseMessage message) { + Conductor conductor, ListApplicationVersionsRequest request) { return CompletableFuture.supplyAsync( () -> { try { var output = conductor.systemDatabase.listApplicationVersions(); - return new ListApplicationVersionsResponse(message, output); + return new ListApplicationVersionsResponse(request, output); } catch (Exception e) { logger.error("Exception encountered when listing application versions", e); - return new ListApplicationVersionsResponse(message, e); + return new ListApplicationVersionsResponse(request, e); } }); } static CompletableFuture handleSetLatestApplicationVersion( - Conductor conductor, BaseMessage message) { + Conductor conductor, SetLatestApplicationVersionRequest request) { return CompletableFuture.supplyAsync( () -> { - SetLatestApplicationVersionRequest request = (SetLatestApplicationVersionRequest) message; try { conductor.dbosExecutor.setLatestApplicationVersion(request.version_name); return new SuccessResponse(request, true); @@ -958,10 +991,10 @@ static CompletableFuture handleSetLatestApplicationVersion( }); } - static CompletableFuture handleListSteps(Conductor conductor, BaseMessage message) { + static CompletableFuture handleListSteps( + Conductor conductor, ListStepsRequest request) { return CompletableFuture.supplyAsync( () -> { - ListStepsRequest request = (ListStepsRequest) message; try { List stepInfoList = conductor.dbosExecutor.listWorkflowSteps( @@ -977,10 +1010,9 @@ static CompletableFuture handleListSteps(Conductor conductor, Base } static CompletableFuture handleExistPendingWorkflows( - Conductor conductor, BaseMessage message) { + Conductor conductor, ExistPendingWorkflowsRequest request) { return CompletableFuture.supplyAsync( () -> { - ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; try { var pending = conductor.systemDatabase.listWorkflows( @@ -997,10 +1029,9 @@ static CompletableFuture handleExistPendingWorkflows( } static CompletableFuture handleGetWorkflow( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetWorkflowRequest request) { return CompletableFuture.supplyAsync( () -> { - GetWorkflowRequest request = (GetWorkflowRequest) message; try { var status = conductor.systemDatabase.listWorkflows(request.toInput()); WorkflowsOutput output = @@ -1013,11 +1044,10 @@ static CompletableFuture handleGetWorkflow( }); } - static CompletableFuture handleRetention(Conductor conductor, BaseMessage message) { + static CompletableFuture handleRetention( + Conductor conductor, RetentionRequest request) { return CompletableFuture.supplyAsync( () -> { - RetentionRequest request = (RetentionRequest) message; - try { var cutoff = request.body.gc_cutoff_epoch_ms == null @@ -1044,11 +1074,9 @@ static CompletableFuture handleRetention(Conductor conductor, Base } static CompletableFuture handleGetMetrics( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetMetricsRequest request) { return CompletableFuture.supplyAsync( () -> { - GetMetricsRequest request = (GetMetricsRequest) message; - try { if (request.metric_class.equals("workflow_step_count")) { var metrics = @@ -1066,10 +1094,9 @@ static CompletableFuture handleGetMetrics( } static CompletableFuture handleGetWorkflowAggregates( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetWorkflowAggregatesRequest request) { return CompletableFuture.supplyAsync( () -> { - GetWorkflowAggregatesRequest request = (GetWorkflowAggregatesRequest) message; try { var rows = conductor.systemDatabase.getWorkflowAggregates(request.toInput()); return new GetWorkflowAggregatesResponse(request, rows); @@ -1081,10 +1108,9 @@ static CompletableFuture handleGetWorkflowAggregates( } static CompletableFuture handleGetStepAggregates( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetStepAggregatesRequest request) { return CompletableFuture.supplyAsync( () -> { - GetStepAggregatesRequest request = (GetStepAggregatesRequest) message; try { var rows = conductor.systemDatabase.getStepAggregates(request.toInput()); return new GetStepAggregatesResponse(request, rows); @@ -1096,10 +1122,9 @@ static CompletableFuture handleGetStepAggregates( } static CompletableFuture handleGetWorkflowEvents( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetWorkflowEventsRequest request) { return CompletableFuture.supplyAsync( () -> { - GetWorkflowEventsRequest request = (GetWorkflowEventsRequest) message; try { var events = conductor.systemDatabase.getAllEvents(request.workflow_id); return new GetWorkflowEventsResponse(request, events); @@ -1114,10 +1139,9 @@ static CompletableFuture handleGetWorkflowEvents( } static CompletableFuture handleGetWorkflowNotifications( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetWorkflowNotificationsRequest request) { return CompletableFuture.supplyAsync( () -> { - GetWorkflowNotificationsRequest request = (GetWorkflowNotificationsRequest) message; try { var notifications = conductor.systemDatabase.getAllNotifications(request.workflow_id); return new GetWorkflowNotificationsResponse(request, notifications); @@ -1132,10 +1156,9 @@ static CompletableFuture handleGetWorkflowNotifications( } static CompletableFuture handleGetWorkflowStreams( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetWorkflowStreamsRequest request) { return CompletableFuture.supplyAsync( () -> { - GetWorkflowStreamsRequest request = (GetWorkflowStreamsRequest) message; try { var streams = conductor.systemDatabase.getAllStreamEntries(request.workflow_id); return new GetWorkflowStreamsResponse(request, streams); @@ -1216,10 +1239,9 @@ static void streamImportAsync(Conductor conductor, WebSocket ws, Reader pipeRead } static CompletableFuture handleImportWorkflow( - Conductor conductor, BaseMessage message) { + Conductor conductor, ImportWorkflowRequest request) { return CompletableFuture.supplyAsync( () -> { - ImportWorkflowRequest request = (ImportWorkflowRequest) message; long startTime = System.currentTimeMillis(); logger.info("Starting import workflow"); @@ -1242,10 +1264,9 @@ static CompletableFuture handleImportWorkflow( } static CompletableFuture handleExportWorkflow( - Conductor conductor, BaseMessage message, WebSocket ws) { + Conductor conductor, ExportWorkflowRequest request, WebSocket ws) { return CompletableFuture.supplyAsync( () -> { - ExportWorkflowRequest request = (ExportWorkflowRequest) message; long startTime = System.currentTimeMillis(); logger.info( "Starting export workflow: id={}, export_children={}", @@ -1262,7 +1283,7 @@ static CompletableFuture handleExportWorkflow( request.workflow_id, workflows.size()); - streamExportResponse(ws, message, workflows, conductor.mapper); + streamExportResponse(ws, request, workflows, conductor.mapper); long duration = System.currentTimeMillis() - startTime; logger.info( @@ -1375,10 +1396,9 @@ static String serializeExportedWorkflows(List workflows, JsonM return Base64.getEncoder().encodeToString(out.toByteArray()); } - static CompletableFuture handleAlert(Conductor conductor, BaseMessage message) { + static CompletableFuture handleAlert(Conductor conductor, AlertRequest request) { return CompletableFuture.supplyAsync( () -> { - AlertRequest request = (AlertRequest) message; try { conductor.dbosExecutor.fireAlertHandler( request.name, request.message, request.metadata); @@ -1390,10 +1410,9 @@ static CompletableFuture handleAlert(Conductor conductor, BaseMess } static CompletableFuture handleListSchedules( - Conductor conductor, BaseMessage message) { + Conductor conductor, ListSchedulesRequest request) { return CompletableFuture.supplyAsync( () -> { - ListSchedulesRequest request = (ListSchedulesRequest) message; try { List schedules = conductor.systemDatabase.listSchedules( @@ -1410,10 +1429,9 @@ static CompletableFuture handleListSchedules( } static CompletableFuture handleGetSchedule( - Conductor conductor, BaseMessage message) { + Conductor conductor, GetScheduleRequest request) { return CompletableFuture.supplyAsync( () -> { - GetScheduleRequest request = (GetScheduleRequest) message; try { var schedule = conductor.systemDatabase.getSchedule(request.schedule_name); if (schedule.isPresent()) { @@ -1431,24 +1449,24 @@ static CompletableFuture handleGetSchedule( } static CompletableFuture handleListQueues( - Conductor conductor, BaseMessage message) { + Conductor conductor, ListQueuesRequest request) { return CompletableFuture.supplyAsync( () -> { try { List queues = conductor.systemDatabase.listQueues(); List output = queues.stream().map(QueueOutput::from).toList(); - return new ListQueuesResponse(message, output); + return new ListQueuesResponse(request, output); } catch (Exception e) { logger.error("Exception encountered when listing queues", e); - return new ListQueuesResponse(message, e.getMessage()); + return new ListQueuesResponse(request, e.getMessage()); } }); } - static CompletableFuture handleGetQueue(Conductor conductor, BaseMessage message) { + static CompletableFuture handleGetQueue( + Conductor conductor, GetQueueRequest request) { return CompletableFuture.supplyAsync( () -> { - GetQueueRequest request = (GetQueueRequest) message; try { var queue = conductor.systemDatabase.findQueue(request.name); return new GetQueueResponse(request, queue.map(QueueOutput::from).orElse(null)); @@ -1460,10 +1478,9 @@ static CompletableFuture handleGetQueue(Conductor conductor, BaseM } static CompletableFuture handlePauseSchedule( - Conductor conductor, BaseMessage message) { + Conductor conductor, PauseScheduleRequest request) { return CompletableFuture.supplyAsync( () -> { - PauseScheduleRequest request = (PauseScheduleRequest) message; try { conductor.systemDatabase.pauseSchedule(request.schedule_name); return new SuccessResponse(request, true); @@ -1476,10 +1493,9 @@ static CompletableFuture handlePauseSchedule( } static CompletableFuture handleResumeSchedule( - Conductor conductor, BaseMessage message) { + Conductor conductor, ResumeScheduleRequest request) { return CompletableFuture.supplyAsync( () -> { - ResumeScheduleRequest request = (ResumeScheduleRequest) message; try { conductor.systemDatabase.resumeSchedule(request.schedule_name); return new SuccessResponse(request, true); @@ -1492,10 +1508,9 @@ static CompletableFuture handleResumeSchedule( } static CompletableFuture handleBackfillSchedule( - Conductor conductor, BaseMessage message) { + Conductor conductor, BackfillScheduleRequest request) { return CompletableFuture.supplyAsync( () -> { - BackfillScheduleRequest request = (BackfillScheduleRequest) message; try { var start = Instant.parse(request.start); var end = Instant.parse(request.end); @@ -1512,10 +1527,9 @@ static CompletableFuture handleBackfillSchedule( } static CompletableFuture handleTriggerSchedule( - Conductor conductor, BaseMessage message) { + Conductor conductor, TriggerScheduleRequest request) { return CompletableFuture.supplyAsync( () -> { - TriggerScheduleRequest request = (TriggerScheduleRequest) message; try { String workflowId = DBOSExecutor.triggerSchedule(request.schedule_name, conductor.systemDatabase, null); diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetWorkflowRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetWorkflowRequest.java index 43e6f73a..dadfb0f3 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetWorkflowRequest.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetWorkflowRequest.java @@ -41,7 +41,11 @@ public ListWorkflowsInput toInput() { null, // parentWorkflowId null, // wasForkedFrom null, // hasParent - null // attributes + null, // attributes + null, // completedAfter + null, // completedBefore + null, // dequeuedAfter + null // dequeuedBefore ); } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java index 53301cdb..fb1f7960 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java @@ -87,6 +87,11 @@ public ListWorkflowsInput asInput() { body.parent_workflow_id, body.was_forked_from, body.has_parent, - body.attributes); + body.attributes, + null, // completedAfter + null, // completedBefore + null, // dequeuedAfter + null // dequeuedBefore + ); } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java index c75ed9c2..1be88b95 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java @@ -26,6 +26,10 @@ public static class Body { public String start_time; public String end_time; + public String completed_after; + public String completed_before; + public String dequeued_after; + public String dequeued_before; @JsonDeserialize(using = StringOrListDeserializer.class) public List status; @@ -85,6 +89,10 @@ public ListWorkflowsInput asInput() { body.parent_workflow_id, body.was_forked_from, body.has_parent, - body.attributes); + body.attributes, + body.completed_after != null ? Instant.parse(body.completed_after) : null, + body.completed_before != null ? Instant.parse(body.completed_before) : null, + body.dequeued_after != null ? Instant.parse(body.dequeued_after) : null, + body.dequeued_before != null ? Instant.parse(body.dequeued_before) : null); } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java index 6e3eeecc..5849a893 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java @@ -52,7 +52,7 @@ public static MessageType fromValue(String value) { return type; } } - throw new IllegalArgumentException("Unknown message type: " + value); + return null; } @Override diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java index 33c6857c..874b181d 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java @@ -728,6 +728,22 @@ public static List listWorkflows(DbContext ctx, ListWorkflowsInp whereConditions.add("created_at <= ?"); parameters.add(input.endTime().toEpochMilli()); } + if (input.completedAfter() != null) { + whereConditions.add("completed_at >= ?"); + parameters.add(input.completedAfter().toEpochMilli()); + } + if (input.completedBefore() != null) { + whereConditions.add("completed_at <= ?"); + parameters.add(input.completedBefore().toEpochMilli()); + } + if (input.dequeuedAfter() != null) { + whereConditions.add("started_at_epoch_ms >= ?"); + parameters.add(input.dequeuedAfter().toEpochMilli()); + } + if (input.dequeuedBefore() != null) { + whereConditions.add("started_at_epoch_ms <= ?"); + parameters.add(input.dequeuedBefore().toEpochMilli()); + } if (input.status() != null && !input.status().isEmpty()) { whereConditions.add("status = ANY(?)"); parameters.add(input.status()); diff --git a/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java b/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java index e7c488d8..26efcea9 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java @@ -35,7 +35,11 @@ public record ListWorkflowsInput( List parentWorkflowId, Boolean wasForkedFrom, Boolean hasParent, - Map attributes) { + Map attributes, + Instant completedAfter, + Instant completedBefore, + Instant dequeuedAfter, + Instant dequeuedBefore) { // Validate the attributes filter here (every convenience constructor, withX copy, and the // conductor/admin asInput() paths route through this canonical constructor) so an invalid filter @@ -47,7 +51,7 @@ public record ListWorkflowsInput( public ListWorkflowsInput() { this( null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null, null); } public ListWorkflowsInput(String workflowId) { @@ -78,6 +82,10 @@ public ListWorkflowsInput(List workflowIds) { null, null, null, + null, + null, + null, + null, null); } @@ -106,7 +114,11 @@ public ListWorkflowsInput withWorkflowIds(List workflowIds) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withStatus(List status) { @@ -133,7 +145,11 @@ public ListWorkflowsInput withStatus(List status) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withStartTime(Instant startTime) { @@ -160,7 +176,11 @@ public ListWorkflowsInput withStartTime(Instant startTime) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withEndTime(Instant endTime) { @@ -187,7 +207,11 @@ public ListWorkflowsInput withEndTime(Instant endTime) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withWorkflowName(List workflowName) { @@ -214,7 +238,11 @@ public ListWorkflowsInput withWorkflowName(List workflowName) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withClassName(String className) { @@ -241,7 +269,11 @@ public ListWorkflowsInput withClassName(String className) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withInstanceName(String instanceName) { @@ -268,7 +300,11 @@ public ListWorkflowsInput withInstanceName(String instanceName) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withApplicationVersion(List applicationVersion) { @@ -295,7 +331,11 @@ public ListWorkflowsInput withApplicationVersion(List applicationVersion parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withAuthenticatedUser(List authenticatedUser) { @@ -322,7 +362,11 @@ public ListWorkflowsInput withAuthenticatedUser(List authenticatedUser) parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withLimit(Integer limit) { @@ -349,7 +393,11 @@ public ListWorkflowsInput withLimit(Integer limit) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withOffset(Integer offset) { @@ -376,7 +424,11 @@ public ListWorkflowsInput withOffset(Integer offset) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withSortDesc(Boolean sortDesc) { @@ -403,7 +455,11 @@ public ListWorkflowsInput withSortDesc(Boolean sortDesc) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withWorkflowIdPrefix(List workflowIdPrefix) { @@ -430,7 +486,11 @@ public ListWorkflowsInput withWorkflowIdPrefix(List workflowIdPrefix) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withLoadInput(Boolean loadInput) { @@ -457,7 +517,11 @@ public ListWorkflowsInput withLoadInput(Boolean loadInput) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withLoadOutput(Boolean loadOutput) { @@ -484,7 +548,11 @@ public ListWorkflowsInput withLoadOutput(Boolean loadOutput) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withQueueName(List queueName) { @@ -511,7 +579,11 @@ public ListWorkflowsInput withQueueName(List queueName) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withQueuesOnly(Boolean queuesOnly) { @@ -538,7 +610,11 @@ public ListWorkflowsInput withQueuesOnly(Boolean queuesOnly) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withExecutorIds(List executorIds) { @@ -565,7 +641,11 @@ public ListWorkflowsInput withExecutorIds(List executorIds) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withForkedFrom(List forkedFrom) { @@ -592,7 +672,11 @@ public ListWorkflowsInput withForkedFrom(List forkedFrom) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withParentWorkflowId(List parentWorkflowId) { @@ -619,7 +703,11 @@ public ListWorkflowsInput withParentWorkflowId(List parentWorkflowId) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withWasForkedFrom(Boolean wasForkedFrom) { @@ -646,7 +734,11 @@ public ListWorkflowsInput withWasForkedFrom(Boolean wasForkedFrom) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withHasParent(Boolean hasParent) { @@ -673,7 +765,11 @@ public ListWorkflowsInput withHasParent(Boolean hasParent) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } public ListWorkflowsInput withAttributes(Map attributes) { @@ -700,7 +796,135 @@ public ListWorkflowsInput withAttributes(Map attributes) { parentWorkflowId, wasForkedFrom, hasParent, - attributes); + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); + } + + public ListWorkflowsInput withCompletedAfter(Instant completedAfter) { + return new ListWorkflowsInput( + workflowIds, + status, + startTime, + endTime, + workflowName, + className, + instanceName, + applicationVersion, + authenticatedUser, + limit, + offset, + sortDesc, + workflowIdPrefix, + loadInput, + loadOutput, + queueName, + queuesOnly, + executorIds, + forkedFrom, + parentWorkflowId, + wasForkedFrom, + hasParent, + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); + } + + public ListWorkflowsInput withCompletedBefore(Instant completedBefore) { + return new ListWorkflowsInput( + workflowIds, + status, + startTime, + endTime, + workflowName, + className, + instanceName, + applicationVersion, + authenticatedUser, + limit, + offset, + sortDesc, + workflowIdPrefix, + loadInput, + loadOutput, + queueName, + queuesOnly, + executorIds, + forkedFrom, + parentWorkflowId, + wasForkedFrom, + hasParent, + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); + } + + public ListWorkflowsInput withDequeuedAfter(Instant dequeuedAfter) { + return new ListWorkflowsInput( + workflowIds, + status, + startTime, + endTime, + workflowName, + className, + instanceName, + applicationVersion, + authenticatedUser, + limit, + offset, + sortDesc, + workflowIdPrefix, + loadInput, + loadOutput, + queueName, + queuesOnly, + executorIds, + forkedFrom, + parentWorkflowId, + wasForkedFrom, + hasParent, + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); + } + + public ListWorkflowsInput withDequeuedBefore(Instant dequeuedBefore) { + return new ListWorkflowsInput( + workflowIds, + status, + startTime, + endTime, + workflowName, + className, + instanceName, + applicationVersion, + authenticatedUser, + limit, + offset, + sortDesc, + workflowIdPrefix, + loadInput, + loadOutput, + queueName, + queuesOnly, + executorIds, + forkedFrom, + parentWorkflowId, + wasForkedFrom, + hasParent, + attributes, + completedAfter, + completedBefore, + dequeuedAfter, + dequeuedBefore); } // Single value overloads for list parameters diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java index a99629fb..ee87e766 100644 --- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java +++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java @@ -326,6 +326,17 @@ private void sendFragmented(String message, int chunkSize) { this.webSocket.sendFragmentedFrame(Opcode.TEXT, lastChunk, true); } + public void sendRawJson(String json) { + this.webSocket.send(json); + } + + public void sendRaw(String type, String requestId) throws Exception { + Map msg = new LinkedHashMap<>(); + msg.put("type", type); + msg.put("request_id", requestId); + this.webSocket.send(ConductorTest.mapper.writeValueAsString(msg)); + } + public void send(MessageType type, String requestId, Map fields, int chunkSize) throws Exception { logger.debug("sending {}", type.getValue()); @@ -527,6 +538,49 @@ public void canExecutorInfo() throws Exception { } } + @RetryingTest(3) + public void unknownMessageTypeReturnsErrorResponse() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + listener.sendRaw("not_a_real_message_type", "req-unknown-42"); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + JsonNode response = mapper.readTree(listener.message); + assertEquals("not_a_real_message_type", response.get("type").stringValue()); + assertEquals("req-unknown-42", response.get("request_id").stringValue()); + assertNotNull(response.get("error_message")); + assertTrue(response.get("error_message").stringValue().toLowerCase().contains("unknown")); + } + } + + @RetryingTest(3) + public void malformedMessageBodyReturnsErrorResponse() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + // "body" must be an object for list_workflows; passing a string causes deserialization + // failure + listener.sendRawJson( + "{\"type\":\"list_workflows\",\"request_id\":\"req-bad-body-99\",\"body\":\"not-an-object\"}"); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + JsonNode response = mapper.readTree(listener.message); + assertEquals("list_workflows", response.get("type").stringValue()); + assertEquals("req-bad-body-99", response.get("request_id").stringValue()); + assertNotNull(response.get("error_message")); + assertFalse(response.get("error_message").stringValue().isEmpty()); + } + } + @Test public void fallbackHostnameUsesEnvWhenSet() { assertEquals("env-host", Conductor.fallbackHostname("env-host")); diff --git a/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java b/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java index 2df67197..85bfd590 100644 --- a/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java +++ b/transact/src/test/java/dev/dbos/transact/workflow/ListWorkflowsTest.java @@ -25,7 +25,7 @@ * ListWorkflowsInput} can be exercised deterministically and without the overhead of actually * running workflows. * - *

Test data (10 rows, created_at = baseTime + offset ms): + *

Test data (11 rows, created_at = baseTime + offset ms): * *

  *  UUID          | status    | name    | class  | config | queue | exec   | ver  | user   | parent     | forkedFrom | +ms
@@ -42,7 +42,14 @@
  *  wf-delayed-1  | DELAYED   | delayed | ClassD | instC  | q4    | exec-2 | v1.0 | user-b | -          | -          | +750
  * 
* - * Status totals: SUCCESS=5, ERROR=1, ENQUEUED=2, PENDING=1, CANCELLED=1, DELAYED=1 + *

Status totals: SUCCESS=5, ERROR=1, ENQUEUED=2, PENDING=1, CANCELLED=1, DELAYED=1 + * + *

Additional columns set via UPDATE after the main insert: + * + *

    + *
  • {@code completed_at}: wf-alpha-1=+1100, wf-alpha-2=+1200, wf-gamma-1=+1600 (others null) + *
  • {@code started_at_epoch_ms} (dequeued-at): wf-beta-1=+2100, wf-gamma-2=+2200 (others null) + *
*/ public class ListWorkflowsTest { @@ -261,6 +268,41 @@ private static void populateWorkflows(DataSource dataSource, long baseTime) thro ps2.setString(1, "wf-alpha-1"); ps2.executeUpdate(); } + + // Set completed_at for three completed workflows + try (var conn = dataSource.getConnection(); + var ps3 = + conn.prepareStatement( + "UPDATE \"dbos\".workflow_status SET completed_at = ? WHERE workflow_uuid = ?")) { + Object[][] completions = { + {baseTime + 1100, "wf-alpha-1"}, + {baseTime + 1200, "wf-alpha-2"}, + {baseTime + 1600, "wf-gamma-1"}, + }; + for (Object[] row : completions) { + ps3.setLong(1, (Long) row[0]); + ps3.setString(2, (String) row[1]); + ps3.addBatch(); + } + ps3.executeBatch(); + } + + // Set started_at_epoch_ms (dequeued-at) for two queued workflows + try (var conn = dataSource.getConnection(); + var ps4 = + conn.prepareStatement( + "UPDATE \"dbos\".workflow_status SET started_at_epoch_ms = ? WHERE workflow_uuid = ?")) { + Object[][] dequeued = { + {baseTime + 2100, "wf-beta-1"}, + {baseTime + 2200, "wf-gamma-2"}, + }; + for (Object[] row : dequeued) { + ps4.setLong(1, (Long) row[0]); + ps4.setString(2, (String) row[1]); + ps4.addBatch(); + } + ps4.executeBatch(); + } } // --------------------------------------------------------------------------- @@ -839,6 +881,76 @@ public void testMultiValueArrayFilters() throws Exception { assertEquals("wf-child-1", parentMulti.get(0).workflowId()); } + @Test + public void testFilterByCompletedAt() throws Exception { + // Only wf-alpha-1 (+1100), wf-alpha-2 (+1200), wf-gamma-1 (+1600) have completed_at set. + + // completedAfter(+1150): wf-alpha-2 and wf-gamma-1 = 2 + List after = + dbos.listWorkflows( + new ListWorkflowsInput().withCompletedAfter(Instant.ofEpochMilli(baseTime + 1150))); + assertEquals(2, after.size()); + after.forEach(wf -> assertTrue(wf.completedAt().toEpochMilli() >= baseTime + 1150)); + + // completedBefore(+1150): wf-alpha-1 = 1 + List before = + dbos.listWorkflows( + new ListWorkflowsInput().withCompletedBefore(Instant.ofEpochMilli(baseTime + 1150))); + assertEquals(1, before.size()); + assertEquals("wf-alpha-1", before.get(0).workflowId()); + + // completedAfter(+1050) + completedBefore(+1250): wf-alpha-1 and wf-alpha-2 = 2 + List window = + dbos.listWorkflows( + new ListWorkflowsInput() + .withCompletedAfter(Instant.ofEpochMilli(baseTime + 1050)) + .withCompletedBefore(Instant.ofEpochMilli(baseTime + 1250))); + assertEquals(2, window.size()); + var windowIds = window.stream().map(WorkflowStatus::workflowId).toList(); + assertTrue(windowIds.contains("wf-alpha-1")); + assertTrue(windowIds.contains("wf-alpha-2")); + + // completedAfter past all — null-completed_at rows excluded = 0 + List none = + dbos.listWorkflows( + new ListWorkflowsInput().withCompletedAfter(Instant.ofEpochMilli(baseTime + 2000))); + assertEquals(0, none.size()); + } + + @Test + public void testFilterByDequeuedAt() throws Exception { + // Only wf-beta-1 (+2100) and wf-gamma-2 (+2200) have started_at_epoch_ms set. + + // dequeuedAfter(+2050): both wf-beta-1 and wf-gamma-2 = 2 + List after = + dbos.listWorkflows( + new ListWorkflowsInput().withDequeuedAfter(Instant.ofEpochMilli(baseTime + 2050))); + assertEquals(2, after.size()); + after.forEach(wf -> assertTrue(wf.startedAt().toEpochMilli() >= baseTime + 2050)); + + // dequeuedBefore(+2150): only wf-beta-1 (+2100) = 1 + List before = + dbos.listWorkflows( + new ListWorkflowsInput().withDequeuedBefore(Instant.ofEpochMilli(baseTime + 2150))); + assertEquals(1, before.size()); + assertEquals("wf-beta-1", before.get(0).workflowId()); + + // dequeuedAfter(+2050) + dequeuedBefore(+2150): only wf-beta-1 = 1 + List window = + dbos.listWorkflows( + new ListWorkflowsInput() + .withDequeuedAfter(Instant.ofEpochMilli(baseTime + 2050)) + .withDequeuedBefore(Instant.ofEpochMilli(baseTime + 2150))); + assertEquals(1, window.size()); + assertEquals("wf-beta-1", window.get(0).workflowId()); + + // dequeuedAfter past all — null-started_at rows excluded = 0 + List none = + dbos.listWorkflows( + new ListWorkflowsInput().withDequeuedAfter(Instant.ofEpochMilli(baseTime + 3000))); + assertEquals(0, none.size()); + } + @Test public void testCombinedFilters() throws Exception {