22
33import asyncio
44from collections .abc import Sequence
5- from typing import TYPE_CHECKING , ClassVar , Literal
5+ from typing import TYPE_CHECKING , Any , ClassVar , Literal
66
77from cognite .client ._api_client import APIClient
8- from cognite .client .data_classes .data_modeling .records import RecordId , RecordIdSequence , RecordWrite
8+ from cognite .client .data_classes .data_modeling .instances import InstanceSort
9+ from cognite .client .data_classes .data_modeling .records import (
10+ Record ,
11+ RecordId ,
12+ RecordIdSequence ,
13+ RecordList ,
14+ RecordSourceSelector ,
15+ RecordWrite ,
16+ TimeRange ,
17+ )
18+ from cognite .client .data_classes .filters import Filter
919from cognite .client .utils ._concurrency import RecordsConcurrencyOperation
1020from cognite .client .utils ._experimental import FeaturePreviewWarning
1121from cognite .client .utils ._url import interpolate_and_url_encode
@@ -23,11 +33,12 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2333 )
2434
2535 _OPERATION_TO_RATE_LIMIT : ClassVar [dict [str , RecordsConcurrencyOperation ]] = {
36+ "read" : RecordsConcurrencyOperation .READ ,
2637 "write" : RecordsConcurrencyOperation .WRITE ,
2738 "delete" : RecordsConcurrencyOperation .WRITE ,
2839 }
2940
30- def _get_semaphore (self , operation : Literal ["write" , "delete" ]) -> asyncio .BoundedSemaphore :
41+ def _get_semaphore (self , operation : Literal ["read" , " write" , "delete" ]) -> asyncio .BoundedSemaphore :
3142 from cognite .client import global_config
3243
3344 return global_config .concurrency_settings .records ._semaphore_factory (
@@ -76,6 +87,7 @@ async def delete(
7687 identifiers = RecordIdSequence .load (items ),
7788 wrap_ids = True ,
7889 resource_path = self ._records_url (stream_id ),
90+ override_semaphore = self ._get_semaphore ("delete" ),
7991 )
8092
8193 async def ingest (
@@ -128,6 +140,7 @@ async def ingest(
128140 items = item_list ,
129141 resource_path = self ._records_url (stream_id ),
130142 no_response = True ,
143+ override_semaphore = self ._get_semaphore ("write" ),
131144 )
132145
133146 async def upsert (
@@ -182,4 +195,73 @@ async def upsert(
182195 items = item_list ,
183196 resource_path = self ._records_url (stream_id , "/upsert" ),
184197 no_response = True ,
198+ override_semaphore = self ._get_semaphore ("write" ),
199+ )
200+
201+ async def list (
202+ self ,
203+ stream_id : str ,
204+ * ,
205+ last_updated_time : TimeRange | None = None ,
206+ filter : Filter | None = None ,
207+ sources : Sequence [RecordSourceSelector ] | None = None ,
208+ sort : Sequence [InstanceSort ] | InstanceSort | None = None ,
209+ limit : int = 10 ,
210+ include_typing : bool = False ,
211+ ) -> RecordList :
212+ """`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
213+
214+ Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
215+ ``sort`` is given.
216+
217+ Args:
218+ stream_id (str): External ID of the stream to query.
219+ last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
220+ immutable streams** (must include a lower bound).
221+ filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
222+ sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
223+ sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
224+ limit (int): Maximum number of records to return (1-1000). Defaults to 10.
225+ include_typing (bool): If True, include property type information on the returned
226+ list's ``typing`` attribute.
227+
228+ Returns:
229+ RecordList: The matching records.
230+
231+ Examples:
232+
233+ List records updated since a given timestamp:
234+
235+ >>> from cognite.client import CogniteClient
236+ >>> from cognite.client.data_classes.data_modeling.records import TimeRange
237+ >>> client = CogniteClient()
238+ >>> res = client.data_modeling.records.list(
239+ ... stream_id="my-stream",
240+ ... last_updated_time=TimeRange(gt=1705341600000),
241+ ... limit=100,
242+ ... )
243+ """
244+ self ._warning .warn ()
245+ other_params : dict [str , Any ] = {}
246+ if last_updated_time is not None :
247+ other_params ["lastUpdatedTime" ] = last_updated_time .dump ()
248+ if sources is not None :
249+ other_params ["sources" ] = [source .dump () for source in sources ]
250+ if sort is not None :
251+ sort_list = [sort ] if isinstance (sort , InstanceSort ) else list (sort )
252+ other_params ["sort" ] = [spec .dump () for spec in sort_list ]
253+ if include_typing :
254+ other_params ["includeTyping" ] = True
255+
256+ return await self ._list (
257+ list_cls = RecordList ,
258+ resource_cls = Record ,
259+ method = "POST" ,
260+ resource_path = self ._records_url (stream_id ),
261+ url_path = self ._records_url (stream_id , "/filter" ),
262+ limit = limit ,
263+ filter = filter .dump (camel_case_property = False ) if isinstance (filter , Filter ) else filter ,
264+ other_params = other_params ,
265+ settings_forcing_raw_response_loading = [f"{ include_typing = } " ] if include_typing else None ,
266+ override_semaphore = self ._get_semaphore ("read" ),
185267 )
0 commit comments