Skip to content

Commit 50c4b6c

Browse files
authored
fix: Keep track of synthetic apify-default-dataset-item events (#814)
### Description - Add `_DatasetClientPPEMixin` to dataset storage clients to automatically charge for the synthetic `apify-default-dataset-item` event on every push to the default dataset. - Update `Actor.push_data()` to account for the combined explicit + synthetic event price when enforcing budget limits. ### Issues - Closes: #760 - Closes: #705 ### Testing - Add new unit and E2E test.
1 parent 1b5ea57 commit 50c4b6c

File tree

11 files changed

+705
-138
lines changed

11 files changed

+705
-138
lines changed

src/apify/_actor.py

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
EventSystemInfoData,
2727
)
2828

29-
from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
29+
from apify._charging import DEFAULT_DATASET_ITEM_EVENT, ChargeResult, ChargingManager, ChargingManagerImplementation
3030
from apify._configuration import Configuration
3131
from apify._consts import EVENT_LISTENERS_TIMEOUT
3232
from apify._crypto import decrypt_input_secrets, load_private_key
@@ -380,14 +380,6 @@ def event_manager(self) -> EventManager:
380380
def _charging_manager_implementation(self) -> ChargingManagerImplementation:
381381
return ChargingManagerImplementation(self.configuration, self.apify_client)
382382

383-
@cached_property
384-
def _charge_lock(self) -> asyncio.Lock:
385-
"""Lock to synchronize charge operations.
386-
387-
Prevents race conditions between Actor.charge and Actor.push_data calls.
388-
"""
389-
return asyncio.Lock()
390-
391383
@cached_property
392384
def _storage_client(self) -> SmartApifyStorageClient:
393385
"""Storage client used by the Actor.
@@ -621,48 +613,63 @@ async def open_request_queue(
621613
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
622614
)
623615

624-
@overload
625-
async def push_data(self, data: dict | list[dict]) -> None: ...
626-
@overload
627-
async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ...
628616
@_ensure_context
629-
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None:
617+
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult:
630618
"""Store an object or a list of objects to the default dataset of the current Actor run.
631619
632620
Args:
633621
data: The data to push to the default dataset.
634622
charged_event_name: If provided and if the Actor uses the pay-per-event pricing model,
635623
the method will attempt to charge for the event for each pushed item.
636624
"""
625+
if charged_event_name and charged_event_name.startswith('apify-'):
626+
raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually')
627+
628+
charging_manager = self.get_charging_manager()
629+
637630
if not data:
638-
return None
631+
charged_event_name = charged_event_name or DEFAULT_DATASET_ITEM_EVENT
632+
charge_limit_reached = charging_manager.is_event_charge_limit_reached(charged_event_name)
633+
634+
return ChargeResult(
635+
event_charge_limit_reached=charge_limit_reached,
636+
charged_count=0,
637+
chargeable_within_limit=charging_manager.compute_chargeable(),
638+
)
639639

640640
data = data if isinstance(data, list) else [data]
641641

642-
# No charging, just push the data without locking.
643-
if charged_event_name is None:
644-
dataset = await self.open_dataset()
645-
await dataset.push_data(data)
646-
return None
642+
dataset = await self.open_dataset()
647643

648-
# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
644+
# Acquire the charge lock to prevent race conditions between concurrent
649645
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
650-
async with self._charge_lock:
651-
max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
652-
charged_event_name
653-
)
654-
655-
# Push as many items as we can charge for.
656-
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
646+
async with charging_manager.charge_lock():
647+
# Synthetic events are handled within dataset.push_data, only get data for `ChargeResult`.
648+
if charged_event_name is None:
649+
before = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT)
650+
await dataset.push_data(data)
651+
after = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT)
652+
return ChargeResult(
653+
event_charge_limit_reached=charging_manager.is_event_charge_limit_reached(
654+
DEFAULT_DATASET_ITEM_EVENT
655+
),
656+
charged_count=after - before,
657+
chargeable_within_limit=charging_manager.compute_chargeable(),
658+
)
657659

658-
dataset = await self.open_dataset()
660+
pushed_items_count = charging_manager.compute_push_data_limit(
661+
items_count=len(data),
662+
event_name=charged_event_name,
663+
is_default_dataset=True,
664+
)
659665

660666
if pushed_items_count < len(data):
661667
await dataset.push_data(data[:pushed_items_count])
662668
elif pushed_items_count > 0:
663669
await dataset.push_data(data)
664670

665-
return await self.get_charging_manager().charge(
671+
# Only charge explicit events; synthetic events will be processed within the client.
672+
return await charging_manager.charge(
666673
event_name=charged_event_name,
667674
count=pushed_items_count,
668675
)
@@ -726,9 +733,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
726733
event_name: Name of the event to be charged for.
727734
count: Number of events to charge for.
728735
"""
729-
# Acquire lock to prevent race conditions with concurrent charge/push_data calls.
730-
async with self._charge_lock:
731-
return await self.get_charging_manager().charge(event_name, count)
736+
# charging_manager.charge() acquires charge_lock internally.
737+
charging_manager = self.get_charging_manager()
738+
return await charging_manager.charge(event_name, count)
732739

733740
@overload
734741
def on(

0 commit comments

Comments
 (0)