forked from apify/apify-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_charging.py
More file actions
464 lines (364 loc) · 17.8 KB
/
_charging.py
File metadata and controls
464 lines (364 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
from __future__ import annotations
import asyncio
import math
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import TYPE_CHECKING, Protocol, TypedDict
from pydantic import TypeAdapter
from apify._models import (
ActorRun,
FlatPricePerMonthActorPricingInfo,
FreeActorPricingInfo,
PayPerEventActorPricingInfo,
PricePerDatasetItemActorPricingInfo,
PricingModel,
)
from apify._utils import docs_group, ensure_context
from apify.log import logger
from apify.storages import Dataset
if TYPE_CHECKING:
from types import TracebackType
from apify_client import ApifyClientAsync
from apify._configuration import Configuration
run_validator = TypeAdapter[ActorRun | None](ActorRun | None)
DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item'
# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to
# access the charging manager without needing to pass it explicitly.
charging_manager_ctx: ContextVar[ChargingManager | None] = ContextVar('charging_manager_ctx', default=None)
_ensure_context = ensure_context('active')
@docs_group('Charging')
class ChargingManager(Protocol):
"""Provides fine-grained access to pay-per-event functionality.
The ChargingManager allows you to charge for specific events in your Actor when using
the pay-per-event pricing model. This enables precise cost control and transparent
billing for different operations within your Actor.
### References
- Apify platform documentation: https://docs.apify.com/platform/actors/publishing/monetize
"""
charge_lock: asyncio.Lock
"""Lock to synchronize charge operations. Prevents race conditions between `charge` and `push_data` calls."""
async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
"""Charge for a specified number of events - sub-operations of the Actor.
This is relevant only for the pay-per-event pricing model.
Args:
event_name: Name of the event to be charged for.
count: Number of events to charge for.
"""
def calculate_total_charged_amount(self) -> Decimal:
"""Calculate the total amount of money charged for pay-per-event events so far."""
def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
"""Calculate how many instances of an event can be charged before we reach the configured limit.
Args:
event_name: Name of the inspected event.
"""
def get_pricing_info(self) -> ActorPricingInfo:
"""Retrieve detailed information about the effective pricing of the current Actor run.
This can be used for instance when your code needs to support multiple pricing models in transition periods.
"""
def get_charged_event_count(self, event_name: str) -> int:
"""Get the number of events with the given name that were charged so far.
Args:
event_name: Name of the inspected event.
"""
def get_max_total_charge_usd(self) -> Decimal:
"""Get the configured maximum total charge for this Actor run."""
def compute_push_data_limit(
self,
items_count: int,
event_name: str,
*,
is_default_dataset: bool,
) -> int:
"""Compute how many items can be pushed and charged within the current budget.
Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event,
so that the combined cost per item does not exceed the remaining budget.
Args:
items_count: The number of items to be pushed.
event_name: The explicit event name to charge for each item.
is_default_dataset: Whether the data is pushed to the default dataset.
If True, the synthetic event cost is included in the combined price.
Returns:
Max number of items that can be pushed within the budget.
"""
@docs_group('Charging')
@dataclass(frozen=True)
class ChargeResult:
"""Result of the `ChargingManager.charge` method."""
event_charge_limit_reached: bool
"""If true, no more events of this type can be charged within the limit."""
charged_count: int
"""Total amount of charged events - may be lower than the requested amount."""
chargeable_within_limit: dict[str, int | None]
"""How many events of each known type can still be charged within the limit."""
@docs_group('Charging')
@dataclass
class ActorPricingInfo:
"""Result of the `ChargingManager.get_pricing_info` method."""
pricing_model: PricingModel | None
"""The currently effective pricing model."""
max_total_charge_usd: Decimal
"""A configured limit for the total charged amount - if you exceed it, you won't receive more money than this."""
is_pay_per_event: bool
"""A shortcut - true if the Actor runs with the pay-per-event pricing model."""
per_event_prices: dict[str, Decimal]
"""Price of every known event type."""
class ChargingManagerImplementation(ChargingManager):
"""Implementation of the `ChargingManager` Protocol - this is only meant to be instantiated internally."""
LOCAL_CHARGING_LOG_DATASET_NAME = 'charging-log'
def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None:
self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf')
self._configuration = configuration
self._is_at_home = configuration.is_at_home
self._actor_run_id = configuration.actor_run_id
self._purge_charging_log_dataset = configuration.purge_on_start
self._pricing_model: PricingModel | None = None
self._client = client
self._charging_log_dataset: Dataset | None = None
self._charging_state: dict[str, ChargingStateItem] = {}
self._pricing_info: dict[str, PricingInfoItem] = {}
self._not_ppe_warning_printed = False
self.active = False
self.charge_lock = asyncio.Lock()
async def __aenter__(self) -> None:
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
# Validate config
if self._configuration.test_pay_per_event and self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)
self.active = True
# Retrieve pricing information from env vars or API
pricing_data = await self._fetch_pricing_info()
pricing_info = pricing_data['pricing_info']
charged_event_counts = pricing_data['charged_event_counts']
max_total_charge_usd = pricing_data['max_total_charge_usd']
# Set pricing model
if self._configuration.test_pay_per_event:
self._pricing_model: PricingModel = 'PAY_PER_EVENT'
else:
self._pricing_model = pricing_info.pricing_model if pricing_info else None
# Load per-event pricing information
if pricing_info is not None and isinstance(pricing_info, PayPerEventActorPricingInfo):
actor_charge_events = pricing_info.pricing_per_event.actor_charge_events or {}
for event_name, event_pricing in actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=Decimal(str(event_pricing.event_price_usd)),
title=event_pricing.event_title,
)
self._max_total_charge_usd = max_total_charge_usd
# Load charged event counts
for event_name, count in charged_event_counts.items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)
# Set up charging log dataset for local development
if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT':
# We are not running on the Apify platform, but PPE is enabled for testing - open a dataset that
# will contain a log of all charge calls for debugging purposes.
if self._purge_charging_log_dataset:
dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
await dataset.drop()
self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
# if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset
# clients can access the charging manager and charge for synthetic events.
if self._pricing_model == 'PAY_PER_EVENT':
charging_manager_ctx.set(self)
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
if not self.active:
raise RuntimeError('Exiting an uninitialized ChargingManager')
charging_manager_ctx.set(None)
self.active = False
@_ensure_context
@_ensure_context
async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
def calculate_chargeable() -> dict[str, int | None]:
"""Calculate the maximum number of events of each type that can be charged within the current budget."""
return {
event_name: self.calculate_max_event_charge_count_within_limit(event_name)
for event_name in self._pricing_info
}
# For runs that do not use the pay-per-event pricing model, just print a warning and return
if self._pricing_model != 'PAY_PER_EVENT':
if not self._not_ppe_warning_printed:
logger.warning(
'Ignored attempt to charge for an event - the Actor does not use the pay-per-event pricing'
)
self._not_ppe_warning_printed = True
return ChargeResult(
event_charge_limit_reached=False,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)
# START OF CRITICAL SECTION - no awaits here
# Determine the maximum amount of events that can be charged within the budget
max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name)
charged_count = min(count, max_chargeable if max_chargeable is not None else count)
if charged_count == 0:
return ChargeResult(
event_charge_limit_reached=True,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)
pricing_info = self._pricing_info.get(
event_name,
PricingInfoItem(
# Use a nonzero price for local development so that the maximum budget can be reached.
price=Decimal() if self._is_at_home else Decimal(1),
title=f"Unknown event '{event_name}'",
),
)
# Update the charging state
self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal()))
self._charging_state[event_name].charge_count += charged_count
self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price
# END OF CRITICAL SECTION
# If running on the platform, call the charge endpoint
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not configured')
if event_name.startswith('apify-'):
# Synthetic events (e.g. apify-default-dataset-item) are tracked internally only,
# the platform handles them automatically based on dataset writes.
pass
elif event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
else:
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")
# Log the charged operation (if enabled)
if self._charging_log_dataset:
await self._charging_log_dataset.push_data(
{
'event_name': event_name,
'event_title': pricing_info.title,
'event_price_usd': round(pricing_info.price, 3),
'charged_count': charged_count,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
)
# If it is not possible to charge the full amount, log that fact
if charged_count < count:
subject = 'instance' if count == 1 else 'instances'
logger.info(
f"Charging {count} {subject} of '{event_name}' event would exceed max_total_charge_usd "
f'- only {charged_count} events were charged'
)
max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name)
return ChargeResult(
event_charge_limit_reached=max_charge_count is not None and max_charge_count <= 0,
charged_count=charged_count,
chargeable_within_limit=calculate_chargeable(),
)
@_ensure_context
@_ensure_context
def calculate_total_charged_amount(self) -> Decimal:
return sum(
(item.total_charged_amount for item in self._charging_state.values()),
start=Decimal(),
)
@_ensure_context
@_ensure_context
def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
price = self._get_event_price(event_name)
if not price:
return None
result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / price
return max(0, math.floor(result)) if result.is_finite() else None
@_ensure_context
@_ensure_context
def get_pricing_info(self) -> ActorPricingInfo:
return ActorPricingInfo(
pricing_model=self._pricing_model,
is_pay_per_event=self._pricing_model == 'PAY_PER_EVENT',
max_total_charge_usd=self._max_total_charge_usd
if self._max_total_charge_usd is not None
else Decimal('inf'),
per_event_prices={
event_name: pricing_info.price for event_name, pricing_info in self._pricing_info.items()
},
)
@_ensure_context
@_ensure_context
def get_charged_event_count(self, event_name: str) -> int:
item = self._charging_state.get(event_name)
return item.charge_count if item is not None else 0
@_ensure_context
@_ensure_context
def get_max_total_charge_usd(self) -> Decimal:
return self._max_total_charge_usd
@_ensure_context
def compute_push_data_limit(
self,
items_count: int,
event_name: str,
*,
is_default_dataset: bool,
) -> int:
explicit_price = self._get_event_price(event_name)
synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0)
combined_price = explicit_price + synthetic_price
if not combined_price:
return items_count
result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price
max_count = max(0, math.floor(result)) if result.is_finite() else items_count
return min(items_count, max_count)
async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
"""Fetch pricing information from environment variables or API."""
# Check if pricing info is available via environment variables
if self._configuration.actor_pricing_info is not None and self._configuration.charged_event_counts is not None:
return _FetchedPricingInfoDict(
pricing_info=self._configuration.actor_pricing_info,
charged_event_counts=self._configuration.charged_event_counts,
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
)
# Fall back to API call
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')
run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')
max_charge = run.options.max_total_charge_usd
return _FetchedPricingInfoDict(
pricing_info=run.pricing_info,
charged_event_counts=run.charged_event_counts or {},
max_total_charge_usd=Decimal(str(max_charge)) if max_charge is not None else Decimal('inf'),
)
# Local development without environment variables
return _FetchedPricingInfoDict(
pricing_info=None,
charged_event_counts={},
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
)
def _get_event_price(self, event_name: str) -> Decimal:
pricing_info = self._pricing_info.get(event_name)
if pricing_info is not None:
return pricing_info.price
return Decimal(0) if self._is_at_home else Decimal(1)
@dataclass
class ChargingStateItem:
charge_count: int
total_charged_amount: Decimal
@dataclass
class PricingInfoItem:
price: Decimal
title: str
class _FetchedPricingInfoDict(TypedDict):
pricing_info: (
FreeActorPricingInfo
| FlatPricePerMonthActorPricingInfo
| PricePerDatasetItemActorPricingInfo
| PayPerEventActorPricingInfo
| None
)
charged_event_counts: dict[str, int]
max_total_charge_usd: Decimal