22
33import asyncio
44from collections .abc import Sequence
5- from typing import TYPE_CHECKING , ClassVar , Literal
5+ from typing import TYPE_CHECKING , Any , ClassVar , Literal
66
77from cognite .client ._api_client import APIClient
8- from cognite .client .data_classes .data_modeling .records import RecordId , RecordIdSequence , RecordWrite
8+ from cognite .client .data_classes .data_modeling .instances import InstanceSort
9+ from cognite .client .data_classes .data_modeling .records import (
10+ RecordId ,
11+ RecordIdSequence ,
12+ RecordList ,
13+ RecordSourceSelector ,
14+ RecordWrite ,
15+ TimeRange ,
16+ )
17+ from cognite .client .data_classes .filters import Filter
918from cognite .client .utils ._concurrency import RecordsConcurrencyOperation
1019from cognite .client .utils ._experimental import FeaturePreviewWarning
1120from cognite .client .utils ._url import interpolate_and_url_encode
@@ -23,19 +32,22 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2332 )
2433
2534 _OPERATION_TO_RATE_LIMIT : ClassVar [dict [str , RecordsConcurrencyOperation ]] = {
35+ "read" : RecordsConcurrencyOperation .RETRIEVE ,
2636 "write" : RecordsConcurrencyOperation .WRITE ,
2737 "delete" : RecordsConcurrencyOperation .WRITE ,
2838 }
2939
30- def _get_semaphore (self , operation : Literal ["write" , "delete" ]) -> asyncio .BoundedSemaphore :
40+ def _get_semaphore (self , operation : Literal ["read" , " write" , "delete" ]) -> asyncio .BoundedSemaphore :
3141 from cognite .client import global_config
3242
3343 return global_config .concurrency_settings .records ._semaphore_factory (
3444 self ._OPERATION_TO_RATE_LIMIT [operation ], project = self ._cognite_client .config .project
3545 )
3646
3747 def _records_url (self , stream_id : str , suffix : str = "" ) -> str :
38- return interpolate_and_url_encode ("/streams/{}/records{}" , stream_id , suffix )
48+ # Only stream_id is URL-encoded; suffix is a literal path segment (e.g. "/filter")
49+ # and must not be percent-encoded.
50+ return interpolate_and_url_encode ("/streams/{}/records" , stream_id ) + suffix
3951
4052 async def delete (
4153 self ,
@@ -127,3 +139,69 @@ async def ingest(
127139 resource_path = self ._records_url (stream_id ),
128140 no_response = True ,
129141 )
142+
143+ async def list (
144+ self ,
145+ stream_id : str ,
146+ * ,
147+ last_updated_time : TimeRange | None = None ,
148+ filter : Filter | None = None ,
149+ sources : Sequence [RecordSourceSelector ] | None = None ,
150+ sort : Sequence [InstanceSort ] | InstanceSort | None = None ,
151+ limit : int = 10 ,
152+ include_typing : bool = False ,
153+ ) -> RecordList :
154+ """`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
155+
156+ Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
157+ ``sort`` is given. This endpoint is not cursor-paged: it returns at most ``limit`` records
158+ (max 1000). To page over a large time window, issue multiple calls with partitioned
159+ ``last_updated_time`` ranges.
160+
161+ Args:
162+ stream_id (str): External ID of the stream to query.
163+ last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
164+ immutable streams** (must include a lower bound).
165+ filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
166+ sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
167+ sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
168+ limit (int): Maximum number of records to return (1-1000). Defaults to 10.
169+ include_typing (bool): If True, include property type information on the returned
170+ list's ``typing`` attribute.
171+
172+ Returns:
173+ RecordList: The matching records.
174+
175+ Examples:
176+
177+ List records updated since a given timestamp:
178+
179+ >>> from cognite.client import CogniteClient
180+ >>> from cognite.client.data_classes.data_modeling.records import TimeRange
181+ >>> client = CogniteClient()
182+ >>> res = client.data_modeling.records.list(
183+ ... stream_id="my-stream",
184+ ... last_updated_time=TimeRange(gt=1705341600000),
185+ ... limit=100,
186+ ... )
187+ """
188+ self ._warning .warn ()
189+ body : dict [str , Any ] = {"limit" : limit }
190+ if last_updated_time is not None :
191+ body ["lastUpdatedTime" ] = last_updated_time .dump ()
192+ if filter is not None :
193+ body ["filter" ] = filter .dump ()
194+ if sources is not None :
195+ body ["sources" ] = [source .dump () for source in sources ]
196+ if sort is not None :
197+ sort_list = [sort ] if isinstance (sort , InstanceSort ) else list (sort )
198+ body ["sort" ] = [spec .dump () for spec in sort_list ]
199+ if include_typing :
200+ body ["includeTyping" ] = True
201+
202+ response = await self ._post (
203+ url_path = self ._records_url (stream_id , "/filter" ),
204+ json = body ,
205+ semaphore = self ._get_semaphore ("read" ),
206+ )
207+ return RecordList ._load_raw_api_response ([response .json ()])
0 commit comments