Skip to content

Commit 8ae111f

Browse files
feat(records): sync resume method
Separate initial sync from cursor-based continuation while sharing the request assembly and pagination through the internal _sync helper. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 832068a commit 8ae111f

4 files changed

Lines changed: 165 additions & 76 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 87 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,42 @@ def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUn
5151
return target_units.dump()
5252
return RecordTargetUnits(properties=target_units).dump()
5353

54+
async def _sync(
55+
self,
56+
stream_id: str,
57+
*,
58+
filter: Filter | None = None,
59+
sources: Sequence[RecordSourceSelector] | None = None,
60+
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
61+
limit: int = 10,
62+
include_typing: bool = False,
63+
initialize_cursor: str | None = None,
64+
cursor: str | None = None,
65+
) -> SyncRecordList:
66+
other_params: dict[str, Any] = {}
67+
if initialize_cursor is not None:
68+
other_params["initializeCursor"] = initialize_cursor
69+
if sources is not None:
70+
other_params["sources"] = [source.dump() for source in sources]
71+
if target_units is not None:
72+
other_params["targetUnits"] = self._dump_target_units(target_units)
73+
if include_typing:
74+
other_params["includeTyping"] = True
75+
76+
return await self._list(
77+
list_cls=SyncRecordList,
78+
resource_cls=SyncRecord,
79+
method="POST",
80+
resource_path=self._records_url(stream_id),
81+
url_path=self._records_url(stream_id, "/sync"),
82+
limit=limit,
83+
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
84+
other_params=other_params,
85+
initial_cursor=cursor,
86+
settings_forcing_raw_response_loading=["records_sync_cursor"],
87+
override_semaphore=self._get_semaphore("read"),
88+
)
89+
5490
async def delete(
5591
self,
5692
items: RecordId | Sequence[RecordId],
@@ -200,8 +236,7 @@ async def sync(
200236
self,
201237
stream_id: str,
202238
*,
203-
cursor: str | None = None,
204-
initialize_cursor: str | None = None,
239+
initialize_cursor: str,
205240
filter: Filter | None = None,
206241
sources: Sequence[RecordSourceSelector] | None = None,
207242
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
@@ -210,17 +245,14 @@ async def sync(
210245
) -> SyncRecordList:
211246
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
212247
213-
Returns the next page of the change feed (new, updated and deleted records). Provide exactly
214-
one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a
215-
relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and
216-
pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates
217-
whether more changes are immediately available.
248+
Returns the first page of the change feed (new, updated and deleted records). Provide
249+
``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned
250+
:attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue;
251+
:attr:`SyncRecordList.has_next` indicates whether more changes are immediately available.
218252
219253
Args:
220254
stream_id (str): External ID of the stream to sync.
221-
cursor (str | None): Resume from a cursor returned by a previous sync call.
222-
initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
223-
relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
255+
initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``.
224256
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
225257
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
226258
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
@@ -243,33 +275,55 @@ async def sync(
243275
... )
244276
>>> for record in page:
245277
... pass # process record; record.status is created/updated/deleted
246-
>>> next_page = client.data_modeling.records.sync(
278+
>>> next_page = client.data_modeling.records.sync_resume(
247279
... stream_id="my-stream", cursor=page.cursor
248280
... )
249281
"""
250282
self._warning.warn()
251-
if cursor is not None and initialize_cursor is not None:
252-
raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.")
253-
other_params: dict[str, Any] = {}
254-
if initialize_cursor is not None:
255-
other_params["initializeCursor"] = initialize_cursor
256-
if sources is not None:
257-
other_params["sources"] = [source.dump() for source in sources]
258-
if target_units is not None:
259-
other_params["targetUnits"] = self._dump_target_units(target_units)
260-
if include_typing:
261-
other_params["includeTyping"] = True
283+
return await self._sync(
284+
stream_id=stream_id,
285+
initialize_cursor=initialize_cursor,
286+
limit=limit,
287+
filter=filter,
288+
sources=sources,
289+
target_units=target_units,
290+
include_typing=include_typing,
291+
)
262292

263-
return await self._list(
264-
list_cls=SyncRecordList,
265-
resource_cls=SyncRecord,
266-
method="POST",
267-
resource_path=self._records_url(stream_id),
268-
url_path=self._records_url(stream_id, "/sync"),
293+
async def sync_resume(
294+
self,
295+
stream_id: str,
296+
*,
297+
cursor: str,
298+
filter: Filter | None = None,
299+
sources: Sequence[RecordSourceSelector] | None = None,
300+
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
301+
limit: int = 10,
302+
include_typing: bool = False,
303+
) -> SyncRecordList:
304+
"""Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`.
305+
306+
Args:
307+
stream_id (str): External ID of the stream to sync.
308+
cursor (str): Resume from a cursor returned by a previous sync call.
309+
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
310+
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
311+
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
312+
to another unit.
313+
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
314+
include_typing (bool): If True, include property type information on the returned
315+
list's ``typing`` attribute.
316+
317+
Returns:
318+
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
319+
"""
320+
self._warning.warn()
321+
return await self._sync(
322+
stream_id=stream_id,
323+
cursor=cursor,
269324
limit=limit,
270-
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
271-
other_params=other_params,
272-
initial_cursor=cursor,
273-
settings_forcing_raw_response_loading=["records_sync_cursor"],
274-
override_semaphore=self._get_semaphore("read"),
325+
filter=filter,
326+
sources=sources,
327+
target_units=target_units,
328+
include_typing=include_typing,
275329
)

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 48 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/data_modeling/records.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from collections.abc import Sequence
3+
from collections.abc import Mapping, Sequence
44
from dataclasses import dataclass
55
from typing import Any, Literal
66

@@ -54,12 +54,12 @@ class RecordSource(CogniteResource):
5454
5555
Args:
5656
source (RecordContainerId): Reference to the container.
57-
properties (dict[str, Any]): The data to write to the source container.
57+
properties (Mapping[str, Any]): The data to write to the source container.
5858
"""
5959

60-
def __init__(self, source: RecordContainerId, properties: dict[str, Any]) -> None:
60+
def __init__(self, source: RecordContainerId, properties: Mapping[str, Any]) -> None:
6161
self.source = source
62-
self.properties = properties
62+
self.properties = dict(properties)
6363

6464
@classmethod
6565
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -83,13 +83,13 @@ class RecordWrite(WriteableCogniteResource["RecordWrite"]):
8383
Args:
8484
space (str): Space the record belongs to.
8585
external_id (str): External ID of the record (1-256 chars, no null bytes).
86-
sources (list[RecordSource]): Container property values to write (1-100 sources).
86+
sources (Sequence[RecordSource]): Container property values to write (1-100 sources).
8787
"""
8888

89-
def __init__(self, space: str, external_id: str, sources: list[RecordSource]) -> None:
89+
def __init__(self, space: str, external_id: str, sources: Sequence[RecordSource]) -> None:
9090
self.space = space
9191
self.external_id = external_id
92-
self.sources = sources
92+
self.sources = list(sources)
9393

9494
def as_id(self) -> RecordId:
9595
return RecordId(space=self.space, external_id=self.external_id)
@@ -127,12 +127,12 @@ class RecordSourceSelector(CogniteResource):
127127
128128
Args:
129129
source (RecordContainerId): The container to select properties from.
130-
properties (list[str]): Property identifiers to return; use ``["*"]`` to return all.
130+
properties (Sequence[str]): Property identifiers to return; use ``["*"]`` to return all.
131131
"""
132132

133-
def __init__(self, source: RecordContainerId, properties: list[str]) -> None:
133+
def __init__(self, source: RecordContainerId, properties: Sequence[str]) -> None:
134134
self.source = source
135-
self.properties = properties
135+
self.properties = list(properties)
136136

137137
@classmethod
138138
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -211,7 +211,7 @@ class SyncRecord(WriteableCogniteResource["RecordWrite"]):
211211
created_time (int): Creation time in milliseconds since epoch.
212212
last_updated_time (int): Last updated time in milliseconds since epoch.
213213
status (Literal['created', 'updated', 'deleted']): The record's change status.
214-
properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for
214+
properties (Mapping[str, Mapping[str, Mapping[str, Any]]] | None): Property values (absent for
215215
deleted tombstones).
216216
"""
217217

@@ -222,14 +222,21 @@ def __init__(
222222
created_time: int,
223223
last_updated_time: int,
224224
status: Literal["created", "updated", "deleted"],
225-
properties: dict[str, dict[str, dict[str, Any]]] | None = None,
225+
properties: Mapping[str, Mapping[str, Mapping[str, Any]]] | None = None,
226226
) -> None:
227227
self.space = space
228228
self.external_id = external_id
229229
self.created_time = created_time
230230
self.last_updated_time = last_updated_time
231231
self.status = status
232-
self.properties = properties
232+
self.properties = (
233+
{
234+
space_key: {container: dict(values) for container, values in containers.items()}
235+
for space_key, containers in properties.items()
236+
}
237+
if properties is not None
238+
else None
239+
)
233240

234241
@classmethod
235242
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -275,7 +282,7 @@ class SyncRecordList(CogniteResourceList[SyncRecord]):
275282
276283
Args:
277284
resources (Sequence[SyncRecord]): The records in this page.
278-
cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync`` call to resume
285+
cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync_resume`` call to resume
279286
from this position.
280287
has_next (bool): Whether more changes are available beyond this page.
281288
typing (TypeInformation | None): Property type information, present when the request was

0 commit comments

Comments
 (0)