Skip to content

Commit eeb284f

Browse files
andersfyllingclaude
andcommitted
feat(records): add list (filter) endpoint with read data classes
Add RecordsAPI.list, a cursorless POST to /streams/{streamId}/records/filter returning a RecordList (max 1000 records). Introduces the records read model: - Record / RecordList read data classes (clean CogniteResource, no RecordId multiple-inheritance; RecordList carries optional `typing`). - TimeRange (gte/gt/lte/lt) for last_updated_time and RecordSourceSelector for source/property selection; reuse the data-modeling Filter DSL, InstanceSort, and TypeInformation. - Add a READ op to RecordsConcurrencyOperation + a read semaphore to the records concurrency config (reads no longer borrow the write semaphore). - Make _records_url append the path suffix literally (encode only stream_id) so "/filter" isn't percent-encoded. Also register the records API module in the docstring-example doctest runner. target_units is intentionally deferred to a follow-up. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent f862306 commit eeb284f

7 files changed

Lines changed: 500 additions & 11 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@
22

33
import asyncio
44
from collections.abc import Sequence
5-
from typing import TYPE_CHECKING, ClassVar, Literal
5+
from typing import TYPE_CHECKING, Any, ClassVar, Literal
66

77
from cognite.client._api_client import APIClient
8-
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
8+
from cognite.client.data_classes.data_modeling.instances import InstanceSort
9+
from cognite.client.data_classes.data_modeling.records import (
10+
RecordId,
11+
RecordIdSequence,
12+
RecordList,
13+
RecordSourceSelector,
14+
RecordWrite,
15+
TimeRange,
16+
)
17+
from cognite.client.data_classes.filters import Filter
918
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
1019
from cognite.client.utils._experimental import FeaturePreviewWarning
1120
from cognite.client.utils._url import interpolate_and_url_encode
@@ -23,19 +32,20 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2332
)
2433

2534
_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
35+
"read": RecordsConcurrencyOperation.READ,
2636
"write": RecordsConcurrencyOperation.WRITE,
2737
"delete": RecordsConcurrencyOperation.WRITE,
2838
}
2939

30-
def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
40+
def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
3141
from cognite.client import global_config
3242

3343
return global_config.concurrency_settings.records._semaphore_factory(
3444
self._OPERATION_TO_RATE_LIMIT[operation], project=self._cognite_client.config.project
3545
)
3646

3747
def _records_url(self, stream_id: str, suffix: str = "") -> str:
38-
# Encode only stream_id; the suffix is a literal path segment (e.g. "/upsert"),
48+
# Encode only stream_id; the suffix is a literal path segment (e.g. "/upsert", "/filter"),
3949
# so it must not be percent-encoded.
4050
return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix
4151

@@ -183,3 +193,69 @@ async def upsert(
183193
resource_path=self._records_url(stream_id, "/upsert"),
184194
no_response=True,
185195
)
196+
197+
async def list(
198+
self,
199+
stream_id: str,
200+
*,
201+
last_updated_time: TimeRange | None = None,
202+
filter: Filter | None = None,
203+
sources: Sequence[RecordSourceSelector] | None = None,
204+
sort: Sequence[InstanceSort] | InstanceSort | None = None,
205+
limit: int = 10,
206+
include_typing: bool = False,
207+
) -> RecordList:
208+
"""`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
209+
210+
Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
211+
``sort`` is given. This endpoint is not cursor-paged: it returns at most ``limit`` records
212+
(max 1000). To page over a large time window, issue multiple calls with partitioned
213+
``last_updated_time`` ranges.
214+
215+
Args:
216+
stream_id (str): External ID of the stream to query.
217+
last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
218+
immutable streams** (must include a lower bound).
219+
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
220+
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
221+
sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
222+
limit (int): Maximum number of records to return (1-1000). Defaults to 10.
223+
include_typing (bool): If True, include property type information on the returned
224+
list's ``typing`` attribute.
225+
226+
Returns:
227+
RecordList: The matching records.
228+
229+
Examples:
230+
231+
List records updated since a given timestamp:
232+
233+
>>> from cognite.client import CogniteClient
234+
>>> from cognite.client.data_classes.data_modeling.records import TimeRange
235+
>>> client = CogniteClient()
236+
>>> res = client.data_modeling.records.list(
237+
... stream_id="my-stream",
238+
... last_updated_time=TimeRange(gt=1705341600000),
239+
... limit=100,
240+
... )
241+
"""
242+
self._warning.warn()
243+
body: dict[str, Any] = {"limit": limit}
244+
if last_updated_time is not None:
245+
body["lastUpdatedTime"] = last_updated_time.dump()
246+
if filter is not None:
247+
body["filter"] = filter.dump()
248+
if sources is not None:
249+
body["sources"] = [source.dump() for source in sources]
250+
if sort is not None:
251+
sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort)
252+
body["sort"] = [spec.dump() for spec in sort_list]
253+
if include_typing:
254+
body["includeTyping"] = True
255+
256+
response = await self._post(
257+
url_path=self._records_url(stream_id, "/filter"),
258+
json=body,
259+
semaphore=self._get_semaphore("read"),
260+
)
261+
return RecordList._load_raw_api_response([response.json()])

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 68 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/data_modeling/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,15 @@
120120
UnionAll,
121121
)
122122
from cognite.client.data_classes.data_modeling.records import (
123+
Record,
123124
RecordContainerId,
124125
RecordId,
126+
RecordList,
125127
RecordSource,
128+
RecordSourceSelector,
126129
RecordWrite,
127130
RecordWriteList,
131+
TimeRange,
128132
)
129133
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
130134
from cognite.client.data_classes.data_modeling.streams import (
@@ -249,9 +253,12 @@
249253
"Query",
250254
"QueryResult",
251255
"QuerySync",
256+
"Record",
252257
"RecordContainerId",
253258
"RecordId",
259+
"RecordList",
254260
"RecordSource",
261+
"RecordSourceSelector",
255262
"RecordWrite",
256263
"RecordWriteList",
257264
"RequiresConstraint",
@@ -279,6 +286,7 @@
279286
"StreamWrite",
280287
"SubscriptionContext",
281288
"Text",
289+
"TimeRange",
282290
"TimeSeriesReference",
283291
"Timestamp",
284292
"TranslatedQuery",

0 commit comments

Comments
 (0)