55from typing import TYPE_CHECKING , Any , Literal
66
77from cognite .client ._api_client import APIClient
8- from cognite .client .data_classes .data_modeling .records import RecordId , RecordIdSequence , RecordWrite
8+ from cognite .client .data_classes .data_modeling .records import (
9+ RecordId ,
10+ RecordIdSequence ,
11+ RecordSourceSelector ,
12+ RecordTargetUnit ,
13+ RecordTargetUnits ,
14+ RecordWrite ,
15+ SyncRecord ,
16+ SyncRecordList ,
17+ )
18+ from cognite .client .data_classes .filters import Filter
919from cognite .client .utils ._experimental import FeaturePreviewWarning
1020from cognite .client .utils ._url import interpolate_and_url_encode
1121
@@ -21,7 +31,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2131 api_maturity = "General Availability" , sdk_maturity = "alpha" , feature_name = "Records"
2232 )
2333
24- def _get_semaphore (self , operation : Any ) -> asyncio .BoundedSemaphore :
34+ def _get_semaphore (self , operation : Literal [ "read" , "write" , "delete" ] ) -> asyncio .BoundedSemaphore :
2535 from cognite .client import global_config
2636
2737 return global_config .concurrency_settings .records ._semaphore_factory (
@@ -33,6 +43,14 @@ def _records_url(self, stream_id: str, suffix: str = "") -> str:
3343 # so it must not be percent-encoded.
3444 return interpolate_and_url_encode ("/streams/{}/records" , stream_id ) + suffix
3545
46+ @staticmethod
47+ def _dump_target_units (target_units : RecordTargetUnits | Sequence [RecordTargetUnit ]) -> dict [str , Any ]:
48+ if isinstance (target_units , RecordTargetUnits ):
49+ if (target_units .properties is None ) == (target_units .unit_system_name is None ):
50+ raise ValueError ("Provide exactly one of 'properties' or 'unit_system_name'." )
51+ return target_units .dump ()
52+ return RecordTargetUnits (properties = target_units ).dump ()
53+
3654 async def delete (
3755 self ,
3856 items : RecordId | Sequence [RecordId ],
@@ -177,3 +195,81 @@ async def upsert(
177195 resource_path = self ._records_url (stream_id , "/upsert" ),
178196 no_response = True ,
179197 )
198+
199+ async def sync (
200+ self ,
201+ stream_id : str ,
202+ * ,
203+ cursor : str | None = None ,
204+ initialize_cursor : str | None = None ,
205+ filter : Filter | None = None ,
206+ sources : Sequence [RecordSourceSelector ] | None = None ,
207+ target_units : RecordTargetUnits | Sequence [RecordTargetUnit ] | None = None ,
208+ limit : int = 10 ,
209+ include_typing : bool = False ,
210+ ) -> SyncRecordList :
211+ """`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
212+
213+ Returns the next page of the change feed (new, updated and deleted records). Provide exactly
214+ one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a
215+ relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and
216+ pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates
217+ whether more changes are immediately available.
218+
219+ Args:
220+ stream_id (str): External ID of the stream to sync.
221+ cursor (str | None): Resume from a cursor returned by a previous sync call.
222+ initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
223+ relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
224+ filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
225+ sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
226+ target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
227+ to another unit.
228+ limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
229+ include_typing (bool): If True, include property type information on the returned
230+ list's ``typing`` attribute.
231+
232+ Returns:
233+ SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
234+
235+ Examples:
236+
237+ Initialize a sync, process the page, then resume from the cursor later:
238+
239+ >>> from cognite.client import CogniteClient
240+ >>> client = CogniteClient()
241+ >>> page = client.data_modeling.records.sync(
242+ ... stream_id="my-stream", initialize_cursor="7d-ago"
243+ ... )
244+ >>> for record in page:
245+ ... pass # process record; record.status is created/updated/deleted
246+ >>> next_page = client.data_modeling.records.sync(
247+ ... stream_id="my-stream", cursor=page.cursor
248+ ... )
249+ """
250+ self ._warning .warn ()
251+ if cursor is not None and initialize_cursor is not None :
252+ raise ValueError ("Provide either 'cursor' or 'initialize_cursor', not both." )
253+ other_params : dict [str , Any ] = {}
254+ if initialize_cursor is not None :
255+ other_params ["initializeCursor" ] = initialize_cursor
256+ if sources is not None :
257+ other_params ["sources" ] = [source .dump () for source in sources ]
258+ if target_units is not None :
259+ other_params ["targetUnits" ] = self ._dump_target_units (target_units )
260+ if include_typing :
261+ other_params ["includeTyping" ] = True
262+
263+ return await self ._list (
264+ list_cls = SyncRecordList ,
265+ resource_cls = SyncRecord ,
266+ method = "POST" ,
267+ resource_path = self ._records_url (stream_id ),
268+ url_path = self ._records_url (stream_id , "/sync" ),
269+ limit = limit ,
270+ filter = filter .dump (camel_case_property = False ) if isinstance (filter , Filter ) else filter ,
271+ other_params = other_params ,
272+ initial_cursor = cursor ,
273+ settings_forcing_raw_response_loading = ["records_sync_cursor" ],
274+ override_semaphore = self ._get_semaphore ("read" ),
275+ )
0 commit comments