From 889fed88eff7bfe011e48d28c0294c24dd3c8514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 14:28:25 +0200 Subject: [PATCH 01/16] feat(records): stream-type-aware concurrency limits Model the Records API's hierarchical rate-limit budgets for mutable and immutable streams. Write semaphore (20) is shared across both stream types. Query, retrieve, and aggregate each have separate mutable and immutable semaphores matching the API's documented limits. Retrieve and aggregate have dedicated budgets checked before the shared query budget (both must pass). The semaphore helpers on RecordsAPI are wired up for all three tiers, ready for use by endpoint implementations. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 45 ++++++- .../client/_sync_api/data_modeling/records.py | 4 +- cognite/client/utils/_concurrency.py | 127 +++++++++++++++++- .../tests_unit/test_utils/test_concurrency.py | 90 +++++++++++++ .../test_concurrency_api_routing.py | 61 +++++++++ 5 files changed, 315 insertions(+), 12 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 3d7591d727..e31364fb8e 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -14,6 +14,8 @@ from cognite.client import AsyncCogniteClient from cognite.client.config import ClientConfig +StreamType = Literal["mutable", "immutable"] + class RecordsAPI(APIClient): def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: @@ -22,16 +24,47 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records" ) - _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { - "write": RecordsConcurrencyOperation.WRITE, - "delete": RecordsConcurrencyOperation.WRITE, - } - def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: from cognite.client import global_config return global_config.concurrency_settings.records._semaphore_factory( - self._OPERATION_TO_RATE_LIMIT[operation], project=self._cognite_client.config.project + RecordsConcurrencyOperation.WRITE, project=self._cognite_client.config.project + ) + + def _get_query_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: + from cognite.client import global_config + + op = ( + RecordsConcurrencyOperation.QUERY_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.QUERY_IMMUTABLE + ) + return global_config.concurrency_settings.records._semaphore_factory( + op, project=self._cognite_client.config.project + ) + + def _get_retrieve_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: + from cognite.client import global_config + + op = ( + RecordsConcurrencyOperation.RETRIEVE_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + ) + return global_config.concurrency_settings.records._semaphore_factory( + op, project=self._cognite_client.config.project + ) + + def _get_aggregate_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: + from cognite.client import global_config + + op = ( + RecordsConcurrencyOperation.AGGREGATE_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE + ) + return global_config.concurrency_settings.records._semaphore_factory( + op, project=self._cognite_client.config.project ) def _records_url(self, stream_id: str, suffix: str = "") -> str: diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 9897313c9d..4b5d2cddd7 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -f86364d61385123f12bc60dd004ea1c2 +e6c9a52c8279b5d08c76b016b7a25811 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -18,6 +18,8 @@ if TYPE_CHECKING: from cognite.client import AsyncCogniteClient +StreamType = Literal["mutable", "immutable"] + class SyncRecordsAPI(SyncAPIClient): """Auto-generated, do not modify manually.""" diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index b5df86a34b..27287d0bc1 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -224,24 +224,114 @@ def __repr__(self) -> str: class RecordsConcurrencyOperation(Enum): WRITE = "write" + QUERY_MUTABLE = "query_mutable" + QUERY_IMMUTABLE = "query_immutable" + RETRIEVE_MUTABLE = "retrieve_mutable" + RETRIEVE_IMMUTABLE = "retrieve_immutable" + AGGREGATE_MUTABLE = "aggregate_mutable" + AGGREGATE_IMMUTABLE = "aggregate_immutable" class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): """ - Global concurrency settings for the Records API. Named "global" to distinguish from - future per-endpoint rate limits that may be added later. + Global concurrency settings for the Records API. + + The Records API has separate rate-limit budgets for reads and writes, and read budgets + differ between mutable and immutable streams. Read budgets are hierarchical: the + retrieve and aggregate endpoints each have a dedicated budget that is checked *before* + the shared query budget (both must pass). + + - **write**: Shared across ingest, upsert, and delete (same limit for both stream types). + - **query_mutable / query_immutable**: Shared read budget consumed by all query endpoints + (list/filter, sync, retrieve, aggregate). + - **retrieve_mutable / retrieve_immutable**: Dedicated budget for retrieve, checked + *in addition to* the shared query budget. + - **aggregate_mutable / aggregate_immutable**: Dedicated budget for aggregate, checked + *in addition to* the shared query budget. Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. - write (int): Maximum concurrent write requests (ingest, delete). + write (int): Maximum concurrent write requests (ingest, upsert, delete). + query_mutable (int): Maximum concurrent query requests against mutable streams. + query_immutable (int): Maximum concurrent query requests against immutable streams. + retrieve_mutable (int): Dedicated retrieve concurrency for mutable streams. + retrieve_immutable (int): Dedicated retrieve concurrency for immutable streams. + aggregate_mutable (int): Dedicated aggregate concurrency for mutable streams. + aggregate_immutable (int): Dedicated aggregate concurrency for immutable streams. """ def __init__( self, concurrency_settings: ConcurrencySettings, write: int, + query_mutable: int, + query_immutable: int, + retrieve_mutable: int, + retrieve_immutable: int, + aggregate_mutable: int, + aggregate_immutable: int, ) -> None: super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) + self._query_mutable = query_mutable + self._query_immutable = query_immutable + self._retrieve_mutable = retrieve_mutable + self._retrieve_immutable = retrieve_immutable + self._aggregate_mutable = aggregate_mutable + self._aggregate_immutable = aggregate_immutable + + @property + def query_mutable(self) -> int: + return self._query_mutable + + @query_mutable.setter + def query_mutable(self, value: int) -> None: + self._check_frozen("query_mutable") + self._query_mutable = value + + @property + def query_immutable(self) -> int: + return self._query_immutable + + @query_immutable.setter + def query_immutable(self, value: int) -> None: + self._check_frozen("query_immutable") + self._query_immutable = value + + @property + def retrieve_mutable(self) -> int: + return self._retrieve_mutable + + @retrieve_mutable.setter + def retrieve_mutable(self, value: int) -> None: + self._check_frozen("retrieve_mutable") + self._retrieve_mutable = value + + @property + def retrieve_immutable(self) -> int: + return self._retrieve_immutable + + @retrieve_immutable.setter + def retrieve_immutable(self, value: int) -> None: + self._check_frozen("retrieve_immutable") + self._retrieve_immutable = value + + @property + def aggregate_mutable(self) -> int: + return self._aggregate_mutable + + @aggregate_mutable.setter + def aggregate_mutable(self, value: int) -> None: + self._check_frozen("aggregate_mutable") + self._aggregate_mutable = value + + @property + def aggregate_immutable(self) -> int: + return self._aggregate_immutable + + @aggregate_immutable.setter + def aggregate_immutable(self, value: int) -> None: + self._check_frozen("aggregate_immutable") + self._aggregate_immutable = value def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) @@ -254,13 +344,31 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st match operation: case RecordsConcurrencyOperation.WRITE: sem = asyncio.BoundedSemaphore(self._write) + case RecordsConcurrencyOperation.QUERY_MUTABLE: + sem = asyncio.BoundedSemaphore(self._query_mutable) + case RecordsConcurrencyOperation.QUERY_IMMUTABLE: + sem = asyncio.BoundedSemaphore(self._query_immutable) + case RecordsConcurrencyOperation.RETRIEVE_MUTABLE: + sem = asyncio.BoundedSemaphore(self._retrieve_mutable) + case RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE: + sem = asyncio.BoundedSemaphore(self._retrieve_immutable) + case RecordsConcurrencyOperation.AGGREGATE_MUTABLE: + sem = asyncio.BoundedSemaphore(self._aggregate_mutable) + case RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE: + sem = asyncio.BoundedSemaphore(self._aggregate_immutable) case _: assert_never(operation) self._semaphore_cache[key] = sem return sem def __repr__(self) -> str: - return f"Concurrency[records](write={self._write})" + return ( + f"Concurrency[records](" + f"write={self._write}, " + f"query_mutable={self._query_mutable}, query_immutable={self._query_immutable}, " + f"retrieve_mutable={self._retrieve_mutable}, retrieve_immutable={self._retrieve_immutable}, " + f"aggregate_mutable={self._aggregate_mutable}, aggregate_immutable={self._aggregate_immutable})" + ) class FileConcurrencyConfig(ConcurrencyConfig): @@ -425,7 +533,16 @@ def __init__(self) -> None: write_schema=1, ) self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) - self._records = RecordsGlobalConcurrencyConfig(self, write=20) + self._records = RecordsGlobalConcurrencyConfig( + self, + write=20, + query_mutable=30, + query_immutable=10, + retrieve_mutable=20, + retrieve_immutable=10, + aggregate_mutable=10, + aggregate_immutable=5, + ) @functools.cached_property def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index c817a5862c..deaa2ab5e0 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -14,6 +14,7 @@ ConcurrencyConfig, ConcurrencySettings, EventLoopThreadExecutor, + RecordsConcurrencyOperation, _get_event_loop_executor, execute_async_tasks, ) @@ -136,6 +137,95 @@ async def test_invalid_operation_hits_assert_never(self) -> None: self.cs.general._semaphore_factory("totally_invalid", "proj-a") # type: ignore[arg-type] +class TestRecordsConcurrencyConfig: + def test_defaults(self) -> None: + cs = ConcurrencySettings() + assert cs.records.write == 20 + assert cs.records.query_mutable == 30 + assert cs.records.query_immutable == 10 + assert cs.records.retrieve_mutable == 20 + assert cs.records.retrieve_immutable == 10 + assert cs.records.aggregate_mutable == 10 + assert cs.records.aggregate_immutable == 5 + + def test_setters_work_before_freeze(self) -> None: + cs = ConcurrencySettings() + cs.records.write = 10 + cs.records.query_mutable = 15 + cs.records.query_immutable = 5 + cs.records.retrieve_mutable = 12 + cs.records.retrieve_immutable = 6 + cs.records.aggregate_mutable = 8 + cs.records.aggregate_immutable = 3 + assert cs.records.write == 10 + assert cs.records.query_mutable == 15 + assert cs.records.query_immutable == 5 + assert cs.records.retrieve_mutable == 12 + assert cs.records.retrieve_immutable == 6 + assert cs.records.aggregate_mutable == 8 + assert cs.records.aggregate_immutable == 3 + + @pytest.mark.parametrize( + "attr", + ["write", "query_mutable", "query_immutable", "retrieve_mutable", "retrieve_immutable", + "aggregate_mutable", "aggregate_immutable"], + ) + def test_setter_raises_after_freeze(self, attr: str) -> None: + cs = ConcurrencySettings() + cs._freeze() + with pytest.raises(RuntimeError, match="Cannot modify"): + setattr(cs.records, attr, 1) + + def test_repr(self) -> None: + cs = ConcurrencySettings() + r = repr(cs.records) + assert "write=20" in r + assert "query_mutable=30" in r + assert "query_immutable=10" in r + assert "retrieve_mutable=20" in r + assert "retrieve_immutable=10" in r + assert "aggregate_mutable=10" in r + assert "aggregate_immutable=5" in r + + +@pytest.mark.usefixtures("fresh_unfrozen_global_concurrency") +class TestRecordsSemaphoreFactory: + cs: ClassVar[ConcurrencySettings] = global_config.concurrency_settings + + @pytest.mark.parametrize( + "operation, expected_value", + [ + (RecordsConcurrencyOperation.WRITE, 20), + (RecordsConcurrencyOperation.QUERY_MUTABLE, 30), + (RecordsConcurrencyOperation.QUERY_IMMUTABLE, 10), + (RecordsConcurrencyOperation.RETRIEVE_MUTABLE, 20), + (RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE, 10), + (RecordsConcurrencyOperation.AGGREGATE_MUTABLE, 10), + (RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE, 5), + ], + ) + async def test_semaphore_values(self, operation: RecordsConcurrencyOperation, expected_value: int) -> None: + sem = self.cs.records._semaphore_factory(operation, "proj-a") + assert sem._value == expected_value + + async def test_all_operations_produce_distinct_semaphores(self) -> None: + sems = { + op: self.cs.records._semaphore_factory(op, "proj-a") + for op in RecordsConcurrencyOperation + } + assert len(set(id(s) for s in sems.values())) == len(RecordsConcurrencyOperation) + + async def test_cache_hit(self) -> None: + sem1 = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a") + sem2 = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a") + assert sem1 is sem2 + + async def test_different_project_different_semaphore(self) -> None: + sem_a = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a") + sem_b = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-b") + assert sem_a is not sem_b + + async def i_dont_like_5(i: int) -> int: if i < 5: return i diff --git a/tests/tests_unit/test_utils/test_concurrency_api_routing.py b/tests/tests_unit/test_utils/test_concurrency_api_routing.py index 0c869de293..eeab129163 100644 --- a/tests/tests_unit/test_utils/test_concurrency_api_routing.py +++ b/tests/tests_unit/test_utils/test_concurrency_api_routing.py @@ -18,6 +18,7 @@ from cognite.client import AsyncCogniteClient from cognite.client.data_classes.data_modeling.ids import NodeId +from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite from tests.utils import fresh_concurrency_state SemCall = tuple[str, str, str] # (sub_config_name (eg 'general'), operation, project) @@ -152,6 +153,66 @@ async def test_raw_read( assert_routed(semaphore_spy, "raw", "read") +class TestRecordsSemaphoreRouting: + """Records API uses RecordsConcurrencyOperation enums (not plain strings), + so we spy on the enum values directly.""" + + @pytest.fixture + def records_spy(self, monkeypatch: pytest.MonkeyPatch) -> Iterator[list[tuple[str, str]]]: + calls: list[tuple[str, str]] = [] + with fresh_concurrency_state() as settings: + original = settings.records._semaphore_factory + + def spy(operation: Any, project: str) -> Any: + calls.append((operation.value, project)) + return original(operation, project) + + monkeypatch.setattr(settings.records, "_semaphore_factory", spy) + yield calls + + @pytest.mark.usefixtures("mock_any_request") + @pytest.mark.parametrize( + "api_call, expected_operation", + [ + pytest.param( + lambda c: c.data_modeling.records.ingest( + RecordWrite(space="sp", external_id="r1", sources=[]), + stream_id="s1", + ), + "write", + id="ingest_write", + ), + pytest.param( + lambda c: c.data_modeling.records.upsert( + RecordWrite(space="sp", external_id="r1", sources=[]), + stream_id="s1", + ), + "write", + id="upsert_write", + ), + pytest.param( + lambda c: c.data_modeling.records.delete( + RecordId(space="sp", external_id="r1"), + stream_id="s1", + ), + "write", + id="delete_write", + ), + ], + ) + async def test_write_routing( + self, + async_client: AsyncCogniteClient, + records_spy: list[tuple[str, str]], + api_call: ApiCall, + expected_operation: str, + ) -> None: + await api_call(async_client) + ops = [op for op, _ in records_spy] + assert expected_operation in ops, f"Expected {expected_operation!r} in {ops}" + assert async_client.config.project in {proj for _, proj in records_spy} + + class TestStrictFixtureCatchesMissingSemaphore: """Sanity check that the suite-wide strict fixture in tests/conftest.py still works. From d6cc7e6bec5938436dbbe91984e357006d80130f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 14:37:32 +0200 Subject: [PATCH 02/16] refactor(records): make write/delete semaphore mapping explicit Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 7 ++++++- cognite/client/_sync_api/data_modeling/records.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index e31364fb8e..df97c24c45 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -24,11 +24,16 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records" ) + _OPERATION_TO_CONCURRENCY: ClassVar[dict[str, RecordsConcurrencyOperation]] = { + "write": RecordsConcurrencyOperation.WRITE, + "delete": RecordsConcurrencyOperation.WRITE, + } + def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: from cognite.client import global_config return global_config.concurrency_settings.records._semaphore_factory( - RecordsConcurrencyOperation.WRITE, project=self._cognite_client.config.project + self._OPERATION_TO_CONCURRENCY[operation], project=self._cognite_client.config.project ) def _get_query_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 4b5d2cddd7..6e6e4b231c 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -e6c9a52c8279b5d08c76b016b7a25811 +9d617c291ad142cadadf3cf7f82ae118 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ From 10edd2bffce2e3fd74b93e22525db093b51d3618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 14:39:23 +0200 Subject: [PATCH 03/16] refactor(records): unify semaphore routing into single _get_semaphore Single method handles all operation types (write, delete, query, retrieve, aggregate) with stream_type routing for read operations. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 58 +++++-------------- .../client/_sync_api/data_modeling/records.py | 2 +- 2 files changed, 16 insertions(+), 44 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index df97c24c45..0df978a092 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -2,7 +2,7 @@ import asyncio from collections.abc import Sequence -from typing import TYPE_CHECKING, ClassVar, Literal +from typing import TYPE_CHECKING, Literal from cognite.client._api_client import APIClient from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite @@ -24,50 +24,22 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records" ) - _OPERATION_TO_CONCURRENCY: ClassVar[dict[str, RecordsConcurrencyOperation]] = { - "write": RecordsConcurrencyOperation.WRITE, - "delete": RecordsConcurrencyOperation.WRITE, - } - - def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: - from cognite.client import global_config - - return global_config.concurrency_settings.records._semaphore_factory( - self._OPERATION_TO_CONCURRENCY[operation], project=self._cognite_client.config.project - ) - - def _get_query_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: - from cognite.client import global_config - - op = ( - RecordsConcurrencyOperation.QUERY_MUTABLE - if stream_type == "mutable" - else RecordsConcurrencyOperation.QUERY_IMMUTABLE - ) - return global_config.concurrency_settings.records._semaphore_factory( - op, project=self._cognite_client.config.project - ) - - def _get_retrieve_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: - from cognite.client import global_config - - op = ( - RecordsConcurrencyOperation.RETRIEVE_MUTABLE - if stream_type == "mutable" - else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE - ) - return global_config.concurrency_settings.records._semaphore_factory( - op, project=self._cognite_client.config.project - ) - - def _get_aggregate_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore: + def _get_semaphore( + self, + operation: Literal["write", "delete", "query", "retrieve", "aggregate"], + stream_type: StreamType = "immutable", + ) -> asyncio.BoundedSemaphore: from cognite.client import global_config - op = ( - RecordsConcurrencyOperation.AGGREGATE_MUTABLE - if stream_type == "mutable" - else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE - ) + match operation: + case "write" | "delete": + op = RecordsConcurrencyOperation.WRITE + case "query": + op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + case "retrieve": + op = RecordsConcurrencyOperation.RETRIEVE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + case "aggregate": + op = RecordsConcurrencyOperation.AGGREGATE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE return global_config.concurrency_settings.records._semaphore_factory( op, project=self._cognite_client.config.project ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 6e6e4b231c..40fa2a8c7f 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -9d617c291ad142cadadf3cf7f82ae118 +06000cc79911c219ffcbfc91707c8593 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ From 47a08fe9a81aac8a2eec6bbe9986c806c141b7fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 14:45:06 +0200 Subject: [PATCH 04/16] refactor(records): use enum values in _check_frozen calls Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 27287d0bc1..da7a86c4ae 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -285,7 +285,7 @@ def query_mutable(self) -> int: @query_mutable.setter def query_mutable(self, value: int) -> None: - self._check_frozen("query_mutable") + self._check_frozen(RecordsConcurrencyOperation.QUERY_MUTABLE.value) self._query_mutable = value @property @@ -294,7 +294,7 @@ def query_immutable(self) -> int: @query_immutable.setter def query_immutable(self, value: int) -> None: - self._check_frozen("query_immutable") + self._check_frozen(RecordsConcurrencyOperation.QUERY_IMMUTABLE.value) self._query_immutable = value @property @@ -303,7 +303,7 @@ def retrieve_mutable(self) -> int: @retrieve_mutable.setter def retrieve_mutable(self, value: int) -> None: - self._check_frozen("retrieve_mutable") + self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_MUTABLE.value) self._retrieve_mutable = value @property @@ -312,7 +312,7 @@ def retrieve_immutable(self) -> int: @retrieve_immutable.setter def retrieve_immutable(self, value: int) -> None: - self._check_frozen("retrieve_immutable") + self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE.value) self._retrieve_immutable = value @property @@ -321,7 +321,7 @@ def aggregate_mutable(self) -> int: @aggregate_mutable.setter def aggregate_mutable(self, value: int) -> None: - self._check_frozen("aggregate_mutable") + self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_MUTABLE.value) self._aggregate_mutable = value @property @@ -330,7 +330,7 @@ def aggregate_immutable(self) -> int: @aggregate_immutable.setter def aggregate_immutable(self, value: int) -> None: - self._check_frozen("aggregate_immutable") + self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE.value) self._aggregate_immutable = value def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: From 9ee366ca5f1f9a2635cea17fead685f8e0522d25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 14:50:37 +0200 Subject: [PATCH 05/16] feat(records): add HierarchicalBoundedSemaphore for multi-budget enforcement Retrieve and aggregate must pass both their dedicated budget AND the shared query budget. HierarchicalBoundedSemaphore acquires multiple semaphores in order (dedicated first, query second) and releases in reverse. _get_semaphore returns this composite for retrieve/aggregate operations. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 20 +++++++++++-------- .../client/_sync_api/data_modeling/records.py | 2 +- cognite/client/utils/_concurrency.py | 19 ++++++++++++++++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 0df978a092..9ef331b4b6 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -6,7 +6,7 @@ from cognite.client._api_client import APIClient from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite -from cognite.client.utils._concurrency import RecordsConcurrencyOperation +from cognite.client.utils._concurrency import HierarchicalBoundedSemaphore, RecordsConcurrencyOperation from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -28,21 +28,25 @@ def _get_semaphore( self, operation: Literal["write", "delete", "query", "retrieve", "aggregate"], stream_type: StreamType = "immutable", - ) -> asyncio.BoundedSemaphore: + ) -> asyncio.BoundedSemaphore | HierarchicalBoundedSemaphore: from cognite.client import global_config + factory = global_config.concurrency_settings.records._semaphore_factory + project = self._cognite_client.config.project match operation: case "write" | "delete": - op = RecordsConcurrencyOperation.WRITE + return factory(RecordsConcurrencyOperation.WRITE, project) case "query": op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + return factory(op, project) case "retrieve": - op = RecordsConcurrencyOperation.RETRIEVE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + dedicated_op = RecordsConcurrencyOperation.RETRIEVE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) case "aggregate": - op = RecordsConcurrencyOperation.AGGREGATE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE - return global_config.concurrency_settings.records._semaphore_factory( - op, project=self._cognite_client.config.project - ) + dedicated_op = RecordsConcurrencyOperation.AGGREGATE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE + query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) def _records_url(self, stream_id: str, suffix: str = "") -> str: # Encode only stream_id; the suffix is a literal path segment (e.g. "/upsert"), diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 40fa2a8c7f..1346924839 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -06000cc79911c219ffcbfc91707c8593 +f211416ec02eab9e26797fbecf9723fd This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index da7a86c4ae..2b0f44073e 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -222,6 +222,25 @@ def __repr__(self) -> str: ) +class HierarchicalBoundedSemaphore: + """Acquires multiple semaphores in order, releases in reverse. + + Used to model the Records API's hierarchical rate limits where an endpoint + (e.g. retrieve) must pass both its dedicated budget and the shared query budget. + """ + + def __init__(self, *semaphores: asyncio.BoundedSemaphore) -> None: + self._semaphores = semaphores + + async def __aenter__(self) -> None: + for sem in self._semaphores: + await sem.__aenter__() + + async def __aexit__(self, *exc: Any) -> None: + for sem in reversed(self._semaphores): + await sem.__aexit__(*exc) + + class RecordsConcurrencyOperation(Enum): WRITE = "write" QUERY_MUTABLE = "query_mutable" From c4f955f6dc117c4821fed19674e7c3d95b903911 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:06:26 +0200 Subject: [PATCH 06/16] test(records): add tests for HierarchicalBoundedSemaphore and _get_semaphore routing - HierarchicalBoundedSemaphore: acquires both, releases both, releases on exception, limits concurrency to the min of both semaphores - _get_semaphore: returns plain semaphore for write/delete/query, HierarchicalBoundedSemaphore for retrieve/aggregate, wraps the correct dedicated+query pair, and shares the query semaphore Co-Authored-By: Claude Opus 4.6 (1M context) --- .../tests_unit/test_utils/test_concurrency.py | 95 ++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index deaa2ab5e0..b09e0261c7 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -7,13 +7,14 @@ import pytest -from cognite.client import global_config +from cognite.client import AsyncCogniteClient, global_config from cognite.client.exceptions import CogniteAPIError from cognite.client.utils._concurrency import ( AsyncSDKTask, ConcurrencyConfig, ConcurrencySettings, EventLoopThreadExecutor, + HierarchicalBoundedSemaphore, RecordsConcurrencyOperation, _get_event_loop_executor, execute_async_tasks, @@ -226,6 +227,98 @@ async def test_different_project_different_semaphore(self) -> None: assert sem_a is not sem_b +class TestHierarchicalBoundedSemaphore: + async def test_acquires_both_semaphores(self) -> None: + outer = asyncio.BoundedSemaphore(2) + inner = asyncio.BoundedSemaphore(3) + h = HierarchicalBoundedSemaphore(outer, inner) + async with h: + assert outer._value == 1 + assert inner._value == 2 + + async def test_releases_both_on_exit(self) -> None: + outer = asyncio.BoundedSemaphore(1) + inner = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(outer, inner) + async with h: + pass + assert outer._value == 1 + assert inner._value == 1 + + async def test_releases_on_exception(self) -> None: + outer = asyncio.BoundedSemaphore(1) + inner = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(outer, inner) + with pytest.raises(ValueError): + async with h: + raise ValueError("boom") + assert outer._value == 1 + assert inner._value == 1 + + async def test_limits_concurrency_to_min(self) -> None: + dedicated = asyncio.BoundedSemaphore(2) + query = asyncio.BoundedSemaphore(5) + entered = asyncio.Event() + hold = asyncio.Event() + + async def worker() -> None: + async with HierarchicalBoundedSemaphore(dedicated, query): + entered.set() + await hold.wait() + + tasks = [asyncio.create_task(worker()) for _ in range(3)] + await asyncio.sleep(0.01) + assert dedicated._value == 0 + assert query._value == 3 + hold.set() + await asyncio.gather(*tasks) + + +@pytest.mark.usefixtures("fresh_unfrozen_global_concurrency") +class TestRecordsGetSemaphore: + """Tests that RecordsAPI._get_semaphore returns the right semaphore type and composition.""" + + async def test_write_returns_plain_semaphore(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("write") + assert isinstance(sem, asyncio.BoundedSemaphore) + + async def test_delete_returns_plain_semaphore(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("delete") + assert isinstance(sem, asyncio.BoundedSemaphore) + + async def test_query_returns_plain_semaphore(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("query", "mutable") + assert isinstance(sem, asyncio.BoundedSemaphore) + + async def test_retrieve_returns_hierarchical(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("retrieve", "mutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + async def test_aggregate_returns_hierarchical(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("aggregate", "immutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + async def test_retrieve_hierarchical_wraps_correct_semaphores(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("retrieve", "mutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + dedicated, query = sem._semaphores + assert dedicated._value == 20 # retrieve_mutable default + assert query._value == 30 # query_mutable default + + async def test_aggregate_immutable_wraps_correct_semaphores(self, async_client: AsyncCogniteClient) -> None: + sem = async_client.data_modeling.records._get_semaphore("aggregate", "immutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + dedicated, query = sem._semaphores + assert dedicated._value == 5 # aggregate_immutable default + assert query._value == 10 # query_immutable default + + async def test_retrieve_and_query_share_query_semaphore(self, async_client: AsyncCogniteClient) -> None: + retrieve_sem = async_client.data_modeling.records._get_semaphore("retrieve", "mutable") + query_sem = async_client.data_modeling.records._get_semaphore("query", "mutable") + assert isinstance(retrieve_sem, HierarchicalBoundedSemaphore) + assert retrieve_sem._semaphores[1] is query_sem + + async def i_dont_like_5(i: int) -> int: if i < 5: return i From c51c4e83f29affae887e5abfb91d1ecec839d6f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:17:11 +0200 Subject: [PATCH 07/16] refactor(records): hoist write_op and query_op above match statement Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 10 +++++----- cognite/client/_sync_api/data_modeling/records.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 9ef331b4b6..e14a4a25c6 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -31,21 +31,21 @@ def _get_semaphore( ) -> asyncio.BoundedSemaphore | HierarchicalBoundedSemaphore: from cognite.client import global_config + write_op = RecordsConcurrencyOperation.WRITE + query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + factory = global_config.concurrency_settings.records._semaphore_factory project = self._cognite_client.config.project match operation: case "write" | "delete": - return factory(RecordsConcurrencyOperation.WRITE, project) + return factory(write_op, project) case "query": - op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE - return factory(op, project) + return factory(query_op, project) case "retrieve": dedicated_op = RecordsConcurrencyOperation.RETRIEVE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE - query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) case "aggregate": dedicated_op = RecordsConcurrencyOperation.AGGREGATE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE - query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) def _records_url(self, stream_id: str, suffix: str = "") -> str: diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 1346924839..c071ec09cc 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -f211416ec02eab9e26797fbecf9723fd +e6f3c4297768ee8cf21aa718b74794f4 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ From 58ba7658ff1a93dcda19a56a11e75720ced165fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:24:49 +0200 Subject: [PATCH 08/16] fix(records): make HierarchicalBoundedSemaphore cancellation- and exception-safe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs found by adversarial testing: 1. Cancellation during __aenter__ (e.g. via asyncio.wait_for timeout) leaked already-acquired semaphores — now rolls back on any BaseException during acquisition. 2. Exception in one semaphore's __aexit__ skipped releasing the rest — now continues releasing all semaphores and re-raises the first error. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 24 +- .../tests_unit/test_utils/test_concurrency.py | 273 ++++++++++++++++++ 2 files changed, 294 insertions(+), 3 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 2b0f44073e..078e11656e 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -227,18 +227,36 @@ class HierarchicalBoundedSemaphore: Used to model the Records API's hierarchical rate limits where an endpoint (e.g. retrieve) must pass both its dedicated budget and the shared query budget. + + If acquisition is interrupted (e.g. by cancellation), all already-acquired + semaphores are released before the exception propagates. Similarly, if a + release raises, the remaining semaphores are still released. """ def __init__(self, *semaphores: asyncio.BoundedSemaphore) -> None: self._semaphores = semaphores async def __aenter__(self) -> None: - for sem in self._semaphores: - await sem.__aenter__() + acquired: list[asyncio.BoundedSemaphore] = [] + try: + for sem in self._semaphores: + await sem.__aenter__() + acquired.append(sem) + except BaseException: + for sem in reversed(acquired): + await sem.__aexit__(None, None, None) + raise async def __aexit__(self, *exc: Any) -> None: + first_err: BaseException | None = None for sem in reversed(self._semaphores): - await sem.__aexit__(*exc) + try: + await sem.__aexit__(*exc) + except BaseException as e: + if first_err is None: + first_err = e + if first_err is not None: + raise first_err class RecordsConcurrencyOperation(Enum): diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index b09e0261c7..9f82da0990 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -274,6 +274,279 @@ async def worker() -> None: await asyncio.gather(*tasks) +class TestHierarchicalBoundedSemaphoreAdversarial: + """Adversarial tests targeting real failure modes in HierarchicalBoundedSemaphore. + + Two confirmed bugs are documented in the tests below: + + BUG-1 (semaphore leak on cancellation): When a task is cancelled while + __aenter__ is blocked waiting on the second semaphore, the first semaphore + has already been acquired but __aexit__ is never called, so it leaks. + + BUG-2 (incomplete release on mid-exit exception): When __aexit__ iterates + in reversed order and one semaphore's release raises, the remaining + semaphores (earlier in the list) are never released. + """ + + # --- BUG-1: Cancellation during acquisition leaks already-acquired semaphores --- + + async def test_bug1_cancellation_while_waiting_on_second_semaphore_leaks_first(self) -> None: + """BUG: If cancelled between acquiring sem[0] and sem[1], sem[0] is never released. + + Root cause: __aenter__ acquires semaphores in a plain for-loop with no + try/except around individual acquisitions. A CancelledError raised inside + sem[1].__aenter__() (while it is blocked) unwinds the coroutine without + giving __aexit__ a chance to run, so sem[0] stays acquired forever. + """ + dedicated = asyncio.BoundedSemaphore(1) + query = asyncio.BoundedSemaphore(0) # permanently blocked + h = HierarchicalBoundedSemaphore(dedicated, query) + + async def worker() -> None: + async with h: + pass + + task = asyncio.create_task(worker()) + await asyncio.sleep(0.02) # let it acquire dedicated and block on query + + assert dedicated._value == 0, "dedicated should be held at this point" + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # BUG: dedicated._value is 0, not 1 — it was never released + assert dedicated._value == 1, ( + "BUG-1: dedicated semaphore leaked after cancellation. " + "sem[0] was acquired by __aenter__ but CancelledError prevented __aexit__ from running." + ) + + async def test_bug1_two_tasks_cancelled_both_leak(self) -> None: + """BUG: Each cancelled task leaks one slot; with two tasks both slots are gone.""" + dedicated = asyncio.BoundedSemaphore(2) + query = asyncio.BoundedSemaphore(0) # permanently blocked + h = HierarchicalBoundedSemaphore(dedicated, query) + + async def worker() -> None: + async with h: + pass + + t1 = asyncio.create_task(worker()) + t2 = asyncio.create_task(worker()) + await asyncio.sleep(0.02) + + t1.cancel() + t2.cancel() + await asyncio.gather(t1, t2, return_exceptions=True) + + # BUG: both slots leaked — dedicated is exhausted even though no work was done + assert dedicated._value == 2, ( + "BUG-1: both dedicated slots leaked; subsequent real work can never acquire the semaphore." + ) + + async def test_bug1_wait_for_timeout_leaks_first_semaphore(self) -> None: + """BUG: asyncio.wait_for cancels the coroutine on timeout, triggering the same leak.""" + dedicated = asyncio.BoundedSemaphore(1) + query = asyncio.BoundedSemaphore(0) # permanently blocked + h = HierarchicalBoundedSemaphore(dedicated, query) + + async def worker() -> None: + async with h: + pass + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(worker(), timeout=0.05) + + # BUG: dedicated._value remains 0 after the timeout + assert dedicated._value == 1, ( + "BUG-1: dedicated semaphore leaked after asyncio.wait_for timeout. " + "Timeout internally cancels the task, hitting the same code path." + ) + + async def test_bug1_cancellation_with_three_semaphores_leaks_two(self) -> None: + """BUG: With three semaphores, cancellation while waiting on sem[2] leaks both sem[0] and sem[1].""" + s0 = asyncio.BoundedSemaphore(1) + s1 = asyncio.BoundedSemaphore(1) + s2 = asyncio.BoundedSemaphore(0) # permanently blocked + h = HierarchicalBoundedSemaphore(s0, s1, s2) + + async def worker() -> None: + async with h: + pass + + task = asyncio.create_task(worker()) + await asyncio.sleep(0.02) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert s0._value == 1, "BUG-1: s0 leaked (first semaphore)" + assert s1._value == 1, "BUG-1: s1 leaked (second semaphore)" + + # --- BUG-2: Exception in __aexit__ of one semaphore skips releasing earlier ones --- + + async def test_bug2_exception_in_middle_release_skips_earlier_releases(self) -> None: + """BUG: If releasing sem[1] raises, sem[0] is never released. + + __aexit__ iterates with a plain for-loop over reversed semaphores. + An exception from any intermediate release propagates immediately, + abandoning all remaining release calls. + """ + + class BoomSemaphore: + """Always acquires fine; always raises on release.""" + + async def __aenter__(self) -> None: + pass + + async def __aexit__(self, *exc: object) -> None: + raise RuntimeError("boom on release") + + sem0 = asyncio.BoundedSemaphore(1) + sem1 = BoomSemaphore() + sem2 = asyncio.BoundedSemaphore(1) + # Acquire order: sem0, sem1, sem2 + # Release order (reversed): sem2, sem1 (boom!), sem0 — sem0 is never reached + h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) + + with pytest.raises(RuntimeError, match="boom on release"): + async with h: + pass + + assert sem0._value == 1, ( + "BUG-2: sem0 was not released because the exception from sem1's __aexit__ " + "aborted the release loop before sem0's turn." + ) + assert sem2._value == 1, "sem2 (released before the bomb) should be fine" + + async def test_bug2_exception_in_first_release_skips_all_remaining(self) -> None: + """BUG: Exception from the first release (last-acquired semaphore) skips all others.""" + + class BoomSemaphore: + async def __aenter__(self) -> None: + pass + + async def __aexit__(self, *exc: object) -> None: + raise RuntimeError("boom") + + sem0 = asyncio.BoundedSemaphore(1) + sem1 = asyncio.BoundedSemaphore(1) + sem2 = BoomSemaphore() # last acquired = first released = first to explode + h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) + + with pytest.raises(RuntimeError, match="boom"): + async with h: + pass + + assert sem0._value == 1, "BUG-2: sem0 leaked because release loop aborted at sem2" + assert sem1._value == 1, "BUG-2: sem1 leaked because release loop aborted at sem2" + + # --- Non-bug adversarial cases (expected to pass) --- + + async def test_zero_semaphores_is_a_noop(self) -> None: + """Edge case: constructing with no semaphores should be a transparent noop.""" + h = HierarchicalBoundedSemaphore() + async with h: + pass # should not raise or block + + async def test_single_semaphore_behaves_like_plain_async_with(self) -> None: + sem = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(sem) + async with h: + assert sem._value == 0 + assert sem._value == 1 + + async def test_nested_usage_deadlocks_with_value_one(self) -> None: + """Nested async with on the same HierarchicalBoundedSemaphore(value=1) deadlocks. + + This is expected — BoundedSemaphore is not reentrant. The test confirms + that the implementation does NOT protect against reentrancy. + """ + sem = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(sem) + + inner_started = False + + async def nested_worker() -> None: + nonlocal inner_started + async with h: + inner_started = True + # Attempt reentrant acquire — will deadlock + async with h: + pass # unreachable + + task = asyncio.create_task(nested_worker()) + await asyncio.sleep(0.05) + + assert inner_started, "outer context should have been entered" + assert not task.done(), "task should still be blocked waiting on inner acquire" + + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + async def test_release_on_exception_inside_context_body_both_semaphores_freed(self) -> None: + """Normal exception inside the body (not in __aexit__) must release all semaphores.""" + sem0 = asyncio.BoundedSemaphore(1) + sem1 = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(sem0, sem1) + + with pytest.raises(ValueError, match="body exception"): + async with h: + raise ValueError("body exception") + + assert sem0._value == 1 + assert sem1._value == 1 + + async def test_concurrent_retrieve_and_list_complete_without_deadlock(self) -> None: + """retrieve (HierarchicalBoundedSemaphore) and list (plain semaphore) sharing + the query semaphore must not deadlock each other.""" + dedicated = asyncio.BoundedSemaphore(2) + query = asyncio.BoundedSemaphore(3) + h_retrieve = HierarchicalBoundedSemaphore(dedicated, query) + + completed: list[str] = [] + + async def retrieve(name: str) -> None: + async with h_retrieve: + await asyncio.sleep(0.01) + completed.append(f"retrieve_{name}") + + async def list_op(name: str) -> None: + async with query: + await asyncio.sleep(0.01) + completed.append(f"list_{name}") + + await asyncio.gather( + asyncio.create_task(retrieve("A")), + asyncio.create_task(retrieve("B")), + asyncio.create_task(list_op("C")), + asyncio.create_task(list_op("D")), + asyncio.create_task(list_op("E")), + ) + + assert len(completed) == 5 + assert all(name in completed for name in ["retrieve_A", "retrieve_B", "list_C", "list_D", "list_E"]) + + async def test_semaphores_fully_restored_after_many_sequential_uses(self) -> None: + """Semaphore values must be fully restored after many normal acquire/release cycles.""" + sem0 = asyncio.BoundedSemaphore(3) + sem1 = asyncio.BoundedSemaphore(5) + h = HierarchicalBoundedSemaphore(sem0, sem1) + + for _ in range(20): + async with h: + pass + + assert sem0._value == 3 + assert sem1._value == 5 + + @pytest.mark.usefixtures("fresh_unfrozen_global_concurrency") class TestRecordsGetSemaphore: """Tests that RecordsAPI._get_semaphore returns the right semaphore type and composition.""" From 9043db891494e9ceef4105ef54bf619a70d4f406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:27:02 +0200 Subject: [PATCH 09/16] feat(records): validate dedicated budget <= shared query budget Raises ValueError at config time (constructor and setters) if a dedicated budget (retrieve/aggregate) exceeds its corresponding shared query budget, since the hierarchical semaphore would never use the extra slots. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 20 +++++++ .../tests_unit/test_utils/test_concurrency.py | 56 +++++++++++++++++-- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 078e11656e..47d9ec266f 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -315,6 +315,20 @@ def __init__( self._retrieve_immutable = retrieve_immutable self._aggregate_mutable = aggregate_mutable self._aggregate_immutable = aggregate_immutable + self._validate_dedicated_within_shared() + + def _validate_dedicated_within_shared(self) -> None: + for dedicated, shared, label in [ + (self._retrieve_mutable, self._query_mutable, "retrieve_mutable vs query_mutable"), + (self._retrieve_immutable, self._query_immutable, "retrieve_immutable vs query_immutable"), + (self._aggregate_mutable, self._query_mutable, "aggregate_mutable vs query_mutable"), + (self._aggregate_immutable, self._query_immutable, "aggregate_immutable vs query_immutable"), + ]: + if dedicated > shared: + raise ValueError( + f"Dedicated budget must be <= shared query budget ({label}): " + f"{dedicated} > {shared}" + ) @property def query_mutable(self) -> int: @@ -324,6 +338,7 @@ def query_mutable(self) -> int: def query_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_MUTABLE.value) self._query_mutable = value + self._validate_dedicated_within_shared() @property def query_immutable(self) -> int: @@ -333,6 +348,7 @@ def query_immutable(self) -> int: def query_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_IMMUTABLE.value) self._query_immutable = value + self._validate_dedicated_within_shared() @property def retrieve_mutable(self) -> int: @@ -342,6 +358,7 @@ def retrieve_mutable(self) -> int: def retrieve_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_MUTABLE.value) self._retrieve_mutable = value + self._validate_dedicated_within_shared() @property def retrieve_immutable(self) -> int: @@ -351,6 +368,7 @@ def retrieve_immutable(self) -> int: def retrieve_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE.value) self._retrieve_immutable = value + self._validate_dedicated_within_shared() @property def aggregate_mutable(self) -> int: @@ -360,6 +378,7 @@ def aggregate_mutable(self) -> int: def aggregate_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_MUTABLE.value) self._aggregate_mutable = value + self._validate_dedicated_within_shared() @property def aggregate_immutable(self) -> int: @@ -369,6 +388,7 @@ def aggregate_immutable(self) -> int: def aggregate_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE.value) self._aggregate_immutable = value + self._validate_dedicated_within_shared() def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index 9f82da0990..7debd38453 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -152,17 +152,18 @@ def test_defaults(self) -> None: def test_setters_work_before_freeze(self) -> None: cs = ConcurrencySettings() cs.records.write = 10 - cs.records.query_mutable = 15 - cs.records.query_immutable = 5 + # Lower dedicated budgets before lowering shared (to keep invariant) cs.records.retrieve_mutable = 12 - cs.records.retrieve_immutable = 6 + cs.records.retrieve_immutable = 4 cs.records.aggregate_mutable = 8 cs.records.aggregate_immutable = 3 + cs.records.query_mutable = 15 + cs.records.query_immutable = 5 assert cs.records.write == 10 assert cs.records.query_mutable == 15 assert cs.records.query_immutable == 5 assert cs.records.retrieve_mutable == 12 - assert cs.records.retrieve_immutable == 6 + assert cs.records.retrieve_immutable == 4 assert cs.records.aggregate_mutable == 8 assert cs.records.aggregate_immutable == 3 @@ -188,6 +189,53 @@ def test_repr(self) -> None: assert "aggregate_mutable=10" in r assert "aggregate_immutable=5" in r + @pytest.mark.parametrize( + "dedicated, shared", + [ + ("retrieve_mutable", "query_mutable"), + ("retrieve_immutable", "query_immutable"), + ("aggregate_mutable", "query_mutable"), + ("aggregate_immutable", "query_immutable"), + ], + ) + def test_dedicated_exceeding_shared_raises_on_init(self, dedicated: str, shared: str) -> None: + cs = ConcurrencySettings() + defaults = { + "write": 20, "query_mutable": 30, "query_immutable": 10, + "retrieve_mutable": 20, "retrieve_immutable": 10, + "aggregate_mutable": 10, "aggregate_immutable": 5, + } + shared_val = defaults[shared] + defaults[dedicated] = shared_val + 1 + from cognite.client.utils._concurrency import RecordsGlobalConcurrencyConfig + with pytest.raises(ValueError, match="Dedicated budget must be <= shared query budget"): + RecordsGlobalConcurrencyConfig(cs, **defaults) + + @pytest.mark.parametrize( + "dedicated, shared", + [ + ("retrieve_mutable", "query_mutable"), + ("retrieve_immutable", "query_immutable"), + ("aggregate_mutable", "query_mutable"), + ("aggregate_immutable", "query_immutable"), + ], + ) + def test_dedicated_exceeding_shared_raises_on_setter(self, dedicated: str, shared: str) -> None: + cs = ConcurrencySettings() + shared_val = getattr(cs.records, shared) + with pytest.raises(ValueError, match="Dedicated budget must be <= shared query budget"): + setattr(cs.records, dedicated, shared_val + 1) + + def test_lowering_shared_below_dedicated_raises(self) -> None: + cs = ConcurrencySettings() + with pytest.raises(ValueError, match="Dedicated budget must be <= shared query budget"): + cs.records.query_mutable = 5 # retrieve_mutable=20 > 5 + + def test_dedicated_equal_to_shared_is_valid(self) -> None: + cs = ConcurrencySettings() + cs.records.retrieve_mutable = 30 # equal to query_mutable=30, should be fine + assert cs.records.retrieve_mutable == 30 + @pytest.mark.usefixtures("fresh_unfrozen_global_concurrency") class TestRecordsSemaphoreFactory: From 83fa7b719522a9f42cb3587612b6845d909ad0c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:29:05 +0200 Subject: [PATCH 10/16] refactor(records): per-setter validation, init reuses setters Each setter validates only its own dedicated-vs-shared relationship instead of re-checking all four pairs. Shared setters (query_*) validate against existing dedicated values; dedicated setters validate against the current shared value. Constructor uses the property setters directly, setting shared budgets first so dedicated setters can validate. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 51 +++++++++---------- .../tests_unit/test_utils/test_concurrency.py | 1 - 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 47d9ec266f..cb7b45b56a 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -309,26 +309,21 @@ def __init__( aggregate_immutable: int, ) -> None: super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) - self._query_mutable = query_mutable - self._query_immutable = query_immutable - self._retrieve_mutable = retrieve_mutable - self._retrieve_immutable = retrieve_immutable - self._aggregate_mutable = aggregate_mutable - self._aggregate_immutable = aggregate_immutable - self._validate_dedicated_within_shared() - - def _validate_dedicated_within_shared(self) -> None: - for dedicated, shared, label in [ - (self._retrieve_mutable, self._query_mutable, "retrieve_mutable vs query_mutable"), - (self._retrieve_immutable, self._query_immutable, "retrieve_immutable vs query_immutable"), - (self._aggregate_mutable, self._query_mutable, "aggregate_mutable vs query_mutable"), - (self._aggregate_immutable, self._query_immutable, "aggregate_immutable vs query_immutable"), - ]: - if dedicated > shared: - raise ValueError( - f"Dedicated budget must be <= shared query budget ({label}): " - f"{dedicated} > {shared}" - ) + # Shared budgets first — dedicated setters validate against these. + self.query_mutable = query_mutable + self.query_immutable = query_immutable + self.retrieve_mutable = retrieve_mutable + self.retrieve_immutable = retrieve_immutable + self.aggregate_mutable = aggregate_mutable + self.aggregate_immutable = aggregate_immutable + + @staticmethod + def _validate_dedicated_le_shared(dedicated: int, shared: int, dedicated_name: str, shared_name: str) -> None: + if dedicated > shared: + raise ValueError( + f"Dedicated budget must be <= shared query budget " + f"({dedicated_name} vs {shared_name}): {dedicated} > {shared}" + ) @property def query_mutable(self) -> int: @@ -337,8 +332,10 @@ def query_mutable(self) -> int: @query_mutable.setter def query_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_MUTABLE.value) + for attr, name in [("_retrieve_mutable", "retrieve_mutable"), ("_aggregate_mutable", "aggregate_mutable")]: + if hasattr(self, attr): + self._validate_dedicated_le_shared(getattr(self, attr), value, name, "query_mutable") self._query_mutable = value - self._validate_dedicated_within_shared() @property def query_immutable(self) -> int: @@ -347,8 +344,10 @@ def query_immutable(self) -> int: @query_immutable.setter def query_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_IMMUTABLE.value) + for attr, name in [("_retrieve_immutable", "retrieve_immutable"), ("_aggregate_immutable", "aggregate_immutable")]: + if hasattr(self, attr): + self._validate_dedicated_le_shared(getattr(self, attr), value, name, "query_immutable") self._query_immutable = value - self._validate_dedicated_within_shared() @property def retrieve_mutable(self) -> int: @@ -357,8 +356,8 @@ def retrieve_mutable(self) -> int: @retrieve_mutable.setter def retrieve_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_MUTABLE.value) + self._validate_dedicated_le_shared(value, self._query_mutable, "retrieve_mutable", "query_mutable") self._retrieve_mutable = value - self._validate_dedicated_within_shared() @property def retrieve_immutable(self) -> int: @@ -367,8 +366,8 @@ def retrieve_immutable(self) -> int: @retrieve_immutable.setter def retrieve_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE.value) + self._validate_dedicated_le_shared(value, self._query_immutable, "retrieve_immutable", "query_immutable") self._retrieve_immutable = value - self._validate_dedicated_within_shared() @property def aggregate_mutable(self) -> int: @@ -377,8 +376,8 @@ def aggregate_mutable(self) -> int: @aggregate_mutable.setter def aggregate_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_MUTABLE.value) + self._validate_dedicated_le_shared(value, self._query_mutable, "aggregate_mutable", "query_mutable") self._aggregate_mutable = value - self._validate_dedicated_within_shared() @property def aggregate_immutable(self) -> int: @@ -387,8 +386,8 @@ def aggregate_immutable(self) -> int: @aggregate_immutable.setter def aggregate_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE.value) + self._validate_dedicated_le_shared(value, self._query_immutable, "aggregate_immutable", "query_immutable") self._aggregate_immutable = value - self._validate_dedicated_within_shared() def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index 7debd38453..14c69c0fdd 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -152,7 +152,6 @@ def test_defaults(self) -> None: def test_setters_work_before_freeze(self) -> None: cs = ConcurrencySettings() cs.records.write = 10 - # Lower dedicated budgets before lowering shared (to keep invariant) cs.records.retrieve_mutable = 12 cs.records.retrieve_immutable = 4 cs.records.aggregate_mutable = 8 From 8c6af82ef2dfadf59f269862456be2906040cf5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:33:47 +0200 Subject: [PATCH 11/16] refactor(records): single _validate_budgets method with overrides Each setter passes its own name=value as a kwarg override. The method resolves each budget from overrides or self, then checks all four dedicated-vs-shared pairs. Init assigns directly and calls it once. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 56 +++++++++++++++------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index cb7b45b56a..ed17da4121 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -309,21 +309,31 @@ def __init__( aggregate_immutable: int, ) -> None: super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) - # Shared budgets first — dedicated setters validate against these. - self.query_mutable = query_mutable - self.query_immutable = query_immutable - self.retrieve_mutable = retrieve_mutable - self.retrieve_immutable = retrieve_immutable - self.aggregate_mutable = aggregate_mutable - self.aggregate_immutable = aggregate_immutable - - @staticmethod - def _validate_dedicated_le_shared(dedicated: int, shared: int, dedicated_name: str, shared_name: str) -> None: - if dedicated > shared: - raise ValueError( - f"Dedicated budget must be <= shared query budget " - f"({dedicated_name} vs {shared_name}): {dedicated} > {shared}" - ) + self._query_mutable = query_mutable + self._query_immutable = query_immutable + self._retrieve_mutable = retrieve_mutable + self._retrieve_immutable = retrieve_immutable + self._aggregate_mutable = aggregate_mutable + self._aggregate_immutable = aggregate_immutable + self._validate_budgets() + + def _validate_budgets(self, **overrides: int) -> None: + def resolve(name: str) -> int: + return overrides.get(name, getattr(self, f"_{name}")) + + for dedicated_name, shared_name in [ + ("retrieve_mutable", "query_mutable"), + ("retrieve_immutable", "query_immutable"), + ("aggregate_mutable", "query_mutable"), + ("aggregate_immutable", "query_immutable"), + ]: + dedicated = resolve(dedicated_name) + shared = resolve(shared_name) + if dedicated > shared: + raise ValueError( + f"Dedicated budget must be <= shared query budget " + f"({dedicated_name} vs {shared_name}): {dedicated} > {shared}" + ) @property def query_mutable(self) -> int: @@ -332,9 +342,7 @@ def query_mutable(self) -> int: @query_mutable.setter def query_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_MUTABLE.value) - for attr, name in [("_retrieve_mutable", "retrieve_mutable"), ("_aggregate_mutable", "aggregate_mutable")]: - if hasattr(self, attr): - self._validate_dedicated_le_shared(getattr(self, attr), value, name, "query_mutable") + self._validate_budgets(query_mutable=value) self._query_mutable = value @property @@ -344,9 +352,7 @@ def query_immutable(self) -> int: @query_immutable.setter def query_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.QUERY_IMMUTABLE.value) - for attr, name in [("_retrieve_immutable", "retrieve_immutable"), ("_aggregate_immutable", "aggregate_immutable")]: - if hasattr(self, attr): - self._validate_dedicated_le_shared(getattr(self, attr), value, name, "query_immutable") + self._validate_budgets(query_immutable=value) self._query_immutable = value @property @@ -356,7 +362,7 @@ def retrieve_mutable(self) -> int: @retrieve_mutable.setter def retrieve_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_MUTABLE.value) - self._validate_dedicated_le_shared(value, self._query_mutable, "retrieve_mutable", "query_mutable") + self._validate_budgets(retrieve_mutable=value) self._retrieve_mutable = value @property @@ -366,7 +372,7 @@ def retrieve_immutable(self) -> int: @retrieve_immutable.setter def retrieve_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE.value) - self._validate_dedicated_le_shared(value, self._query_immutable, "retrieve_immutable", "query_immutable") + self._validate_budgets(retrieve_immutable=value) self._retrieve_immutable = value @property @@ -376,7 +382,7 @@ def aggregate_mutable(self) -> int: @aggregate_mutable.setter def aggregate_mutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_MUTABLE.value) - self._validate_dedicated_le_shared(value, self._query_mutable, "aggregate_mutable", "query_mutable") + self._validate_budgets(aggregate_mutable=value) self._aggregate_mutable = value @property @@ -386,7 +392,7 @@ def aggregate_immutable(self) -> int: @aggregate_immutable.setter def aggregate_immutable(self, value: int) -> None: self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE.value) - self._validate_dedicated_le_shared(value, self._query_immutable, "aggregate_immutable", "query_immutable") + self._validate_budgets(aggregate_immutable=value) self._aggregate_immutable = value def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: From edc3096d352607c969260ef786aa525f3c7d0291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:37:43 +0200 Subject: [PATCH 12/16] refactor(records): move _check_frozen into _validate_budgets Setters are now just _validate_budgets(name=value) + assign. The validation method handles both the frozen check (from the override keys) and the budget invariant in one call. Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index ed17da4121..6f4f78e321 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -318,6 +318,9 @@ def __init__( self._validate_budgets() def _validate_budgets(self, **overrides: int) -> None: + for name in overrides: + self._check_frozen(name) + def resolve(name: str) -> int: return overrides.get(name, getattr(self, f"_{name}")) @@ -341,7 +344,6 @@ def query_mutable(self) -> int: @query_mutable.setter def query_mutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.QUERY_MUTABLE.value) self._validate_budgets(query_mutable=value) self._query_mutable = value @@ -351,7 +353,6 @@ def query_immutable(self) -> int: @query_immutable.setter def query_immutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.QUERY_IMMUTABLE.value) self._validate_budgets(query_immutable=value) self._query_immutable = value @@ -361,7 +362,6 @@ def retrieve_mutable(self) -> int: @retrieve_mutable.setter def retrieve_mutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_MUTABLE.value) self._validate_budgets(retrieve_mutable=value) self._retrieve_mutable = value @@ -371,7 +371,6 @@ def retrieve_immutable(self) -> int: @retrieve_immutable.setter def retrieve_immutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE.value) self._validate_budgets(retrieve_immutable=value) self._retrieve_immutable = value @@ -381,7 +380,6 @@ def aggregate_mutable(self) -> int: @aggregate_mutable.setter def aggregate_mutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_MUTABLE.value) self._validate_budgets(aggregate_mutable=value) self._aggregate_mutable = value @@ -391,7 +389,6 @@ def aggregate_immutable(self) -> int: @aggregate_immutable.setter def aggregate_immutable(self, value: int) -> None: - self._check_frozen(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE.value) self._validate_budgets(aggregate_immutable=value) self._aggregate_immutable = value From 4d17d9c2453dfa26bea3600bc8f54061588996b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:42:51 +0200 Subject: [PATCH 13/16] fix: resolve linting issues from pre-commit - ruff format: wrap long ternary expressions - mypy: add type: ignore[override] on _get_semaphore (wider signature than base class, intentional) and type: ignore[arg-type] on test-only BoomSemaphore fakes Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/_api/data_modeling/records.py | 20 +++++++++--- .../client/_sync_api/data_modeling/records.py | 2 +- .../tests_unit/test_utils/test_concurrency.py | 31 ++++++++++++------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index e14a4a25c6..fdc319d885 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -24,7 +24,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records" ) - def _get_semaphore( + def _get_semaphore( # type: ignore[override] self, operation: Literal["write", "delete", "query", "retrieve", "aggregate"], stream_type: StreamType = "immutable", @@ -32,7 +32,11 @@ def _get_semaphore( from cognite.client import global_config write_op = RecordsConcurrencyOperation.WRITE - query_op = RecordsConcurrencyOperation.QUERY_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.QUERY_IMMUTABLE + query_op = ( + RecordsConcurrencyOperation.QUERY_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.QUERY_IMMUTABLE + ) factory = global_config.concurrency_settings.records._semaphore_factory project = self._cognite_client.config.project @@ -42,10 +46,18 @@ def _get_semaphore( case "query": return factory(query_op, project) case "retrieve": - dedicated_op = RecordsConcurrencyOperation.RETRIEVE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + dedicated_op = ( + RecordsConcurrencyOperation.RETRIEVE_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE + ) return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) case "aggregate": - dedicated_op = RecordsConcurrencyOperation.AGGREGATE_MUTABLE if stream_type == "mutable" else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE + dedicated_op = ( + RecordsConcurrencyOperation.AGGREGATE_MUTABLE + if stream_type == "mutable" + else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE + ) return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project)) def _records_url(self, stream_id: str, suffix: str = "") -> str: diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index c071ec09cc..26a3eb4d7e 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -e6f3c4297768ee8cf21aa718b74794f4 +5920bce88870da17ef2034ac258b62ac This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/tests/tests_unit/test_utils/test_concurrency.py b/tests/tests_unit/test_utils/test_concurrency.py index 14c69c0fdd..37da6bf255 100644 --- a/tests/tests_unit/test_utils/test_concurrency.py +++ b/tests/tests_unit/test_utils/test_concurrency.py @@ -168,8 +168,15 @@ def test_setters_work_before_freeze(self) -> None: @pytest.mark.parametrize( "attr", - ["write", "query_mutable", "query_immutable", "retrieve_mutable", "retrieve_immutable", - "aggregate_mutable", "aggregate_immutable"], + [ + "write", + "query_mutable", + "query_immutable", + "retrieve_mutable", + "retrieve_immutable", + "aggregate_mutable", + "aggregate_immutable", + ], ) def test_setter_raises_after_freeze(self, attr: str) -> None: cs = ConcurrencySettings() @@ -200,13 +207,18 @@ def test_repr(self) -> None: def test_dedicated_exceeding_shared_raises_on_init(self, dedicated: str, shared: str) -> None: cs = ConcurrencySettings() defaults = { - "write": 20, "query_mutable": 30, "query_immutable": 10, - "retrieve_mutable": 20, "retrieve_immutable": 10, - "aggregate_mutable": 10, "aggregate_immutable": 5, + "write": 20, + "query_mutable": 30, + "query_immutable": 10, + "retrieve_mutable": 20, + "retrieve_immutable": 10, + "aggregate_mutable": 10, + "aggregate_immutable": 5, } shared_val = defaults[shared] defaults[dedicated] = shared_val + 1 from cognite.client.utils._concurrency import RecordsGlobalConcurrencyConfig + with pytest.raises(ValueError, match="Dedicated budget must be <= shared query budget"): RecordsGlobalConcurrencyConfig(cs, **defaults) @@ -257,10 +269,7 @@ async def test_semaphore_values(self, operation: RecordsConcurrencyOperation, ex assert sem._value == expected_value async def test_all_operations_produce_distinct_semaphores(self) -> None: - sems = { - op: self.cs.records._semaphore_factory(op, "proj-a") - for op in RecordsConcurrencyOperation - } + sems = {op: self.cs.records._semaphore_factory(op, "proj-a") for op in RecordsConcurrencyOperation} assert len(set(id(s) for s in sems.values())) == len(RecordsConcurrencyOperation) async def test_cache_hit(self) -> None: @@ -459,7 +468,7 @@ async def __aexit__(self, *exc: object) -> None: sem2 = asyncio.BoundedSemaphore(1) # Acquire order: sem0, sem1, sem2 # Release order (reversed): sem2, sem1 (boom!), sem0 — sem0 is never reached - h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) + h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) # type: ignore[arg-type] with pytest.raises(RuntimeError, match="boom on release"): async with h: @@ -484,7 +493,7 @@ async def __aexit__(self, *exc: object) -> None: sem0 = asyncio.BoundedSemaphore(1) sem1 = asyncio.BoundedSemaphore(1) sem2 = BoomSemaphore() # last acquired = first released = first to explode - h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) + h = HierarchicalBoundedSemaphore(sem0, sem1, sem2) # type: ignore[arg-type] with pytest.raises(RuntimeError, match="boom"): async with h: From 170a2dd95f0dc56eb32269b6a4715ad61b3446ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:45:49 +0200 Subject: [PATCH 14/16] fix: docstring indentation for custom-checks parser Co-Authored-By: Claude Opus 4.6 (1M context) --- cognite/client/utils/_concurrency.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 6f4f78e321..60244b9d42 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -278,13 +278,10 @@ class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): retrieve and aggregate endpoints each have a dedicated budget that is checked *before* the shared query budget (both must pass). - - **write**: Shared across ingest, upsert, and delete (same limit for both stream types). - - **query_mutable / query_immutable**: Shared read budget consumed by all query endpoints - (list/filter, sync, retrieve, aggregate). - - **retrieve_mutable / retrieve_immutable**: Dedicated budget for retrieve, checked - *in addition to* the shared query budget. - - **aggregate_mutable / aggregate_immutable**: Dedicated budget for aggregate, checked - *in addition to* the shared query budget. + - **write**: Shared across ingest, upsert, and delete (same for both stream types). + - **query_mutable / query_immutable**: Shared read budget for all query endpoints. + - **retrieve_mutable / retrieve_immutable**: Dedicated retrieve budget (+ shared query). + - **aggregate_mutable / aggregate_immutable**: Dedicated aggregate budget (+ shared query). Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. From e73d91b3e79790a8a4390764d51cbffd1957c652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:55:36 +0200 Subject: [PATCH 15/16] test(records): verify HierarchicalBoundedSemaphore through real HTTP client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three tests that push a HierarchicalBoundedSemaphore through the actual SDK _post → _http_client._with_retry → async with chain: - successful request acquires and releases both semaphores - HTTP 500 error still releases both semaphores - concurrent requests are limited by the tighter semaphore Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test_concurrency_api_routing.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/tests/tests_unit/test_utils/test_concurrency_api_routing.py b/tests/tests_unit/test_utils/test_concurrency_api_routing.py index eeab129163..f8cf3d5e4b 100644 --- a/tests/tests_unit/test_utils/test_concurrency_api_routing.py +++ b/tests/tests_unit/test_utils/test_concurrency_api_routing.py @@ -9,16 +9,20 @@ from __future__ import annotations +import asyncio import re from collections.abc import Awaitable, Callable, Iterator from typing import Any +import httpx import pytest from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient from cognite.client.data_classes.data_modeling.ids import NodeId from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.exceptions import CogniteAPIError +from cognite.client.utils._concurrency import HierarchicalBoundedSemaphore from tests.utils import fresh_concurrency_state SemCall = tuple[str, str, str] # (sub_config_name (eg 'general'), operation, project) @@ -213,6 +217,85 @@ async def test_write_routing( assert async_client.config.project in {proj for _, proj in records_spy} +class TestHierarchicalSemaphoreThroughHTTPClient: + """Verify that HierarchicalBoundedSemaphore works through the real SDK HTTP chain. + + The type hints say asyncio.BoundedSemaphore, but at runtime the HTTP client + just does ``async with semaphore``. These tests confirm that a + HierarchicalBoundedSemaphore actually acquires and releases both inner + semaphores when passed through _post. + """ + + async def test_post_with_hierarchical_semaphore_succeeds( + self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock + ) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + dedicated = asyncio.BoundedSemaphore(1) + query = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(dedicated, query) + + await async_client.data_modeling.records._post( + url_path="/streams/test/records/filter", + json={"limit": 10}, + semaphore=h, + ) + assert dedicated._value == 1, "dedicated semaphore should be released after request" + assert query._value == 1, "query semaphore should be released after request" + + async def test_post_with_hierarchical_semaphore_releases_on_http_error( + self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock + ) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=500, json={"error": {"message": "fail", "code": 500}}) + dedicated = asyncio.BoundedSemaphore(1) + query = asyncio.BoundedSemaphore(1) + h = HierarchicalBoundedSemaphore(dedicated, query) + + with pytest.raises(CogniteAPIError): + await async_client.data_modeling.records._post( + url_path="/streams/test/records/filter", + json={"limit": 10}, + semaphore=h, + ) + assert dedicated._value == 1, "dedicated semaphore should be released after error" + assert query._value == 1, "query semaphore should be released after error" + + async def test_hierarchical_semaphore_limits_concurrent_requests( + self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock + ) -> None: + hold = asyncio.Event() + + async def slow_response(request: Any) -> Any: + await hold.wait() + return httpx.Response(200, json={"items": []}) + + httpx_mock.add_callback(slow_response, method="POST", url=re.compile(r".*"), is_optional=True) + httpx_mock.add_callback(slow_response, method="POST", url=re.compile(r".*"), is_optional=True) + httpx_mock.add_callback(slow_response, method="POST", url=re.compile(r".*"), is_optional=True) + + dedicated = asyncio.BoundedSemaphore(1) + query = asyncio.BoundedSemaphore(2) + + async def make_request() -> None: + h = HierarchicalBoundedSemaphore(dedicated, query) + await async_client.data_modeling.records._post( + url_path="/streams/test/records/filter", + json={"limit": 10}, + semaphore=h, + ) + + t1 = asyncio.create_task(make_request()) + t2 = asyncio.create_task(make_request()) + await asyncio.sleep(0.05) + + assert dedicated._value == 0, "dedicated(1) should be fully consumed by first request" + assert query._value == 1, "query(2) should have 1 slot consumed" + + hold.set() + await asyncio.gather(t1, t2) + assert dedicated._value == 1 + assert query._value == 2 + + class TestStrictFixtureCatchesMissingSemaphore: """Sanity check that the suite-wide strict fixture in tests/conftest.py still works. From 4e3059f4b1d55ed861e91c777e30f6d78fc76eb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 15 Jun 2026 15:57:02 +0200 Subject: [PATCH 16/16] test(records): simulate real endpoint patterns for all operation types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Seven tests exercising the exact _get_semaphore → _post chain that list/retrieve/aggregate endpoints will use, for both mutable and immutable streams. Verifies correct semaphore types, values, release after request, and that retrieve shares the query semaphore with list. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test_concurrency_api_routing.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/tests_unit/test_utils/test_concurrency_api_routing.py b/tests/tests_unit/test_utils/test_concurrency_api_routing.py index f8cf3d5e4b..f4084928c3 100644 --- a/tests/tests_unit/test_utils/test_concurrency_api_routing.py +++ b/tests/tests_unit/test_utils/test_concurrency_api_routing.py @@ -296,6 +296,101 @@ async def make_request() -> None: assert query._value == 2 +class TestRecordsSemaphoreEndpointPatterns: + """Simulate the exact patterns that records endpoints will use: + - list/sync: _get_semaphore("query", stream_type) → override_semaphore in _list + - retrieve: _get_semaphore("retrieve", stream_type) → override_semaphore in _post (hierarchical) + - aggregate: _get_semaphore("aggregate", stream_type) → override_semaphore in _post (hierarchical) + """ + + @pytest.fixture(autouse=True) + def _fresh_state(self) -> Iterator[None]: + with fresh_concurrency_state(): + yield + + async def test_list_pattern_mutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("query", "mutable") + assert isinstance(sem, asyncio.BoundedSemaphore) + assert sem._value == 30 + + await records_api._post(url_path="/streams/s1/records/filter", json={"limit": 10}, semaphore=sem) + assert sem._value == 30 + + async def test_list_pattern_immutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("query", "immutable") + assert isinstance(sem, asyncio.BoundedSemaphore) + assert sem._value == 10 + + await records_api._post(url_path="/streams/s1/records/filter", json={"limit": 10}, semaphore=sem) + assert sem._value == 10 + + async def test_retrieve_pattern_mutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("retrieve", "mutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + await records_api._post(url_path="/streams/s1/records/retrieve", json={"items": []}, semaphore=sem) + dedicated, query = sem._semaphores + assert dedicated._value == 20 + assert query._value == 30 + + async def test_retrieve_pattern_immutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("retrieve", "immutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + await records_api._post(url_path="/streams/s1/records/retrieve", json={"items": []}, semaphore=sem) + dedicated, query = sem._semaphores + assert dedicated._value == 10 + assert query._value == 10 + + async def test_aggregate_pattern_mutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("aggregate", "mutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + await records_api._post(url_path="/streams/s1/records/aggregate", json={}, semaphore=sem) + dedicated, query = sem._semaphores + assert dedicated._value == 10 + assert query._value == 30 + + async def test_aggregate_pattern_immutable(self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}) + records_api = async_client.data_modeling.records + sem = records_api._get_semaphore("aggregate", "immutable") + assert isinstance(sem, HierarchicalBoundedSemaphore) + + await records_api._post(url_path="/streams/s1/records/aggregate", json={}, semaphore=sem) + dedicated, query = sem._semaphores + assert dedicated._value == 5 + assert query._value == 10 + + async def test_retrieve_and_list_share_query_semaphore_through_post( + self, async_client: AsyncCogniteClient, httpx_mock: HTTPXMock + ) -> None: + """A retrieve request and a list request against the same stream type + must compete for the same query semaphore.""" + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}, is_optional=True) + httpx_mock.add_response(method="POST", url=re.compile(r".*"), status_code=200, json={"items": []}, is_optional=True) + records_api = async_client.data_modeling.records + + retrieve_sem = records_api._get_semaphore("retrieve", "mutable") + list_sem = records_api._get_semaphore("query", "mutable") + + assert isinstance(retrieve_sem, HierarchicalBoundedSemaphore) + assert retrieve_sem._semaphores[1] is list_sem + + await records_api._post(url_path="/streams/s1/records/retrieve", json={"items": []}, semaphore=retrieve_sem) + await records_api._post(url_path="/streams/s1/records/filter", json={"limit": 10}, semaphore=list_sem) + + class TestStrictFixtureCatchesMissingSemaphore: """Sanity check that the suite-wide strict fixture in tests/conftest.py still works.