Skip to content

Commit d6ea422

Browse files
authored
Allow user to configure a default job expiry-extra length (#303)
* Allow user to configure a default job expiry length Previously, expires_extra_ms in arq/connections.py set a default expiry of 1 day from when a job was expected to start to when it would expire if no expiry was set in enqueue_job(). This commit keeps that as the default, but allows both ArqRedis and Worker to have a larger default expiry configured, as self.expires_extra_ms. expires_extra_ms is also moved to arq/constants.py along with other shared constants. * Add test for time-to-live of job keys based on expires_extra_ms
1 parent 76fd0a3 commit d6ea422

4 files changed

Lines changed: 38 additions & 7 deletions

File tree

arq/connections.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from redis.asyncio.sentinel import Sentinel
1313
from redis.exceptions import RedisError, WatchError
1414

15-
from .constants import default_queue_name, job_key_prefix, result_key_prefix
15+
from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix
1616
from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job
1717
from .utils import timestamp_ms, to_ms, to_unix_ms
1818

@@ -71,10 +71,6 @@ def __repr__(self) -> str:
7171
return 'RedisSettings({})'.format(', '.join(f'{k}={v!r}' for k, v in self.__dict__.items()))
7272

7373

74-
# extra time after the job is expected to start when the job key should expire, 1 day in ms
75-
expires_extra_ms = 86_400_000
76-
77-
7874
if TYPE_CHECKING:
7975
BaseRedis = Redis[bytes]
8076
else:
@@ -89,6 +85,8 @@ class ArqRedis(BaseRedis):
8985
:param job_serializer: a function that serializes Python objects to bytes, defaults to pickle.dumps
9086
:param job_deserializer: a function that deserializes bytes into Python objects, defaults to pickle.loads
9187
:param default_queue_name: the default queue name to use, defaults to ``arq.queue``.
88+
:param expires_extra_ms: the default length of time from when a job is expected to start
89+
after which the job expires, defaults to 1 day in ms.
9290
:param kwargs: keyword arguments directly passed to ``redis.asyncio.Redis``.
9391
"""
9492

@@ -98,13 +96,15 @@ def __init__(
9896
job_serializer: Optional[Serializer] = None,
9997
job_deserializer: Optional[Deserializer] = None,
10098
default_queue_name: str = default_queue_name,
99+
expires_extra_ms: int = expires_extra_ms,
101100
**kwargs: Any,
102101
) -> None:
103102
self.job_serializer = job_serializer
104103
self.job_deserializer = job_deserializer
105104
self.default_queue_name = default_queue_name
106105
if pool_or_conn:
107106
kwargs['connection_pool'] = pool_or_conn
107+
self.expires_extra_ms = expires_extra_ms
108108
super().__init__(**kwargs)
109109

110110
async def enqueue_job(
@@ -157,7 +157,7 @@ async def enqueue_job(
157157
else:
158158
score = enqueue_time_ms
159159

160-
expires_ms = expires_ms or score - enqueue_time_ms + expires_extra_ms
160+
expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms
161161

162162
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
163163
pipe.multi()
@@ -210,6 +210,7 @@ async def create_pool(
210210
job_serializer: Optional[Serializer] = None,
211211
job_deserializer: Optional[Deserializer] = None,
212212
default_queue_name: str = default_queue_name,
213+
expires_extra_ms: int = expires_extra_ms,
213214
) -> ArqRedis:
214215
"""
215216
Create a new redis pool, retrying up to ``conn_retries`` times if the connection fails.
@@ -257,6 +258,7 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
257258
pool.job_serializer = job_serializer
258259
pool.job_deserializer = job_deserializer
259260
pool.default_queue_name = default_queue_name
261+
pool.expires_extra_ms = expires_extra_ms
260262
await pool.ping()
261263

262264
except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e:

arq/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@
1313

1414
# used by `ms_to_datetime` to get the timezone
1515
timezone_env_vars = 'ARQ_TIMEZONE', 'arq_timezone', 'TIMEZONE', 'timezone'
16+
17+
# extra time after the job is expected to start when the job key should expire, 1 day in ms
18+
expires_extra_ms = 86_400_000

arq/worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
abort_job_max_age,
2020
abort_jobs_ss,
2121
default_queue_name,
22+
expires_extra_ms,
2223
health_check_key_suffix,
2324
in_progress_key_prefix,
2425
job_key_prefix,
@@ -167,6 +168,8 @@ class Worker:
167168
:param max_burst_jobs: the maximum number of jobs to process in burst mode (disabled with negative values)
168169
:param job_serializer: a function that serializes Python objects to bytes, defaults to pickle.dumps
169170
:param job_deserializer: a function that deserializes bytes into Python objects, defaults to pickle.loads
171+
:param expires_extra_ms: the default length of time from when a job is expected to start
172+
after which the job expires, defaults to 1 day in ms.
170173
"""
171174

172175
def __init__(
@@ -198,6 +201,7 @@ def __init__(
198201
max_burst_jobs: int = -1,
199202
job_serializer: Optional[Serializer] = None,
200203
job_deserializer: Optional[Deserializer] = None,
204+
expires_extra_ms: int = expires_extra_ms,
201205
):
202206
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
203207
if queue_name is None:
@@ -261,6 +265,7 @@ def __init__(
261265
self.max_burst_jobs = max_burst_jobs
262266
self.job_serializer = job_serializer
263267
self.job_deserializer = job_deserializer
268+
self.expires_extra_ms = expires_extra_ms
264269

265270
def run(self) -> None:
266271
"""
@@ -311,6 +316,7 @@ async def main(self) -> None:
311316
job_deserializer=self.job_deserializer,
312317
job_serializer=self.job_serializer,
313318
default_queue_name=self.queue_name,
319+
expires_extra_ms=self.expires_extra_ms,
314320
)
315321

316322
logger.info('Starting worker for %d functions: %s', len(self.functions), ', '.join(self.functions))

tests/test_worker.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import pytest
1111

1212
from arq.connections import ArqRedis, RedisSettings
13-
from arq.constants import abort_jobs_ss, default_queue_name, health_check_key_suffix, job_key_prefix
13+
from arq.constants import abort_jobs_ss, default_queue_name, expires_extra_ms, health_check_key_suffix, job_key_prefix
1414
from arq.jobs import Job, JobStatus
1515
from arq.worker import (
1616
FailedJobs,
@@ -308,6 +308,26 @@ async def test_job_expired_run_check(arq_redis: ArqRedis, worker, caplog):
308308
assert exc_info.value.job_results[0].result == JobExecutionFailed('job expired')
309309

310310

311+
@pytest.mark.parametrize(
312+
'extra_job_expiry,wait_time',
313+
[
314+
(None, expires_extra_ms),
315+
(1_000_000, 1_000_000),
316+
(10_000_000, 10_000_000),
317+
(999_999_999, 999_999_999),
318+
],
319+
)
320+
async def test_default_job_expiry(arq_redis: ArqRedis, worker, caplog, extra_job_expiry, wait_time):
321+
"""Test that jobs have a default expiry time based on the expires_extra_ms property in
322+
ArqRedis."""
323+
caplog.set_level(logging.INFO)
324+
if extra_job_expiry is not None:
325+
arq_redis.expires_extra_ms = extra_job_expiry
326+
await arq_redis.enqueue_job('foobar', _job_id='testing')
327+
time_to_live_ms = await arq_redis.pttl(job_key_prefix + 'testing')
328+
assert time_to_live_ms == pytest.approx(wait_time)
329+
330+
311331
async def test_job_old(arq_redis: ArqRedis, worker, caplog):
312332
caplog.set_level(logging.INFO)
313333
await arq_redis.enqueue_job('foobar', _job_id='testing', _defer_by=-2)

0 commit comments

Comments
 (0)