Skip to content

Commit 123501a

Browse files
Support abort for deferred tasks (#307)
* Support abort for deferred tasks * fix imports * fix mypy * fixes related to PR 307 * lint fixes * lint fixes * reversed changes * fix linter * fix avoiding race condition * fix test * fix lint * fix lint
1 parent 3d50f61 commit 123501a

2 files changed

Lines changed: 37 additions & 0 deletions

File tree

arq/jobs.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,15 @@ async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0.
156156
:param poll_delay: how often to poll redis for the job result
157157
:return: True if the job aborted properly, False otherwise
158158
"""
159+
job_info = await self.info()
160+
if job_info and job_info.score and job_info.score > timestamp_ms():
161+
async with self._redis.pipeline(transaction=True) as tr:
162+
tr.zrem(self._queue_name, self.job_id) # type: ignore[unused-coroutine]
163+
tr.zadd(self._queue_name, {self.job_id: 1}) # type: ignore[unused-coroutine]
164+
await tr.execute()
165+
159166
await self._redis.zadd(abort_jobs_ss, {self.job_id: timestamp_ms()})
167+
160168
try:
161169
await self.result(timeout=timeout, poll_delay=poll_delay)
162170
except asyncio.CancelledError:

tests/test_worker.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import re
55
import signal
66
import sys
7+
from datetime import datetime, timedelta
78
from unittest.mock import MagicMock
89

910
import msgpack
@@ -824,6 +825,34 @@ async def longfunc(ctx):
824825
assert worker.tasks == {}
825826

826827

828+
async def test_abort_deferred_job_before(arq_redis: ArqRedis, worker, caplog, loop):
829+
async def longfunc(ctx):
830+
await asyncio.sleep(3600)
831+
832+
caplog.set_level(logging.INFO)
833+
834+
job = await arq_redis.enqueue_job('longfunc', _job_id='testing', _defer_until=datetime.utcnow() + timedelta(days=1))
835+
836+
worker: Worker = worker(functions=[func(longfunc, name='longfunc')], allow_abort_jobs=True, poll_delay=0.1)
837+
assert worker.jobs_complete == 0
838+
assert worker.jobs_failed == 0
839+
assert worker.jobs_retried == 0
840+
841+
with pytest.raises(asyncio.TimeoutError):
842+
await job.abort(timeout=0)
843+
await worker.main()
844+
845+
assert worker.jobs_complete == 0
846+
assert worker.jobs_failed == 1
847+
assert worker.jobs_retried == 0
848+
log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records))
849+
assert 'X.XXs ⊘ testing:longfunc aborted before start' in log
850+
await worker.main()
851+
assert worker.aborting_tasks == set()
852+
assert worker.job_tasks == {}
853+
assert worker.tasks == {}
854+
855+
827856
async def test_not_abort_job(arq_redis: ArqRedis, worker, caplog, loop):
828857
async def shortfunc(ctx):
829858
await asyncio.sleep(0.2)

0 commit comments

Comments
 (0)