22
33import asyncio
44from collections .abc import Sequence
5- from typing import TYPE_CHECKING , ClassVar , Literal
5+ from typing import TYPE_CHECKING , Any , ClassVar , Literal
66
77from cognite .client ._api_client import APIClient
8- from cognite .client .data_classes .data_modeling .records import RecordId , RecordIdSequence , RecordWrite
8+ from cognite .client .data_classes .data_modeling .instances import InstanceSort
9+ from cognite .client .data_classes .data_modeling .records import (
10+ Record ,
11+ RecordId ,
12+ RecordIdSequence ,
13+ RecordList ,
14+ RecordSourceSelector ,
15+ RecordWrite ,
16+ TimeRange ,
17+ )
18+ from cognite .client .data_classes .filters import Filter
919from cognite .client .utils ._concurrency import RecordsConcurrencyOperation
1020from cognite .client .utils ._experimental import FeaturePreviewWarning
1121from cognite .client .utils ._url import interpolate_and_url_encode
1424 from cognite .client import AsyncCogniteClient
1525 from cognite .client .config import ClientConfig
1626
17-
1827class RecordsAPI (APIClient ):
1928 def __init__ (self , config : ClientConfig , api_version : str | None , cognite_client : AsyncCogniteClient ) -> None :
2029 super ().__init__ (config , api_version , cognite_client )
@@ -23,11 +32,14 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2332 )
2433
2534 _OPERATION_TO_RATE_LIMIT : ClassVar [dict [str , RecordsConcurrencyOperation ]] = {
35+ "read" : RecordsConcurrencyOperation .READ ,
2636 "write" : RecordsConcurrencyOperation .WRITE ,
2737 "delete" : RecordsConcurrencyOperation .WRITE ,
2838 }
2939
30- def _get_semaphore (self , operation : Literal ["write" , "delete" ]) -> asyncio .BoundedSemaphore :
40+ def _get_semaphore ( # type: ignore[override]
41+ self , operation : Literal ["read" , "write" , "delete" ]
42+ ) -> asyncio .BoundedSemaphore :
3143 from cognite .client import global_config
3244
3345 return global_config .concurrency_settings .records ._semaphore_factory (
@@ -76,6 +88,7 @@ async def delete(
7688 identifiers = RecordIdSequence .load (items ),
7789 wrap_ids = True ,
7890 resource_path = self ._records_url (stream_id ),
91+ override_semaphore = self ._get_semaphore ("delete" ),
7992 )
8093
8194 async def ingest (
@@ -128,6 +141,7 @@ async def ingest(
128141 items = item_list ,
129142 resource_path = self ._records_url (stream_id ),
130143 no_response = True ,
144+ override_semaphore = self ._get_semaphore ("write" ),
131145 )
132146
133147 async def upsert (
@@ -182,4 +196,73 @@ async def upsert(
182196 items = item_list ,
183197 resource_path = self ._records_url (stream_id , "/upsert" ),
184198 no_response = True ,
199+ override_semaphore = self ._get_semaphore ("write" ),
200+ )
201+
202+ async def list (
203+ self ,
204+ stream_id : str ,
205+ * ,
206+ last_updated_time : TimeRange | None = None ,
207+ filter : Filter | None = None ,
208+ sources : Sequence [RecordSourceSelector ] | None = None ,
209+ sort : Sequence [InstanceSort ] | InstanceSort | None = None ,
210+ limit : int = 10 ,
211+ include_typing : bool = False ,
212+ ) -> RecordList :
213+ """`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
214+
215+ Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
216+ ``sort`` is given.
217+
218+ Args:
219+ stream_id (str): External ID of the stream to query.
220+ last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
221+ immutable streams** (must include a lower bound).
222+ filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
223+ sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
224+ sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
225+ limit (int): Maximum number of records to return (1-1000). Defaults to 10.
226+ include_typing (bool): If True, include property type information on the returned
227+ list's ``typing`` attribute.
228+
229+ Returns:
230+ RecordList: The matching records.
231+
232+ Examples:
233+
234+ List records updated since a given timestamp:
235+
236+ >>> from cognite.client import CogniteClient
237+ >>> from cognite.client.data_classes.data_modeling.records import TimeRange
238+ >>> client = CogniteClient()
239+ >>> res = client.data_modeling.records.list(
240+ ... stream_id="my-stream",
241+ ... last_updated_time=TimeRange(gt=1705341600000),
242+ ... limit=100,
243+ ... )
244+ """
245+ self ._warning .warn ()
246+ other_params : dict [str , Any ] = {}
247+ if last_updated_time is not None :
248+ other_params ["lastUpdatedTime" ] = last_updated_time .dump ()
249+ if sources is not None :
250+ other_params ["sources" ] = [source .dump () for source in sources ]
251+ if sort is not None :
252+ sort_list = [sort ] if isinstance (sort , InstanceSort ) else list (sort )
253+ other_params ["sort" ] = [spec .dump () for spec in sort_list ]
254+ if include_typing :
255+ other_params ["includeTyping" ] = True
256+
257+ return await self ._list (
258+ list_cls = RecordList ,
259+ resource_cls = Record ,
260+ method = "POST" ,
261+ resource_path = self ._records_url (stream_id ),
262+ url_path = self ._records_url (stream_id , "/filter" ),
263+ limit = limit ,
264+ filter = filter .dump (camel_case_property = False ) if isinstance (filter , Filter ) else filter ,
265+ other_params = other_params ,
266+ settings_forcing_raw_response_loading = [f"{ include_typing = } " ] if include_typing else None ,
267+ override_semaphore = self ._get_semaphore ("read" ),
185268 )
0 commit comments