|
4 | 4 | import dataclasses |
5 | 5 | import json |
6 | 6 | import logging |
| 7 | +import time |
7 | 8 | from asyncio import CancelledError |
8 | 9 | from collections.abc import AsyncGenerator, AsyncIterable |
9 | 10 |
|
|
22 | 23 |
|
23 | 24 | # Also: how do we manage multiple SSE servers / uvicorn workers / load-balancing? |
24 | 25 |
|
| 26 | +SSE_EVENTS_COUNTER_LOG_INTERVAL_SECONDS = 300 |
| 27 | + |
25 | 28 |
|
26 | 29 | def create_hub_cache_endpoint(hub_cache_watcher: HubCacheWatcher) -> Endpoint: |
27 | 30 | async def hub_cache_endpoint(request: Request) -> Response: |
28 | | - logging.info("/hub-cache") |
29 | | - |
30 | 31 | all = get_request_parameter(request, "all", default="false").lower() == "true" |
31 | 32 | # ^ the values that trigger the initialization are "true", "True" and any other case-insensitive variant |
32 | 33 |
|
33 | 34 | uuid, event = hub_cache_watcher.subscribe() |
| 35 | + logging.info(f"/hub-cache {all=} {uuid=}") |
34 | 36 | if all: |
35 | 37 | init_task = hub_cache_watcher.run_initialization(uuid) |
36 | 38 |
|
37 | 39 | async def event_generator() -> AsyncGenerator[ServerSentEvent, None]: |
| 40 | + _last_time = time.time() |
| 41 | + _count_since_last_time = 0 |
38 | 42 | try: |
39 | 43 | while True: |
| 44 | + if time.time() - _last_time > SSE_EVENTS_COUNTER_LOG_INTERVAL_SECONDS: |
| 45 | + logging.info( |
| 46 | + f"SSE events sent in the last {SSE_EVENTS_COUNTER_LOG_INTERVAL_SECONDS}s: {_count_since_last_time} ({uuid=})" |
| 47 | + ) |
| 48 | + _last_time = time.time() |
| 49 | + _count_since_last_time = 0 |
40 | 50 | new_value = await event.wait_value() |
41 | 51 | event.clear() |
42 | 52 | if new_value is not None: |
43 | 53 | logging.debug(f"Sending new value: {new_value}") |
| 54 | + _count_since_last_time += 1 |
44 | 55 | yield ServerSentEvent(data=json.dumps(dataclasses.asdict(new_value)), event="message") |
45 | 56 | finally: |
| 57 | + logging.info( |
| 58 | + f"SSE events sent in the last {int(time.time() - _last_time)}s: {_count_since_last_time} ({uuid=})" |
| 59 | + ) |
46 | 60 | hub_cache_watcher.unsubscribe(uuid) |
47 | 61 | if all: |
48 | 62 | await init_task |
|
0 commit comments