Skip to content

Commit 90dc2f8

Browse files
authored
PL/pgSQL enqueue & send (#295)
~~also moves migratons to resource files ~~ Also simplifies migration and removes migration error ignoring that was incorrectly ported from TS
1 parent 5b91385 commit 90dc2f8

6 files changed

Lines changed: 622 additions & 138 deletions

File tree

transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java

Lines changed: 150 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,6 @@
1818
public class MigrationManager {
1919

2020
private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
21-
private static final List<String> IGNORABLE_SQL_STATES =
22-
List.of(
23-
// Relation / object already exists
24-
"42P07", // duplicate_table
25-
"42710", // duplicate_object (e.g., index)
26-
"42701", // duplicate_column
27-
"42P06", // duplicate_schema
28-
// Uniqueness (e.g., insert seed rows twice)
29-
"23505" // unique_violation
30-
);
3121

3222
public static void runMigrations(DBOSConfig config) {
3323
Objects.requireNonNull(config, "DBOS Config must not be null");
@@ -197,28 +187,19 @@ static void runDbosMigrations(Connection conn, String schema, List<String> migra
197187
try (var stmt = conn.createStatement()) {
198188
stmt.execute(migrations.get(i));
199189
} catch (SQLException e) {
200-
if (IGNORABLE_SQL_STATES.contains(e.getSQLState())) {
201-
logger.warn(
202-
"Ignoring migration {} error; Migration was likely already applied. Occurred while executing {}",
203-
migrationIndex,
204-
migrations.get(i));
205-
} else {
206-
throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e);
207-
}
190+
throw new RuntimeException("Failed to run migration %d".formatted(migrationIndex), e);
208191
}
209192

210193
try {
211-
int rowCount = 0;
212-
var updateSQL = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema);
213-
try (var stmt = conn.prepareStatement(updateSQL)) {
214-
stmt.setLong(1, migrationIndex);
215-
rowCount = stmt.executeUpdate();
216-
}
217-
218-
if (rowCount == 0) {
219-
var insertSql =
220-
"INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema);
221-
try (var stmt = conn.prepareStatement(insertSql)) {
194+
if (lastApplied == 0) {
195+
var sql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema);
196+
try (var stmt = conn.prepareStatement(sql)) {
197+
stmt.setLong(1, migrationIndex);
198+
stmt.executeUpdate();
199+
}
200+
} else {
201+
var sql = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema);
202+
try (var stmt = conn.prepareStatement(sql)) {
222203
stmt.setLong(1, migrationIndex);
223204
stmt.executeUpdate();
224205
}
@@ -246,7 +227,9 @@ public static List<String> getMigrations(String schema) {
246227
migration9,
247228
migration10,
248229
migration11,
249-
migration12);
230+
migration12,
231+
migration13,
232+
migration14);
250233
return migrations.stream().map(m -> m.formatted(schema)).toList();
251234
}
252235

@@ -461,4 +444,141 @@ IF NOT EXISTS (
461444
ALTER TABLE "%1$s"."notifications" ADD COLUMN "consumed" BOOLEAN NOT NULL DEFAULT FALSE;
462445
CREATE INDEX "idx_notifications_unconsumed" ON "%1$s"."notifications" ("destination_uuid", "topic") WHERE consumed = FALSE;
463446
""";
447+
448+
static final String migration13 =
449+
"""
450+
CREATE TABLE "%1$s".application_versions (
451+
version_id TEXT NOT NULL PRIMARY KEY,
452+
version_name TEXT NOT NULL UNIQUE,
453+
version_timestamp BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint,
454+
created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint
455+
);
456+
""";
457+
458+
static final String migration14 =
459+
"""
460+
CREATE FUNCTION "%1$s".enqueue_workflow(
461+
workflow_name TEXT,
462+
queue_name TEXT,
463+
positional_args JSON[] DEFAULT ARRAY[]::JSON[],
464+
named_args JSON DEFAULT '{}'::JSON,
465+
class_name TEXT DEFAULT NULL,
466+
config_name TEXT DEFAULT NULL,
467+
workflow_id TEXT DEFAULT NULL,
468+
app_version TEXT DEFAULT NULL,
469+
timeout_ms BIGINT DEFAULT NULL,
470+
deadline_epoch_ms BIGINT DEFAULT NULL,
471+
deduplication_id TEXT DEFAULT NULL,
472+
priority INTEGER DEFAULT NULL,
473+
queue_partition_key TEXT DEFAULT NULL
474+
) RETURNS TEXT AS $$
475+
DECLARE
476+
v_workflow_id TEXT;
477+
v_serialized_inputs TEXT;
478+
v_owner_xid TEXT;
479+
v_now BIGINT;
480+
v_recovery_attempts INTEGER := 0;
481+
v_priority INTEGER;
482+
v_existing_name TEXT;
483+
v_existing_class_name TEXT;
484+
v_existing_config_name TEXT;
485+
BEGIN
486+
487+
-- Validate required parameters
488+
IF workflow_name IS NULL OR workflow_name = '' THEN
489+
RAISE EXCEPTION 'Workflow name cannot be null or empty';
490+
END IF;
491+
IF queue_name IS NULL OR queue_name = '' THEN
492+
RAISE EXCEPTION 'Queue name cannot be null or empty';
493+
END IF;
494+
IF named_args IS NOT NULL AND jsonb_typeof(named_args::jsonb) != 'object' THEN
495+
RAISE EXCEPTION 'Named args must be a JSON object';
496+
END IF;
497+
IF workflow_id IS NOT NULL AND workflow_id = '' THEN
498+
RAISE EXCEPTION 'Workflow ID cannot be an empty string if provided.';
499+
END IF;
500+
501+
v_workflow_id := COALESCE(workflow_id, gen_random_uuid()::TEXT);
502+
v_owner_xid := gen_random_uuid()::TEXT;
503+
v_priority := COALESCE(priority, 0);
504+
v_serialized_inputs := json_build_object(
505+
'positionalArgs', positional_args,
506+
'namedArgs', named_args
507+
)::TEXT;
508+
v_now := EXTRACT(epoch FROM now()) * 1000;
509+
510+
INSERT INTO "%1$s".workflow_status (
511+
workflow_uuid, status, inputs,
512+
name, class_name, config_name,
513+
queue_name, deduplication_id, priority, queue_partition_key,
514+
application_version,
515+
created_at, updated_at, recovery_attempts,
516+
workflow_timeout_ms, workflow_deadline_epoch_ms,
517+
parent_workflow_id, owner_xid, serialization
518+
) VALUES (
519+
v_workflow_id, 'ENQUEUED', v_serialized_inputs,
520+
workflow_name, class_name, config_name,
521+
queue_name, deduplication_id, v_priority, queue_partition_key,
522+
app_version,
523+
v_now, v_now, v_recovery_attempts,
524+
timeout_ms, deadline_epoch_ms,
525+
NULL, v_owner_xid, 'portable_json'
526+
)
527+
ON CONFLICT (workflow_uuid)
528+
DO UPDATE SET
529+
updated_at = EXCLUDED.updated_at
530+
RETURNING workflow_status.name, workflow_status.class_name, workflow_status.config_name
531+
INTO v_existing_name, v_existing_class_name, v_existing_config_name;
532+
533+
-- Validate workflow metadata matches
534+
IF v_existing_name IS DISTINCT FROM workflow_name THEN
535+
RAISE EXCEPTION 'Conflicting DBOS workflow name'
536+
USING DETAIL = format('Workflow %%s exists with name %%s, but the provided workflow name is: %%s', v_workflow_id, v_existing_name, workflow_name),
537+
ERRCODE = 'invalid_parameter_value';
538+
END IF;
539+
IF v_existing_class_name IS DISTINCT FROM class_name THEN
540+
RAISE EXCEPTION 'Conflicting DBOS workflow class_name'
541+
USING DETAIL = format('Workflow %%s exists with class_name %%s, but the provided class_name is: %%s', v_workflow_id, v_existing_class_name, class_name),
542+
ERRCODE = 'invalid_parameter_value';
543+
END IF;
544+
IF v_existing_config_name IS DISTINCT FROM config_name THEN
545+
RAISE EXCEPTION 'Conflicting DBOS workflow config_name'
546+
USING DETAIL = format('Workflow %%s exists with config_name %%s, but the provided config_name is: %%s', v_workflow_id, v_existing_config_name, config_name),
547+
ERRCODE = 'invalid_parameter_value';
548+
END IF;
549+
550+
RETURN v_workflow_id;
551+
552+
EXCEPTION
553+
WHEN unique_violation THEN
554+
RAISE EXCEPTION 'DBOS queue duplicated'
555+
USING DETAIL = format('Workflow %%s with queue %%s and deduplication ID %%s already exists', v_workflow_id, queue_name, deduplication_id),
556+
ERRCODE = 'unique_violation';
557+
END;
558+
$$ LANGUAGE plpgsql;
559+
560+
CREATE FUNCTION "%1$s".send_message(
561+
destination_id TEXT,
562+
message JSON,
563+
topic TEXT DEFAULT NULL,
564+
idempotency_key TEXT DEFAULT NULL
565+
) RETURNS VOID AS $$
566+
DECLARE
567+
v_topic TEXT := COALESCE(topic, '__null__topic__');
568+
v_message_id TEXT := COALESCE(idempotency_key, gen_random_uuid()::TEXT);
569+
BEGIN
570+
INSERT INTO "%1$s".notifications (
571+
destination_uuid, topic, message, message_uuid, serialization
572+
) VALUES (
573+
destination_id, v_topic, message, v_message_id, 'portable_json'
574+
)
575+
ON CONFLICT (message_uuid) DO NOTHING;
576+
EXCEPTION
577+
WHEN foreign_key_violation THEN
578+
RAISE EXCEPTION 'DBOS non-existent workflow'
579+
USING DETAIL = format('Destination workflow %%s does not exist', destination_id),
580+
ERRCODE = 'foreign_key_violation';
581+
END;
582+
$$ LANGUAGE plpgsql;
583+
""";
464584
}

transact/src/test/java/dev/dbos/transact/client/ClientTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.dbos.transact.config.DBOSConfig;
1212
import dev.dbos.transact.database.SystemDatabase;
1313
import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException;
14+
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
1415
import dev.dbos.transact.utils.DBUtils;
1516
import dev.dbos.transact.workflow.Queue;
1617

@@ -192,4 +193,17 @@ public void clientEnqueueTimeouts() throws Exception {
192193
stat2.orElseThrow(() -> new AssertionError("Workflow status not found")).status());
193194
}
194195
}
196+
197+
@Test
198+
public void invalidSend() throws Exception {
199+
var invalidWorkflowId = UUID.randomUUID().toString();
200+
201+
try (var client = new DBOSClient(dbUrl, dbUser, dbPassword)) {
202+
var ex =
203+
assertThrows(
204+
DBOSNonExistentWorkflowException.class,
205+
() -> client.send(invalidWorkflowId, "test.message", null, null));
206+
assertTrue(ex.getMessage().contains(invalidWorkflowId));
207+
}
208+
}
195209
}

0 commit comments

Comments
 (0)