11import { readFile } from "node:fs/promises" ;
22import type http from "node:http" ;
33import * as path from "node:path" ;
4- import type { Pool , PoolClient } from "pg" ;
4+ import type { PoolClient } from "pg" ;
55import { configuration } from "../config" ;
66import { ActionsDbManager } from "../db" ;
77import { ActionWorkerPool } from "../threads/workerPool" ;
88import type { ActionDefinitionInsertedPayload , ActionResponse , ActionRunInsertedPayload } from "../type/types" ;
99import { extractSchemas } from "../utils/codeRunner" ;
10- import { createLogger , format , transports } from "winston" ;
1110import logger from "../utils/logger" ;
1211import { ActionRunCancellationRequestPayload } from "../type/types" ;
12+ import { ActionRunner } from "../type/actionRunner" ;
1313
1414let listenClient : PoolClient | undefined ;
1515
1616async function readFileFromStore ( fileName : string ) : Promise < string > {
1717 // read file from aerie file store and return [resolve] it as a string
1818 const fileStoreBasePath = configuration ( ) . ACTION_LOCAL_STORE ;
1919 const filePath = path . join ( fileStoreBasePath , fileName ) ;
20+
2021 logger . info ( `path is ${ filePath } ` ) ;
22+
2123 return await readFile ( filePath , "utf-8" ) ;
2224}
2325
@@ -42,6 +44,7 @@ async function refreshActionDefinitionSchema(payload: ActionDefinitionInsertedPa
4244 JSON . stringify ( schemas . settingDefinitions ) ,
4345 payload . action_definition_id ,
4446 ] ) ;
47+
4548 logger . info ( "Updated action_definition:" , res . rows [ 0 ] ) ;
4649 } catch ( error ) {
4750 logger . error ( "Error updating row:" , error ) ;
@@ -52,31 +55,36 @@ async function cancelAction(payload: ActionRunCancellationRequestPayload) {
5255 ActionWorkerPool . cancelTask ( payload . action_run_id ) ;
5356}
5457
55- async function runAction ( payload : ActionRunInsertedPayload ) {
56- const actionRunId = payload . action_run_id ;
57- const actionFilePath = payload . action_file_path ;
58- logger . info ( `action run ${ actionRunId } inserted (${ actionFilePath } )` ) ;
58+ export async function runAction ( payload : ActionRunInsertedPayload , actionSecrets ?: Record < string , any > ) : Promise < void > {
59+ const { action_file_path, action_run_id, parameters, settings, workspace_id } = payload ;
60+ const actionRunId = Number ( action_run_id ) ;
61+
62+ logger . info ( `action run ${ action_run_id } inserted (${ action_file_path } )` ) ;
63+
64+ let taskError ;
65+
5966 // event payload contains a file path for the action file which should be run
60- const actionJS = await readFileFromStore ( actionFilePath ) ;
67+ const actionJS = await readFileFromStore ( action_file_path ) ;
6168
6269 // NOTE: Authentication tokens are unavailable in PostgreSQL Listen/Notify
6370 // const authToken = req.header("authorization");
6471 // if (!authToken) console.warn("No valid `authorization` header in action-run request");
6572
66- const { parameters, settings } = payload ;
67- const workspaceId = payload . workspace_id ;
6873 const pool = ActionsDbManager . getDb ( ) ;
69- logger . info ( `Submitting task to worker pool for action run ${ actionRunId } ` ) ;
74+
75+ logger . info ( `Submitting task to worker pool for action run ${ action_run_id } ` ) ;
7076 const start = performance . now ( ) ;
71- let run , taskError ;
77+ let run ;
78+
7279 try {
7380 run = ( await ActionWorkerPool . submitTask ( {
74- actionJS : actionJS ,
75- action_run_id : actionRunId ,
81+ actionJS,
82+ action_run_id,
7683 message_port : null ,
77- parameters : parameters ,
78- settings : settings ,
79- workspaceId : workspaceId ,
84+ parameters,
85+ settings,
86+ workspaceId : workspace_id ,
87+ secrets : actionSecrets ,
8088 } ) ) satisfies ActionResponse ;
8189 } catch ( error : any ) {
8290 if ( error ?. name === "AbortError" ) {
@@ -92,7 +100,6 @@ async function runAction(payload: ActionRunInsertedPayload) {
92100 const status = taskError || run ?. errors ? "failed" : "success" ;
93101 logger . info ( `Finished run ${ actionRunId } in ${ duration / 1000 } s - ${ status } ` ) ;
94102 const errorValue = JSON . stringify ( taskError || run ?. errors || { } ) ;
95-
96103 const logStr = run ? run . console . join ( "\n" ) : "" ;
97104
98105 // update action_run row in DB with status/results/errors/logs
@@ -118,6 +125,7 @@ async function runAction(payload: ActionRunInsertedPayload) {
118125 payload . action_run_id ,
119126 ] ,
120127 ) ;
128+
121129 logger . info ( "Updated action_run:" , res . rows [ 0 ] ) ;
122130 } catch ( error ) {
123131 logger . error ( "Error updating row:" , error ) ;
@@ -141,25 +149,29 @@ export async function setupListeners() {
141149
142150 listenClient . on ( "notification" , async ( msg ) => {
143151 console . info ( `PG notify event: ${ JSON . stringify ( msg , null , 2 ) } ` ) ;
152+
144153 if ( ! msg . payload ) {
145154 console . warn ( `warning: PG event with no message or payload: ${ JSON . stringify ( msg , null , 2 ) } ` ) ;
146155 return ;
147156 }
157+
148158 const payload = JSON . parse ( msg . payload ) ;
149159
150160 if ( msg . channel === "action_definition_inserted" ) {
151161 await refreshActionDefinitionSchema ( payload ) ;
152162 } else if ( msg . channel === "action_run_inserted" ) {
153- await runAction ( payload ) ;
163+ await ActionRunner . addActionRun ( payload as ActionRunInsertedPayload ) ;
154164 } else if ( msg . channel === "action_run_cancel_requested" ) {
155165 await cancelAction ( payload ) ;
156166 }
157167 } ) ;
168+
158169 logger . info ( "Initialized PG event listeners" ) ;
159170}
160171
161- export function cleanup ( server : http . Server ) {
172+ export function cleanup ( server : http . Server ) : void {
162173 console . log ( "shutting down..." ) ;
174+
163175 if ( listenClient ) {
164176 listenClient . release ( ) ;
165177 }
0 commit comments