Skip to content

Commit 51e28b5

Browse files
committed
update cherge_lock
1 parent 34e4617 commit 51e28b5

File tree

7 files changed

+111
-31
lines changed

7 files changed

+111
-31
lines changed

src/apify/_actor.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -380,14 +380,6 @@ def event_manager(self) -> EventManager:
380380
def _charging_manager_implementation(self) -> ChargingManagerImplementation:
381381
return ChargingManagerImplementation(self.configuration, self.apify_client)
382382

383-
@cached_property
384-
def _charge_lock(self) -> asyncio.Lock:
385-
"""Lock to synchronize charge operations.
386-
387-
Prevents race conditions between Actor.charge and Actor.push_data calls.
388-
"""
389-
return asyncio.Lock()
390-
391383
@cached_property
392384
def _storage_client(self) -> SmartApifyStorageClient:
393385
"""Storage client used by the Actor.
@@ -642,16 +634,18 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non
642634
if charged_event_name and charged_event_name.startswith('apify-'):
643635
raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually')
644636

637+
charging_manager = self.get_charging_manager()
638+
645639
# Acquire the charge lock to prevent race conditions between concurrent
646640
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
647-
async with self._charge_lock:
641+
async with charging_manager.charge_lock:
648642
# No explicit charging requested; synthetic events are handled within dataset.push_data.
649643
if charged_event_name is None:
650644
dataset = await self.open_dataset()
651645
await dataset.push_data(data)
652646
return None
653647

654-
pushed_items_count = self.get_charging_manager().calculate_push_data_limit(
648+
pushed_items_count = self.get_charging_manager().compute_push_data_limit(
655649
items_count=len(data),
656650
event_name=charged_event_name,
657651
is_default_dataset=True,
@@ -730,8 +724,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
730724
count: Number of events to charge for.
731725
"""
732726
# Acquire lock to prevent race conditions with concurrent charge/push_data calls.
733-
async with self._charge_lock:
734-
return await self.get_charging_manager().charge(event_name, count)
727+
charging_manager = self.get_charging_manager()
728+
async with charging_manager.charge_lock:
729+
return await charging_manager.charge(event_name, count)
735730

736731
@overload
737732
def on(

src/apify/_charging.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import math
45
from contextvars import ContextVar
56
from dataclasses import dataclass
@@ -52,6 +53,9 @@ class ChargingManager(Protocol):
5253
- Apify platform documentation: https://docs.apify.com/platform/actors/publishing/monetize
5354
"""
5455

56+
charge_lock: asyncio.Lock
57+
"""Lock to synchronize charge operations. Prevents race conditions between `charge` and `push_data` calls."""
58+
5559
async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
5660
"""Charge for a specified number of events - sub-operations of the Actor.
5761
@@ -88,14 +92,14 @@ def get_charged_event_count(self, event_name: str) -> int:
8892
def get_max_total_charge_usd(self) -> Decimal:
8993
"""Get the configured maximum total charge for this Actor run."""
9094

91-
def calculate_push_data_limit(
95+
def compute_push_data_limit(
9296
self,
9397
items_count: int,
9498
event_name: str,
9599
*,
96100
is_default_dataset: bool,
97101
) -> int:
98-
"""Calculate how many items can be pushed and charged within the current budget.
102+
"""Compute how many items can be pushed and charged within the current budget.
99103
100104
Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event,
101105
so that the combined cost per item does not exceed the remaining budget.
@@ -166,6 +170,8 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No
166170
self._not_ppe_warning_printed = False
167171
self.active = False
168172

173+
self.charge_lock = asyncio.Lock()
174+
169175
async def __aenter__(self) -> None:
170176
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
171177
# Validate config
@@ -370,7 +376,7 @@ def get_max_total_charge_usd(self) -> Decimal:
370376
return self._max_total_charge_usd
371377

372378
@_ensure_context
373-
def calculate_push_data_limit(
379+
def compute_push_data_limit(
374380
self,
375381
items_count: int,
376382
event_name: str,

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ async def payloads_generator(items: list[Any]) -> AsyncIterator[str]:
142142
for index, item in enumerate(items):
143143
yield await self._check_and_serialize(item, index)
144144

145-
async with self._lock:
145+
async with self._lock, self._charge_lock():
146146
items = data if isinstance(data, list) else [data]
147-
limit = self._calculate_limit_for_push(len(items))
147+
limit = self._compute_limit_for_push(len(items))
148148
items = items[:limit]
149149

150150
async for chunk in self._chunk_by_size(payloads_generator(items)):

src/apify/storage_clients/_file_system/_dataset_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ async def open(
4848

4949
@override
5050
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
51-
async with self._lock:
51+
async with self._lock, self._charge_lock():
5252
items = data if isinstance(data, list) else [data]
53-
limit = self._calculate_limit_for_push(len(items))
53+
limit = self._compute_limit_for_push(len(items))
5454

5555
new_item_count = self._metadata.item_count
56-
for item in items:
56+
for item in items[:limit]:
5757
new_item_count += 1
5858
await self._push_item(item, new_item_count)
5959

src/apify/storage_clients/_ppe_dataset_mixin.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
1+
from __future__ import annotations
2+
3+
from contextlib import asynccontextmanager
4+
from typing import TYPE_CHECKING
5+
16
from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx
27

8+
if TYPE_CHECKING:
9+
from collections.abc import AsyncIterator
10+
311

412
class DatasetClientPpeMixin:
513
"""A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events."""
614

715
def __init__(self) -> None:
816
self.is_default_dataset = False
917

10-
def _calculate_limit_for_push(self, items_count: int) -> int:
18+
def _compute_limit_for_push(self, items_count: int) -> int:
1119
if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()):
1220
max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit(
1321
event_name=DEFAULT_DATASET_ITEM_EVENT
@@ -21,3 +29,18 @@ async def _charge_for_items(self, count_items: int) -> None:
2129
event_name=DEFAULT_DATASET_ITEM_EVENT,
2230
count=count_items,
2331
)
32+
33+
@asynccontextmanager
34+
async def _charge_lock(self) -> AsyncIterator[None]:
35+
"""Context manager to acquire the charge lock if PPE charging manager is active."""
36+
charging_manager = charging_manager_ctx.get()
37+
if charging_manager:
38+
if charging_manager.charge_lock.locked():
39+
# If the charge lock is already locked, it means we're called from within Actor.push_data which
40+
# already holds the lock. asyncio.Lock is not reentrant, so re-acquiring would deadlock.
41+
yield
42+
else:
43+
async with charging_manager.charge_lock:
44+
yield
45+
else:
46+
yield

tests/unit/actor/test_actor_charge.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from collections.abc import AsyncGenerator
23
from contextlib import asynccontextmanager
34
from decimal import Decimal
@@ -186,6 +187,61 @@ async def test_push_data_charges_synthetic_event_for_default_dataset() -> None:
186187
assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 3
187188

188189

190+
async def test_charge_lock_concurrent_actor_and_dataset_push() -> None:
191+
"""Test that charge_lock properly synchronizes concurrent Actor.push_data and dataset.push_data calls."""
192+
async with setup_mocked_charging(
193+
Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True)
194+
) as setup:
195+
setup.charging_mgr._pricing_info['event'] = PricingInfoItem(Decimal('0.10'), 'Event')
196+
setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem(
197+
Decimal('0.10'), 'Dataset item'
198+
)
199+
200+
dataset = await Actor.open_dataset()
201+
202+
# Run concurrent pushes - Actor.push_data and direct dataset.push_data
203+
await asyncio.gather(
204+
Actor.push_data([{'source': 'actor', 'id': i} for i in range(5)], 'event'),
205+
dataset.push_data([{'source': 'dataset', 'id': i} for i in range(5)]),
206+
)
207+
208+
# Verify all items were pushed
209+
items = await dataset.get_data()
210+
assert len(items.items) == 10
211+
212+
# Verify charging was tracked correctly:
213+
# - Actor.push_data charged 'event' (5) + 'apify-default-dataset-item' (5)
214+
# - dataset.push_data charged 'apify-default-dataset-item' (5)
215+
assert setup.charging_mgr.get_charged_event_count('event') == 5
216+
assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 10
217+
218+
219+
async def test_charge_lock_concurrent_with_limited_budget() -> None:
220+
"""Test that charge_lock correctly limits items when concurrent pushes compete for limited budget."""
221+
async with setup_mocked_charging(
222+
Configuration(max_total_charge_usd=Decimal('0.50'), test_pay_per_event=True)
223+
) as setup:
224+
# Each default dataset item costs $0.10, so max 5 items total
225+
setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem(
226+
Decimal('0.10'), 'Dataset item'
227+
)
228+
229+
dataset = await Actor.open_dataset()
230+
231+
# Both try to push 5 items, but budget only allows 5 total
232+
await asyncio.gather(
233+
dataset.push_data([{'source': 'a', 'id': i} for i in range(5)]),
234+
dataset.push_data([{'source': 'b', 'id': i} for i in range(5)]),
235+
)
236+
237+
# Verify total items pushed does not exceed budget limit
238+
items = await dataset.get_data()
239+
assert len(items.items) == 5 # Budget allows max 5 items at $0.10 each
240+
241+
# Verify total charged events matches items pushed
242+
assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 5
243+
244+
189245
async def test_charge_with_overdrawn_budget() -> None:
190246
configuration = Configuration(
191247
max_total_charge_usd=Decimal('0.00025'),

tests/unit/actor/test_charging_manager.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,16 @@ async def test_get_max_total_charge_usd(mock_client: MagicMock) -> None:
247247
assert cm.get_max_total_charge_usd() == Decimal('42.50')
248248

249249

250-
async def test_calculate_push_data_limit_no_ppe(mock_client: MagicMock) -> None:
250+
async def test_compute_push_data_limit_no_ppe(mock_client: MagicMock) -> None:
251251
"""Returns items_count when no PPE pricing is configured (prices are zero)."""
252252
config = _make_config(actor_pricing_info=None, charged_event_counts={})
253253
cm = ChargingManagerImplementation(config, mock_client)
254254
async with cm:
255-
result = cm.calculate_push_data_limit(10, 'some-event', is_default_dataset=True)
255+
result = cm.compute_push_data_limit(10, 'some-event', is_default_dataset=True)
256256
assert result == 10
257257

258258

259-
async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) -> None:
259+
async def test_compute_push_data_limit_within_budget(mock_client: MagicMock) -> None:
260260
"""Returns full items_count when combined budget is sufficient for all items."""
261261
pricing_info = _make_ppe_pricing_info({'click': Decimal('0.01'), 'apify-default-dataset-item': Decimal('0.01')})
262262
config = _make_config(
@@ -268,11 +268,11 @@ async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) -
268268
cm = ChargingManagerImplementation(config, mock_client)
269269
async with cm:
270270
# combined price = 0.02/item, budget = 10.00, max = 500
271-
result = cm.calculate_push_data_limit(5, 'click', is_default_dataset=True)
271+
result = cm.compute_push_data_limit(5, 'click', is_default_dataset=True)
272272
assert result == 5
273273

274274

275-
async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None:
275+
async def test_compute_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None:
276276
"""Returns capped count when combined price (explicit + synthetic) exceeds budget."""
277277
pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')})
278278
config = _make_config(
@@ -284,11 +284,11 @@ async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock)
284284
cm = ChargingManagerImplementation(config, mock_client)
285285
async with cm:
286286
# combined price = 2.00/item, budget = 3.00, max = floor(3/2) = 1
287-
result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=True)
287+
result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=True)
288288
assert result == 1
289289

290290

291-
async def test_calculate_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None:
291+
async def test_compute_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None:
292292
"""When not pushing to the default dataset, only explicit event price is considered."""
293293
pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')})
294294
config = _make_config(
@@ -300,11 +300,11 @@ async def test_calculate_push_data_limit_without_default_dataset(mock_client: Ma
300300
cm = ChargingManagerImplementation(config, mock_client)
301301
async with cm:
302302
# explicit price only = 1.00/item, budget = 3.00, max = floor(3/1) = 3
303-
result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False)
303+
result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False)
304304
assert result == 3
305305

306306

307-
async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None:
307+
async def test_compute_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None:
308308
"""Returns 0 when the budget is fully exhausted before the push."""
309309
pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00')})
310310
config = _make_config(
@@ -315,7 +315,7 @@ async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock
315315
)
316316
cm = ChargingManagerImplementation(config, mock_client)
317317
async with cm:
318-
result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False)
318+
result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False)
319319
assert result == 0
320320

321321

0 commit comments

Comments
 (0)