Skip to content

Commit 2e6a8cb

Browse files
committed
refactor: implement asyncio lock for order store operations to enhance concurrency
1 parent e0b2cf5 commit 2e6a8cb

1 file changed

Lines changed: 50 additions & 33 deletions

File tree

  • custom_components/rohlikcz

custom_components/rohlikcz/hub.py

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from __future__ import annotations
2+
import asyncio
23
import json
34
import logging
45
import os
@@ -337,6 +338,7 @@ def __init__(self, hass: HomeAssistant, username: str, password: str, analytics:
337338
self.data: dict = {}
338339
self._callbacks: set[Callable[[], None]] = set()
339340
self._order_store: OrderStore | None = None
341+
self._store_lock = asyncio.Lock()
340342
self._analytics: list[str] = analytics or []
341343
self._top_n: int = top_n
342344
self._hide_discontinued: bool = hide_discontinued
@@ -404,31 +406,37 @@ async def async_update(self) -> None:
404406

405407
# Process delivered orders into persistent store and auto-enrich new ones
406408
if self._analytics and self._order_store and self.data.get("delivered_orders"):
407-
new = self._order_store.process_orders(self.data["delivered_orders"])
409+
async with self._store_lock:
410+
new = self._order_store.process_orders(self.data["delivered_orders"])
411+
if new > 0:
412+
await self._order_store.async_save()
408413
if new > 0:
409-
await self._order_store.async_save()
410414
# Schedule enrichment in background (don't block update cycle / setup)
411415
self._hass.async_create_task(self._auto_enrich_new_orders(new))
412416

413417
await self.publish_updates()
414418

415419
async def _auto_enrich_new_orders(self, new_count: int) -> None:
416420
"""Auto-enrich recently added orders in the background."""
421+
enriched = False
417422
try:
418-
unenriched = self._order_store.unenriched_order_ids
419-
recent_unenriched = unenriched[-new_count:] if len(unenriched) >= new_count else unenriched
420-
if recent_unenriched:
421-
items_map = await self._rohlik_api.enrich_orders_with_items(recent_unenriched)
422-
for order_id, items in items_map.items():
423-
self._order_store.add_items_to_order(order_id, items)
424-
uncategorized = self._order_store.uncategorized_product_ids()
425-
if uncategorized:
426-
cat_map = await self._rohlik_api.fetch_product_categories_batch(uncategorized)
427-
self._order_store.update_product_categories(cat_map)
428-
if items_map:
429-
await self._order_store.async_save()
430-
_LOGGER.info(f"Auto-enriched {len(items_map)} new orders")
431-
await self.publish_updates()
423+
async with self._store_lock:
424+
unenriched = self._order_store.unenriched_order_ids
425+
recent_unenriched = unenriched[-new_count:] if len(unenriched) >= new_count else unenriched
426+
if recent_unenriched:
427+
items_map = await self._rohlik_api.enrich_orders_with_items(recent_unenriched)
428+
for order_id, items in items_map.items():
429+
self._order_store.add_items_to_order(order_id, items)
430+
uncategorized = self._order_store.uncategorized_product_ids()
431+
if uncategorized:
432+
cat_map = await self._rohlik_api.fetch_product_categories_batch(uncategorized)
433+
self._order_store.update_product_categories(cat_map)
434+
if items_map:
435+
await self._order_store.async_save()
436+
_LOGGER.info(f"Auto-enriched {len(items_map)} new orders")
437+
enriched = True
438+
if enriched:
439+
await self.publish_updates()
432440
except Exception as err:
433441
_LOGGER.warning(f"Auto-enrichment of new orders failed: {err}")
434442

@@ -437,25 +445,26 @@ async def fetch_full_order_history(self, hass=None) -> dict:
437445
438446
Returns dict with stats.
439447
"""
440-
# Step 1: Fetch order list (existing behavior)
441-
all_orders = await self._rohlik_api.fetch_all_delivered_orders()
442-
new_orders = 0
443-
if self._order_store and all_orders:
444-
new_orders = self._order_store.process_orders(all_orders)
445-
if new_orders > 0:
446-
await self._order_store.async_save()
448+
async with self._store_lock:
449+
# Step 1: Fetch order list (existing behavior)
450+
all_orders = await self._rohlik_api.fetch_all_delivered_orders()
451+
new_orders = 0
452+
if self._order_store and all_orders:
453+
new_orders = self._order_store.process_orders(all_orders)
454+
if new_orders > 0:
455+
await self._order_store.async_save()
447456

448-
# Step 2: Enrich with items and categories
449-
if self._order_store:
450-
enrich_stats = await self.enrich_order_details(hass=hass)
451-
enrich_stats["new_orders"] = new_orders
452-
# Mark backfill as done so we don't re-download on next restart
453-
if not self._order_store.backfill_complete:
454-
self._order_store.mark_backfill_complete()
455-
await self._order_store.async_save()
456-
return enrich_stats
457+
# Step 2: Enrich with items and categories
458+
if self._order_store:
459+
enrich_stats = await self._enrich_order_details(hass=hass)
460+
enrich_stats["new_orders"] = new_orders
461+
# Mark backfill as done so we don't re-download on next restart
462+
if not self._order_store.backfill_complete:
463+
self._order_store.mark_backfill_complete()
464+
await self._order_store.async_save()
465+
return enrich_stats
457466

458-
return {"total_orders": 0, "new_orders": 0}
467+
return {"total_orders": 0, "new_orders": 0}
459468

460469
def _t(self, key: str) -> str:
461470
"""Get localized notification string based on HA language."""
@@ -474,6 +483,14 @@ async def _notify(self, hass, message: str, title: str, notification_id: str = "
474483
async def enrich_order_details(self, hass=None) -> dict:
475484
"""Fetch item details and categories for all unenriched orders.
476485
486+
Public entry point — acquires the store lock.
487+
"""
488+
async with self._store_lock:
489+
return await self._enrich_order_details(hass=hass)
490+
491+
async def _enrich_order_details(self, hass=None) -> dict:
492+
"""Internal enrichment logic — caller must hold _store_lock.
493+
477494
Two-phase enrichment:
478495
1. Fetch items for orders missing them
479496
2. Fetch categories for products not yet in cache

0 commit comments

Comments
 (0)