|
7 | 7 | from abc import ABC, abstractmethod |
8 | 8 | from collections import UserList |
9 | 9 | from collections.abc import Callable, Coroutine |
10 | | -from enum import Enum |
11 | 10 | from typing import ( |
12 | 11 | Any, |
13 | 12 | Literal, |
@@ -222,45 +221,47 @@ def __repr__(self) -> str: |
222 | 221 | ) |
223 | 222 |
|
224 | 223 |
|
225 | | -class RecordsConcurrencyOperation(Enum): |
226 | | - WRITE = "write" |
227 | | - |
228 | | - |
229 | 224 | class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): |
230 | 225 | """ |
231 | 226 | Global concurrency settings for the Records API. Named "global" to distinguish from |
232 | 227 | future per-endpoint rate limits that may be added later. |
233 | 228 |
|
234 | 229 | Args: |
235 | 230 | concurrency_settings (ConcurrencySettings): Reference to the parent settings object. |
| 231 | + read (int): Maximum concurrent read requests (list, retrieve, sync). |
236 | 232 | write (int): Maximum concurrent write requests (ingest, delete). |
237 | 233 | """ |
238 | 234 |
|
239 | 235 | def __init__( |
240 | 236 | self, |
241 | 237 | concurrency_settings: ConcurrencySettings, |
| 238 | + read: int, |
242 | 239 | write: int, |
243 | 240 | ) -> None: |
244 | | - super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) |
| 241 | + super().__init__(concurrency_settings, "records", read=read, write=write, delete=0) |
245 | 242 |
|
246 | | - def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: |
247 | | - key = (operation.value, project, asyncio.get_running_loop()) |
| 243 | + def _semaphore_factory( |
| 244 | + self, operation: Literal["read", "write", "delete"], project: str |
| 245 | + ) -> asyncio.BoundedSemaphore: |
| 246 | + key = (operation, project, asyncio.get_running_loop()) |
248 | 247 | if key in self._semaphore_cache: |
249 | 248 | return self._semaphore_cache[key] |
250 | 249 |
|
251 | 250 | from cognite.client import global_config |
252 | 251 |
|
253 | 252 | global_config.concurrency_settings._freeze() |
254 | 253 | match operation: |
255 | | - case RecordsConcurrencyOperation.WRITE: |
| 254 | + case "read": |
| 255 | + sem = asyncio.BoundedSemaphore(self._read) |
| 256 | + case "write" | "delete": |
256 | 257 | sem = asyncio.BoundedSemaphore(self._write) |
257 | 258 | case _: |
258 | 259 | assert_never(operation) |
259 | 260 | self._semaphore_cache[key] = sem |
260 | 261 | return sem |
261 | 262 |
|
262 | 263 | def __repr__(self) -> str: |
263 | | - return f"Concurrency[records](write={self._write})" |
| 264 | + return f"Concurrency[records](read={self._read}, write={self._write})" |
264 | 265 |
|
265 | 266 |
|
266 | 267 | class FileConcurrencyConfig(ConcurrencyConfig): |
@@ -425,7 +426,7 @@ def __init__(self) -> None: |
425 | 426 | write_schema=1, |
426 | 427 | ) |
427 | 428 | self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) |
428 | | - self._records = RecordsGlobalConcurrencyConfig(self, write=20) |
| 429 | + self._records = RecordsGlobalConcurrencyConfig(self, read=4, write=20) |
429 | 430 |
|
430 | 431 | @functools.cached_property |
431 | 432 | def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: |
|
0 commit comments