Skip to content

Commit 5e3c8d3

Browse files
feat(records): add sync endpoint
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9ecf95c commit 5e3c8d3

6 files changed

Lines changed: 635 additions & 11 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,20 @@
22

33
import asyncio
44
from collections.abc import Sequence
5-
from typing import TYPE_CHECKING, ClassVar, Literal
5+
from typing import TYPE_CHECKING, Any, ClassVar, Literal
66

77
from cognite.client._api_client import APIClient
8-
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
8+
from cognite.client.data_classes.data_modeling.records import (
9+
RecordId,
10+
RecordIdSequence,
11+
RecordSourceSelector,
12+
RecordTargetUnit,
13+
RecordTargetUnits,
14+
RecordWrite,
15+
SyncRecord,
16+
SyncRecordList,
17+
)
18+
from cognite.client.data_classes.filters import Filter
919
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
1020
from cognite.client.utils._experimental import FeaturePreviewWarning
1121
from cognite.client.utils._url import interpolate_and_url_encode
@@ -23,11 +33,12 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
2333
)
2434

2535
_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
36+
"read": RecordsConcurrencyOperation.READ,
2637
"write": RecordsConcurrencyOperation.WRITE,
2738
"delete": RecordsConcurrencyOperation.WRITE,
2839
}
2940

30-
def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
41+
def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
3142
from cognite.client import global_config
3243

3344
return global_config.concurrency_settings.records._semaphore_factory(
@@ -39,6 +50,14 @@ def _records_url(self, stream_id: str, suffix: str = "") -> str:
3950
# so it must not be percent-encoded.
4051
return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix
4152

53+
@staticmethod
54+
def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUnit]) -> dict[str, Any]:
55+
if isinstance(target_units, RecordTargetUnits):
56+
if (target_units.properties is None) == (target_units.unit_system_name is None):
57+
raise ValueError("Provide exactly one of 'properties' or 'unit_system_name'.")
58+
return target_units.dump()
59+
return RecordTargetUnits(properties=target_units).dump()
60+
4261
async def delete(
4362
self,
4463
items: RecordId | Sequence[RecordId],
@@ -183,3 +202,81 @@ async def upsert(
183202
resource_path=self._records_url(stream_id, "/upsert"),
184203
no_response=True,
185204
)
205+
206+
async def sync(
207+
self,
208+
stream_id: str,
209+
*,
210+
cursor: str | None = None,
211+
initialize_cursor: str | None = None,
212+
filter: Filter | None = None,
213+
sources: Sequence[RecordSourceSelector] | None = None,
214+
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
215+
limit: int = 10,
216+
include_typing: bool = False,
217+
) -> SyncRecordList:
218+
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
219+
220+
Returns the next page of the change feed (new, updated and deleted records). Provide exactly
221+
one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a
222+
relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and
223+
pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates
224+
whether more changes are immediately available.
225+
226+
Args:
227+
stream_id (str): External ID of the stream to sync.
228+
cursor (str | None): Resume from a cursor returned by a previous sync call.
229+
initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
230+
relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
231+
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
232+
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
233+
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
234+
to another unit.
235+
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
236+
include_typing (bool): If True, include property type information on the returned
237+
list's ``typing`` attribute.
238+
239+
Returns:
240+
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
241+
242+
Examples:
243+
244+
Initialize a sync, process the page, then resume from the cursor later:
245+
246+
>>> from cognite.client import CogniteClient
247+
>>> client = CogniteClient()
248+
>>> page = client.data_modeling.records.sync(
249+
... stream_id="my-stream", initialize_cursor="7d-ago"
250+
... )
251+
>>> for record in page:
252+
... pass # process record; record.status is created/updated/deleted
253+
>>> next_page = client.data_modeling.records.sync(
254+
... stream_id="my-stream", cursor=page.cursor
255+
... )
256+
"""
257+
self._warning.warn()
258+
if cursor is not None and initialize_cursor is not None:
259+
raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.")
260+
other_params: dict[str, Any] = {}
261+
if initialize_cursor is not None:
262+
other_params["initializeCursor"] = initialize_cursor
263+
if sources is not None:
264+
other_params["sources"] = [source.dump() for source in sources]
265+
if target_units is not None:
266+
other_params["targetUnits"] = self._dump_target_units(target_units)
267+
if include_typing:
268+
other_params["includeTyping"] = True
269+
270+
return await self._list(
271+
list_cls=SyncRecordList,
272+
resource_cls=SyncRecord,
273+
method="POST",
274+
resource_path=self._records_url(stream_id),
275+
url_path=self._records_url(stream_id, "/sync"),
276+
limit=limit,
277+
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
278+
other_params=other_params,
279+
initial_cursor=cursor,
280+
settings_forcing_raw_response_loading=["records_sync_cursor"],
281+
override_semaphore=self._get_semaphore("read"),
282+
)

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 75 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/data_modeling/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,13 @@
123123
RecordContainerId,
124124
RecordId,
125125
RecordSource,
126+
RecordSourceSelector,
127+
RecordTargetUnit,
128+
RecordTargetUnits,
126129
RecordWrite,
127130
RecordWriteList,
131+
SyncRecord,
132+
SyncRecordList,
128133
)
129134
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
130135
from cognite.client.data_classes.data_modeling.streams import (
@@ -252,6 +257,9 @@
252257
"RecordContainerId",
253258
"RecordId",
254259
"RecordSource",
260+
"RecordSourceSelector",
261+
"RecordTargetUnit",
262+
"RecordTargetUnits",
255263
"RecordWrite",
256264
"RecordWriteList",
257265
"RequiresConstraint",
@@ -278,6 +286,8 @@
278286
"StreamTemplateWriteSettings",
279287
"StreamWrite",
280288
"SubscriptionContext",
289+
"SyncRecord",
290+
"SyncRecordList",
281291
"Text",
282292
"TimeSeriesReference",
283293
"Timestamp",

0 commit comments

Comments
 (0)