Skip to content

Commit 209ee79

Browse files
committed
Keep track of synthetic apify-default-dataset-item events
1 parent 9e0aa56 commit 209ee79

File tree

9 files changed

+409
-27
lines changed

9 files changed

+409
-27
lines changed

src/apify/_actor.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,9 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non
628628

629629
data = data if isinstance(data, list) else [data]
630630

631+
if charged_event_name and charged_event_name.startswith('apify-'):
632+
raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually')
633+
631634
# No charging, just push the data without locking.
632635
if charged_event_name is None:
633636
dataset = await self.open_dataset()
@@ -637,20 +640,20 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non
637640
# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
638641
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
639642
async with self._charge_lock:
640-
max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
641-
charged_event_name
643+
pushed_items_count = self.get_charging_manager().calculate_push_data_limit(
644+
items_count=len(data),
645+
event_name=charged_event_name,
646+
is_default_dataset=True,
642647
)
643648

644-
# Push as many items as we can charge for.
645-
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
646-
647649
dataset = await self.open_dataset()
648650

649651
if pushed_items_count < len(data):
650652
await dataset.push_data(data[:pushed_items_count])
651653
elif pushed_items_count > 0:
652654
await dataset.push_data(data)
653655

656+
# Only charge explicit events; synthetic events will be processed within the client.
654657
return await self.get_charging_manager().charge(
655658
event_name=charged_event_name,
656659
count=pushed_items_count,

src/apify/_charging.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import math
4+
from contextvars import ContextVar
45
from dataclasses import dataclass
56
from datetime import datetime, timezone
67
from decimal import Decimal
@@ -31,6 +32,14 @@
3132

3233
run_validator = TypeAdapter[ActorRun | None](ActorRun | None)
3334

35+
DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item'
36+
37+
# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to
38+
# access the charging manager without needing to pass it explicitly.
39+
charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar(
40+
'charging_manager_ctx', default=None
41+
)
42+
3443

3544
@docs_group('Charging')
3645
class ChargingManager(Protocol):
@@ -81,6 +90,28 @@ def get_charged_event_count(self, event_name: str) -> int:
8190
def get_max_total_charge_usd(self) -> Decimal:
8291
"""Get the configured maximum total charge for this Actor run."""
8392

93+
def calculate_push_data_limit(
94+
self,
95+
items_count: int,
96+
event_name: str,
97+
*,
98+
is_default_dataset: bool,
99+
) -> int:
100+
"""Calculate how many items can be pushed and charged within the current budget.
101+
102+
Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event,
103+
so that the combined cost per item does not exceed the remaining budget.
104+
105+
Args:
106+
items_count: The number of items to be pushed.
107+
event_name: The explicit event name to charge for each item.
108+
is_default_dataset: Whether the data is pushed to the default dataset.
109+
If True, the synthetic event cost is included in the combined price.
110+
111+
Returns:
112+
Max number of items that can be pushed within the budget.
113+
"""
114+
84115

85116
@docs_group('Charging')
86117
@dataclass(frozen=True)
@@ -190,6 +221,11 @@ async def __aenter__(self) -> None:
190221

191222
self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
192223

224+
# if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset
225+
# clients can access the charging manager and charge for synthetic events.
226+
if self._pricing_model == 'PAY_PER_EVENT':
227+
charging_manager_ctx.set(self)
228+
193229
async def __aexit__(
194230
self,
195231
exc_type: type[BaseException] | None,
@@ -199,6 +235,7 @@ async def __aexit__(
199235
if not self.active:
200236
raise RuntimeError('Exiting an uninitialized ChargingManager')
201237

238+
charging_manager_ctx.set(None)
202239
self.active = False
203240

204241
@ensure_context
@@ -258,7 +295,11 @@ def calculate_chargeable() -> dict[str, int | None]:
258295
if self._actor_run_id is None:
259296
raise RuntimeError('Actor run ID not configured')
260297

261-
if event_name in self._pricing_info:
298+
if event_name.startswith('apify-'):
299+
# Synthetic events (e.g. apify-default-dataset-item) are tracked internally only,
300+
# the platform handles them automatically based on dataset writes.
301+
pass
302+
elif event_name in self._pricing_info:
262303
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
263304
else:
264305
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")
@@ -300,14 +341,7 @@ def calculate_total_charged_amount(self) -> Decimal:
300341

301342
@ensure_context
302343
def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
303-
pricing_info = self._pricing_info.get(event_name)
304-
305-
if pricing_info is not None:
306-
price = pricing_info.price
307-
elif not self._is_at_home:
308-
price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached
309-
else:
310-
price = Decimal()
344+
price = self._get_event_price(event_name)
311345

312346
if not price:
313347
return None
@@ -337,6 +371,25 @@ def get_charged_event_count(self, event_name: str) -> int:
337371
def get_max_total_charge_usd(self) -> Decimal:
338372
return self._max_total_charge_usd
339373

374+
@ensure_context
375+
def calculate_push_data_limit(
376+
self,
377+
items_count: int,
378+
event_name: str,
379+
*,
380+
is_default_dataset: bool,
381+
) -> int:
382+
explicit_price = self._get_event_price(event_name)
383+
synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0)
384+
combined_price = explicit_price + synthetic_price
385+
386+
if not combined_price:
387+
return items_count
388+
389+
result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price
390+
max_count = max(0, math.floor(result)) if result.is_finite() else items_count
391+
return min(items_count, max_count)
392+
340393
async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
341394
"""Fetch pricing information from environment variables or API."""
342395
# Check if pricing info is available via environment variables
@@ -370,6 +423,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
370423
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
371424
)
372425

426+
def _get_event_price(self, event_name: str) -> Decimal:
427+
pricing_info = self._pricing_info.get(event_name)
428+
if pricing_info is not None:
429+
return pricing_info.price
430+
return Decimal(0) if self._is_at_home else Decimal(1)
431+
373432

374433
@dataclass
375434
class ChargingStateItem:

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1414

1515
from ._api_client_creation import create_storage_api_client
16+
from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin
1617

1718
if TYPE_CHECKING:
1819
from collections.abc import AsyncIterator
@@ -25,7 +26,7 @@
2526
logger = getLogger(__name__)
2627

2728

28-
class ApifyDatasetClient(DatasetClient):
29+
class ApifyDatasetClient(DatasetClient, _DatasetClientPPEMixin):
2930
"""An Apify platform implementation of the dataset client."""
3031

3132
_MAX_PAYLOAD_SIZE = ByteSize.from_mb(9)
@@ -48,6 +49,8 @@ def __init__(
4849
4950
Preferably use the `ApifyDatasetClient.open` class method to create a new instance.
5051
"""
52+
super().__init__()
53+
5154
self._api_client = api_client
5255
"""The Apify dataset client for API operations."""
5356

@@ -108,12 +111,16 @@ async def open(
108111
id=id,
109112
)
110113

111-
return cls(
114+
dataset_client = cls(
112115
api_client=api_client,
113116
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
114117
lock=asyncio.Lock(),
115118
)
116119

120+
dataset_client.is_default_dataset = (await dataset_client.get_metadata()).id == configuration.default_dataset_id
121+
122+
return dataset_client
123+
117124
@override
118125
async def purge(self) -> None:
119126
raise NotImplementedError(
@@ -128,21 +135,19 @@ async def drop(self) -> None:
128135

129136
@override
130137
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
131-
async def payloads_generator() -> AsyncIterator[str]:
132-
for index, item in enumerate(data):
138+
async def payloads_generator(items: list[Any]) -> AsyncIterator[str]:
139+
for index, item in enumerate(items):
133140
yield await self._check_and_serialize(item, index)
134141

135142
async with self._lock:
136-
# Handle lists
137-
if isinstance(data, list):
138-
# Invoke client in series to preserve the order of data
139-
async for items in self._chunk_by_size(payloads_generator()):
140-
await self._api_client.push_items(items=items)
143+
items = data if isinstance(data, list) else [data]
144+
limit = await self._calculate_limit_for_push(len(items))
145+
items = items[:limit]
141146

142-
# Handle singular items
143-
else:
144-
items = await self._check_and_serialize(data)
145-
await self._api_client.push_items(items=items)
147+
async for chunk in self._chunk_by_size(payloads_generator(items)):
148+
await self._api_client.push_items(items=chunk)
149+
150+
await self._charge_for_items(count_items=limit)
146151

147152
@override
148153
async def get_data(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Any
4+
5+
from typing_extensions import Self, override
6+
7+
from crawlee.storage_clients._file_system import FileSystemDatasetClient
8+
9+
from apify._configuration import Configuration as ApifyConfiguration
10+
from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin
11+
12+
if TYPE_CHECKING:
13+
from crawlee.configuration import Configuration
14+
15+
16+
class ApifyFileSystemDatasetClient(FileSystemDatasetClient, _DatasetClientPPEMixin):
17+
def __init__(self, *args: Any, **kwargs: Any) -> None:
18+
FileSystemDatasetClient.__init__(self, *args, **kwargs)
19+
_DatasetClientPPEMixin.__init__(self)
20+
21+
@override
22+
@classmethod
23+
async def open(
24+
cls,
25+
*,
26+
id: str | None,
27+
name: str | None,
28+
alias: str | None,
29+
configuration: Configuration | ApifyConfiguration,
30+
) -> Self:
31+
32+
dataset_client = await super().open(
33+
id=id,
34+
name=name,
35+
alias=alias,
36+
configuration=configuration,
37+
)
38+
39+
if isinstance(configuration, ApifyConfiguration) and all(v is None for v in (id, name, alias)):
40+
dataset_client.is_default_dataset = True
41+
42+
return dataset_client
43+
44+
@override
45+
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
46+
items = data if isinstance(data, list) else [data]
47+
limit = await self._calculate_limit_for_push(len(items))
48+
await super().push_data(items[:limit])
49+
await self._charge_for_items(limit)

src/apify/storage_clients/_file_system/_storage_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from crawlee.configuration import Configuration
88
from crawlee.storage_clients import FileSystemStorageClient
99

10+
from ._dataset_client import ApifyFileSystemDatasetClient
1011
from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient
1112

1213
if TYPE_CHECKING:
@@ -48,3 +49,20 @@ async def create_kvs_client(
4849
)
4950
await self._purge_if_needed(client, configuration)
5051
return client
52+
53+
@override
54+
async def create_dataset_client(
55+
self,
56+
*,
57+
id: str | None = None,
58+
name: str | None = None,
59+
alias: str | None = None,
60+
configuration: Configuration | None = None,
61+
) -> ApifyFileSystemDatasetClient:
62+
configuration = configuration or Configuration.get_global_configuration()
63+
return await ApifyFileSystemDatasetClient.open(
64+
id=id,
65+
name=name,
66+
alias=alias,
67+
configuration=configuration,
68+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx
2+
3+
4+
class _DatasetClientPPEMixin:
5+
"""A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events."""
6+
7+
def __init__(self) -> None:
8+
self._is_default_dataset: bool = False
9+
10+
@property
11+
def is_default_dataset(self) -> bool:
12+
return self._is_default_dataset
13+
14+
@is_default_dataset.setter
15+
def is_default_dataset(self, value: bool) -> None:
16+
self._is_default_dataset = value
17+
18+
async def _calculate_limit_for_push(self, items_count: int) -> int:
19+
if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()):
20+
max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit(
21+
event_name=DEFAULT_DATASET_ITEM_EVENT
22+
)
23+
return min(max_charged_count, items_count) if max_charged_count is not None else items_count
24+
return items_count
25+
26+
async def _charge_for_items(self, count_items: int) -> None:
27+
if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()):
28+
await charging_manager.charge(
29+
event_name=DEFAULT_DATASET_ITEM_EVENT,
30+
count=count_items,
31+
)

0 commit comments

Comments
 (0)