Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions action-server/src/listeners/dbListeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import type { PoolClient } from "pg";
import { configuration } from "../config";
import { ActionsDbManager } from "../db";
import { ActionWorkerPool } from "../threads/workerPool";
import type { ActionDefinitionInsertedPayload, ActionResponse, ActionRunInsertedPayload } from "../type/types";
import { ActionRunner } from "../type/actionRunner";
import type { ActionDefinitionInsertedPayload, ActionResponse, ActionRunCancellationRequestPayload, ActionRunInsertedPayload } from "../type/types";
import { extractSchemas } from "../utils/codeRunner";
import logger from "../utils/logger";
import { ActionRunCancellationRequestPayload } from "../type/types";
import { ActionRunner } from "../type/actionRunner";

let listenClient: PoolClient | undefined;

Expand All @@ -30,11 +29,11 @@ async function refreshActionDefinitionSchema(payload: ActionDefinitionInsertedPa

const pool = ActionsDbManager.getDb();
const query = `
UPDATE actions.action_definition
UPDATE actions.action_definition_version
SET
parameter_schema = $1::jsonb,
settings_schema = $2::jsonb
WHERE id = $3
WHERE action_definition_id = $3 AND revision = $4
RETURNING *;
`;

Expand All @@ -43,9 +42,34 @@ async function refreshActionDefinitionSchema(payload: ActionDefinitionInsertedPa
JSON.stringify(schemas.parameterDefinitions),
JSON.stringify(schemas.settingDefinitions),
payload.action_definition_id,
payload.revision,
]);

logger.info("Updated action_definition:", res.rows[0]);
logger.info("Updated action_definition_version:", res.rows[0]);

// Strip stale settings from parent action_definition.
// Settings keys no longer in the new version's schema are removed.
const newSettingsKeys = Object.keys(schemas.settingDefinitions || {});
const stripStaleSettingsQuery = `
UPDATE actions.action_definition
SET settings = (
SELECT COALESCE(jsonb_object_agg(key, value), '{}'::jsonb)
FROM jsonb_each(settings)
WHERE key = ANY($1::text[])
)
WHERE id = $2
AND settings IS NOT NULL
AND settings != '{}'::jsonb
AND EXISTS (
SELECT 1 FROM jsonb_object_keys(settings) k
WHERE k != ALL($1::text[])
);
`;
const stripRes = await pool.query(stripStaleSettingsQuery, [newSettingsKeys, payload.action_definition_id]);

if (stripRes.rowCount && stripRes.rowCount > 0) {
logger.info(`Stripped stale settings for action_definition ${payload.action_definition_id}`);
}
} catch (error) {
logger.error("Error updating row:", error);
}
Expand Down Expand Up @@ -140,8 +164,8 @@ export async function setupListeners() {

// save listenClient as a global so we can close it on cleanup if necessary
listenClient = await pool.connect();
// these occur when user inserts row in `action_definition`, need to pre-process to extract the schemas
listenClient.query("LISTEN action_definition_inserted");
// these occur when user inserts row in `action_definition_version`, need to pre-process to extract the schemas
listenClient.query("LISTEN action_definition_version_inserted");
// these occur when a user inserts a row in the `action_run` table, signifying a run request
listenClient.query("LISTEN action_run_inserted");
// these occur when a user sets the `canceled` of an `action_run` to true, signifying a cancellation request
Expand All @@ -157,7 +181,7 @@ export async function setupListeners() {

const payload = JSON.parse(msg.payload);

if (msg.channel === "action_definition_inserted") {
if (msg.channel === "action_definition_version_inserted") {
await refreshActionDefinitionSchema(payload);
} else if (msg.channel === "action_run_inserted") {
await ActionRunner.addActionRun(payload as ActionRunInsertedPayload);
Expand Down
2 changes: 2 additions & 0 deletions action-server/src/type/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ export type ActionTask = {
export type ActionDefinitionInsertedPayload = {
action_definition_id: number;
action_file_path: string;
revision: number;
};

export type ActionRunInsertedPayload = {
action_run_id: string;
settings: Record<string, any>;
parameters: Record<string, any>;
action_definition_id: number;
action_definition_revision: number;
workspace_id: number;
action_file_path: string;
has_secrets: boolean;
Expand Down
Loading
Loading