Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/crawlee/_autoscaling/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from crawlee._utils.byte_size import ByteSize

from crawlee._utils.byte_size import ByteSize, Ratio

SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97

Expand Down Expand Up @@ -97,8 +96,12 @@ class MemorySnapshot:
system_wide_used_size: ByteSize | None
"""Memory usage of all processes, system-wide."""

max_memory_size: ByteSize
"""The maximum memory that can be used by `AutoscaledPool`."""
max_memory_size: ByteSize | Ratio
Comment thread
vdusek marked this conversation as resolved.
Outdated
"""The maximum memory that can be used by `AutoscaledPool`.

When of type `ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory
scaling based on the available system memory.
Comment thread
Pijukatel marked this conversation as resolved.
Outdated
"""

system_wide_memory_size: ByteSize | None
"""Total memory available in the whole system."""
Expand Down
69 changes: 51 additions & 18 deletions src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from crawlee import service_locator
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.byte_size import ByteSize, Ratio
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.system import MemoryInfo, get_memory_info
from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info
from crawlee.events._types import Event, EventSystemInfoData

if TYPE_CHECKING:
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(
max_used_memory_ratio: float,
max_event_loop_delay: timedelta,
max_client_errors: int,
max_memory_size: ByteSize,
max_memory_size: ByteSize | Ratio,
) -> None:
"""Initialize a new instance.

Expand All @@ -85,7 +85,9 @@ def __init__(
value, the event loop is considered overloaded.
max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors
is higher than the provided number, the client is considered overloaded.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. When of type
`ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory
scaling based on the available system memory.
"""
self._max_used_cpu_ratio = max_used_cpu_ratio
self._max_used_memory_ratio = max_used_memory_ratio
Expand Down Expand Up @@ -121,7 +123,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter:
max_memory_size = (
ByteSize.from_mb(config.memory_mbytes)
if config.memory_mbytes
else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio))
else Ratio(value=config.available_memory_ratio)
)

return cls(
Expand Down Expand Up @@ -280,23 +282,51 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
Args:
event_data: System info data from which memory usage is read.
"""
match event_data.memory_info, self._max_memory_size:
case MemoryInfo() as memory_info, Ratio() as ratio:
max_memory_size = memory_info.total_size * ratio.value
system_wide_used_size = memory_info.system_wide_used_size
system_wide_memory_size = memory_info.total_size

case MemoryUsageInfo(), Ratio() as ratio:
# This is just hypothetical case, that should not happen in practice.
# `LocalEvenManager` should always provide `MemoryInfo` in the event data.
# When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`.
Comment thread
vdusek marked this conversation as resolved.
Outdated
max_memory_size = get_memory_info().total_size * ratio.value
system_wide_used_size = None
system_wide_memory_size = None

case MemoryInfo() as memory_info, ByteSize() as byte_size:
max_memory_size = byte_size
system_wide_used_size = memory_info.system_wide_used_size
system_wide_memory_size = memory_info.total_size

case MemoryUsageInfo(), ByteSize() as byte_size:
max_memory_size = byte_size
system_wide_used_size = None
system_wide_memory_size = None

case _, _:
raise NotImplementedError('Unsupported combination of memory info and max memory size types.')

snapshot = MemorySnapshot(
current_size=event_data.memory_info.current_size,
max_memory_size=self._max_memory_size,
max_memory_size=max_memory_size,
max_used_memory_ratio=self._max_used_memory_ratio,
created_at=event_data.memory_info.created_at,
system_wide_used_size=None,
system_wide_memory_size=None,
system_wide_used_size=system_wide_used_size,
system_wide_memory_size=system_wide_memory_size,
)

if isinstance(memory_info := event_data.memory_info, MemoryInfo):
snapshot.system_wide_used_size = memory_info.system_wide_used_size
snapshot.system_wide_memory_size = memory_info.total_size

snapshots = cast('list[Snapshot]', self._memory_snapshots)
self._prune_snapshots(snapshots, snapshot.created_at)
self._memory_snapshots.add(snapshot)
self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at)

self._evaluate_memory_load(
event_data.memory_info.current_size,
event_data.memory_info.created_at,
max_memory_size=max_memory_size,
)

def _snapshot_event_loop(self) -> None:
"""Capture a snapshot of the current event loop usage.
Expand Down Expand Up @@ -364,27 +394,30 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
else:
snapshots.clear()

def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime) -> None:
def _evaluate_memory_load(
self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime, max_memory_size: ByteSize
) -> None:
"""Evaluate and logs critical memory load conditions based on the system information.

Args:
current_memory_usage_size: The current memory usage.
snapshot_timestamp: The time at which the memory snapshot was taken.
max_memory_size: The maximum memory size to be used for evaluation.
"""
# Check if the warning has been logged recently to avoid spamming
if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD:
return

threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size
buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
threshold_memory_size = self._max_used_memory_ratio * max_memory_size
buffer_memory_size = max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
overload_memory_threshold_size = threshold_memory_size + buffer_memory_size

# Log a warning if current memory usage exceeds the critical overload threshold
if current_memory_usage_size > overload_memory_threshold_size:
memory_usage_percentage = round((current_memory_usage_size.bytes / self._max_memory_size.bytes) * 100)
memory_usage_percentage = round((current_memory_usage_size.bytes / max_memory_size.bytes) * 100)
logger.warning(
f'Memory is critically overloaded. Using {current_memory_usage_size} of '
f'{self._max_memory_size} ({memory_usage_percentage}%). '
f'{max_memory_size} ({memory_usage_percentage}%). '
'Consider increasing available memory.'
)
self._timestamp_of_last_memory_warning = snapshot_timestamp
10 changes: 9 additions & 1 deletion src/crawlee/_utils/byte_size.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from __future__ import annotations
Comment thread
vdusek marked this conversation as resolved.

from dataclasses import dataclass
from typing import Any
from typing import Annotated, Any

from pydantic import BaseModel, Field

_BYTES_PER_KB = 1024
_BYTES_PER_MB = _BYTES_PER_KB**2
_BYTES_PER_GB = _BYTES_PER_KB**3
_BYTES_PER_TB = _BYTES_PER_KB**4


class Ratio(BaseModel):
Comment thread
vdusek marked this conversation as resolved.
Outdated
"""Represents ratio of memory."""

value: Annotated[float, Field(gt=0.0, le=1.0)]
Comment thread
Pijukatel marked this conversation as resolved.
Outdated


@dataclass(frozen=True)
class ByteSize:
"""Represents a byte size."""
Expand Down
7 changes: 5 additions & 2 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,14 @@ class Configuration(BaseSettings):
validation_alias=AliasChoices(
'apify_available_memory_ratio',
'crawlee_available_memory_ratio',
)
),
gt=0.0,
le=1.0,
),
] = 0.25
"""The maximum proportion of system memory to use. If `memory_mbytes` is not provided, this ratio is used to
calculate the maximum memory. This option is utilized by the `Snapshotter`."""
calculate the maximum memory. This option is utilized by the `Snapshotter` and supports the dynamic system memory
scaling."""

storage_dir: Annotated[
str,
Expand Down
85 changes: 83 additions & 2 deletions tests/unit/_autoscaling/test_snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
import asyncio
from datetime import datetime, timedelta, timezone
from logging import getLogger
from math import floor
from typing import TYPE_CHECKING, cast
from unittest.mock import MagicMock

import pytest

from crawlee import service_locator
from crawlee._autoscaling import Snapshotter
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot
from crawlee._autoscaling._types import (
SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD,
ClientSnapshot,
CpuSnapshot,
MemorySnapshot,
Comment thread
Pijukatel marked this conversation as resolved.
)
from crawlee._autoscaling.snapshotter import SortedSnapshotList
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.system import CpuInfo, MemoryInfo
from crawlee._utils.system import CpuInfo, MemoryInfo, get_memory_info
from crawlee.configuration import Configuration
from crawlee.events import LocalEventManager
from crawlee.events._types import Event, EventSystemInfoData
Expand All @@ -22,6 +28,9 @@
from collections.abc import AsyncGenerator


MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD


@pytest.fixture
async def event_manager() -> AsyncGenerator[LocalEventManager, None]:
# Use a long interval to avoid interference from periodic system info events during tests and ensure the first
Expand Down Expand Up @@ -369,3 +378,75 @@ def test_sorted_snapshot_list_add_maintains_order() -> None:
prev_time = sorted_list[i - 1].created_at
curr_time = snapshot.created_at
assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order'


_initial_memory_info = get_memory_info()
Comment thread
vdusek marked this conversation as resolved.
Outdated


@pytest.mark.parametrize(
('available_memory_ratio', 'memory_mbytes', 'overloaded_after_scale_up'),
[
pytest.param(MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, None, False, id='Ratio-based memory limit'),
pytest.param(
MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD,
floor(_initial_memory_info.total_size.to_mb()),
True,
id='Fixed memory limit',
),
],
)
async def test_dynamic_memory(
*,
default_cpu_info: CpuInfo,
event_manager: LocalEventManager,
available_memory_ratio: float,
memory_mbytes: int | None,
overloaded_after_scale_up: bool,
) -> None:
"""Test dynamic memory scaling scenario where the system-wide memory can change.

Create two memory snapshots. They have same memory usage, but different available memory.
First snapshot is created with insufficient memory, so it is overloaded.
Second snapshot is created with sufficient memory.

Based on the Snapshotter configuration, it will either take into account the increased available memory or not.
"""

service_locator.set_event_manager(event_manager)

async with Snapshotter.from_config(
Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=available_memory_ratio)
) as snapshotter:
# Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded
memory_infos = [
# Overloaded sample
MemoryInfo(
total_size=_initial_memory_info.total_size,
current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD,
system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD,
),
# Same as first sample, with twice as memory available in the system
MemoryInfo(
total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory
current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD,
system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD,
),
]

for memory_info in memory_infos:
event_manager.emit(
event=Event.SYSTEM_INFO,
event_data=EventSystemInfoData(
cpu_info=default_cpu_info,
memory_info=memory_info,
),
)

await event_manager.wait_for_all_listeners_to_complete()

memory_samples = snapshotter.get_memory_sample()
assert len(memory_samples) == 2
# First sample will be overloaded.
assert memory_samples[0].is_overloaded
# Second sample can reflect the increased available memory based on the configuration used to create Snapshotter
assert memory_samples[1].is_overloaded == overloaded_after_scale_up