Skip to content

Commit d64f15e

Browse files
committed
Fix a deprecation in the RabbitMQ backend, and install aio-pika automatically in temp venvs that use it
1 parent 7a28fd9 commit d64f15e

4 files changed

Lines changed: 36 additions & 26 deletions

File tree

offwork/__main__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async def _run_in_tmp_venv(args: argparse.Namespace) -> None:
5454
extras: list[str] = []
5555
if args.backend and args.backend.startswith(("redis://", "rediss://")):
5656
extras.append("redis")
57+
if args.backend and args.backend.startswith(("amqp://", "amqps://")):
58+
extras.append("rabbitmq")
5759

5860
async with temp_venv(install_offwork=True, extras=extras) as venv:
5961
cmd = _build_worker_cmd(str(venv.python), args)
@@ -315,6 +317,8 @@ def _collect_extras(args: argparse.Namespace, script: Path) -> list[str]:
315317
backend = os.environ.get("OFFWORK_BACKEND", "")
316318
if backend.startswith(("redis://", "rediss://")) and "redis" not in extras:
317319
extras.append("redis")
320+
if backend.startswith(("amqp://", "amqps://")) and "rabbitmq" not in extras:
321+
extras.append("rabbitmq")
318322
for extra in _detect_offwork_extras(str(script)):
319323
if extra not in extras:
320324
extras.append(extra)

offwork/worker/backends/http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ class HttpBackend(Backend):
2222
The base URL can point either at the broker root or at the service root;
2323
when no path is provided, ``/api/v1/broker`` is assumed.
2424
25-
Authentication is intentionally simple for the proof-of-concept:
26-
include ``?api_key=...`` in the URL and the backend will move it into the
25+
Authentication is currently supported via an API key, which can be provided by
26+
including ``?api_key=...`` in the URL and the backend will move it into the
2727
``X-Offwork-API-Key`` request header.
2828
"""
2929

offwork/worker/backends/rabbitmq.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,22 @@ def __init__(
6767
*,
6868
task_queue: str | None = None,
6969
result_ttl: int | None = None,
70+
queue_namespace: str | None = None,
7071
) -> None:
7172
self._url = url
7273
self._task_queue_name = task_queue or self.TASK_QUEUE
7374
self._result_ttl = result_ttl or self.DEFAULT_RESULT_TTL
7475
self._connection: Any = None
7576
self._channel: Any = None
7677
self._lock = asyncio.Lock()
78+
ns = f"{queue_namespace}." if queue_namespace else ""
79+
self._result_prefix = f"{ns}{self.RESULT_PREFIX}"
80+
self._heartbeat_prefix = f"{ns}{self.HEARTBEAT_PREFIX}"
81+
self._cancel_prefix = f"{ns}{self.CANCEL_PREFIX}"
82+
self._progress_prefix = f"{ns}{self.PROGRESS_PREFIX}"
83+
self._schedule_prefix = f"{ns}{self.SCHEDULE_PREFIX}"
84+
self._throttle_prefix = f"{ns}{self.THROTTLE_PREFIX}"
85+
self._notify_exchange = f"{ns}{self.NOTIFY_EXCHANGE}"
7786

7887
# -- connection management --------------------------------------------------
7988

@@ -146,7 +155,7 @@ async def _declare_queue_robust(
146155
redeclares it with the new arguments.
147156
"""
148157
try:
149-
return await channel.declare_queue(name, arguments=arguments)
158+
return await channel.declare_queue(name, durable=True, arguments=arguments)
150159
except Exception:
151160
# Channel is closed by RabbitMQ after PRECONDITION_FAILED.
152161
# Reopen a fresh channel, purge the stale queue, and retry.
@@ -157,7 +166,7 @@ async def _declare_queue_robust(
157166
except Exception:
158167
self._channel = None
159168
channel = await self._ensure_channel()
160-
return await channel.declare_queue(name, arguments=arguments)
169+
return await channel.declare_queue(name, durable=True, arguments=arguments)
161170

162171
async def _kv_put(
163172
self, prefix: str, task_id: str, value: str, ttl_s: int,
@@ -229,8 +238,8 @@ async def listen(self) -> AsyncIterator[str]:
229238
async def send_result(self, task_id: str, result_json: str) -> None:
230239
async with self._lock:
231240
channel = await self._ensure_channel()
232-
name = f"{self.RESULT_PREFIX}{task_id}"
233-
await channel.declare_queue(name, arguments=self._result_args())
241+
name = f"{self._result_prefix}{task_id}"
242+
await channel.declare_queue(name, durable=True, arguments=self._result_args())
234243
await channel.default_exchange.publish(
235244
aio_pika.Message(result_json.encode()),
236245
routing_key=name,
@@ -239,9 +248,9 @@ async def send_result(self, task_id: str, result_json: str) -> None:
239248
async def get_result(self, task_id: str, timeout: float | None = None) -> str:
240249
channel = await self._new_channel()
241250
try:
242-
name = f"{self.RESULT_PREFIX}{task_id}"
251+
name = f"{self._result_prefix}{task_id}"
243252
queue = await channel.declare_queue(
244-
name, arguments=self._result_args(),
253+
name, durable=True, arguments=self._result_args(),
245254
)
246255
future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
247256

@@ -270,9 +279,9 @@ async def _on_message(msg: Any) -> None:
270279
async def try_get_result(self, task_id: str) -> str | None:
271280
async with self._lock:
272281
channel = await self._ensure_channel()
273-
name = f"{self.RESULT_PREFIX}{task_id}"
282+
name = f"{self._result_prefix}{task_id}"
274283
queue = await channel.declare_queue(
275-
name, arguments=self._result_args(),
284+
name, durable=True, arguments=self._result_args(),
276285
)
277286
msg = await queue.get(fail=False)
278287
if msg is None:
@@ -285,55 +294,51 @@ async def try_get_result(self, task_id: str) -> str | None:
285294

286295
async def send_heartbeat(self, task_id: str) -> None:
287296
await self._kv_put(
288-
self.HEARTBEAT_PREFIX, task_id,
297+
self._heartbeat_prefix, task_id,
289298
str(time.time()), self.HEARTBEAT_TTL,
290299
)
291300

292301
async def get_heartbeat(self, task_id: str) -> float | None:
293-
# Consume (ack) the heartbeat rather than peeking. This avoids a
294-
# RabbitMQ race where nack+requeue bypasses x-max-length=1 and causes
295-
# the stale heartbeat to be returned on every subsequent poll, making
296-
# stall detection fire spuriously.
297302
raw = await self._kv_get(
298-
self.HEARTBEAT_PREFIX, task_id, self.HEARTBEAT_TTL, peek=False,
303+
self._heartbeat_prefix, task_id, self.HEARTBEAT_TTL, peek=True,
299304
)
300305
return float(raw) if raw is not None else None
301306

302307
# -- Cancellation ----------------------------------------------------------
303308

304309
async def cancel_task(self, task_id: str) -> None:
305310
await self._kv_put(
306-
self.CANCEL_PREFIX, task_id, "1", self.CANCEL_TTL,
311+
self._cancel_prefix, task_id, "1", self.CANCEL_TTL,
307312
)
308313

309314
async def is_cancelled(self, task_id: str) -> bool:
310315
raw = await self._kv_get(
311-
self.CANCEL_PREFIX, task_id, self.CANCEL_TTL, peek=True,
316+
self._cancel_prefix, task_id, self.CANCEL_TTL, peek=True,
312317
)
313318
return raw is not None
314319

315320
# -- Progress --------------------------------------------------------------
316321

317322
async def send_progress(self, task_id: str, progress_json: str) -> None:
318323
await self._kv_put(
319-
self.PROGRESS_PREFIX, task_id, progress_json, self.PROGRESS_TTL,
324+
self._progress_prefix, task_id, progress_json, self.PROGRESS_TTL,
320325
)
321326

322327
async def get_progress(self, task_id: str) -> str | None:
323328
return await self._kv_get(
324-
self.PROGRESS_PREFIX, task_id, self.PROGRESS_TTL, peek=True,
329+
self._progress_prefix, task_id, self.PROGRESS_TTL, peek=True,
325330
)
326331

327332
# -- Schedule cancellation -------------------------------------------------
328333

329334
async def cancel_schedule(self, schedule_id: str) -> None:
330335
await self._kv_put(
331-
self.SCHEDULE_PREFIX, schedule_id, "1", self.SCHEDULE_TTL,
336+
self._schedule_prefix, schedule_id, "1", self.SCHEDULE_TTL,
332337
)
333338

334339
async def is_schedule_cancelled(self, schedule_id: str) -> bool:
335340
raw = await self._kv_get(
336-
self.SCHEDULE_PREFIX, schedule_id, self.SCHEDULE_TTL, peek=True,
341+
self._schedule_prefix, schedule_id, self.SCHEDULE_TTL, peek=True,
337342
)
338343
return raw is not None
339344

@@ -352,7 +357,7 @@ async def check_throttle(self, function_name: str) -> bool:
352357
# message body. Function names can contain characters rejected by
353358
# the AMQP queue-name grammar (e.g. ``<locals>``), so we hash them.
354359
raw = await self._kv_get(
355-
self.THROTTLE_PREFIX, self._safe_suffix(function_name),
360+
self._throttle_prefix, self._safe_suffix(function_name),
356361
self.THROTTLE_QUEUE_TTL, peek=True,
357362
)
358363
if raw is None:
@@ -364,7 +369,7 @@ async def record_throttle(
364369
) -> None:
365370
expiry = time.time() + throttle_seconds
366371
await self._kv_put(
367-
self.THROTTLE_PREFIX, self._safe_suffix(function_name),
372+
self._throttle_prefix, self._safe_suffix(function_name),
368373
str(expiry), self.THROTTLE_QUEUE_TTL,
369374
)
370375

@@ -374,7 +379,7 @@ async def notify_result(self, task_id: str) -> None:
374379
async with self._lock:
375380
channel = await self._ensure_channel()
376381
exchange = await channel.declare_exchange(
377-
self.NOTIFY_EXCHANGE, aio_pika.ExchangeType.FANOUT,
382+
self._notify_exchange, aio_pika.ExchangeType.FANOUT,
378383
)
379384
await exchange.publish(
380385
aio_pika.Message(task_id.encode()),
@@ -385,7 +390,7 @@ async def subscribe_results(self) -> AsyncIterator[str]:
385390
channel = await self._new_channel()
386391
try:
387392
exchange = await channel.declare_exchange(
388-
self.NOTIFY_EXCHANGE, aio_pika.ExchangeType.FANOUT,
393+
self._notify_exchange, aio_pika.ExchangeType.FANOUT,
389394
)
390395
queue = await channel.declare_queue(exclusive=True)
391396
await queue.bind(exchange)

tests/test_rabbitmq_backend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ async def backend() -> AsyncIterator["RabbitMQBackend"]:
5656
b = RabbitMQBackend(
5757
RABBITMQ_URL,
5858
task_queue=f"offwork.test.tasks.{suffix}",
59+
queue_namespace=f"test.{suffix}",
5960
)
6061
yield b
6162
await b.close()

0 commit comments

Comments
 (0)