diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 3d7591d727..25af161d26 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -2,10 +2,20 @@ import asyncio from collections.abc import Sequence -from typing import TYPE_CHECKING, ClassVar, Literal +from typing import TYPE_CHECKING, Any, ClassVar, Literal from cognite.client._api_client import APIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordIdSequence, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, + RecordWrite, + SyncRecord, + SyncRecordList, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._concurrency import RecordsConcurrencyOperation from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -23,11 +33,12 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client ) _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { + "read": RecordsConcurrencyOperation.READ, "write": RecordsConcurrencyOperation.WRITE, "delete": RecordsConcurrencyOperation.WRITE, } - def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: + def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore: from cognite.client import global_config return global_config.concurrency_settings.records._semaphore_factory( @@ -39,6 +50,50 @@ def _records_url(self, stream_id: str, suffix: str = "") -> str: # so it must not be percent-encoded. return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix + @staticmethod + def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUnit]) -> dict[str, Any]: + if isinstance(target_units, RecordTargetUnits): + if (target_units.properties is None) == (target_units.unit_system_name is None): + raise ValueError("Provide exactly one of 'properties' or 'unit_system_name'.") + return target_units.dump() + return RecordTargetUnits(properties=target_units).dump() + + async def _sync( + self, + stream_id: str, + *, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + initialize_cursor: str | None = None, + cursor: str | None = None, + ) -> SyncRecordList: + other_params: dict[str, Any] = {} + if initialize_cursor is not None: + other_params["initializeCursor"] = initialize_cursor + if sources is not None: + other_params["sources"] = [source.dump() for source in sources] + if target_units is not None: + other_params["targetUnits"] = self._dump_target_units(target_units) + if include_typing: + other_params["includeTyping"] = True + + return await self._list( + list_cls=SyncRecordList, + resource_cls=SyncRecord, + method="POST", + resource_path=self._records_url(stream_id), + url_path=self._records_url(stream_id, "/sync"), + limit=limit, + filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, + other_params=other_params, + initial_cursor=cursor, + settings_forcing_raw_response_loading=["records_sync_cursor"], + override_semaphore=self._get_semaphore("read"), + ) + async def delete( self, items: RecordId | Sequence[RecordId], @@ -183,3 +238,99 @@ async def upsert( resource_path=self._records_url(stream_id, "/upsert"), no_response=True, ) + + async def sync( + self, + stream_id: str, + *, + initialize_cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """`Sync records from a stream `_. + + Returns the first page of the change feed (new, updated and deleted records). Provide + ``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned + :attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue; + :attr:`SyncRecordList.has_next` indicates whether more changes are immediately available. + + Args: + stream_id (str): External ID of the stream to sync. + initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + + Examples: + + Initialize a sync, process the page, then resume from the cursor later: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> page = client.data_modeling.records.sync( + ... stream_id="my-stream", initialize_cursor="7d-ago" + ... ) + >>> for record in page: + ... pass # process record; record.status is created/updated/deleted + >>> next_page = client.data_modeling.records.sync_resume( + ... stream_id="my-stream", cursor=page.cursor + ... ) + """ + self._warning.warn() + return await self._sync( + stream_id=stream_id, + initialize_cursor=initialize_cursor, + limit=limit, + filter=filter, + sources=sources, + target_units=target_units, + include_typing=include_typing, + ) + + async def sync_resume( + self, + stream_id: str, + *, + cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str): Resume from a cursor returned by a previous sync call. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + """ + self._warning.warn() + return await self._sync( + stream_id=stream_id, + cursor=cursor, + limit=limit, + filter=filter, + sources=sources, + target_units=target_units, + include_typing=include_typing, + ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 9897313c9d..b524560c47 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -f86364d61385123f12bc60dd004ea1c2 +1026c1cbc96879e5164690932bcbb9b9 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -12,7 +12,15 @@ from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, + RecordWrite, + SyncRecordList, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: @@ -148,3 +156,103 @@ def upsert( return run_sync( self.__async_client.data_modeling.records.upsert(items=items, stream_id=stream_id, upsert_mode=upsert_mode) ) + + def sync( + self, + stream_id: str, + *, + initialize_cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """ + `Sync records from a stream `_. + + Returns the first page of the change feed (new, updated and deleted records). Provide + ``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned + :attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue; + :attr:`SyncRecordList.has_next` indicates whether more changes are immediately available. + + Args: + stream_id (str): External ID of the stream to sync. + initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + + Examples: + + Initialize a sync, process the page, then resume from the cursor later: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> page = client.data_modeling.records.sync( + ... stream_id="my-stream", initialize_cursor="7d-ago" + ... ) + >>> for record in page: + ... pass # process record; record.status is created/updated/deleted + >>> next_page = client.data_modeling.records.sync_resume( + ... stream_id="my-stream", cursor=page.cursor + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.sync( + stream_id=stream_id, + initialize_cursor=initialize_cursor, + filter=filter, + sources=sources, + target_units=target_units, + limit=limit, + include_typing=include_typing, + ) + ) + + def sync_resume( + self, + stream_id: str, + *, + cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """ + Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str): Resume from a cursor returned by a previous sync call. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + """ + return run_sync( + self.__async_client.data_modeling.records.sync_resume( + stream_id=stream_id, + cursor=cursor, + filter=filter, + sources=sources, + target_units=target_units, + limit=limit, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 20c4831bf7..282ab25b6f 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -123,8 +123,13 @@ RecordContainerId, RecordId, RecordSource, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, RecordWrite, RecordWriteList, + SyncRecord, + SyncRecordList, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( @@ -252,6 +257,9 @@ "RecordContainerId", "RecordId", "RecordSource", + "RecordSourceSelector", + "RecordTargetUnit", + "RecordTargetUnits", "RecordWrite", "RecordWriteList", "RequiresConstraint", @@ -278,6 +286,8 @@ "StreamTemplateWriteSettings", "StreamWrite", "SubscriptionContext", + "SyncRecord", + "SyncRecordList", "Text", "TimeSeriesReference", "Timestamp", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index deb7967b0f..c64a749613 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,8 +1,9 @@ from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Mapping, Sequence +from copy import deepcopy from dataclasses import dataclass -from typing import Any +from typing import Any, Literal from typing_extensions import Self @@ -11,10 +12,24 @@ CogniteResourceList, WriteableCogniteResource, ) +from cognite.client.data_classes.data_modeling.data_types import UnitReference, UnitSystemReference from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId -__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"] +__all__ = [ + "RecordContainerId", + "RecordId", + "RecordIdSequence", + "RecordSource", + "RecordSourceSelector", + "RecordTargetUnit", + "RecordTargetUnits", + "RecordWrite", + "RecordWriteList", + "SyncRecord", + "SyncRecordList", +] class RecordIdSequence(IdentifierSequenceCore[RecordId]): @@ -40,12 +55,12 @@ class RecordSource(CogniteResource): Args: source (RecordContainerId): Reference to the container. - properties (dict[str, Any]): The data to write to the source container. + properties (Mapping[str, Any]): The data to write to the source container. """ - def __init__(self, source: RecordContainerId, properties: dict[str, Any]) -> None: + def __init__(self, source: RecordContainerId, properties: Mapping[str, Any]) -> None: self.source = source - self.properties = properties + self.properties = dict(properties) @classmethod def _load(cls, resource: dict[str, Any]) -> Self: @@ -57,7 +72,7 @@ def _load(cls, resource: dict[str, Any]) -> Self: def dump(self, camel_case: bool = True) -> dict[str, Any]: return { "source": self.source.dump(camel_case=camel_case), - "properties": self.properties, + "properties": deepcopy(self.properties), } @@ -69,13 +84,13 @@ class RecordWrite(WriteableCogniteResource["RecordWrite"]): Args: space (str): Space the record belongs to. external_id (str): External ID of the record (1-256 chars, no null bytes). - sources (list[RecordSource]): Container property values to write (1-100 sources). + sources (Sequence[RecordSource]): Container property values to write (1-100 sources). """ - def __init__(self, space: str, external_id: str, sources: list[RecordSource]) -> None: + def __init__(self, space: str, external_id: str, sources: Sequence[RecordSource]) -> None: self.space = space self.external_id = external_id - self.sources = sources + self.sources = list(sources) def as_id(self) -> RecordId: return RecordId(space=self.space, external_id=self.external_id) @@ -106,3 +121,202 @@ class RecordWriteList(CogniteResourceList[RecordWrite]): def as_ids(self) -> list[RecordId]: return [v.as_id() for v in self] + + +class RecordSourceSelector(CogniteResource): + """Selects which container properties to return for a record. + + Args: + source (RecordContainerId): The container to select properties from. + properties (Sequence[str]): Property identifiers to return; use ``["*"]`` to return all. + """ + + def __init__(self, source: RecordContainerId, properties: Sequence[str]) -> None: + self.source = source + self.properties = list(properties) + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties} + + +class RecordTargetUnit(CogniteResource): + """A target unit conversion for one Records container property. + + Args: + property (Sequence[str]): Fully qualified container property path: + ``[space, container_external_id, property_id]``. + unit (UnitReference | UnitSystemReference): Target unit or target unit system. + """ + + def __init__(self, property: Sequence[str], unit: UnitReference | UnitSystemReference) -> None: + self.property = list(property) + self.unit = unit + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + property=resource["property"], + unit=UnitReference.load(resource["unit"]) + if "externalId" in resource["unit"] + else UnitSystemReference.load(resource["unit"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"property": self.property, "unit": self.unit.dump(camel_case=camel_case)} + + +class RecordTargetUnits(CogniteResource): + """Target unit conversions for a Records filter, sync, or aggregate request. + + Args: + properties (Sequence[RecordTargetUnit] | None): Property-specific target unit conversions. + unit_system_name (str | None): Convert all convertible properties to a target unit system. + """ + + def __init__( + self, properties: Sequence[RecordTargetUnit] | None = None, unit_system_name: str | None = None + ) -> None: + self.properties = list(properties) if properties is not None else None + self.unit_system_name = unit_system_name + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + if "properties" in resource: + return cls(properties=[RecordTargetUnit._load(item) for item in resource["properties"]]) + if "unitSystemName" in resource: + return cls(unit_system_name=resource["unitSystemName"]) + return cls() + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if self.unit_system_name is not None: + return {"unitSystemName" if camel_case else "unit_system_name": self.unit_system_name} + if self.properties is not None: + return { + "properties": [target_unit.dump(camel_case=camel_case) for target_unit in self.properties], + } + return {} + + +class SyncRecord(WriteableCogniteResource["RecordWrite"]): + """A record returned by the sync endpoint, annotated with a change status. + + For ``status="deleted"`` tombstones (mutable streams), :attr:`properties` is ``None``. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + status (Literal['created', 'updated', 'deleted']): The record's change status. + properties (Mapping[str, Mapping[str, Mapping[str, Any]]] | None): Property values (absent for + deleted tombstones). + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: Literal["created", "updated", "deleted"], + properties: Mapping[str, Mapping[str, Mapping[str, Any]]] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = ( + { + space: {container: dict(values) for container, values in containers.items()} + for space, containers in properties.items() + } + if properties is not None + else None + ) + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output: dict[str, Any] = { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + output["properties"] = deepcopy(self.properties) + return output + + def as_id(self) -> RecordId: + return RecordId(space=self.space, external_id=self.external_id) + + def as_write(self) -> RecordWrite: + """Reconstruct the :class:`RecordWrite` by grouping read properties back into sources.""" + sources = [ + RecordSource( + source=RecordContainerId(space=space, external_id=container), + properties=dict(props), + ) + for space, containers in (self.properties or {}).items() + for container, props in containers.items() + ] + return RecordWrite(space=self.space, external_id=self.external_id, sources=sources) + + +class SyncRecordList(CogniteResourceList[SyncRecord]): + """A page of :class:`SyncRecord` objects from the sync endpoint. + + Args: + resources (Sequence[SyncRecord]): The records in this page. + cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync_resume`` call to resume + from this position. + has_next (bool): Whether more changes are available beyond this page. + typing (TypeInformation | None): Property type information, present when the request was + made with ``include_typing=True``. + """ + + _RESOURCE = SyncRecord + + def __init__( + self, + resources: Sequence[SyncRecord], + cursor: str | None = None, + has_next: bool = False, + typing: TypeInformation | None = None, + ) -> None: + super().__init__(resources) + self.cursor = cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load_response(cls, response: dict[str, Any]) -> Self: + return cls._load_raw_api_response([response]) + + @classmethod + def _load_raw_api_response(cls, responses: list[dict[str, Any]]) -> Self: + last_response = responses[-1] + typing = next( + (TypeInformation._load(response["typing"]) for response in responses if "typing" in response), None + ) + return cls( + [SyncRecord._load(item) for response in responses for item in response["items"]], + cursor=last_response["nextCursor"], + has_next=last_response["hasNext"], + typing=typing, + ) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index b5df86a34b..4c38dcb1e3 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -223,6 +223,7 @@ def __repr__(self) -> str: class RecordsConcurrencyOperation(Enum): + READ = "read" WRITE = "write" @@ -233,15 +234,17 @@ class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. - write (int): Maximum concurrent write requests (ingest, delete). + read (int): Maximum concurrent read requests (sync). + write (int): Maximum concurrent write requests (ingest, upsert, delete). """ def __init__( self, concurrency_settings: ConcurrencySettings, + read: int, write: int, ) -> None: - super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) + super().__init__(concurrency_settings, "records", read=read, write=write, delete=0) def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) @@ -252,6 +255,8 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st global_config.concurrency_settings._freeze() match operation: + case RecordsConcurrencyOperation.READ: + sem = asyncio.BoundedSemaphore(self._read) case RecordsConcurrencyOperation.WRITE: sem = asyncio.BoundedSemaphore(self._write) case _: @@ -260,7 +265,7 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st return sem def __repr__(self) -> str: - return f"Concurrency[records](write={self._write})" + return f"Concurrency[records](read={self._read}, write={self._write})" class FileConcurrencyConfig(ConcurrencyConfig): @@ -425,7 +430,7 @@ def __init__(self) -> None: write_schema=1, ) self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) - self._records = RecordsGlobalConcurrencyConfig(self, write=20) + self._records = RecordsGlobalConcurrencyConfig(self, read=4, write=20) @functools.cached_property def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index 27ed46d90c..45119fd53e 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -6,11 +6,18 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.data_types import UnitReference +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.data_classes.data_modeling.records import ( RecordContainerId, RecordId, RecordSource, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, RecordWrite, + SyncRecord, + SyncRecordList, ) from tests.utils import jsgz_load @@ -55,6 +62,22 @@ def mock_upsert(httpx_mock: HTTPXMock, upsert_url_pattern: re.Pattern) -> None: httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202) +@pytest.fixture +def record_response() -> dict: + return { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"container-x": {"temp": 22.5}}}, + } + + +@pytest.fixture +def sync_url_pattern(records_base_url: str) -> re.Pattern: + return re.compile(re.escape(records_base_url) + r"/sync$") + + @pytest.fixture def write_item() -> RecordWrite: return RecordWrite( @@ -226,6 +249,135 @@ def test_upsert_chunks( assert len(jsgz_load(requests[1].content)["items"]) == 1 +class TestRecordsAPISync: + def test_sync_returns_page_with_cursor( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + items = [{**record_response, "externalId": f"rec-{i}", "status": "created"} for i in range(10)] + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": items, "nextCursor": "abc", "hasNext": False}, + ) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="7d-ago") + assert isinstance(page, SyncRecordList) + assert page.cursor == "abc" + assert page.has_next is False + assert page[0].status == "created" + request = httpx_mock.get_requests()[0] + assert request.url.path.endswith(f"/streams/{stream_id}/records/sync") + assert jsgz_load(request.content) == {"initializeCursor": "7d-ago", "limit": 10} + + def test_sync_resume_sends_cursor( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + stream_id: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={ + "items": [ + {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "created"} + ], + "nextCursor": "p2", + "hasNext": True, + }, + ) + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={ + "items": [ + {"space": "sp", "externalId": "rec-2", "createdTime": 3, "lastUpdatedTime": 4, "status": "updated"} + ], + "nextCursor": "p3", + "hasNext": False, + }, + ) + first = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="2d-ago", limit=1) + assert first.has_next is True + assert first.cursor is not None + second = cognite_client.data_modeling.records.sync_resume(stream_id=stream_id, cursor=first.cursor, limit=1) + assert second.cursor == "p3" + body2 = jsgz_load(httpx_mock.get_requests()[1].content) + assert body2 == {"cursor": "p2", "limit": 1} + + def test_sync_deleted_tombstone_has_no_properties( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + stream_id: str, + ) -> None: + item = {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "deleted"} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False}, + ) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="c", limit=1) + assert page[0].status == "deleted" + assert page[0].properties is None + + def test_sync_include_typing( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + item = {**record_response, "status": "updated"} + typing = {"sp": {"container-x": {"temp": {"type": {"type": "float64", "list": False}, "nullable": True}}}} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False, "typing": typing}, + ) + page = cognite_client.data_modeling.records.sync( + stream_id=stream_id, initialize_cursor="c", include_typing=True, limit=1 + ) + assert jsgz_load(httpx_mock.get_requests()[0].content)["includeTyping"] is True + assert isinstance(page.typing, TypeInformation) + + def test_sync_target_units_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + item = {**record_response, "status": "updated"} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False}, + ) + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + initialize_cursor="c", + target_units=RecordTargetUnits(unit_system_name="Imperial"), + limit=1, + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["targetUnits"] == {"unitSystemName": "Imperial"} + + class TestRecordDTOs: def test_record_write_as_id(self, write_item: RecordWrite) -> None: rid = write_item.as_id() @@ -256,3 +408,77 @@ def test_record_source_dump(self) -> None: d = src.dump() assert d["source"]["type"] == "container" assert d["properties"] == {"x": 1} + + def test_record_source_selector_dump(self) -> None: + selector = RecordSourceSelector(RecordContainerId(space="sp", external_id="c"), ["temp", "pressure"]) + assert selector.dump() == { + "source": {"type": "container", "space": "sp", "externalId": "c"}, + "properties": ["temp", "pressure"], + } + + def test_sync_record_as_write_reconstructs_sources(self) -> None: + record = SyncRecord( + space="sp", + external_id="rec-1", + created_time=1, + last_updated_time=2, + status="updated", + properties={"sp": {"c": {"temp": 22.5}}}, + ) + write = record.as_write() + assert isinstance(write, RecordWrite) + assert write.dump()["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "c"}, "properties": {"temp": 22.5}} + ] + + def test_record_target_units_dump(self) -> None: + target_units = RecordTargetUnits( + properties=[RecordTargetUnit(["sp", "c", "pressure"], UnitReference("pressure:pa"))] + ) + assert target_units.dump() == { + "properties": [{"property": ["sp", "c", "pressure"], "unit": {"externalId": "pressure:pa"}}] + } + + def test_record_target_units_rejects_empty_request_mode( + self, cognite_client: CogniteClient, stream_id: str + ) -> None: + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + initialize_cursor="c", + target_units=RecordTargetUnits(), + limit=1, + ) + + def test_record_target_units_rejects_multiple_request_modes( + self, cognite_client: CogniteClient, stream_id: str + ) -> None: + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + initialize_cursor="c", + target_units=RecordTargetUnits(properties=[], unit_system_name="Imperial"), + limit=1, + ) + + def test_sync_record_load_dump_round_trip(self) -> None: + payload = { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "status": "updated", + "properties": {"sp": {"c": {"temp": 22.5}}}, + } + record = SyncRecord._load(payload) + assert isinstance(record, SyncRecord) + assert record.status == "updated" + assert record.dump() == payload + + def test_sync_record_deleted_tombstone(self) -> None: + record = SyncRecord._load( + {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "deleted"} + ) + assert record.status == "deleted" + assert record.properties is None + assert "properties" not in record.dump()