From 35d8d1577abdf387157865ccc8ecbd1e7b932c12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Sun, 21 Jun 2026 12:18:19 +0200 Subject: [PATCH 1/4] feat(records): sync endpoint Co-authored-by: Cursor --- cognite/client/_api/data_modeling/records.py | 103 +++++++- .../client/_sync_api/data_modeling/records.py | 77 +++++- .../data_classes/data_modeling/__init__.py | 10 + .../data_classes/data_modeling/records.py | 210 +++++++++++++++- cognite/client/utils/_concurrency.py | 13 +- .../test_data_modeling/test_records.py | 233 ++++++++++++++++++ 6 files changed, 635 insertions(+), 11 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 3d7591d727..9d093eaa5d 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -2,10 +2,20 @@ import asyncio from collections.abc import Sequence -from typing import TYPE_CHECKING, ClassVar, Literal +from typing import TYPE_CHECKING, Any, ClassVar, Literal from cognite.client._api_client import APIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordIdSequence, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, + RecordWrite, + SyncRecord, + SyncRecordList, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._concurrency import RecordsConcurrencyOperation from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -23,11 +33,12 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client ) _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { + "read": RecordsConcurrencyOperation.READ, "write": RecordsConcurrencyOperation.WRITE, "delete": RecordsConcurrencyOperation.WRITE, } - def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: + def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore: from cognite.client import global_config return global_config.concurrency_settings.records._semaphore_factory( @@ -39,6 +50,14 @@ def _records_url(self, stream_id: str, suffix: str = "") -> str: # so it must not be percent-encoded. return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix + @staticmethod + def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUnit]) -> dict[str, Any]: + if isinstance(target_units, RecordTargetUnits): + if (target_units.properties is None) == (target_units.unit_system_name is None): + raise ValueError("Provide exactly one of 'properties' or 'unit_system_name'.") + return target_units.dump() + return RecordTargetUnits(properties=target_units).dump() + async def delete( self, items: RecordId | Sequence[RecordId], @@ -183,3 +202,81 @@ async def upsert( resource_path=self._records_url(stream_id, "/upsert"), no_response=True, ) + + async def sync( + self, + stream_id: str, + *, + cursor: str | None = None, + initialize_cursor: str | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """`Sync records from a stream `_. + + Returns the next page of the change feed (new, updated and deleted records). Provide exactly + one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a + relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and + pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates + whether more changes are immediately available. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str | None): Resume from a cursor returned by a previous sync call. + initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a + relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + + Examples: + + Initialize a sync, process the page, then resume from the cursor later: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> page = client.data_modeling.records.sync( + ... stream_id="my-stream", initialize_cursor="7d-ago" + ... ) + >>> for record in page: + ... pass # process record; record.status is created/updated/deleted + >>> next_page = client.data_modeling.records.sync( + ... stream_id="my-stream", cursor=page.cursor + ... ) + """ + self._warning.warn() + if cursor is not None and initialize_cursor is not None: + raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.") + other_params: dict[str, Any] = {} + if initialize_cursor is not None: + other_params["initializeCursor"] = initialize_cursor + if sources is not None: + other_params["sources"] = [source.dump() for source in sources] + if target_units is not None: + other_params["targetUnits"] = self._dump_target_units(target_units) + if include_typing: + other_params["includeTyping"] = True + + return await self._list( + list_cls=SyncRecordList, + resource_cls=SyncRecord, + method="POST", + resource_path=self._records_url(stream_id), + url_path=self._records_url(stream_id, "/sync"), + limit=limit, + filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, + other_params=other_params, + initial_cursor=cursor, + settings_forcing_raw_response_loading=["records_sync_cursor"], + override_semaphore=self._get_semaphore("read"), + ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 9897313c9d..4a2534f447 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -f86364d61385123f12bc60dd004ea1c2 +1b13cd8cb9a6d5758aee61658b3c8230 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -12,7 +12,15 @@ from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, + RecordWrite, + SyncRecordList, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: @@ -148,3 +156,68 @@ def upsert( return run_sync( self.__async_client.data_modeling.records.upsert(items=items, stream_id=stream_id, upsert_mode=upsert_mode) ) + + def sync( + self, + stream_id: str, + *, + cursor: str | None = None, + initialize_cursor: str | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """ + `Sync records from a stream `_. + + Returns the next page of the change feed (new, updated and deleted records). Provide exactly + one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a + relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and + pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates + whether more changes are immediately available. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str | None): Resume from a cursor returned by a previous sync call. + initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a + relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + + Examples: + + Initialize a sync, process the page, then resume from the cursor later: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> page = client.data_modeling.records.sync( + ... stream_id="my-stream", initialize_cursor="7d-ago" + ... ) + >>> for record in page: + ... pass # process record; record.status is created/updated/deleted + >>> next_page = client.data_modeling.records.sync( + ... stream_id="my-stream", cursor=page.cursor + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.sync( + stream_id=stream_id, + cursor=cursor, + initialize_cursor=initialize_cursor, + filter=filter, + sources=sources, + target_units=target_units, + limit=limit, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 20c4831bf7..282ab25b6f 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -123,8 +123,13 @@ RecordContainerId, RecordId, RecordSource, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, RecordWrite, RecordWriteList, + SyncRecord, + SyncRecordList, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( @@ -252,6 +257,9 @@ "RecordContainerId", "RecordId", "RecordSource", + "RecordSourceSelector", + "RecordTargetUnit", + "RecordTargetUnits", "RecordWrite", "RecordWriteList", "RequiresConstraint", @@ -278,6 +286,8 @@ "StreamTemplateWriteSettings", "StreamWrite", "SubscriptionContext", + "SyncRecord", + "SyncRecordList", "Text", "TimeSeriesReference", "Timestamp", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index deb7967b0f..88ecfe87dc 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -2,7 +2,7 @@ from collections.abc import Sequence from dataclasses import dataclass -from typing import Any +from typing import Any, Literal from typing_extensions import Self @@ -11,10 +11,24 @@ CogniteResourceList, WriteableCogniteResource, ) +from cognite.client.data_classes.data_modeling.data_types import UnitReference, UnitSystemReference from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId -__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"] +__all__ = [ + "RecordContainerId", + "RecordId", + "RecordIdSequence", + "RecordSource", + "RecordSourceSelector", + "RecordTargetUnit", + "RecordTargetUnits", + "RecordWrite", + "RecordWriteList", + "SyncRecord", + "SyncRecordList", +] class RecordIdSequence(IdentifierSequenceCore[RecordId]): @@ -106,3 +120,195 @@ class RecordWriteList(CogniteResourceList[RecordWrite]): def as_ids(self) -> list[RecordId]: return [v.as_id() for v in self] + + +class RecordSourceSelector(CogniteResource): + """Selects which container properties to return for a record. + + Args: + source (RecordContainerId): The container to select properties from. + properties (list[str]): Property identifiers to return; use ``["*"]`` to return all. + """ + + def __init__(self, source: RecordContainerId, properties: list[str]) -> None: + self.source = source + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties} + + +class RecordTargetUnit(CogniteResource): + """A target unit conversion for one Records container property. + + Args: + property (Sequence[str]): Fully qualified container property path: + ``[space, container_external_id, property_id]``. + unit (UnitReference | UnitSystemReference): Target unit or target unit system. + """ + + def __init__(self, property: Sequence[str], unit: UnitReference | UnitSystemReference) -> None: + self.property = list(property) + self.unit = unit + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + property=resource["property"], + unit=UnitReference.load(resource["unit"]) + if "externalId" in resource["unit"] + else UnitSystemReference.load(resource["unit"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"property": self.property, "unit": self.unit.dump(camel_case=camel_case)} + + +class RecordTargetUnits(CogniteResource): + """Target unit conversions for a Records filter, sync, or aggregate request. + + Args: + properties (Sequence[RecordTargetUnit] | None): Property-specific target unit conversions. + unit_system_name (str | None): Convert all convertible properties to a target unit system. + """ + + def __init__( + self, properties: Sequence[RecordTargetUnit] | None = None, unit_system_name: str | None = None + ) -> None: + self.properties = list(properties) if properties is not None else None + self.unit_system_name = unit_system_name + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + if "properties" in resource: + return cls(properties=[RecordTargetUnit._load(item) for item in resource["properties"]]) + if "unitSystemName" in resource: + return cls(unit_system_name=resource["unitSystemName"]) + return cls() + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if self.unit_system_name is not None: + return {"unitSystemName" if camel_case else "unit_system_name": self.unit_system_name} + if self.properties is not None: + return { + "properties": [target_unit.dump(camel_case=camel_case) for target_unit in self.properties], + } + return {} + + +class SyncRecord(WriteableCogniteResource["RecordWrite"]): + """A record returned by the sync endpoint, annotated with a change status. + + For ``status="deleted"`` tombstones (mutable streams), :attr:`properties` is ``None``. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + status (Literal['created', 'updated', 'deleted']): The record's change status. + properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for + deleted tombstones). + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: Literal["created", "updated", "deleted"], + properties: dict[str, dict[str, dict[str, Any]]] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output: dict[str, Any] = { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + output["properties"] = self.properties + return output + + def as_id(self) -> RecordId: + return RecordId(space=self.space, external_id=self.external_id) + + def as_write(self) -> RecordWrite: + """Reconstruct the :class:`RecordWrite` by grouping read properties back into sources.""" + sources = [ + RecordSource( + source=RecordContainerId(space=space, external_id=container), + properties=dict(props), + ) + for space, containers in (self.properties or {}).items() + for container, props in containers.items() + ] + return RecordWrite(space=self.space, external_id=self.external_id, sources=sources) + + +class SyncRecordList(CogniteResourceList[SyncRecord]): + """A page of :class:`SyncRecord` objects from the sync endpoint. + + Args: + resources (Sequence[SyncRecord]): The records in this page. + cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync`` call to resume + from this position. + has_next (bool): Whether more changes are available beyond this page. + typing (TypeInformation | None): Property type information, present when the request was + made with ``include_typing=True``. + """ + + _RESOURCE = SyncRecord + + def __init__( + self, + resources: Sequence[SyncRecord], + cursor: str | None = None, + has_next: bool = False, + typing: TypeInformation | None = None, + ) -> None: + super().__init__(resources) + self.cursor = cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load_response(cls, response: dict[str, Any]) -> Self: + return cls._load_raw_api_response([response]) + + @classmethod + def _load_raw_api_response(cls, responses: list[dict[str, Any]]) -> Self: + last_response = responses[-1] + typing = next( + (TypeInformation._load(response["typing"]) for response in responses if "typing" in response), None + ) + return cls( + [SyncRecord._load(item) for response in responses for item in response["items"]], + cursor=last_response["nextCursor"], + has_next=last_response["hasNext"], + typing=typing, + ) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index b5df86a34b..72cf2c7e5a 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -223,6 +223,7 @@ def __repr__(self) -> str: class RecordsConcurrencyOperation(Enum): + READ = "read" WRITE = "write" @@ -233,15 +234,17 @@ class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. - write (int): Maximum concurrent write requests (ingest, delete). + read (int): Maximum concurrent read requests (sync). + write (int): Maximum concurrent write requests (ingest, upsert, delete). """ def __init__( self, concurrency_settings: ConcurrencySettings, + read: int, write: int, ) -> None: - super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) + super().__init__(concurrency_settings, "records", read=read, write=write, delete=0) def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) @@ -252,6 +255,8 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st global_config.concurrency_settings._freeze() match operation: + case RecordsConcurrencyOperation.READ: + sem = asyncio.BoundedSemaphore(self._read) case RecordsConcurrencyOperation.WRITE: sem = asyncio.BoundedSemaphore(self._write) case _: @@ -260,7 +265,7 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st return sem def __repr__(self) -> str: - return f"Concurrency[records](write={self._write})" + return f"Concurrency[records](read={self._read}, write={self._write})" class FileConcurrencyConfig(ConcurrencyConfig): @@ -425,7 +430,7 @@ def __init__(self) -> None: write_schema=1, ) self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) - self._records = RecordsGlobalConcurrencyConfig(self, write=20) + self._records = RecordsGlobalConcurrencyConfig(self, read=20, write=20) @functools.cached_property def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index 27ed46d90c..cdb0a380c2 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -6,11 +6,18 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.data_types import UnitReference +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.data_classes.data_modeling.records import ( RecordContainerId, RecordId, RecordSource, + RecordSourceSelector, + RecordTargetUnit, + RecordTargetUnits, RecordWrite, + SyncRecord, + SyncRecordList, ) from tests.utils import jsgz_load @@ -55,6 +62,22 @@ def mock_upsert(httpx_mock: HTTPXMock, upsert_url_pattern: re.Pattern) -> None: httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202) +@pytest.fixture +def record_response() -> dict: + return { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"container-x": {"temp": 22.5}}}, + } + + +@pytest.fixture +def sync_url_pattern(records_base_url: str) -> re.Pattern: + return re.compile(re.escape(records_base_url) + r"/sync$") + + @pytest.fixture def write_item() -> RecordWrite: return RecordWrite( @@ -226,6 +249,142 @@ def test_upsert_chunks( assert len(jsgz_load(requests[1].content)["items"]) == 1 +class TestRecordsAPISync: + def test_sync_returns_page_with_cursor( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + items = [{**record_response, "externalId": f"rec-{i}", "status": "created"} for i in range(10)] + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": items, "nextCursor": "abc", "hasNext": False}, + ) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="7d-ago") + assert isinstance(page, SyncRecordList) + assert page.cursor == "abc" + assert page.has_next is False + assert page[0].status == "created" + request = httpx_mock.get_requests()[0] + assert request.url.path.endswith(f"/streams/{stream_id}/records/sync") + assert jsgz_load(request.content) == {"initializeCursor": "7d-ago", "limit": 10} + + def test_sync_resume_sends_cursor( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + stream_id: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={ + "items": [ + {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "created"} + ], + "nextCursor": "p2", + "hasNext": True, + }, + ) + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={ + "items": [ + {"space": "sp", "externalId": "rec-2", "createdTime": 3, "lastUpdatedTime": 4, "status": "updated"} + ], + "nextCursor": "p3", + "hasNext": False, + }, + ) + first = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="2d-ago", limit=1) + assert first.has_next is True + second = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor=first.cursor, limit=1) + assert second.cursor == "p3" + body2 = jsgz_load(httpx_mock.get_requests()[1].content) + assert body2 == {"cursor": "p2", "limit": 1} + + def test_sync_rejects_cursor_and_initialize_cursor( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + stream_id: str, + ) -> None: + with pytest.raises(ValueError, match="not both"): + cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", initialize_cursor="2d-ago") + assert httpx_mock.get_requests() == [] + + def test_sync_deleted_tombstone_has_no_properties( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + stream_id: str, + ) -> None: + item = {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "deleted"} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False}, + ) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", limit=1) + assert page[0].status == "deleted" + assert page[0].properties is None + + def test_sync_include_typing( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + item = {**record_response, "status": "updated"} + typing = {"sp": {"container-x": {"temp": {"type": {"type": "float64", "list": False}, "nullable": True}}}} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False, "typing": typing}, + ) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", include_typing=True, limit=1) + assert jsgz_load(httpx_mock.get_requests()[0].content)["includeTyping"] is True + assert isinstance(page.typing, TypeInformation) + + def test_sync_target_units_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + sync_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + item = {**record_response, "status": "updated"} + httpx_mock.add_response( + method="POST", + url=sync_url_pattern, + status_code=200, + json={"items": [item], "nextCursor": "z", "hasNext": False}, + ) + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + cursor="c", + target_units=RecordTargetUnits(unit_system_name="Imperial"), + limit=1, + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["targetUnits"] == {"unitSystemName": "Imperial"} + + class TestRecordDTOs: def test_record_write_as_id(self, write_item: RecordWrite) -> None: rid = write_item.as_id() @@ -256,3 +415,77 @@ def test_record_source_dump(self) -> None: d = src.dump() assert d["source"]["type"] == "container" assert d["properties"] == {"x": 1} + + def test_record_source_selector_dump(self) -> None: + selector = RecordSourceSelector(RecordContainerId(space="sp", external_id="c"), ["temp", "pressure"]) + assert selector.dump() == { + "source": {"type": "container", "space": "sp", "externalId": "c"}, + "properties": ["temp", "pressure"], + } + + def test_sync_record_as_write_reconstructs_sources(self) -> None: + record = SyncRecord( + space="sp", + external_id="rec-1", + created_time=1, + last_updated_time=2, + status="updated", + properties={"sp": {"c": {"temp": 22.5}}}, + ) + write = record.as_write() + assert isinstance(write, RecordWrite) + assert write.dump()["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "c"}, "properties": {"temp": 22.5}} + ] + + def test_record_target_units_dump(self) -> None: + target_units = RecordTargetUnits( + properties=[RecordTargetUnit(["sp", "c", "pressure"], UnitReference("pressure:pa"))] + ) + assert target_units.dump() == { + "properties": [{"property": ["sp", "c", "pressure"], "unit": {"externalId": "pressure:pa"}}] + } + + def test_record_target_units_rejects_empty_request_mode( + self, cognite_client: CogniteClient, stream_id: str + ) -> None: + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + cursor="c", + target_units=RecordTargetUnits(), + limit=1, + ) + + def test_record_target_units_rejects_multiple_request_modes( + self, cognite_client: CogniteClient, stream_id: str + ) -> None: + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.records.sync( + stream_id=stream_id, + cursor="c", + target_units=RecordTargetUnits(properties=[], unit_system_name="Imperial"), + limit=1, + ) + + def test_sync_record_load_dump_round_trip(self) -> None: + payload = { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "status": "updated", + "properties": {"sp": {"c": {"temp": 22.5}}}, + } + record = SyncRecord._load(payload) + assert isinstance(record, SyncRecord) + assert record.status == "updated" + assert record.dump() == payload + + def test_sync_record_deleted_tombstone(self) -> None: + record = SyncRecord._load( + {"space": "sp", "externalId": "rec-1", "createdTime": 1, "lastUpdatedTime": 2, "status": "deleted"} + ) + assert record.status == "deleted" + assert record.properties is None + assert "properties" not in record.dump() From cc796b0652150aed07408cfe779c5e65bca00584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Sun, 21 Jun 2026 12:18:31 +0200 Subject: [PATCH 2/4] feat(records): sync resume method Separate initial sync from cursor-based continuation while sharing the request assembly and pagination through the internal _sync helper. Co-authored-by: Cursor --- cognite/client/_api/data_modeling/records.py | 120 +++++++++++++----- .../client/_sync_api/data_modeling/records.py | 61 +++++++-- .../data_classes/data_modeling/records.py | 35 +++-- .../test_data_modeling/test_records.py | 25 ++-- 4 files changed, 165 insertions(+), 76 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 9d093eaa5d..25af161d26 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -58,6 +58,42 @@ def _dump_target_units(target_units: RecordTargetUnits | Sequence[RecordTargetUn return target_units.dump() return RecordTargetUnits(properties=target_units).dump() + async def _sync( + self, + stream_id: str, + *, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + initialize_cursor: str | None = None, + cursor: str | None = None, + ) -> SyncRecordList: + other_params: dict[str, Any] = {} + if initialize_cursor is not None: + other_params["initializeCursor"] = initialize_cursor + if sources is not None: + other_params["sources"] = [source.dump() for source in sources] + if target_units is not None: + other_params["targetUnits"] = self._dump_target_units(target_units) + if include_typing: + other_params["includeTyping"] = True + + return await self._list( + list_cls=SyncRecordList, + resource_cls=SyncRecord, + method="POST", + resource_path=self._records_url(stream_id), + url_path=self._records_url(stream_id, "/sync"), + limit=limit, + filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, + other_params=other_params, + initial_cursor=cursor, + settings_forcing_raw_response_loading=["records_sync_cursor"], + override_semaphore=self._get_semaphore("read"), + ) + async def delete( self, items: RecordId | Sequence[RecordId], @@ -207,8 +243,7 @@ async def sync( self, stream_id: str, *, - cursor: str | None = None, - initialize_cursor: str | None = None, + initialize_cursor: str, filter: Filter | None = None, sources: Sequence[RecordSourceSelector] | None = None, target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, @@ -217,17 +252,14 @@ async def sync( ) -> SyncRecordList: """`Sync records from a stream `_. - Returns the next page of the change feed (new, updated and deleted records). Provide exactly - one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a - relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and - pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates - whether more changes are immediately available. + Returns the first page of the change feed (new, updated and deleted records). Provide + ``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned + :attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue; + :attr:`SyncRecordList.has_next` indicates whether more changes are immediately available. Args: stream_id (str): External ID of the stream to sync. - cursor (str | None): Resume from a cursor returned by a previous sync call. - initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a - relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set. + initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``. filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). sources (Sequence[RecordSourceSelector] | None): Which container properties to return. target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert @@ -250,33 +282,55 @@ async def sync( ... ) >>> for record in page: ... pass # process record; record.status is created/updated/deleted - >>> next_page = client.data_modeling.records.sync( + >>> next_page = client.data_modeling.records.sync_resume( ... stream_id="my-stream", cursor=page.cursor ... ) """ self._warning.warn() - if cursor is not None and initialize_cursor is not None: - raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.") - other_params: dict[str, Any] = {} - if initialize_cursor is not None: - other_params["initializeCursor"] = initialize_cursor - if sources is not None: - other_params["sources"] = [source.dump() for source in sources] - if target_units is not None: - other_params["targetUnits"] = self._dump_target_units(target_units) - if include_typing: - other_params["includeTyping"] = True + return await self._sync( + stream_id=stream_id, + initialize_cursor=initialize_cursor, + limit=limit, + filter=filter, + sources=sources, + target_units=target_units, + include_typing=include_typing, + ) - return await self._list( - list_cls=SyncRecordList, - resource_cls=SyncRecord, - method="POST", - resource_path=self._records_url(stream_id), - url_path=self._records_url(stream_id, "/sync"), + async def sync_resume( + self, + stream_id: str, + *, + cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str): Resume from a cursor returned by a previous sync call. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + """ + self._warning.warn() + return await self._sync( + stream_id=stream_id, + cursor=cursor, limit=limit, - filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, - other_params=other_params, - initial_cursor=cursor, - settings_forcing_raw_response_loading=["records_sync_cursor"], - override_semaphore=self._get_semaphore("read"), + filter=filter, + sources=sources, + target_units=target_units, + include_typing=include_typing, ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 4a2534f447..b524560c47 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -1b13cd8cb9a6d5758aee61658b3c8230 +1026c1cbc96879e5164690932bcbb9b9 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -161,8 +161,7 @@ def sync( self, stream_id: str, *, - cursor: str | None = None, - initialize_cursor: str | None = None, + initialize_cursor: str, filter: Filter | None = None, sources: Sequence[RecordSourceSelector] | None = None, target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, @@ -172,17 +171,14 @@ def sync( """ `Sync records from a stream `_. - Returns the next page of the change feed (new, updated and deleted records). Provide exactly - one of ``cursor`` (to resume a previous position) or ``initialize_cursor`` (to start from a - relative time such as ``"7d-ago"``). Persist the returned :attr:`SyncRecordList.cursor` and - pass it as ``cursor`` on the next call to continue; :attr:`SyncRecordList.has_next` indicates - whether more changes are immediately available. + Returns the first page of the change feed (new, updated and deleted records). Provide + ``initialize_cursor`` to start from a relative time such as ``"7d-ago"``. Persist the returned + :attr:`SyncRecordList.cursor` and pass it to :meth:`sync_resume` on the next call to continue; + :attr:`SyncRecordList.has_next` indicates whether more changes are immediately available. Args: stream_id (str): External ID of the stream to sync. - cursor (str | None): Resume from a cursor returned by a previous sync call. - initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a - relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set. + initialize_cursor (str): Where to start, as a relative duration like ``"7d-ago"``. filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). sources (Sequence[RecordSourceSelector] | None): Which container properties to return. target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert @@ -205,14 +201,13 @@ def sync( ... ) >>> for record in page: ... pass # process record; record.status is created/updated/deleted - >>> next_page = client.data_modeling.records.sync( + >>> next_page = client.data_modeling.records.sync_resume( ... stream_id="my-stream", cursor=page.cursor ... ) """ return run_sync( self.__async_client.data_modeling.records.sync( stream_id=stream_id, - cursor=cursor, initialize_cursor=initialize_cursor, filter=filter, sources=sources, @@ -221,3 +216,43 @@ def sync( include_typing=include_typing, ) ) + + def sync_resume( + self, + stream_id: str, + *, + cursor: str, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> SyncRecordList: + """ + Resume syncing records from a stream using a cursor from :meth:`sync` or :meth:`sync_resume`. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str): Resume from a cursor returned by a previous sync call. + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert + to another unit. + limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + SyncRecordList: One page of change records, with ``cursor`` and ``has_next`` set. + """ + return run_sync( + self.__async_client.data_modeling.records.sync_resume( + stream_id=stream_id, + cursor=cursor, + filter=filter, + sources=sources, + target_units=target_units, + limit=limit, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index 88ecfe87dc..6e22a4ffa1 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from dataclasses import dataclass from typing import Any, Literal @@ -54,12 +54,12 @@ class RecordSource(CogniteResource): Args: source (RecordContainerId): Reference to the container. - properties (dict[str, Any]): The data to write to the source container. + properties (Mapping[str, Any]): The data to write to the source container. """ - def __init__(self, source: RecordContainerId, properties: dict[str, Any]) -> None: + def __init__(self, source: RecordContainerId, properties: Mapping[str, Any]) -> None: self.source = source - self.properties = properties + self.properties = dict(properties) @classmethod def _load(cls, resource: dict[str, Any]) -> Self: @@ -83,13 +83,13 @@ class RecordWrite(WriteableCogniteResource["RecordWrite"]): Args: space (str): Space the record belongs to. external_id (str): External ID of the record (1-256 chars, no null bytes). - sources (list[RecordSource]): Container property values to write (1-100 sources). + sources (Sequence[RecordSource]): Container property values to write (1-100 sources). """ - def __init__(self, space: str, external_id: str, sources: list[RecordSource]) -> None: + def __init__(self, space: str, external_id: str, sources: Sequence[RecordSource]) -> None: self.space = space self.external_id = external_id - self.sources = sources + self.sources = list(sources) def as_id(self) -> RecordId: return RecordId(space=self.space, external_id=self.external_id) @@ -127,12 +127,12 @@ class RecordSourceSelector(CogniteResource): Args: source (RecordContainerId): The container to select properties from. - properties (list[str]): Property identifiers to return; use ``["*"]`` to return all. + properties (Sequence[str]): Property identifiers to return; use ``["*"]`` to return all. """ - def __init__(self, source: RecordContainerId, properties: list[str]) -> None: + def __init__(self, source: RecordContainerId, properties: Sequence[str]) -> None: self.source = source - self.properties = properties + self.properties = list(properties) @classmethod def _load(cls, resource: dict[str, Any]) -> Self: @@ -211,7 +211,7 @@ class SyncRecord(WriteableCogniteResource["RecordWrite"]): created_time (int): Creation time in milliseconds since epoch. last_updated_time (int): Last updated time in milliseconds since epoch. status (Literal['created', 'updated', 'deleted']): The record's change status. - properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for + properties (Mapping[str, Mapping[str, Mapping[str, Any]]] | None): Property values (absent for deleted tombstones). """ @@ -222,14 +222,21 @@ def __init__( created_time: int, last_updated_time: int, status: Literal["created", "updated", "deleted"], - properties: dict[str, dict[str, dict[str, Any]]] | None = None, + properties: Mapping[str, Mapping[str, Mapping[str, Any]]] | None = None, ) -> None: self.space = space self.external_id = external_id self.created_time = created_time self.last_updated_time = last_updated_time self.status = status - self.properties = properties + self.properties = ( + { + space: {container: dict(values) for container, values in containers.items()} + for space, containers in properties.items() + } + if properties is not None + else None + ) @classmethod def _load(cls, resource: dict[str, Any]) -> Self: @@ -275,7 +282,7 @@ class SyncRecordList(CogniteResourceList[SyncRecord]): Args: resources (Sequence[SyncRecord]): The records in this page. - cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync`` call to resume + cursor (str | None): Cursor to pass as ``cursor`` to the next ``sync_resume`` call to resume from this position. has_next (bool): Whether more changes are available beyond this page. typing (TypeInformation | None): Property type information, present when the request was diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index cdb0a380c2..45119fd53e 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -307,21 +307,12 @@ def test_sync_resume_sends_cursor( ) first = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="2d-ago", limit=1) assert first.has_next is True - second = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor=first.cursor, limit=1) + assert first.cursor is not None + second = cognite_client.data_modeling.records.sync_resume(stream_id=stream_id, cursor=first.cursor, limit=1) assert second.cursor == "p3" body2 = jsgz_load(httpx_mock.get_requests()[1].content) assert body2 == {"cursor": "p2", "limit": 1} - def test_sync_rejects_cursor_and_initialize_cursor( - self, - cognite_client: CogniteClient, - httpx_mock: HTTPXMock, - stream_id: str, - ) -> None: - with pytest.raises(ValueError, match="not both"): - cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", initialize_cursor="2d-ago") - assert httpx_mock.get_requests() == [] - def test_sync_deleted_tombstone_has_no_properties( self, cognite_client: CogniteClient, @@ -336,7 +327,7 @@ def test_sync_deleted_tombstone_has_no_properties( status_code=200, json={"items": [item], "nextCursor": "z", "hasNext": False}, ) - page = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", limit=1) + page = cognite_client.data_modeling.records.sync(stream_id=stream_id, initialize_cursor="c", limit=1) assert page[0].status == "deleted" assert page[0].properties is None @@ -356,7 +347,9 @@ def test_sync_include_typing( status_code=200, json={"items": [item], "nextCursor": "z", "hasNext": False, "typing": typing}, ) - page = cognite_client.data_modeling.records.sync(stream_id=stream_id, cursor="c", include_typing=True, limit=1) + page = cognite_client.data_modeling.records.sync( + stream_id=stream_id, initialize_cursor="c", include_typing=True, limit=1 + ) assert jsgz_load(httpx_mock.get_requests()[0].content)["includeTyping"] is True assert isinstance(page.typing, TypeInformation) @@ -377,7 +370,7 @@ def test_sync_target_units_body_shape( ) cognite_client.data_modeling.records.sync( stream_id=stream_id, - cursor="c", + initialize_cursor="c", target_units=RecordTargetUnits(unit_system_name="Imperial"), limit=1, ) @@ -452,7 +445,7 @@ def test_record_target_units_rejects_empty_request_mode( with pytest.raises(ValueError, match="exactly one"): cognite_client.data_modeling.records.sync( stream_id=stream_id, - cursor="c", + initialize_cursor="c", target_units=RecordTargetUnits(), limit=1, ) @@ -463,7 +456,7 @@ def test_record_target_units_rejects_multiple_request_modes( with pytest.raises(ValueError, match="exactly one"): cognite_client.data_modeling.records.sync( stream_id=stream_id, - cursor="c", + initialize_cursor="c", target_units=RecordTargetUnits(properties=[], unit_system_name="Imperial"), limit=1, ) From bd029258f2aecfea042d26c8bde6ce28d1387d7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Sun, 21 Jun 2026 12:18:42 +0200 Subject: [PATCH 3/4] fix(records): avoid dump aliasing for property payloads Deep-copy record property payloads when dumping so load/dump round-trip tests and callers cannot mutate the source instance through the returned dictionary. Co-authored-by: Cursor --- cognite/client/data_classes/data_modeling/records.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index 6e22a4ffa1..c64a749613 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Mapping, Sequence +from copy import deepcopy from dataclasses import dataclass from typing import Any, Literal @@ -71,7 +72,7 @@ def _load(cls, resource: dict[str, Any]) -> Self: def dump(self, camel_case: bool = True) -> dict[str, Any]: return { "source": self.source.dump(camel_case=camel_case), - "properties": self.properties, + "properties": deepcopy(self.properties), } @@ -258,7 +259,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: "status": self.status, } if self.properties is not None: - output["properties"] = self.properties + output["properties"] = deepcopy(self.properties) return output def as_id(self) -> RecordId: From ad587bc289deb62e10c91ca57d43954c9a284b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Sun, 21 Jun 2026 12:19:04 +0200 Subject: [PATCH 4/4] chore(records): set read concurrency to 4 Match the lowest read limit documented for Records by keeping the SDK's concurrent read requests capped at 4. Co-authored-by: Cursor --- cognite/client/utils/_concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 72cf2c7e5a..4c38dcb1e3 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -430,7 +430,7 @@ def __init__(self) -> None: write_schema=1, ) self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) - self._records = RecordsGlobalConcurrencyConfig(self, read=20, write=20) + self._records = RecordsGlobalConcurrencyConfig(self, read=4, write=20) @functools.cached_property def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: