Skip to content

Commit 18d2e2f

Browse files
committed
Add job start and end handlers to worker functions, integrating structlog for enhanced logging context. Update worker settings to include new handlers.
1 parent b346a81 commit 18d2e2f

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

src/app/core/worker/functions.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import asyncio
22
import logging
33

4+
import structlog
45
import uvloop
56
from arq.worker import Worker
67

78
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
89

9-
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
10-
1110

1211
# -------- background tasks --------
1312
async def sample_background_task(ctx: Worker, name: str) -> str:
@@ -22,3 +21,13 @@ async def startup(ctx: Worker) -> None:
2221

2322
async def shutdown(ctx: Worker) -> None:
2423
logging.info("Worker end")
24+
25+
26+
async def on_job_start(ctx: Worker) -> None:
27+
structlog.contextvars.bind_contextvars(job_id=ctx["job_id"])
28+
logging.info("Job Started")
29+
30+
31+
async def on_job_end(ctx: Worker) -> None:
32+
logging.info("Job Competed")
33+
structlog.contextvars.clear_contextvars()

src/app/core/worker/settings.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
import asyncio
2+
from typing import cast
3+
4+
from arq.cli import watch_reload
15
from arq.connections import RedisSettings
6+
from arq.typing import WorkerSettingsType
7+
from arq.worker import check_health, run_worker
28

39
from ...core.config import settings
4-
from .functions import sample_background_task, shutdown, startup
10+
from ...core.logger import logging # noqa: F401
11+
from .functions import on_job_end, on_job_start, sample_background_task, shutdown, startup
512

613
REDIS_QUEUE_HOST = settings.REDIS_QUEUE_HOST
714
REDIS_QUEUE_PORT = settings.REDIS_QUEUE_PORT
@@ -12,4 +19,24 @@ class WorkerSettings:
1219
redis_settings = RedisSettings(host=REDIS_QUEUE_HOST, port=REDIS_QUEUE_PORT)
1320
on_startup = startup
1421
on_shutdown = shutdown
22+
on_job_start = on_job_start
23+
on_job_end = on_job_end
1524
handle_signals = False
25+
26+
27+
def start_arq_service(check: bool = False, burst: int | None = None, watch: str | None = None):
28+
worker_settings_ = cast("WorkerSettingsType", WorkerSettings)
29+
30+
if check:
31+
exit(check_health(worker_settings_))
32+
else:
33+
kwargs = {} if burst is None else {"burst": burst}
34+
if watch:
35+
asyncio.run(watch_reload(watch, worker_settings_))
36+
else:
37+
run_worker(worker_settings_, **kwargs)
38+
39+
40+
if __name__ == "__main__":
41+
start_arq_service()
42+
# python -m src.app.core.worker.settings

0 commit comments

Comments
 (0)