Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 154 additions & 3 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@

import asyncio
from collections.abc import Sequence
from typing import TYPE_CHECKING, ClassVar, Literal
from typing import TYPE_CHECKING, Any, ClassVar, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
from cognite.client.data_classes.data_modeling.records import (
RecordId,
RecordIdSequence,
RecordSourceSelector,
RecordTargetUnit,
RecordTargetUnits,
RecordWrite,
SyncRecord,
SyncRecordList,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._url import interpolate_and_url_encode
Expand All @@ -23,11 +33,12 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
)

_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
"read": RecordsConcurrencyOperation.READ,
"write": RecordsConcurrencyOperation.WRITE,
"delete": RecordsConcurrencyOperation.WRITE,
}

def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
from cognite.client import global_config

return global_config.concurrency_settings.records._semaphore_factory(
Expand All @@ -39,6 +50,50 @@ def _records_url(self, stream_id: str, suffix: str = "") -> str:
# so it must not be percent-encoded.
return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix

@staticmethod
def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUnit]) -> dict[str, Any]:
if isinstance(target_units, RecordTargetUnits):
if (target_units.properties is None) == (target_units.unit_system_name is None):
raise ValueError("Provide exactly one of 'properties' or 'unit_system_name'.")
return target_units.dump()
return RecordTargetUnits(properties=target_units).dump()

async def _sync(
self,
stream_id: str,
*,
filter: Filter | None = None,
sources: Sequence[RecordSourceSelector] | None = None,
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
limit: int = 10,
include_typing: bool = False,
initialize_cursor: str | None = None,
cursor: str | None = None,
) -> SyncRecordList:
other_params: dict[str, Any] = {}
if initialize_cursor is not None:
other_params["initializeCursor"] = initialize_cursor
if sources is not None:
other_params["sources"] = [source.dump() for source in sources]
if target_units is not None:
other_params["targetUnits"] = self._dump_target_units(target_units)
if include_typing:
other_params["includeTyping"] = True

return await self._list(
list_cls=SyncRecordList,
resource_cls=SyncRecord,
method="POST",
resource_path=self._records_url(stream_id),
url_path=self._records_url(stream_id, "/sync"),
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
initial_cursor=cursor,
settings_forcing_raw_response_loading=["records_sync_cursor"],
override_semaphore=self._get_semaphore("read"),
)

async def delete(
self,
items: RecordId | Sequence[RecordId],
Expand Down Expand Up @@ -183,3 +238,99 @@ async def upsert(
resource_path=self._records_url(stream_id, "/upsert"),
no_response=True,
)

async def sync(
self,
stream_id: str,
*,
initialize_cursor: str,
filter: Filter | None = None,
sources: Sequence[RecordSourceSelector] | None = None,
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
limit: int = 10,
include_typing: bool = False,
) -> SyncRecordList:
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.

Returns the first page of the change feed (new, updated and deleted records). Provide
``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned
:attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue;
:attr:`SyncRecordList.has_next` indicates whether more changes are immediately available.

Args:
stream_id (str): External ID of the stream to sync.
initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``.
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
to another unit.
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
include_typing (bool): If True, include property type information on the returned
list's ``typing`` attribute.

Returns:
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.

Examples:

Initialize a sync, process the page, then resume from the cursor later:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> page = client.data_modeling.records.sync(
... stream_id="my-stream", initialize_cursor="7d-ago"
... )
>>> for record in page:
... pass # process record; record.status is created/updated/deleted
>>> next_page = client.data_modeling.records.sync_resume(
... stream_id="my-stream", cursor=page.cursor
... )
"""
self._warning.warn()
return await self._sync(
stream_id=stream_id,
initialize_cursor=initialize_cursor,
limit=limit,
filter=filter,
sources=sources,
target_units=target_units,
include_typing=include_typing,
)

async def sync_resume(
self,
stream_id: str,
*,
cursor: str,
filter: Filter | None = None,
sources: Sequence[RecordSourceSelector] | None = None,
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
limit: int = 10,
include_typing: bool = False,
) -> SyncRecordList:
"""Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`.

Args:
stream_id (str): External ID of the stream to sync.
cursor (str): Resume from a cursor returned by a previous sync call.
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
to another unit.
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
include_typing (bool): If True, include property type information on the returned
list's ``typing`` attribute.

Returns:
SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set.
"""
self._warning.warn()
return await self._sync(
stream_id=stream_id,
cursor=cursor,
limit=limit,
filter=filter,
sources=sources,
target_units=target_units,
include_typing=include_typing,
)
112 changes: 110 additions & 2 deletions cognite/client/_sync_api/data_modeling/records.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@
RecordContainerId,
RecordId,
RecordSource,
RecordSourceSelector,
RecordTargetUnit,
RecordTargetUnits,
RecordWrite,
RecordWriteList,
SyncRecord,
SyncRecordList,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Expand Down Expand Up @@ -252,6 +257,9 @@
"RecordContainerId",
"RecordId",
"RecordSource",
"RecordSourceSelector",
"RecordTargetUnit",
"RecordTargetUnits",
"RecordWrite",
"RecordWriteList",
"RequiresConstraint",
Expand All @@ -278,6 +286,8 @@
"StreamTemplateWriteSettings",
"StreamWrite",
"SubscriptionContext",
"SyncRecord",
"SyncRecordList",
"Text",
"TimeSeriesReference",
"Timestamp",
Expand Down
Loading
Loading