Skip to content

Commit ad9c3bc

Browse files
authored
[+] add params column to timetable.execution_log table, closes #739 (#740)
Move task logging inside the actual functions. Now every set of parameters produce a separate entry in timetable.execution_log table. Simplify the return of task functions with just error.
1 parent e741988 commit ad9c3bc

File tree

16 files changed

+111
-110
lines changed

16 files changed

+111
-110
lines changed

internal/pgengine/access.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (pge *PgEngine) IsAlive() bool {
2828
}
2929

3030
// LogTaskExecution will log current chain element execution status including retcode
31-
func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string) {
31+
func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string, params string) {
3232
switch pge.Logging.LogDBLevel {
3333
case "none":
3434
return
@@ -38,12 +38,12 @@ func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retC
3838
}
3939
}
4040
_, err := pge.ConfigDb.Exec(ctx, `INSERT INTO timetable.execution_log (
41-
chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error)
42-
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`,
41+
chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error, params)
42+
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11, $12)`,
4343
task.ChainID, task.TaskID, task.Command, task.Kind,
4444
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
4545
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid,
46-
task.IgnoreError)
46+
task.IgnoreError, params)
4747
if err != nil {
4848
pge.l.WithError(err).Error("Failed to log chain element execution status")
4949
}

internal/pgengine/access_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ func TestLogChainElementExecution(t *testing.T) {
107107
t.Run("Check LogChainElementExecution if sql fails", func(*testing.T) {
108108
mockPool.ExpectExec("INSERT INTO .*execution_log").WithArgs(
109109
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(),
110-
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()).
110+
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()).
111111
WillReturnError(errors.New("Failed to log chain element execution status"))
112-
pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS")
112+
pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS", "")
113113
})
114114

115115
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")

internal/pgengine/migration.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
147147
return ExecuteMigrationScript(ctx, tx, "00721.sql")
148148
},
149149
},
150+
&migrator.Migration{
151+
Name: "00733 Add params column to timetable.execution_log table",
152+
Func: func(ctx context.Context, tx pgx.Tx) error {
153+
return ExecuteMigrationScript(ctx, tx, "00733.sql")
154+
},
155+
},
150156
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
151157
// and "dbapi" variable in main.go!
152158

internal/pgengine/pgengine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestSchedulerFunctions(t *testing.T) {
135135
assert.NoError(t, err, "Should start transaction")
136136
assert.Greater(t, txid, int64(0), "Should return transaction id")
137137
f := func(sql string, params []string) error {
138-
_, err := pge.ExecuteSQLCommand(ctx, tx, sql, params)
138+
err := pge.ExecuteSQLCommand(ctx, tx, &pgengine.ChainTask{Command: sql}, params)
139139
return err
140140
}
141141
assert.Error(t, f("", nil), "Should error for empty script")

internal/pgengine/sql/ddl.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ CREATE TABLE timetable.execution_log (
118118
kind timetable.command_kind,
119119
command TEXT,
120120
output TEXT,
121-
client_name TEXT NOT NULL
121+
client_name TEXT NOT NULL,
122+
params TEXT
122123
);
123124

124125
COMMENT ON TABLE timetable.execution_log IS

internal/pgengine/sql/init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ VALUES
2727
(11, '00573 Add ability to start a chain with delay'),
2828
(12, '00575 Add on_error handling'),
2929
(13, '00629 Add ignore_error column to timetable.execution_log'),
30-
(14, '00721 Add more job control functions');
30+
(14, '00721 Add more job control functions'),
31+
(15, '00733 Add params column to timetable.execution_log table');
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Add params column to execution_log table to store parameter values used during task execution
2+
ALTER TABLE timetable.execution_log ADD COLUMN params TEXT;
3+
4+
COMMENT ON COLUMN timetable.execution_log.params IS 'Array of parameter values used during task execution';

internal/pgengine/transaction.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/cybertec-postgresql/pg_timetable/internal/log"
1212
pgx "github.com/jackc/pgx/v5"
13-
"github.com/jackc/pgx/v5/pgconn"
1413
)
1514

1615
// StartTransaction returns transaction object, virtual transaction id and error
@@ -56,7 +55,7 @@ func (pge *PgEngine) MustRollbackToSavepoint(ctx context.Context, tx pgx.Tx, tas
5655
}
5756

5857
// ExecuteSQLTask executes SQL task
59-
func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
58+
func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) {
6059
switch {
6160
case task.IsRemote():
6261
return pge.ExecRemoteSQLTask(ctx, task, paramValues)
@@ -68,15 +67,15 @@ func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainT
6867
}
6968

7069
// ExecLocalSQLTask executes local task in the chain transaction
71-
func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
70+
func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) {
7271
if err := pge.SetRole(ctx, tx, task.RunAs); err != nil {
73-
return "", err
72+
return err
7473
}
7574
if task.IgnoreError {
7675
pge.MustSavepoint(ctx, tx, task.TaskID)
7776
}
7877
pge.SetCurrentTaskContext(ctx, tx, task.ChainID, task.TaskID)
79-
out, err = pge.ExecuteSQLCommand(ctx, tx, task.Command, paramValues)
78+
err = pge.ExecuteSQLCommand(ctx, tx, task, paramValues)
8079
if err != nil && task.IgnoreError {
8180
pge.MustRollbackToSavepoint(ctx, tx, task.TaskID)
8281
}
@@ -87,55 +86,57 @@ func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *Chai
8786
}
8887

8988
// ExecStandaloneTask executes task against the provided connection interface, it can be remote connection or acquired connection from the pool
90-
func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) (string, error) {
89+
func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) error {
9190
conn, err := connf()
9291
if err != nil {
93-
return "", err
92+
return err
9493
}
9594
defer pge.FinalizeDBConnection(ctx, conn)
9695
if err := pge.SetRole(ctx, conn, task.RunAs); err != nil {
97-
return "", err
96+
return err
9897
}
9998
pge.SetCurrentTaskContext(ctx, conn, task.ChainID, task.TaskID)
100-
return pge.ExecuteSQLCommand(ctx, conn, task.Command, paramValues)
99+
return pge.ExecuteSQLCommand(ctx, conn, task, paramValues)
101100
}
102101

103102
// ExecRemoteSQLTask executes task against remote connection
104-
func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
103+
func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error {
105104
log.GetLogger(ctx).Info("Switching to remote task mode")
106105
return pge.ExecStandaloneTask(ctx,
107106
func() (PgxConnIface, error) { return pge.GetRemoteDBConnection(ctx, task.ConnectString) },
108107
task, paramValues)
109108
}
110109

111110
// ExecAutonomousSQLTask executes autonomous task in an acquired connection from pool
112-
func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
111+
func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error {
113112
log.GetLogger(ctx).Info("Switching to autonomous task mode")
114113
return pge.ExecStandaloneTask(ctx,
115114
func() (PgxConnIface, error) { return pge.GetLocalDBConnection(ctx) },
116115
task, paramValues)
117116
}
118117

119118
// ExecuteSQLCommand executes chain command with parameters inside transaction
120-
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, command string, paramValues []string) (out string, err error) {
121-
var ct pgconn.CommandTag
119+
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, task *ChainTask, paramValues []string) (err error) {
122120
var params []any
123-
if strings.TrimSpace(command) == "" {
124-
return "", errors.New("SQL command cannot be empty")
121+
var errCodes = map[bool]int{false: 0, true: -1}
122+
if strings.TrimSpace(task.Command) == "" {
123+
return errors.New("SQL command cannot be empty")
125124
}
126125
if len(paramValues) == 0 { //mimic empty param
127-
ct, err = executor.Exec(ctx, command)
128-
out = ct.String()
129-
return
126+
ct, e := executor.Exec(ctx, task.Command)
127+
pge.LogTaskExecution(context.Background(), task, errCodes[err != nil], ct.String(), "")
128+
return e
130129
}
131130
for _, val := range paramValues {
132-
if val > "" {
133-
if err = json.Unmarshal([]byte(val), &params); err != nil {
134-
return
135-
}
136-
ct, err = executor.Exec(ctx, command, params...)
137-
out = out + ct.String() + "\n"
131+
if val == "" {
132+
continue
133+
}
134+
if err = json.Unmarshal([]byte(val), &params); err != nil {
135+
return
138136
}
137+
ct, e := executor.Exec(ctx, task.Command, params...)
138+
err = errors.Join(err, e)
139+
pge.LogTaskExecution(context.Background(), task, errCodes[e != nil], ct.String(), val)
139140
}
140141
return
141142
}

internal/pgengine/transaction_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,21 @@ func TestExecuteSQLTask(t *testing.T) {
8888
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
8989

9090
t.Run("Check autonomous SQL task", func(t *testing.T) {
91-
_, err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{})
91+
err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{})
9292
assert.ErrorContains(t, err, "pgpool.Acquire() method is not implemented")
9393
})
9494

9595
t.Run("Check remote SQL task", func(t *testing.T) {
9696
task := pgengine.ChainTask{ConnectString: "foo"}
97-
_, err := pge.ExecuteSQLTask(ctx, nil, &task, []string{})
97+
err := pge.ExecuteSQLTask(ctx, nil, &task, []string{})
9898
assert.ErrorContains(t, err, "cannot parse")
9999
})
100100

101101
t.Run("Check local SQL task", func(t *testing.T) {
102102
mockPool.ExpectBegin()
103103
tx, err := mockPool.Begin(ctx)
104104
assert.NoError(t, err)
105-
_, err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{})
105+
err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{})
106106
assert.ErrorContains(t, err, "SQL command cannot be empty")
107107
})
108108
}
@@ -125,11 +125,11 @@ func TestExecLocalSQLTask(t *testing.T) {
125125
Command: "FOO",
126126
RunAs: "Bob",
127127
}
128-
_, err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
128+
err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
129129
assert.Error(t, err)
130130

131131
mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob"))
132-
_, err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
132+
err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
133133
assert.ErrorContains(t, err, "unknown role Bob")
134134
assert.NoError(t, mockPool.ExpectationsWereMet())
135135
}
@@ -152,16 +152,16 @@ func TestExecStandaloneTask(t *testing.T) {
152152
}
153153
cf := func() (pgengine.PgxConnIface, error) { return mockPool.AsConn(), nil }
154154

155-
_, err := pge.ExecStandaloneTask(ctx, cf, &task, []string{})
155+
err := pge.ExecStandaloneTask(ctx, cf, &task, []string{})
156156
assert.Error(t, err)
157157

158158
mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob"))
159159
mockPool.ExpectClose()
160-
_, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
160+
err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
161161
assert.ErrorContains(t, err, "unknown role Bob")
162162

163163
cf = func() (pgengine.PgxConnIface, error) { return nil, errors.New("no connection") }
164-
_, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
164+
err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
165165
assert.ErrorContains(t, err, "no connection")
166166

167167
assert.NoError(t, mockPool.ExpectationsWereMet())
@@ -181,19 +181,19 @@ func TestExecuteSQLCommand(t *testing.T) {
181181

182182
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
183183

184-
_, err := pge.ExecuteSQLCommand(ctx, mockPool, "", []string{})
184+
err := pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{}, []string{})
185185
assert.Error(t, err)
186186

187187
mockPool.ExpectExec("correct json").WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
188-
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{})
188+
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{})
189189
assert.NoError(t, err)
190190

191191
mockPool.ExpectExec("correct json").WithArgs("John", 30.0, nil).WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
192-
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{`["John", 30, null]`})
192+
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{`["John", 30, null]`})
193193
assert.NoError(t, err)
194194

195195
mockPool.ExpectExec("incorrect json").WillReturnError(json.Unmarshal([]byte("foo"), &struct{}{}))
196-
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "incorrect json", []string{"foo"})
196+
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "incorrect json"}, []string{"foo"})
197197
assert.Error(t, err)
198198
}
199199

internal/scheduler/chain.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package scheduler
33
import (
44
"cmp"
55
"context"
6+
"errors"
67
"fmt"
7-
"strings"
88
"time"
99

1010
"github.com/cybertec-postgresql/pg_timetable/internal/log"
@@ -222,11 +222,16 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
222222
l := chainL.WithField("task", task.TaskID)
223223
l.Info("Starting task")
224224
taskCtx := log.WithLogger(chainCtx, l)
225-
retCode := sch.executeTask(taskCtx, tx, &task)
225+
err = sch.executeTask(taskCtx, tx, &task)
226+
if err != nil {
227+
l.WithError(err).Error("Task execution failed")
228+
} else {
229+
l.Info("Task executed successfully")
230+
}
226231

227232
// we use background context here because current one (chainCtx) might be cancelled
228233
bctx = log.WithLogger(ctx, l)
229-
if retCode != 0 {
234+
if err != nil {
230235
if !task.IgnoreError {
231236
chainL.Error("Chain failed")
232237
sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
@@ -247,20 +252,18 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
247252
}
248253

249254
/* execute a task */
250-
func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int {
255+
func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) error {
251256
var (
252257
paramValues []string
253258
err error
254-
out string
255-
retCode int
256259
cancel context.CancelFunc
257260
)
258261

259262
l := log.GetLogger(ctx)
260263
err = sch.pgengine.GetChainParamValues(ctx, &paramValues, task)
261264
if err != nil {
262265
l.WithError(err).Error("cannot fetch parameters values for chain: ", err)
263-
return -1
266+
return err
264267
}
265268

266269
ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.TaskTimeout, task.Timeout)
@@ -271,27 +274,16 @@ func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine
271274
task.StartedAt = time.Now()
272275
switch task.Kind {
273276
case "SQL":
274-
out, err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues)
277+
err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues)
275278
case "PROGRAM":
276279
if sch.pgengine.NoProgramTasks {
277280
l.Info("Program task execution skipped")
278-
return -2
281+
return errors.New("program tasks execution is disabled")
279282
}
280-
retCode, out, err = sch.ExecuteProgramCommand(ctx, task.Command, paramValues)
283+
err = sch.ExecuteProgramCommand(ctx, task, paramValues)
281284
case "BUILTIN":
282-
out, err = sch.executeBuiltinTask(ctx, task.Command, paramValues)
285+
err = sch.executeBuiltinTask(ctx, task, paramValues)
283286
}
284287
task.Duration = time.Since(task.StartedAt).Microseconds()
285-
286-
if err != nil {
287-
if retCode == 0 {
288-
retCode = -1
289-
}
290-
out = strings.Join([]string{out, err.Error()}, "\n")
291-
l.WithError(err).Error("Task execution failed")
292-
} else {
293-
l.Info("Task executed successfully")
294-
}
295-
sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out)
296-
return retCode
288+
return err
297289
}

0 commit comments

Comments
 (0)