Skip to content

Commit 889fed8

Browse files
andersfyllingclaude
andcommitted
feat(records): stream-type-aware concurrency limits
Model the Records API's hierarchical rate-limit budgets for mutable and immutable streams. Write semaphore (20) is shared across both stream types. Query, retrieve, and aggregate each have separate mutable and immutable semaphores matching the API's documented limits. Retrieve and aggregate have dedicated budgets checked before the shared query budget (both must pass). The semaphore helpers on RecordsAPI are wired up for all three tiers, ready for use by endpoint implementations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7934665 commit 889fed8

5 files changed

Lines changed: 315 additions & 12 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from cognite.client import AsyncCogniteClient
1515
from cognite.client.config import ClientConfig
1616

17+
StreamType = Literal["mutable", "immutable"]
18+
1719

1820
class RecordsAPI(APIClient):
1921
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
@@ -22,16 +24,47 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2224
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records"
2325
)
2426

25-
_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
26-
"write": RecordsConcurrencyOperation.WRITE,
27-
"delete": RecordsConcurrencyOperation.WRITE,
28-
}
29-
3027
def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
3128
from cognite.client import global_config
3229

3330
return global_config.concurrency_settings.records._semaphore_factory(
34-
self._OPERATION_TO_RATE_LIMIT[operation], project=self._cognite_client.config.project
31+
RecordsConcurrencyOperation.WRITE, project=self._cognite_client.config.project
32+
)
33+
34+
def _get_query_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore:
35+
from cognite.client import global_config
36+
37+
op = (
38+
RecordsConcurrencyOperation.QUERY_MUTABLE
39+
if stream_type == "mutable"
40+
else RecordsConcurrencyOperation.QUERY_IMMUTABLE
41+
)
42+
return global_config.concurrency_settings.records._semaphore_factory(
43+
op, project=self._cognite_client.config.project
44+
)
45+
46+
def _get_retrieve_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore:
47+
from cognite.client import global_config
48+
49+
op = (
50+
RecordsConcurrencyOperation.RETRIEVE_MUTABLE
51+
if stream_type == "mutable"
52+
else RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE
53+
)
54+
return global_config.concurrency_settings.records._semaphore_factory(
55+
op, project=self._cognite_client.config.project
56+
)
57+
58+
def _get_aggregate_semaphore(self, stream_type: StreamType) -> asyncio.BoundedSemaphore:
59+
from cognite.client import global_config
60+
61+
op = (
62+
RecordsConcurrencyOperation.AGGREGATE_MUTABLE
63+
if stream_type == "mutable"
64+
else RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE
65+
)
66+
return global_config.concurrency_settings.records._semaphore_factory(
67+
op, project=self._cognite_client.config.project
3568
)
3669

3770
def _records_url(self, stream_id: str, suffix: str = "") -> str:

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/utils/_concurrency.py

Lines changed: 122 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,24 +224,114 @@ def __repr__(self) -> str:
224224

225225
class RecordsConcurrencyOperation(Enum):
226226
WRITE = "write"
227+
QUERY_MUTABLE = "query_mutable"
228+
QUERY_IMMUTABLE = "query_immutable"
229+
RETRIEVE_MUTABLE = "retrieve_mutable"
230+
RETRIEVE_IMMUTABLE = "retrieve_immutable"
231+
AGGREGATE_MUTABLE = "aggregate_mutable"
232+
AGGREGATE_IMMUTABLE = "aggregate_immutable"
227233

228234

229235
class RecordsGlobalConcurrencyConfig(ConcurrencyConfig):
230236
"""
231-
Global concurrency settings for the Records API. Named "global" to distinguish from
232-
future per-endpoint rate limits that may be added later.
237+
Global concurrency settings for the Records API.
238+
239+
The Records API has separate rate-limit budgets for reads and writes, and read budgets
240+
differ between mutable and immutable streams. Read budgets are hierarchical: the
241+
retrieve and aggregate endpoints each have a dedicated budget that is checked *before*
242+
the shared query budget (both must pass).
243+
244+
- **write**: Shared across ingest, upsert, and delete (same limit for both stream types).
245+
- **query_mutable / query_immutable**: Shared read budget consumed by all query endpoints
246+
(list/filter, sync, retrieve, aggregate).
247+
- **retrieve_mutable / retrieve_immutable**: Dedicated budget for retrieve, checked
248+
*in addition to* the shared query budget.
249+
- **aggregate_mutable / aggregate_immutable**: Dedicated budget for aggregate, checked
250+
*in addition to* the shared query budget.
233251
234252
Args:
235253
concurrency_settings (ConcurrencySettings): Reference to the parent settings object.
236-
write (int): Maximum concurrent write requests (ingest, delete).
254+
write (int): Maximum concurrent write requests (ingest, upsert, delete).
255+
query_mutable (int): Maximum concurrent query requests against mutable streams.
256+
query_immutable (int): Maximum concurrent query requests against immutable streams.
257+
retrieve_mutable (int): Dedicated retrieve concurrency for mutable streams.
258+
retrieve_immutable (int): Dedicated retrieve concurrency for immutable streams.
259+
aggregate_mutable (int): Dedicated aggregate concurrency for mutable streams.
260+
aggregate_immutable (int): Dedicated aggregate concurrency for immutable streams.
237261
"""
238262

239263
def __init__(
240264
self,
241265
concurrency_settings: ConcurrencySettings,
242266
write: int,
267+
query_mutable: int,
268+
query_immutable: int,
269+
retrieve_mutable: int,
270+
retrieve_immutable: int,
271+
aggregate_mutable: int,
272+
aggregate_immutable: int,
243273
) -> None:
244274
super().__init__(concurrency_settings, "records", read=0, write=write, delete=0)
275+
self._query_mutable = query_mutable
276+
self._query_immutable = query_immutable
277+
self._retrieve_mutable = retrieve_mutable
278+
self._retrieve_immutable = retrieve_immutable
279+
self._aggregate_mutable = aggregate_mutable
280+
self._aggregate_immutable = aggregate_immutable
281+
282+
@property
283+
def query_mutable(self) -> int:
284+
return self._query_mutable
285+
286+
@query_mutable.setter
287+
def query_mutable(self, value: int) -> None:
288+
self._check_frozen("query_mutable")
289+
self._query_mutable = value
290+
291+
@property
292+
def query_immutable(self) -> int:
293+
return self._query_immutable
294+
295+
@query_immutable.setter
296+
def query_immutable(self, value: int) -> None:
297+
self._check_frozen("query_immutable")
298+
self._query_immutable = value
299+
300+
@property
301+
def retrieve_mutable(self) -> int:
302+
return self._retrieve_mutable
303+
304+
@retrieve_mutable.setter
305+
def retrieve_mutable(self, value: int) -> None:
306+
self._check_frozen("retrieve_mutable")
307+
self._retrieve_mutable = value
308+
309+
@property
310+
def retrieve_immutable(self) -> int:
311+
return self._retrieve_immutable
312+
313+
@retrieve_immutable.setter
314+
def retrieve_immutable(self, value: int) -> None:
315+
self._check_frozen("retrieve_immutable")
316+
self._retrieve_immutable = value
317+
318+
@property
319+
def aggregate_mutable(self) -> int:
320+
return self._aggregate_mutable
321+
322+
@aggregate_mutable.setter
323+
def aggregate_mutable(self, value: int) -> None:
324+
self._check_frozen("aggregate_mutable")
325+
self._aggregate_mutable = value
326+
327+
@property
328+
def aggregate_immutable(self) -> int:
329+
return self._aggregate_immutable
330+
331+
@aggregate_immutable.setter
332+
def aggregate_immutable(self, value: int) -> None:
333+
self._check_frozen("aggregate_immutable")
334+
self._aggregate_immutable = value
245335

246336
def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore:
247337
key = (operation.value, project, asyncio.get_running_loop())
@@ -254,13 +344,31 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st
254344
match operation:
255345
case RecordsConcurrencyOperation.WRITE:
256346
sem = asyncio.BoundedSemaphore(self._write)
347+
case RecordsConcurrencyOperation.QUERY_MUTABLE:
348+
sem = asyncio.BoundedSemaphore(self._query_mutable)
349+
case RecordsConcurrencyOperation.QUERY_IMMUTABLE:
350+
sem = asyncio.BoundedSemaphore(self._query_immutable)
351+
case RecordsConcurrencyOperation.RETRIEVE_MUTABLE:
352+
sem = asyncio.BoundedSemaphore(self._retrieve_mutable)
353+
case RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE:
354+
sem = asyncio.BoundedSemaphore(self._retrieve_immutable)
355+
case RecordsConcurrencyOperation.AGGREGATE_MUTABLE:
356+
sem = asyncio.BoundedSemaphore(self._aggregate_mutable)
357+
case RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE:
358+
sem = asyncio.BoundedSemaphore(self._aggregate_immutable)
257359
case _:
258360
assert_never(operation)
259361
self._semaphore_cache[key] = sem
260362
return sem
261363

262364
def __repr__(self) -> str:
263-
return f"Concurrency[records](write={self._write})"
365+
return (
366+
f"Concurrency[records]("
367+
f"write={self._write}, "
368+
f"query_mutable={self._query_mutable}, query_immutable={self._query_immutable}, "
369+
f"retrieve_mutable={self._retrieve_mutable}, retrieve_immutable={self._retrieve_immutable}, "
370+
f"aggregate_mutable={self._aggregate_mutable}, aggregate_immutable={self._aggregate_immutable})"
371+
)
264372

265373

266374
class FileConcurrencyConfig(ConcurrencyConfig):
@@ -425,7 +533,16 @@ def __init__(self) -> None:
425533
write_schema=1,
426534
)
427535
self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15)
428-
self._records = RecordsGlobalConcurrencyConfig(self, write=20)
536+
self._records = RecordsGlobalConcurrencyConfig(
537+
self,
538+
write=20,
539+
query_mutable=30,
540+
query_immutable=10,
541+
retrieve_mutable=20,
542+
retrieve_immutable=10,
543+
aggregate_mutable=10,
544+
aggregate_immutable=5,
545+
)
429546

430547
@functools.cached_property
431548
def _all_concurrency_configs(self) -> list[ConcurrencyConfig]:

tests/tests_unit/test_utils/test_concurrency.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
ConcurrencyConfig,
1515
ConcurrencySettings,
1616
EventLoopThreadExecutor,
17+
RecordsConcurrencyOperation,
1718
_get_event_loop_executor,
1819
execute_async_tasks,
1920
)
@@ -136,6 +137,95 @@ async def test_invalid_operation_hits_assert_never(self) -> None:
136137
self.cs.general._semaphore_factory("totally_invalid", "proj-a") # type: ignore[arg-type]
137138

138139

140+
class TestRecordsConcurrencyConfig:
141+
def test_defaults(self) -> None:
142+
cs = ConcurrencySettings()
143+
assert cs.records.write == 20
144+
assert cs.records.query_mutable == 30
145+
assert cs.records.query_immutable == 10
146+
assert cs.records.retrieve_mutable == 20
147+
assert cs.records.retrieve_immutable == 10
148+
assert cs.records.aggregate_mutable == 10
149+
assert cs.records.aggregate_immutable == 5
150+
151+
def test_setters_work_before_freeze(self) -> None:
152+
cs = ConcurrencySettings()
153+
cs.records.write = 10
154+
cs.records.query_mutable = 15
155+
cs.records.query_immutable = 5
156+
cs.records.retrieve_mutable = 12
157+
cs.records.retrieve_immutable = 6
158+
cs.records.aggregate_mutable = 8
159+
cs.records.aggregate_immutable = 3
160+
assert cs.records.write == 10
161+
assert cs.records.query_mutable == 15
162+
assert cs.records.query_immutable == 5
163+
assert cs.records.retrieve_mutable == 12
164+
assert cs.records.retrieve_immutable == 6
165+
assert cs.records.aggregate_mutable == 8
166+
assert cs.records.aggregate_immutable == 3
167+
168+
@pytest.mark.parametrize(
169+
"attr",
170+
["write", "query_mutable", "query_immutable", "retrieve_mutable", "retrieve_immutable",
171+
"aggregate_mutable", "aggregate_immutable"],
172+
)
173+
def test_setter_raises_after_freeze(self, attr: str) -> None:
174+
cs = ConcurrencySettings()
175+
cs._freeze()
176+
with pytest.raises(RuntimeError, match="Cannot modify"):
177+
setattr(cs.records, attr, 1)
178+
179+
def test_repr(self) -> None:
180+
cs = ConcurrencySettings()
181+
r = repr(cs.records)
182+
assert "write=20" in r
183+
assert "query_mutable=30" in r
184+
assert "query_immutable=10" in r
185+
assert "retrieve_mutable=20" in r
186+
assert "retrieve_immutable=10" in r
187+
assert "aggregate_mutable=10" in r
188+
assert "aggregate_immutable=5" in r
189+
190+
191+
@pytest.mark.usefixtures("fresh_unfrozen_global_concurrency")
192+
class TestRecordsSemaphoreFactory:
193+
cs: ClassVar[ConcurrencySettings] = global_config.concurrency_settings
194+
195+
@pytest.mark.parametrize(
196+
"operation, expected_value",
197+
[
198+
(RecordsConcurrencyOperation.WRITE, 20),
199+
(RecordsConcurrencyOperation.QUERY_MUTABLE, 30),
200+
(RecordsConcurrencyOperation.QUERY_IMMUTABLE, 10),
201+
(RecordsConcurrencyOperation.RETRIEVE_MUTABLE, 20),
202+
(RecordsConcurrencyOperation.RETRIEVE_IMMUTABLE, 10),
203+
(RecordsConcurrencyOperation.AGGREGATE_MUTABLE, 10),
204+
(RecordsConcurrencyOperation.AGGREGATE_IMMUTABLE, 5),
205+
],
206+
)
207+
async def test_semaphore_values(self, operation: RecordsConcurrencyOperation, expected_value: int) -> None:
208+
sem = self.cs.records._semaphore_factory(operation, "proj-a")
209+
assert sem._value == expected_value
210+
211+
async def test_all_operations_produce_distinct_semaphores(self) -> None:
212+
sems = {
213+
op: self.cs.records._semaphore_factory(op, "proj-a")
214+
for op in RecordsConcurrencyOperation
215+
}
216+
assert len(set(id(s) for s in sems.values())) == len(RecordsConcurrencyOperation)
217+
218+
async def test_cache_hit(self) -> None:
219+
sem1 = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a")
220+
sem2 = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a")
221+
assert sem1 is sem2
222+
223+
async def test_different_project_different_semaphore(self) -> None:
224+
sem_a = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-a")
225+
sem_b = self.cs.records._semaphore_factory(RecordsConcurrencyOperation.QUERY_MUTABLE, "proj-b")
226+
assert sem_a is not sem_b
227+
228+
139229
async def i_dont_like_5(i: int) -> int:
140230
if i < 5:
141231
return i

0 commit comments

Comments
 (0)