Skip to content

Commit c2345e4

Browse files
andersfyllingclaude
andcommitted
feat(records): add sync endpoint
Add RecordsAPI.sync, a cursor-based POST to /streams/{streamId}/records/sync returning one page as a SyncRecordList that carries the resumable `cursor`, `has_next`, and optional `typing`. Mirrors how instances.sync surfaces a resumable cursor (the caller persists `.cursor` and passes it back) rather than draining and discarding it. - SyncRecord extends Record with a `status` (created/updated/deleted); deleted tombstones load with properties=None. - Provide exactly one of `cursor` or `initialize_cursor` ("<duration>-ago"); passing both raises ValueError. - Reuses the records read model + RETRIEVE read semaphore from the list PR. Stacked on feat/records-list. target_units deferred to a follow-up. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent ccfe0ed commit c2345e4

5 files changed

Lines changed: 359 additions & 2 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
RecordList,
1313
RecordSourceSelector,
1414
RecordWrite,
15+
SyncRecordList,
1516
TimeRange,
1617
)
1718
from cognite.client.data_classes.filters import Filter
@@ -205,3 +206,75 @@ async def list(
205206
semaphore=self._get_semaphore("read"),
206207
)
207208
return RecordList._load_raw_api_response([response.json()])
209+
210+
async def sync(
211+
self,
212+
stream_id: str,
213+
*,
214+
cursor: str | None = None,
215+
initialize_cursor: str | None = None,
216+
filter: Filter | None = None,
217+
sources: Sequence[RecordSourceSelector] | None = None,
218+
limit: int | None = None,
219+
include_typing: bool = False,
220+
) -> SyncRecordList:
221+
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
222+
223+
Returns the next page of the change feed (new, updated and deleted records). Provide exactly
224+
one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a
225+
relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and
226+
pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates
227+
whether more changes are immediately available.
228+
229+
Args:
230+
stream_id (str): External ID of the stream to sync.
231+
cursor (str | None): Resume from a cursor returned by a previous sync call.
232+
initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
233+
relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
234+
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
235+
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
236+
limit (int | None): Maximum number of records to return in this page (1-1000).
237+
include_typing (bool): If True, include property type information on the returned
238+
list's ``typing`` attribute.
239+
240+
Returns:
241+
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
242+
243+
Examples:
244+
245+
Initialize a sync, process the page, then resume from the cursor later:
246+
247+
>>> from cognite.client import CogniteClient
248+
>>> client = CogniteClient()
249+
>>> page = client.data_modeling.records.sync(
250+
... stream_id="my-stream", initialize_cursor="7d-ago"
251+
... )
252+
>>> for record in page:
253+
... pass # process record; record.status is created/updated/deleted
254+
>>> next_page = client.data_modeling.records.sync(
255+
... stream_id="my-stream", cursor=page.cursor
256+
... )
257+
"""
258+
self._warning.warn()
259+
if cursor is not None and initialize_cursor is not None:
260+
raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.")
261+
body: dict[str, Any] = {}
262+
if cursor is not None:
263+
body["cursor"] = cursor
264+
elif initialize_cursor is not None:
265+
body["initializeCursor"] = initialize_cursor
266+
if filter is not None:
267+
body["filter"] = filter.dump()
268+
if sources is not None:
269+
body["sources"] = [source.dump() for source in sources]
270+
if limit is not None:
271+
body["limit"] = limit
272+
if include_typing:
273+
body["includeTyping"] = True
274+
275+
response = await self._post(
276+
url_path=self._records_url(stream_id, "/sync"),
277+
json=body,
278+
semaphore=self._get_semaphore("read"),
279+
)
280+
return SyncRecordList._load_response(response.json())

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 63 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/data_modeling/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@
128128
RecordSourceSelector,
129129
RecordWrite,
130130
RecordWriteList,
131+
SyncRecord,
132+
SyncRecordList,
131133
TimeRange,
132134
)
133135
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
@@ -285,6 +287,8 @@
285287
"StreamTemplateWriteSettings",
286288
"StreamWrite",
287289
"SubscriptionContext",
290+
"SyncRecord",
291+
"SyncRecordList",
288292
"Text",
289293
"TimeRange",
290294
"TimeSeriesReference",

cognite/client/data_classes/data_modeling/records.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from collections.abc import Sequence
44
from dataclasses import dataclass
5-
from typing import Any
5+
from typing import Any, Literal
66

77
from typing_extensions import Self
88

@@ -26,6 +26,8 @@
2626
"RecordSourceSelector",
2727
"RecordWrite",
2828
"RecordWriteList",
29+
"SyncRecord",
30+
"SyncRecordList",
2931
"TimeRange",
3032
]
3133

@@ -270,3 +272,90 @@ def _load(cls, resource: dict[str, Any]) -> Self:
270272

271273
def dump(self, camel_case: bool = True) -> dict[str, Any]:
272274
return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties}
275+
276+
277+
class SyncRecord(Record):
278+
"""A record returned by the sync endpoint, annotated with a change status.
279+
280+
For ``status="deleted"`` tombstones (mutable streams), :attr:`properties` is ``None``.
281+
282+
Args:
283+
space (str): Space the record belongs to.
284+
external_id (str): External ID of the record.
285+
created_time (int): Creation time in milliseconds since epoch.
286+
last_updated_time (int): Last updated time in milliseconds since epoch.
287+
status (Literal['created', 'updated', 'deleted']): The record's change status.
288+
properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for
289+
deleted tombstones).
290+
"""
291+
292+
def __init__(
293+
self,
294+
space: str,
295+
external_id: str,
296+
created_time: int,
297+
last_updated_time: int,
298+
status: Literal["created", "updated", "deleted"],
299+
properties: dict[str, dict[str, dict[str, Any]]] | None = None,
300+
) -> None:
301+
super().__init__(
302+
space=space,
303+
external_id=external_id,
304+
created_time=created_time,
305+
last_updated_time=last_updated_time,
306+
properties=properties,
307+
)
308+
self.status = status
309+
310+
@classmethod
311+
def _load(cls, resource: dict[str, Any]) -> Self:
312+
return cls(
313+
space=resource["space"],
314+
external_id=resource["externalId"],
315+
created_time=resource["createdTime"],
316+
last_updated_time=resource["lastUpdatedTime"],
317+
status=resource["status"],
318+
properties=resource.get("properties"),
319+
)
320+
321+
def dump(self, camel_case: bool = True) -> dict[str, Any]:
322+
output = super().dump(camel_case=camel_case)
323+
output["status"] = self.status
324+
return output
325+
326+
327+
class SyncRecordList(CogniteResourceList[SyncRecord]):
328+
"""A page of :class:`SyncRecord` objects from the sync endpoint.
329+
330+
Args:
331+
resources (Sequence[SyncRecord]): The records in this page.
332+
cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync`` call to resume
333+
from this position.
334+
has_next (bool): Whether more changes are available beyond this page.
335+
typing (TypeInformation | None): Property type information, present when the request was
336+
made with ``include_typing=True``.
337+
"""
338+
339+
_RESOURCE = SyncRecord
340+
341+
def __init__(
342+
self,
343+
resources: Sequence[SyncRecord],
344+
cursor: str | None = None,
345+
has_next: bool = False,
346+
typing: TypeInformation | None = None,
347+
) -> None:
348+
super().__init__(resources)
349+
self.cursor = cursor
350+
self.has_next = has_next
351+
self.typing = typing
352+
353+
@classmethod
354+
def _load_response(cls, response: dict[str, Any]) -> Self:
355+
typing = TypeInformation._load(response["typing"]) if "typing" in response else None
356+
return cls(
357+
[SyncRecord._load(item) for item in response["items"]],
358+
cursor=response["nextCursor"],
359+
has_next=response["hasNext"],
360+
typing=typing,
361+
)

0 commit comments

Comments
 (0)