-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathjob_functions.sql
More file actions
158 lines (139 loc) · 6.76 KB
/
job_functions.sql
File metadata and controls
158 lines (139 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
-- add_task() will add a task to the same chain as the task with `parent_id`
CREATE OR REPLACE FUNCTION timetable.add_task(
IN kind timetable.command_kind,
IN command TEXT,
IN parent_id BIGINT,
IN order_delta DOUBLE PRECISION DEFAULT 10
) RETURNS BIGINT AS $$
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT chain_id, task_order + $4, $1, $2 FROM timetable.task WHERE task_id = $3
RETURNING task_id
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_task IS 'Add a task to the same chain as the task with parent_id';
-- add_job() will add one-task chain to the system
CREATE OR REPLACE FUNCTION timetable.add_job(
job_name TEXT,
job_schedule timetable.cron,
job_command TEXT,
job_parameters JSONB DEFAULT NULL,
job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind,
job_client_name TEXT DEFAULT NULL,
job_max_instances INTEGER DEFAULT NULL,
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error)
RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE
FROM cte_chain
RETURNING task_id
),
cte_param AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain
)
SELECT v_chain_id FROM cte_chain
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.add_job IS 'Add one-task chain (aka job) to the system';
-- notify_chain_start() will send notification to the worker to start the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_start(
chain_id BIGINT,
worker_name TEXT,
start_delay INTERVAL DEFAULT NULL
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "START", "Ts": %s, "Delay": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint,
COALESCE(EXTRACT(epoch FROM start_delay)::bigint, 0)
)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_start IS 'Send notification to the worker to start the chain';
-- notify_chain_stop() will send notification to the worker to stop the chain
CREATE OR REPLACE FUNCTION timetable.notify_chain_stop(
chain_id BIGINT,
worker_name TEXT
) RETURNS void AS $$
SELECT pg_notify(
worker_name,
format('{"ConfigID": %s, "Command": "STOP", "Ts": %s}',
chain_id,
EXTRACT(epoch FROM clock_timestamp())::bigint)
)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.notify_chain_stop IS 'Send notification to the worker to stop the chain';
-- move_task_up() will switch the order of the task execution with a previous task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_up(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order < ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_up IS 'Switch the order of the task execution with a previous task within the chain';
-- move_task_down() will switch the order of the task execution with a following task within the chain
CREATE OR REPLACE FUNCTION timetable.move_task_down(IN task_id BIGINT) RETURNS boolean AS $$
WITH current_task (ct_chain_id, ct_id, ct_order) AS (
SELECT chain_id, task_id, task_order FROM timetable.task WHERE task_id = $1
),
tasks(t_id, t_new_order) AS (
SELECT task_id, COALESCE(LAG(task_order) OVER w, LEAD(task_order) OVER w)
FROM timetable.task t, current_task ct
WHERE chain_id = ct_chain_id AND (task_order > ct_order OR task_id = ct_id)
WINDOW w AS (PARTITION BY chain_id ORDER BY ABS(task_order - ct_order))
LIMIT 2
),
upd AS (
UPDATE timetable.task t SET task_order = t_new_order
FROM tasks WHERE tasks.t_id = t.task_id AND tasks.t_new_order IS NOT NULL
RETURNING true
)
SELECT COUNT(*) > 0 FROM upd
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.move_task_down IS 'Switch the order of the task execution with a following task within the chain';
-- delete_task() will delete the task from a chain
CREATE OR REPLACE FUNCTION timetable.delete_task(IN task_id BIGINT) RETURNS boolean AS $$
WITH del_task AS (DELETE FROM timetable.task WHERE task_id = $1 RETURNING task_id)
SELECT EXISTS(SELECT 1 FROM del_task)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_task IS 'Delete the task from a chain';
-- delete_job() will delete the chain and its tasks from the system
CREATE OR REPLACE FUNCTION timetable.delete_job(IN job_name TEXT) RETURNS boolean AS $$
WITH del_chain AS (DELETE FROM timetable.chain WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM del_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.delete_job IS 'Delete the chain and its tasks from the system';
-- pause_job() will pause the chain (set live = false)
CREATE OR REPLACE FUNCTION timetable.pause_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = false WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.pause_job IS 'Pause the chain (set live = false)';
-- resume_job() will resume the chain (set live = true)
CREATE OR REPLACE FUNCTION timetable.resume_job(IN job_name TEXT) RETURNS boolean AS $$
WITH upd_chain AS (UPDATE timetable.chain SET live = true WHERE chain.chain_name = $1 RETURNING chain_id)
SELECT EXISTS(SELECT 1 FROM upd_chain)
$$ LANGUAGE SQL;
COMMENT ON FUNCTION timetable.resume_job IS 'Resume the chain (set live = true)';