Skip to content

Commit d703065

Browse files
committed
Add async_tasks update script
1 parent 4255588 commit d703065

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

sql/lantern.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ BEGIN
485485
RETURN;
486486
END IF;
487487

488-
-- Asynchronous task scheduling
488+
-- Asynchronous task scheduling BEGIN
489489
CREATE TABLE lantern.tasks (
490490
jobid bigserial primary key,
491491
query text not null,
@@ -592,3 +592,5 @@ $async_tasks_related$ LANGUAGE plpgsql;
592592

593593
SELECT _lantern_internal.maybe_setup_lantern_tasks();
594594
DROP FUNCTION _lantern_internal.maybe_setup_lantern_tasks();
595+
596+
-- Asynchronous task scheduling BEGIN

sql/updates/0.2.2--0.2.3.sql

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-- Asynchronous task scheduling BEGIN
2+
CREATE TABLE lantern.tasks (
3+
jobid bigserial primary key,
4+
query text not null,
5+
pg_cron_job_name text default null, -- initially null, because it will be ready after job insertion
6+
job_name text default null,
7+
username text not null default current_user,
8+
started_at timestamp with time zone not null default now(),
9+
duration interval,
10+
status text,
11+
error_message text
12+
);
13+
14+
GRANT SELECT ON lantern.tasks TO public;
15+
ALTER TABLE lantern.tasks ENABLE ROW LEVEL SECURITY;
16+
CREATE POLICY lantern_tasks_policy ON lantern.tasks USING (username OPERATOR(pg_catalog.=) current_user);
17+
18+
-- create a trigger and added to cron.job_run_details
19+
CREATE OR REPLACE FUNCTION _lantern_internal.async_task_finalizer_trigger() RETURNS TRIGGER AS $$
20+
DECLARE
21+
res RECORD;
22+
BEGIN
23+
-- if NEW.status is one of "starting", "running", "sending, "connecting", return
24+
IF NEW.status IN ('starting', 'running', 'sending', 'connecting') THEN
25+
RETURN NEW;
26+
END IF;
27+
28+
IF NEW.status NOT IN ('succeeded', 'failed') THEN
29+
RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status;
30+
END IF;
31+
32+
-- Get the job name from the jobid
33+
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
34+
-- active cron jobs
35+
UPDATE lantern.tasks t SET
36+
(duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status,
37+
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END,
38+
c.jobname )
39+
FROM cron.job c
40+
LEFT JOIN cron.job_run_details run
41+
ON c.jobid = run.jobid
42+
WHERE
43+
t.pg_cron_job_name = c.jobname AND
44+
c.jobid = NEW.jobid
45+
-- using returning as a trick to run the unschedule function as a side effect
46+
RETURNING cron.unschedule(t.pg_cron_job_name) INTO res;
47+
48+
RETURN NEW;
49+
END
50+
$$ LANGUAGE plpgsql;
51+
52+
CREATE TRIGGER status_change_trigger
53+
AFTER UPDATE OF status
54+
ON cron.job_run_details
55+
FOR EACH ROW
56+
WHEN (OLD.status IS DISTINCT FROM NEW.status)
57+
EXECUTE FUNCTION _lantern_internal.async_task_finalizer_trigger();
58+
59+
60+
CREATE OR REPLACE FUNCTION lantern.async_task(query text, job_name text) RETURNS INTEGER AS $$
61+
DECLARE
62+
_job_id integer;
63+
_pg_cron_job_name text;
64+
start_time timestamptz;
65+
BEGIN
66+
start_time := clock_timestamp();
67+
job_name := COALESCE(job_name, '');
68+
69+
INSERT INTO lantern.tasks (query, job_name, started_at)
70+
VALUES (query, job_name, start_time) RETURNING jobid INTO _job_id;
71+
72+
_pg_cron_job_name := 'async_task_' || _job_id;
73+
74+
UPDATE lantern.tasks t SET
75+
pg_cron_job_name = _pg_cron_job_name
76+
WHERE jobid = _job_id;
77+
78+
-- Schedule the job. Note: The original query execution is moved to the finalizer.
79+
PERFORM cron.schedule(_pg_cron_job_name, '1 seconds', query);
80+
RAISE NOTICE 'Job scheduled with pg_cron name: %', quote_literal(_pg_cron_job_name);
81+
RETURN _job_id;
82+
END
83+
$$ LANGUAGE plpgsql;
84+
85+
CREATE OR REPLACE FUNCTION lantern.async_task(query text) RETURNS INTEGER AS $$
86+
BEGIN
87+
RETURN lantern.async_task(query, NULL);
88+
END
89+
$$ LANGUAGE plpgsql;
90+
91+
CREATE OR REPLACE FUNCTION lantern.cancel_all_async_tasks() RETURNS void AS $$
92+
BEGIN
93+
PERFORM cron.unschedule(pg_cron_job_name) FROM lantern.tasks
94+
WHERE duration IS NULL;
95+
96+
UPDATE lantern.tasks t SET
97+
duration = clock_timestamp() - t.started_at,
98+
status = 'canceled',
99+
error_message = COALESCE(error_message, '') || 'Canceled by user'
100+
WHERE duration is NULL;
101+
END
102+
$$ LANGUAGE plpgsql;
103+
END
104+
$async_tasks_related$ LANGUAGE plpgsql;
105+
106+
SELECT _lantern_internal.maybe_setup_lantern_tasks();
107+
DROP FUNCTION _lantern_internal.maybe_setup_lantern_tasks();
108+
109+
-- Asynchronous task scheduling BEGIN

0 commit comments

Comments
 (0)