Skip to content

Commit 02db7ca

Browse files
authored
chore(tests): add test suite for semaphore usage (e.g. multithreaded env) (#2596)
1 parent 642b9bb commit 02db7ca

17 files changed

Lines changed: 650 additions & 85 deletions

cognite/client/utils/_concurrency.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,13 @@ def __init__(self) -> None:
262262
write_schema=1,
263263
)
264264

265+
@functools.cached_property
266+
def _all_concurrency_configs(self) -> list[ConcurrencyConfig]:
267+
"""Helper method primarily used in testing to handle the 'annoying' state of concurrency settings"""
268+
configs = [name for name, val in vars(type(self)).items() if isinstance(val, property)]
269+
configs.remove("is_frozen")
270+
return [getattr(self, name) for name in configs]
271+
265272
def _check_frozen(self, name: str, api_name: str) -> None:
266273
if self.__frozen:
267274
raise RuntimeError(
@@ -556,22 +563,28 @@ def start(self) -> None:
556563

557564

558565
# We need this in order to support a synchronous Cognite client.
559-
_INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON: EventLoopThreadExecutor
566+
_INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON: EventLoopThreadExecutor | None = None
567+
_EXECUTOR_INIT_LOCK = threading.Lock()
560568

561569

562570
def _get_event_loop_executor() -> EventLoopThreadExecutor:
563571
global _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON
564-
try:
572+
573+
# Fast path: singleton already initialized — no lock needed.
574+
if _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON is not None:
575+
return _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON
576+
577+
# Slow path: serialize initialization. Without this, multiple threads racing on the first
578+
# call (e.g. a ThreadPoolExecutor of sync clients) can each construct their own executor
579+
# and end up using different background loops, breaking the per-loop semaphore cache key.
580+
with _EXECUTOR_INIT_LOCK:
581+
if _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON is None:
582+
ex_cls = EventLoopThreadExecutor
583+
if _RUNNING_IN_BROWSER:
584+
ex_cls = cast(type[EventLoopThreadExecutor], _PyodideEventLoopExecutor)
585+
_INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON = ex_cls()
586+
_INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON.start()
565587
return _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON
566-
except NameError:
567-
# First time we need to initialize:
568-
ex_cls = EventLoopThreadExecutor
569-
if _RUNNING_IN_BROWSER:
570-
ex_cls = cast(type[EventLoopThreadExecutor], _PyodideEventLoopExecutor)
571-
572-
executor = _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON = ex_cls()
573-
executor.start()
574-
return executor
575588

576589

577590
async def execute_async_tasks_with_fail_fast(tasks: list[AsyncSDKTask]) -> TasksSummary:

pytest.ini

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
[pytest]
22
markers =
3-
dsl
4-
coredeps
3+
dsl: requires optional data-science deps (e.g. pandas/numpy) - skipped when only core deps are installed
4+
coredeps: runs only with --test-deps-only-core (verifies that optional deps are NOT installed in the core-only env)
5+
allow_no_semaphore: opts a test out of the strict semaphore check (see tests/conftest.py)
56

67
anyio_mode = auto
78

tests/conftest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,56 @@
22

33
import platform
44
from collections.abc import Callable, Iterator
5+
from typing import Any
56

67
import dotenv
78
import pytest
89
from _pytest.monkeypatch import MonkeyPatch
910

1011
from cognite.client import global_config
1112
from cognite.client._api_client import APIClient
13+
from cognite.client._http_client import AsyncHTTPClientWithRetry
1214

1315
dotenv.load_dotenv()
1416

1517
global_config.disable_pypi_version_check = True
1618

19+
ALLOW_NO_SEMAPHORE_MARKER = "allow_no_semaphore"
20+
21+
22+
@pytest.fixture(autouse=True)
23+
def require_semaphore_on_every_request(request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch) -> None:
24+
"""As a main rule, all API calls in the test suite MUST route through a semaphore.
25+
26+
Production code falls back to ``nullcontext()`` when ``semaphore is None``; that path
27+
exists for users calling top-level ``client.post(...)``/``client.get(...)`` with raw URLs,
28+
but no internal API method should ever hit it. Patching here turns a missing semaphore
29+
into a hard failure so a regression (forgotten ``semaphore=...`` arg) shows up loudly.
30+
31+
Tests that legitimately exercise the None path opt out via ``@pytest.mark.allow_no_semaphore("<reason>")``.
32+
These are the top-level methods mentioned above plus any method entering the semaphore
33+
at a higher level than at the HTTP request level (e.g. datapoints.insert)
34+
"""
35+
if request.node.get_closest_marker(ALLOW_NO_SEMAPHORE_MARKER):
36+
return
37+
38+
original = AsyncHTTPClientWithRetry._with_retry
39+
40+
async def strict(
41+
self: AsyncHTTPClientWithRetry, coro_factory: Any, *, url: str, headers: Any, semaphore: Any
42+
) -> Any:
43+
if semaphore is None:
44+
pytest.fail(
45+
f"Internal API call to {url!r} did not pass a semaphore. "
46+
"All endpoints behind client.<api>.<method> must route through a semaphore — "
47+
"the nullcontext fallback is reserved for top-level client.post/get calls only. "
48+
"If this call legitimately holds the semaphore at a higher level, mark the test "
49+
f"with @pytest.mark.{ALLOW_NO_SEMAPHORE_MARKER}('<reason>')."
50+
)
51+
return await original(self, coro_factory, url=url, headers=headers, semaphore=semaphore)
52+
53+
monkeypatch.setattr(AsyncHTTPClientWithRetry, "_with_retry", strict)
54+
1755

1856
@pytest.fixture(scope="session")
1957
def anyio_backend() -> str:

tests/tests_integration/test_api/test_datapoint_subscriptions.py

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,16 @@
22

33
import math
44
import random
5-
import time
65
import unittest
76
from collections.abc import Iterator
87
from contextlib import contextmanager
9-
from datetime import datetime
108

119
import numpy as np
1210
import pandas as pd
1311
import pytest
1412

1513
from cognite.client import CogniteClient
16-
from cognite.client.data_classes import TimeSeries, TimeSeriesWrite, filters
14+
from cognite.client.data_classes import TimeSeriesWrite, filters
1715
from cognite.client.data_classes.data_modeling import NodeId, SpaceApply
1816
from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteTimeSeriesApply
1917
from cognite.client.data_classes.datapoints_subscriptions import (
@@ -302,6 +300,7 @@ def test_iterate_data_subscription_changed_time_series(
302300
assert {a.external_id for a in batch.subscription_changes.removed} == {first_ts}
303301
assert len(batch.updates) == 0
304302

303+
@pytest.mark.allow_no_semaphore("Part of the tests hits DatapointsAPI._insert_datapoints")
305304
def test_iterate_data_subscription_datapoints_added(
306305
self, cognite_client: CogniteClient, time_series_external_ids: list[str]
307306
) -> None:
@@ -384,6 +383,7 @@ def test_iterate_data_subscription_start_1m_ago(
384383
"you run these tests, please wait 1 minute and try again.",
385384
)
386385

386+
@pytest.mark.allow_no_semaphore("Part of the tests hits DatapointsAPI._insert_datapoints")
387387
def test_iterate_data__using_status_codes(
388388
self, cognite_client: CogniteClient, sub_for_status_codes: DatapointSubscription
389389
) -> None:
@@ -429,63 +429,3 @@ def test_iterate_data__using_status_codes(
429429
assert bad_upsert_value[0] is None
430430
assert all(isinstance(v, float) for v in bad_upsert_value[1:])
431431
assert no_bad[0].upserts.status_symbol == ["Uncertain", "Good", "Good", "Good"]
432-
433-
@pytest.mark.skip(reason="Using a filter (as opposed to specific identifiers) is eventually consistent")
434-
def test_update_filter_subscription_added_times_series(
435-
self, cognite_client: CogniteClient, time_series_external_ids: list[str]
436-
) -> None:
437-
f = filters
438-
p = DatapointSubscriptionProperty
439-
numerical_timeseries = f.And(
440-
f.Equals(p.is_string, False), f.Prefix(p.external_id, "PYSDK DataPoint Subscription Test")
441-
)
442-
new_subscription = DataPointSubscriptionWrite(
443-
external_id=f"PYSDKDataPointSubscriptionUpdateFilterTest-{random_string(10)}",
444-
name="PYSDKDataPointSubscriptionUpdateFilterTest",
445-
filter=numerical_timeseries,
446-
partition_count=1,
447-
)
448-
created_timeseries: TimeSeries | None = None
449-
with create_subscription_with_cleanup(cognite_client, new_subscription) as created:
450-
assert created.created_time
451-
452-
initial_added_count = 0
453-
for batch in cognite_client.time_series.subscriptions.iterate_data(
454-
new_subscription.external_id,
455-
poll_timeout=0,
456-
):
457-
initial_added_count += len(batch.subscription_changes.added)
458-
if not batch.has_next:
459-
break
460-
461-
assert initial_added_count > 0, "There should be at least one numerical timeseries added"
462-
463-
new_numerical_timeseries = TimeSeriesWrite(
464-
external_id=f"PYSDK DataPoint Subscription Test 42 ({random_string(10)})",
465-
name="PYSDK DataPoint Subscription Test 42",
466-
is_string=False,
467-
)
468-
try:
469-
created_timeseries = cognite_client.time_series.create(new_numerical_timeseries)
470-
cognite_client.time_series.data.insert(
471-
[(datetime.now(), 42)], external_id=new_numerical_timeseries.external_id
472-
)
473-
# Ensure that the subscription has been updated
474-
time.sleep(10)
475-
476-
updated_added_count = 0
477-
for batch in cognite_client.time_series.subscriptions.iterate_data(
478-
new_subscription.external_id,
479-
poll_timeout=0,
480-
):
481-
updated_added_count += len(batch.subscription_changes.added)
482-
if not batch.has_next:
483-
break
484-
485-
assert initial_added_count + 1 == updated_added_count, (
486-
"The new timeseries should be added. This is most likely because using a filter with "
487-
"datapoint subscriptions is eventually consistent."
488-
)
489-
finally:
490-
if created_timeseries:
491-
cognite_client.time_series.delete(created_timeseries.id)

tests/tests_integration/test_api/test_datapoints.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,10 @@ def ts_create_in_dms(
501501

502502

503503
class TestTimeSeriesCreatedInDMS:
504+
@pytest.mark.allow_no_semaphore(
505+
"Test inserts datapoints; DatapointsAPI._insert_datapoints holds the semaphore via outer "
506+
"'async with' and passes None to _post to avoid double-acquiring."
507+
)
504508
def test_insert_read_delete_dps(self, cognite_client: CogniteClient, ts_create_in_dms: NodeApplyResult) -> None:
505509
# Ensure the DMS time series is retrievable from normal TS API:
506510
inst_id = ts_create_in_dms.as_id()
@@ -2855,6 +2859,11 @@ def test_instance_id_and_missing(self, cognite_client: CogniteClient, instance_t
28552859
cognite_client.time_series.data.retrieve_latest(instance_id=missing, ignore_unknown_ids=False)
28562860

28572861

2862+
@pytest.mark.allow_no_semaphore(
2863+
"Insert paths go through DatapointsAPI._insert_datapoints, which holds the semaphore via "
2864+
"outer 'async with' for memory backpressure and passes None to _post. Delete paths in this "
2865+
"class use the regular semaphore route and are still checked against the broader suite."
2866+
)
28582867
class TestInsertDatapointsAPI:
28592868
@pytest.mark.usefixtures("post_spy")
28602869
def test_insert(

tests/tests_integration/test_api/test_event_loop_interop.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,23 @@
66
from __future__ import annotations
77

88
import random
9+
from collections.abc import Iterator
910
from concurrent.futures import ThreadPoolExecutor
1011
from typing import Any
1112

1213
import pytest
1314

14-
from cognite.client import AsyncCogniteClient, CogniteClient, global_config
15+
from cognite.client import AsyncCogniteClient, CogniteClient
1516
from cognite.client._http_client import _global_async_httpx_clients
17+
from tests.utils import fresh_concurrency_state
1618

1719

1820
@pytest.fixture(autouse=True)
19-
def _clear_loop_caches() -> None:
21+
def _clear_loop_caches() -> Iterator[None]:
2022
"""Reset per-loop caches so each test starts with a clean slate (...and state lol)."""
2123
_global_async_httpx_clients.clear()
22-
for config in (
23-
global_config.concurrency_settings.general,
24-
global_config.concurrency_settings.datapoints,
25-
global_config.concurrency_settings.raw,
26-
global_config.concurrency_settings.data_modeling,
27-
):
28-
config._semaphore_cache.clear()
24+
with fresh_concurrency_state():
25+
yield
2926

3027

3128
def make_sync_api_call(client: CogniteClient) -> Any:

tests/tests_integration/test_api/test_simulators/test_models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
"seed_resource_names",
3232
"seed_simulator_model_revisions",
3333
)
34+
@pytest.mark.allow_no_semaphore(
35+
"Simulator tests use direct _post(..., semaphore=None) calls in fixtures/seed helpers to "
36+
"exercise endpoints (logs/update, callback, integrations/update) that aren't exposed via "
37+
"public SDK methods — they're worker-facing endpoints used here for test setup only."
38+
)
3439
class TestSimulatorModels:
3540
def test_list_models(self, cognite_client: CogniteClient, seed_resource_names: ResourceNames) -> None:
3641
models = cognite_client.simulators.models.list(

tests/tests_integration/test_api/test_simulators/test_runs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from tests.tests_integration.test_api.test_simulators.seed.data import RESOURCES, ResourceNames
2121

2222

23+
@pytest.mark.allow_no_semaphore(
24+
"Simulator tests use direct _post(..., semaphore=None) calls (e.g. /simulators/run/callback) "
25+
"to exercise worker-facing endpoints not exposed via public SDK methods."
26+
)
2327
@pytest.mark.usefixtures("seed_resource_names", "seed_simulator_routine_revisions")
2428
class TestSimulatorRuns:
2529
def test_list_filtering(

tests/tests_integration/test_api/test_workflows.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,10 @@ def test_list_workflows(self, cognite_client: CogniteClient, persisted_workflow_
463463

464464

465465
class TestWorkflowVersions:
466+
@pytest.mark.allow_no_semaphore(
467+
"Test setup uses simulator fixtures that exercise worker-facing endpoints "
468+
"(/simulators/integrations/update) via direct _post(..., semaphore=None)."
469+
)
466470
def test_upsert_run_delete_with_simulation_task(
467471
self,
468472
cognite_client: CogniteClient,

tests/tests_integration/test_cognite_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def cognite_client_with_wrong_base_url(
1919
return cognite_client
2020

2121

22+
@pytest.mark.allow_no_semaphore(
23+
"Tests exercise top-level client.post/get/put/token-inspect on the (Async)CogniteClient — "
24+
"the documented nullcontext path. Concurrency limiting is unrelated."
25+
)
2226
class TestCogniteClient:
2327
def test_wrong_project(
2428
self, monkeypatch: MonkeyPatch, async_client: AsyncCogniteClient, cognite_client: CogniteClient

0 commit comments

Comments
 (0)