Skip to content

Commit cc796b0

Browse files
feat(records): sync resume method
Separate initial sync from cursor-based continuation while sharing the request assembly and pagination through the internal _sync helper. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 35d8d15 commit cc796b0

4 files changed

Lines changed: 165 additions & 76 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 87 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,42 @@ def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUn
5858
return target_units.dump()
5959
return RecordTargetUnits(properties=target_units).dump()
6060

61+
async def _sync(
62+
self,
63+
stream_id: str,
64+
*,
65+
filter: Filter | None = None,
66+
sources: Sequence[RecordSourceSelector] | None = None,
67+
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
68+
limit: int = 10,
69+
include_typing: bool = False,
70+
initialize_cursor: str | None = None,
71+
cursor: str | None = None,
72+
) -> SyncRecordList:
73+
other_params: dict[str, Any] = {}
74+
if initialize_cursor is not None:
75+
other_params["initializeCursor"] = initialize_cursor
76+
if sources is not None:
77+
other_params["sources"] = [source.dump() for source in sources]
78+
if target_units is not None:
79+
other_params["targetUnits"] = self._dump_target_units(target_units)
80+
if include_typing:
81+
other_params["includeTyping"] = True
82+
83+
return await self._list(
84+
list_cls=SyncRecordList,
85+
resource_cls=SyncRecord,
86+
method="POST",
87+
resource_path=self._records_url(stream_id),
88+
url_path=self._records_url(stream_id, "/sync"),
89+
limit=limit,
90+
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
91+
other_params=other_params,
92+
initial_cursor=cursor,
93+
settings_forcing_raw_response_loading=["records_sync_cursor"],
94+
override_semaphore=self._get_semaphore("read"),
95+
)
96+
6197
async def delete(
6298
self,
6399
items: RecordId | Sequence[RecordId],
@@ -207,8 +243,7 @@ async def sync(
207243
self,
208244
stream_id: str,
209245
*,
210-
cursor: str | None = None,
211-
initialize_cursor: str | None = None,
246+
initialize_cursor: str,
212247
filter: Filter | None = None,
213248
sources: Sequence[RecordSourceSelector] | None = None,
214249
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
@@ -217,17 +252,14 @@ async def sync(
217252
) -> SyncRecordList:
218253
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
219254
220-
Returns the next page of the change feed (new, updated and deleted records). Provide exactly
221-
one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a
222-
relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and
223-
pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates
224-
whether more changes are immediately available.
255+
Returns the first page of the change feed (new, updated and deleted records). Provide
256+
``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned
257+
:attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue;
258+
:attr:`SyncRecordList.has_next` indicates whether more changes are immediately available.
225259
226260
Args:
227261
stream_id (str): External ID of the stream to sync.
228-
cursor (str | None): Resume from a cursor returned by a previous sync call.
229-
initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
230-
relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
262+
initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``.
231263
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
232264
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
233265
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
@@ -250,33 +282,55 @@ async def sync(
250282
... )
251283
>>> for record in page:
252284
... pass # process record; record.status is created/updated/deleted
253-
>>> next_page = client.data_modeling.records.sync(
285+
>>> next_page = client.data_modeling.records.sync_resume(
254286
... stream_id="my-stream", cursor=page.cursor
255287
... )
256288
"""
257289
self._warning.warn()
258-
if cursor is not None and initialize_cursor is not None:
259-
raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.")
260-
other_params: dict[str, Any] = {}
261-
if initialize_cursor is not None:
262-
other_params["initializeCursor"] = initialize_cursor
263-
if sources is not None:
264-
other_params["sources"] = [source.dump() for source in sources]
265-
if target_units is not None:
266-
other_params["targetUnits"] = self._dump_target_units(target_units)
267-
if include_typing:
268-
other_params["includeTyping"] = True
290+
return await self._sync(
291+
stream_id=stream_id,
292+
initialize_cursor=initialize_cursor,
293+
limit=limit,
294+
filter=filter,
295+
sources=sources,
296+
target_units=target_units,
297+
include_typing=include_typing,
298+
)
269299

270-
return await self._list(
271-
list_cls=SyncRecordList,
272-
resource_cls=SyncRecord,
273-
method="POST",
274-
resource_path=self._records_url(stream_id),
275-
url_path=self._records_url(stream_id, "/sync"),
300+
async def sync_resume(
301+
self,
302+
stream_id: str,
303+
*,
304+
cursor: str,
305+
filter: Filter | None = None,
306+
sources: Sequence[RecordSourceSelector] | None = None,
307+
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
308+
limit: int = 10,
309+
include_typing: bool = False,
310+
) -> SyncRecordList:
311+
"""Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`.
312+
313+
Args:
314+
stream_id (str): External ID of the stream to sync.
315+
cursor (str): Resume from a cursor returned by a previous sync call.
316+
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
317+
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
318+
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
319+
to another unit.
320+
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
321+
include_typing (bool): If True, include property type information on the returned
322+
list's ``typing`` attribute.
323+
324+
Returns:
325+
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
326+
"""
327+
self._warning.warn()
328+
return await self._sync(
329+
stream_id=stream_id,
330+
cursor=cursor,
276331
limit=limit,
277-
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
278-
other_params=other_params,
279-
initial_cursor=cursor,
280-
settings_forcing_raw_response_loading=["records_sync_cursor"],
281-
override_semaphore=self._get_semaphore("read"),
332+
filter=filter,
333+
sources=sources,
334+
target_units=target_units,
335+
include_typing=include_typing,
282336
)

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 48 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/data_modeling/records.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from collections.abc import Sequence
3+
from collections.abc import Mapping, Sequence
44
from dataclasses import dataclass
55
from typing import Any, Literal
66

@@ -54,12 +54,12 @@ class RecordSource(CogniteResource):
5454
5555
Args:
5656
source (RecordContainerId): Reference to the container.
57-
properties (dict[str, Any]): The data to write to the source container.
57+
properties (Mapping[str, Any]): The data to write to the source container.
5858
"""
5959

60-
def __init__(self, source: RecordContainerId, properties: dict[str, Any]) -> None:
60+
def __init__(self, source: RecordContainerId, properties: Mapping[str, Any]) -> None:
6161
self.source = source
62-
self.properties = properties
62+
self.properties = dict(properties)
6363

6464
@classmethod
6565
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -83,13 +83,13 @@ class RecordWrite(WriteableCogniteResource["RecordWrite"]):
8383
Args:
8484
space (str): Space the record belongs to.
8585
external_id (str): External ID of the record (1-256 chars, no null bytes).
86-
sources (list[RecordSource]): Container property values to write (1-100 sources).
86+
sources (Sequence[RecordSource]): Container property values to write (1-100 sources).
8787
"""
8888

89-
def __init__(self, space: str, external_id: str, sources: list[RecordSource]) -> None:
89+
def __init__(self, space: str, external_id: str, sources: Sequence[RecordSource]) -> None:
9090
self.space = space
9191
self.external_id = external_id
92-
self.sources = sources
92+
self.sources = list(sources)
9393

9494
def as_id(self) -> RecordId:
9595
return RecordId(space=self.space, external_id=self.external_id)
@@ -127,12 +127,12 @@ class RecordSourceSelector(CogniteResource):
127127
128128
Args:
129129
source (RecordContainerId): The container to select properties from.
130-
properties (list[str]): Property identifiers to return; use ``["*"]`` to return all.
130+
properties (Sequence[str]): Property identifiers to return; use ``["*"]`` to return all.
131131
"""
132132

133-
def __init__(self, source: RecordContainerId, properties: list[str]) -> None:
133+
def __init__(self, source: RecordContainerId, properties: Sequence[str]) -> None:
134134
self.source = source
135-
self.properties = properties
135+
self.properties = list(properties)
136136

137137
@classmethod
138138
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -211,7 +211,7 @@ class SyncRecord(WriteableCogniteResource["RecordWrite"]):
211211
created_time (int): Creation time in milliseconds since epoch.
212212
last_updated_time (int): Last updated time in milliseconds since epoch.
213213
status (Literal['created', 'updated', 'deleted']): The record's change status.
214-
properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for
214+
properties (Mapping[str, Mapping[str, Mapping[str, Any]]] | None): Property values (absent for
215215
deleted tombstones).
216216
"""
217217

@@ -222,14 +222,21 @@ def __init__(
222222
created_time: int,
223223
last_updated_time: int,
224224
status: Literal["created", "updated", "deleted"],
225-
properties: dict[str, dict[str, dict[str, Any]]] | None = None,
225+
properties: Mapping[str, Mapping[str, Mapping[str, Any]]] | None = None,
226226
) -> None:
227227
self.space = space
228228
self.external_id = external_id
229229
self.created_time = created_time
230230
self.last_updated_time = last_updated_time
231231
self.status = status
232-
self.properties = properties
232+
self.properties = (
233+
{
234+
space: {container: dict(values) for container, values in containers.items()}
235+
for space, containers in properties.items()
236+
}
237+
if properties is not None
238+
else None
239+
)
233240

234241
@classmethod
235242
def _load(cls, resource: dict[str, Any]) -> Self:
@@ -275,7 +282,7 @@ class SyncRecordList(CogniteResourceList[SyncRecord]):
275282
276283
Args:
277284
resources (Sequence[SyncRecord]): The records in this page.
278-
cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync`` call to resume
285+
cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync_resume`` call to resume
279286
from this position.
280287
has_next (bool): Whether more changes are available beyond this page.
281288
typing (TypeInformation | None): Property type information, present when the request was

0 commit comments

Comments
 (0)