Skip to content

Commit 6498b2e

Browse files
fix: clean up lint errors and typos in observability code
- Fix typos in observer docstring and metric descriptions - Add missing docstrings to observer protocol and implementation methods - Remove unused Gauge import from PrometheusMiddleware.__init__ - Remove unused ReceiverObserver import from run.py - Fix import ordering (ruff I001) - Add noqa for expected complexity in runner method - Run black formatting
1 parent 2c48668 commit 6498b2e

File tree

4 files changed

+44
-25
lines changed

4 files changed

+44
-25
lines changed

taskiq/cli/worker/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from taskiq.cli.utils import import_object, import_tasks
1414
from taskiq.cli.worker.args import WorkerArgs
1515
from taskiq.cli.worker.process_manager import ProcessManager
16-
from taskiq.receiver import Receiver, ReceiverObserver
16+
from taskiq.receiver import Receiver
1717

1818
try:
1919
import uvloop

taskiq/middlewares/prometheus_middleware.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
from pathlib import Path
55
from tempfile import gettempdir
66
from typing import Any
7+
78
from taskiq.abc.middleware import TaskiqMiddleware
89
from taskiq.message import TaskiqMessage
9-
from taskiq.result import TaskiqResult
1010
from taskiq.receiver.observer import ReceiverObserver
11+
from taskiq.result import TaskiqResult
1112

1213
logger = getLogger("taskiq.prometheus")
1314

@@ -21,7 +22,7 @@ class PrometheusMiddleware(TaskiqMiddleware):
2122
2223
:param server_port: The port to listen on.
2324
:param server_addr: The address to listen on.
24-
:paam metrics_path: The path to store metrics for multiproc env.
25+
:param metrics_path: The path to store metrics for multiproc env.
2526
"""
2627

2728
def __init__(
@@ -44,7 +45,7 @@ def __init__(
4445
logger.debug("Initializing metrics")
4546

4647
try:
47-
from prometheus_client import Counter, Histogram, Gauge # noqa: PLC0415
48+
from prometheus_client import Counter, Histogram # noqa: PLC0415
4849
except ImportError as exc:
4950
raise ImportError(
5051
"Cannot initialize metrics. Please install 'taskiq[metrics]'.",
@@ -199,9 +200,14 @@ def post_execute(
199200
self.success_tasks.labels(message.task_name).inc()
200201
self.execution_time.labels(message.task_name).observe(result.execution_time)
201202

202-
def set_broker(self, broker: "AsyncBroker") -> None: # noqa: F821 pyright: ignore[reportUnknownVariableType]
203+
def set_broker(self, broker: "AsyncBroker") -> None: # noqa: F821
204+
"""
205+
Set broker and attach receiver observer.
206+
207+
:param broker: broker to set.
208+
"""
203209
super().set_broker(broker)
204-
broker._receiver_observer = PrometheusReceiverObserver()
210+
broker._receiver_observer = PrometheusReceiverObserver() # noqa: SLF001
205211

206212
def post_save(
207213
self,
@@ -250,20 +256,25 @@ def __init__(self) -> None:
250256
)
251257
self.deserialize_error = Counter(
252258
"deserialize_error_count",
253-
"Number of times broker faced a desrialization error",
259+
"Number of times broker faced a deserialization error",
254260
)
255261

256262
def on_prefetch_queue_size(self, size: int) -> None:
263+
"""Record current prefetch queue depth."""
257264
self.prefetch_queue_size.set(size)
258265

259266
def on_semaphore_status(self, available: int) -> None:
267+
"""Record available semaphore slots."""
260268
self.semaphore_available.set(available)
261269

262270
def on_active_tasks_count(self, count: int) -> None:
271+
"""Record number of currently executing tasks."""
263272
self.active_tasks_count.set(count)
264273

265274
def on_task_not_found(self, task_name: str) -> None:
275+
"""Increment counter for unregistered task lookups."""
266276
self.task_not_found_total.labels(task_name).inc()
267277

268278
def on_deserialize_error(self, raw: bytes, error: Exception) -> None:
279+
"""Increment counter for message deserialization failures."""
269280
self.deserialize_error.inc()

taskiq/receiver/observer.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,32 @@
44
@runtime_checkable
55
class ReceiverObserver(Protocol):
66
"""
7-
Observer for reciever stats.
7+
Observer for receiver stats.
88
9-
This classs is used to observe/collect metrics for the receiver.
9+
This class is used to observe/collect metrics for the receiver.
1010
This includes semaphore usage, tasks in queue, etc.
1111
1212
metrics tracked:
1313
- Number of tasks in queue
14-
- Number of taks in execution (Semaphore uusage)
14+
- Number of tasks in execution (semaphore usage)
1515
"""
1616

17-
def on_prefetch_queue_size(self, size: int) -> None: ...
18-
def on_semaphore_status(self, available: int) -> None: ...
19-
def on_active_tasks_count(self, count: int) -> None: ...
20-
def on_task_not_found(self, task_name: str) -> None: ...
21-
def on_deserialize_error(self, raw: bytes, error: Exception) -> None: ...
17+
def on_prefetch_queue_size(self, size: int) -> None:
18+
"""Called when the prefetch queue size changes."""
19+
...
20+
21+
def on_semaphore_status(self, available: int) -> None:
22+
"""Called when semaphore availability changes."""
23+
...
24+
25+
def on_active_tasks_count(self, count: int) -> None:
26+
"""Called when the number of active tasks changes."""
27+
...
28+
29+
def on_task_not_found(self, task_name: str) -> None:
30+
"""Called when a received task is not registered."""
31+
...
32+
33+
def on_deserialize_error(self, raw: bytes, error: Exception) -> None:
34+
"""Called when a message fails to deserialize."""
35+
...

taskiq/receiver/receiver.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ async def prefetcher(
412412
await queue.put(QUEUE_DONE)
413413
self.sem_prefetch.release()
414414

415-
async def runner(
415+
async def runner( # noqa: C901
416416
self,
417417
queue: "asyncio.Queue[bytes | AckableMessage]",
418418
) -> None:
@@ -443,26 +443,20 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
443443
self.sem.release()
444444

445445
if self.observer is not None:
446-
self.observer.on_semaphore_status(
447-
self.sem._value # noqa
448-
)
446+
self.observer.on_semaphore_status(self.sem._value) # noqa
449447

450448
while True:
451449
try:
452450
# Waits for semaphore to be released.
453451
if self.sem is not None:
454452
await self.sem.acquire()
455453
if self.observer is not None:
456-
self.observer.on_semaphore_status(
457-
self.sem._value # noqa
458-
)
454+
self.observer.on_semaphore_status(self.sem._value) # noqa
459455

460456
self.sem_prefetch.release()
461457
message = await queue.get()
462458
if self.observer is not None:
463-
self.observer.on_prefetch_queue_size(
464-
queue.qsize() # noqa
465-
)
459+
self.observer.on_prefetch_queue_size(queue.qsize())
466460
if message is QUEUE_DONE:
467461
# asyncio.wait will throw an error if there is nothing to wait for
468462
if tasks:

0 commit comments

Comments
 (0)