Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
889fed8
feat(records): stream-type-aware concurrency limits
andersfylling Jun 15, 2026
d6cc7e6
refactor(records): make write/delete semaphore mapping explicit
andersfylling Jun 15, 2026
10edd2b
refactor(records): unify semaphore routing into single _get_semaphore
andersfylling Jun 15, 2026
47a08fe
refactor(records): use enum values in _check_frozen calls
andersfylling Jun 15, 2026
9ee366c
feat(records): add HierarchicalBoundedSemaphore for multi-budget enfo…
andersfylling Jun 15, 2026
c4f955f
test(records): add tests for HierarchicalBoundedSemaphore and _get_se…
andersfylling Jun 15, 2026
c51c4e8
refactor(records): hoist write_op and query_op above match statement
andersfylling Jun 15, 2026
58ba765
fix(records): make HierarchicalBoundedSemaphore cancellation- and exc…
andersfylling Jun 15, 2026
9043db8
feat(records): validate dedicated budget <= shared query budget
andersfylling Jun 15, 2026
83fa7b7
refactor(records): per-setter validation, init reuses setters
andersfylling Jun 15, 2026
8c6af82
refactor(records): single _validate_budgets method with overrides
andersfylling Jun 15, 2026
edc3096
refactor(records): move _check_frozen into _validate_budgets
andersfylling Jun 15, 2026
4d17d9c
fix: resolve linting issues from pre-commit
andersfylling Jun 15, 2026
170a2dd
fix: docstring indentation for custom-checks parser
andersfylling Jun 15, 2026
e73d91b
test(records): verify HierarchicalBoundedSemaphore through real HTTP …
andersfylling Jun 15, 2026
4e3059f
test(records): simulate real endpoint patterns for all operation types
andersfylling Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import asyncio
from collections.abc import Sequence
from typing import TYPE_CHECKING, ClassVar, Literal
from typing import TYPE_CHECKING, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
from cognite.client.utils._concurrency import HierarchicalBoundedSemaphore, RecordsConcurrencyOperation
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._url import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig

StreamType = Literal["mutable", "immutable"]


class RecordsAPI(APIClient):
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
Expand All @@ -22,18 +24,42 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records"
)

_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
"write": RecordsConcurrencyOperation.WRITE,
"delete": RecordsConcurrencyOperation.WRITE,
}

def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
def _get_semaphore( # type: ignore[override]
self,
operation: Literal["write", "delete", "query", "retrieve", "aggregate"],
stream_type: StreamType = "immutable",
) -> asyncio.BoundedSemaphore | HierarchicalBoundedSemaphore:
from cognite.client import global_config

return global_config.concurrency_settings.records._semaphore_factory(
self._OPERATION_TO_RATE_LIMIT[operation], project=self._cognite_client.config.project
write_op = RecordsConcurrencyOperation.WRITE
query_op = (
RecordsConcurrencyOperation.QUERY_MUTABLE
if stream_type == "mutable"
else RecordsConcurrencyOperation.QUERY_IMMUTABLE
)

factory = global_config.concurrency_settings.records._semaphore_factory
project = self._cognite_client.config.project
match operation:
case "write" | "delete":
return factory(write_op, project)
case "query":
return factory(query_op, project)
case "retrieve":
dedicated_op = (
RecordsConcurrencyOperation.RETRIEVE_MUTABLE
if stream_type == "mutable"
else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE
)
return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project))
case "aggregate":
dedicated_op = (
RecordsConcurrencyOperation.AGGREGATE_MUTABLE
if stream_type == "mutable"
else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE
)
return HierarchicalBoundedSemaphore(factory(dedicated_op, project), factory(query_op, project))

def _records_url(self, stream_id: str, suffix: str = "") -> str:
# Encode only stream_id; the suffix is a literal path segment (e.g. "/upsert"),
# so it must not be percent-encoded.
Expand Down
4 changes: 3 additions & 1 deletion cognite/client/_sync_api/data_modeling/records.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

183 changes: 178 additions & 5 deletions cognite/client/utils/_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,26 +222,172 @@ def __repr__(self) -> str:
)


class HierarchicalBoundedSemaphore:
"""Acquires multiple semaphores in order, releases in reverse.

Used to model the Records API's hierarchical rate limits where an endpoint
(e.g. retrieve) must pass both its dedicated budget and the shared query budget.

If acquisition is interrupted (e.g. by cancellation), all already-acquired
semaphores are released before the exception propagates. Similarly, if a
release raises, the remaining semaphores are still released.
"""

def __init__(self, *semaphores: asyncio.BoundedSemaphore) -> None:
self._semaphores = semaphores

async def __aenter__(self) -> None:
acquired: list[asyncio.BoundedSemaphore] = []
try:
for sem in self._semaphores:
await sem.__aenter__()
acquired.append(sem)
except BaseException:
for sem in reversed(acquired):
await sem.__aexit__(None, None, None)
raise

async def __aexit__(self, *exc: Any) -> None:
first_err: BaseException | None = None
for sem in reversed(self._semaphores):
try:
await sem.__aexit__(*exc)
except BaseException as e:
if first_err is None:
first_err = e
if first_err is not None:
raise first_err


class RecordsConcurrencyOperation(Enum):
WRITE = "write"
QUERY_MUTABLE = "query_mutable"
QUERY_IMMUTABLE = "query_immutable"
RETRIEVE_MUTABLE = "retrieve_mutable"
RETRIEVE_IMMUTABLE = "retrieve_immutable"
AGGREGATE_MUTABLE = "aggregate_mutable"
AGGREGATE_IMMUTABLE = "aggregate_immutable"


class RecordsGlobalConcurrencyConfig(ConcurrencyConfig):
"""
Global concurrency settings for the Records API. Named "global" to distinguish from
future per-endpoint rate limits that may be added later.
Global concurrency settings for the Records API.

The Records API has separate rate-limit budgets for reads and writes, and read budgets
differ between mutable and immutable streams. Read budgets are hierarchical: the
retrieve and aggregate endpoints each have a dedicated budget that is checked *before*
the shared query budget (both must pass).

- **write**: Shared across ingest, upsert, and delete (same for both stream types).
- **query_mutable / query_immutable**: Shared read budget for all query endpoints.
- **retrieve_mutable / retrieve_immutable**: Dedicated retrieve budget (+ shared query).
- **aggregate_mutable / aggregate_immutable**: Dedicated aggregate budget (+ shared query).

Args:
concurrency_settings (ConcurrencySettings): Reference to the parent settings object.
write (int): Maximum concurrent write requests (ingest, delete).
write (int): Maximum concurrent write requests (ingest, upsert, delete).
query_mutable (int): Maximum concurrent query requests against mutable streams.
query_immutable (int): Maximum concurrent query requests against immutable streams.
retrieve_mutable (int): Dedicated retrieve concurrency for mutable streams.
retrieve_immutable (int): Dedicated retrieve concurrency for immutable streams.
aggregate_mutable (int): Dedicated aggregate concurrency for mutable streams.
aggregate_immutable (int): Dedicated aggregate concurrency for immutable streams.
"""

def __init__(
self,
concurrency_settings: ConcurrencySettings,
write: int,
query_mutable: int,
query_immutable: int,
retrieve_mutable: int,
retrieve_immutable: int,
aggregate_mutable: int,
aggregate_immutable: int,
) -> None:
super().__init__(concurrency_settings, "records", read=0, write=write, delete=0)
self._query_mutable = query_mutable
self._query_immutable = query_immutable
self._retrieve_mutable = retrieve_mutable
self._retrieve_immutable = retrieve_immutable
self._aggregate_mutable = aggregate_mutable
self._aggregate_immutable = aggregate_immutable
self._validate_budgets()

def _validate_budgets(self, **overrides: int) -> None:
for name in overrides:
self._check_frozen(name)

def resolve(name: str) -> int:
return overrides.get(name, getattr(self, f"_{name}"))

for dedicated_name, shared_name in [
("retrieve_mutable", "query_mutable"),
("retrieve_immutable", "query_immutable"),
("aggregate_mutable", "query_mutable"),
("aggregate_immutable", "query_immutable"),
]:
dedicated = resolve(dedicated_name)
shared = resolve(shared_name)
if dedicated > shared:
raise ValueError(
f"Dedicated budget must be <= shared query budget "
f"({dedicated_name} vs {shared_name}): {dedicated} > {shared}"
)

@property
def query_mutable(self) -> int:
return self._query_mutable

@query_mutable.setter
def query_mutable(self, value: int) -> None:
self._validate_budgets(query_mutable=value)
self._query_mutable = value

@property
def query_immutable(self) -> int:
return self._query_immutable

@query_immutable.setter
def query_immutable(self, value: int) -> None:
self._validate_budgets(query_immutable=value)
self._query_immutable = value

@property
def retrieve_mutable(self) -> int:
return self._retrieve_mutable

@retrieve_mutable.setter
def retrieve_mutable(self, value: int) -> None:
self._validate_budgets(retrieve_mutable=value)
self._retrieve_mutable = value

@property
def retrieve_immutable(self) -> int:
return self._retrieve_immutable

@retrieve_immutable.setter
def retrieve_immutable(self, value: int) -> None:
self._validate_budgets(retrieve_immutable=value)
self._retrieve_immutable = value

@property
def aggregate_mutable(self) -> int:
return self._aggregate_mutable

@aggregate_mutable.setter
def aggregate_mutable(self, value: int) -> None:
self._validate_budgets(aggregate_mutable=value)
self._aggregate_mutable = value

@property
def aggregate_immutable(self) -> int:
return self._aggregate_immutable

@aggregate_immutable.setter
def aggregate_immutable(self, value: int) -> None:
self._validate_budgets(aggregate_immutable=value)
self._aggregate_immutable = value

def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore:
key = (operation.value, project, asyncio.get_running_loop())
Expand All @@ -254,13 +400,31 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st
match operation:
case RecordsConcurrencyOperation.WRITE:
sem = asyncio.BoundedSemaphore(self._write)
case RecordsConcurrencyOperation.QUERY_MUTABLE:
sem = asyncio.BoundedSemaphore(self._query_mutable)
case RecordsConcurrencyOperation.QUERY_IMMUTABLE:
sem = asyncio.BoundedSemaphore(self._query_immutable)
case RecordsConcurrencyOperation.RETRIEVE_MUTABLE:
sem = asyncio.BoundedSemaphore(self._retrieve_mutable)
case RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE:
sem = asyncio.BoundedSemaphore(self._retrieve_immutable)
case RecordsConcurrencyOperation.AGGREGATE_MUTABLE:
sem = asyncio.BoundedSemaphore(self._aggregate_mutable)
case RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE:
sem = asyncio.BoundedSemaphore(self._aggregate_immutable)
case _:
assert_never(operation)
self._semaphore_cache[key] = sem
return sem

def __repr__(self) -> str:
return f"Concurrency[records](write={self._write})"
return (
f"Concurrency[records]("
f"write={self._write}, "
f"query_mutable={self._query_mutable}, query_immutable={self._query_immutable}, "
f"retrieve_mutable={self._retrieve_mutable}, retrieve_immutable={self._retrieve_immutable}, "
f"aggregate_mutable={self._aggregate_mutable}, aggregate_immutable={self._aggregate_immutable})"
)


class FileConcurrencyConfig(ConcurrencyConfig):
Expand Down Expand Up @@ -425,7 +589,16 @@ def __init__(self) -> None:
write_schema=1,
)
self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15)
self._records = RecordsGlobalConcurrencyConfig(self, write=20)
self._records = RecordsGlobalConcurrencyConfig(
self,
write=20,
query_mutable=30,
query_immutable=10,
retrieve_mutable=20,
retrieve_immutable=10,
aggregate_mutable=10,
aggregate_immutable=5,
)

@functools.cached_property
def _all_concurrency_configs(self) -> list[ConcurrencyConfig]:
Expand Down
Loading
Loading