Skip to content

Commit df05981

Browse files
Merge branch 'master' into CDF-28136-fix-workflow-integration-test-teardown
2 parents cce80dc + a2722cd commit df05981

3 files changed

Lines changed: 16 additions & 21 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import asyncio
44
from collections.abc import Sequence
5-
from typing import TYPE_CHECKING, ClassVar, Literal
5+
from typing import TYPE_CHECKING, Any, Literal
66

77
from cognite.client._api_client import APIClient
88
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
9-
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
109
from cognite.client.utils._experimental import FeaturePreviewWarning
1110
from cognite.client.utils._url import interpolate_and_url_encode
1211

@@ -22,16 +21,11 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2221
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records"
2322
)
2423

25-
_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
26-
"write": RecordsConcurrencyOperation.WRITE,
27-
"delete": RecordsConcurrencyOperation.WRITE,
28-
}
29-
30-
def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
24+
def _get_semaphore(self, operation: Any) -> asyncio.BoundedSemaphore:
3125
from cognite.client import global_config
3226

3327
return global_config.concurrency_settings.records._semaphore_factory(
34-
self._OPERATION_TO_RATE_LIMIT[operation], project=self._cognite_client.config.project
28+
operation, project=self._cognite_client.config.project
3529
)
3630

3731
def _records_url(self, stream_id: str, suffix: str = "") -> str:

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/utils/_concurrency.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from abc import ABC, abstractmethod
88
from collections import UserList
99
from collections.abc import Callable, Coroutine
10-
from enum import Enum
1110
from typing import (
1211
Any,
1312
Literal,
@@ -222,45 +221,47 @@ def __repr__(self) -> str:
222221
)
223222

224223

225-
class RecordsConcurrencyOperation(Enum):
226-
WRITE = "write"
227-
228-
229224
class RecordsGlobalConcurrencyConfig(ConcurrencyConfig):
230225
"""
231226
Global concurrency settings for the Records API. Named "global" to distinguish from
232227
future per-endpoint rate limits that may be added later.
233228
234229
Args:
235230
concurrency_settings (ConcurrencySettings): Reference to the parent settings object.
231+
read (int): Maximum concurrent read requests (list, retrieve, sync).
236232
write (int): Maximum concurrent write requests (ingest, delete).
237233
"""
238234

239235
def __init__(
240236
self,
241237
concurrency_settings: ConcurrencySettings,
238+
read: int,
242239
write: int,
243240
) -> None:
244-
super().__init__(concurrency_settings, "records", read=0, write=write, delete=0)
241+
super().__init__(concurrency_settings, "records", read=read, write=write, delete=0)
245242

246-
def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore:
247-
key = (operation.value, project, asyncio.get_running_loop())
243+
def _semaphore_factory(
244+
self, operation: Literal["read", "write", "delete"], project: str
245+
) -> asyncio.BoundedSemaphore:
246+
key = (operation, project, asyncio.get_running_loop())
248247
if key in self._semaphore_cache:
249248
return self._semaphore_cache[key]
250249

251250
from cognite.client import global_config
252251

253252
global_config.concurrency_settings._freeze()
254253
match operation:
255-
case RecordsConcurrencyOperation.WRITE:
254+
case "read":
255+
sem = asyncio.BoundedSemaphore(self._read)
256+
case "write" | "delete":
256257
sem = asyncio.BoundedSemaphore(self._write)
257258
case _:
258259
assert_never(operation)
259260
self._semaphore_cache[key] = sem
260261
return sem
261262

262263
def __repr__(self) -> str:
263-
return f"Concurrency[records](write={self._write})"
264+
return f"Concurrency[records](read={self._read}, write={self._write})"
264265

265266

266267
class FileConcurrencyConfig(ConcurrencyConfig):
@@ -425,7 +426,7 @@ def __init__(self) -> None:
425426
write_schema=1,
426427
)
427428
self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15)
428-
self._records = RecordsGlobalConcurrencyConfig(self, write=20)
429+
self._records = RecordsGlobalConcurrencyConfig(self, read=4, write=20)
429430

430431
@functools.cached_property
431432
def _all_concurrency_configs(self) -> list[ConcurrencyConfig]:

0 commit comments

Comments
 (0)