-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathconftest.py
More file actions
70 lines (49 loc) · 2.53 KB
/
Copy pathconftest.py
File metadata and controls
70 lines (49 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import annotations
import os
from typing import TYPE_CHECKING
import pytest
from apify_shared.consts import ApifyEnvVars
from crawlee import service_locator
import apify._actor
from apify.storage_clients._apify._alias_resolving import AliasResolver
if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path
@pytest.fixture
def prepare_test_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Callable[[], None]:
"""Prepare the testing environment by resetting the global state before each test.
This fixture ensures that the global state of the package is reset to a known baseline before each test runs.
It also configures a temporary storage directory for test isolation.
Args:
monkeypatch: Test utility provided by pytest for patching.
tmp_path: A unique temporary directory path provided by pytest for test isolation.
Returns:
A callable that prepares the test environment.
"""
def _prepare_test_env() -> None:
# Reset the Actor class state.
apify._actor.Actor.__wrapped__.__class__._is_any_instance_initialized = False # ty: ignore[unresolved-attribute]
apify._actor.Actor.__wrapped__.__class__._is_rebooting = False # ty: ignore[unresolved-attribute]
delattr(apify._actor.Actor, '__wrapped__')
# Set the environment variable for the local storage directory to the temporary path.
monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, str(tmp_path))
# Reset the services in the service locator.
service_locator._configuration = None
service_locator._event_manager = None
service_locator._storage_client = None
service_locator.storage_instance_manager.clear_cache()
# Reset the AliasResolver class state.
AliasResolver._alias_map = {}
AliasResolver._alias_init_lock = None
# Verify that the test environment was set up correctly.
assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path)
return _prepare_test_env
@pytest.fixture(autouse=True)
def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None:
"""Isolate the testing environment by resetting global state before and after each test.
This fixture ensures that each test starts with a clean slate and that any modifications during the test
do not affect subsequent tests. It runs automatically for all tests.
Args:
prepare_test_env: Fixture to prepare the environment before each test.
"""
prepare_test_env()