diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 3d7591d727..f01659ecb2 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -2,10 +2,20 @@ import asyncio from collections.abc import Sequence -from typing import TYPE_CHECKING, ClassVar, Literal +from typing import TYPE_CHECKING, Any, ClassVar, Literal from cognite.client._api_client import APIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite +from cognite.client.data_classes.data_modeling.instances import InstanceSort +from cognite.client.data_classes.data_modeling.records import ( + Record, + RecordId, + RecordIdSequence, + RecordList, + RecordSourceSelector, + RecordWrite, + TimeRange, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._concurrency import RecordsConcurrencyOperation from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -14,7 +24,6 @@ from cognite.client import AsyncCogniteClient from cognite.client.config import ClientConfig - class RecordsAPI(APIClient): def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: super().__init__(config, api_version, cognite_client) @@ -23,11 +32,14 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client ) _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { + "read": RecordsConcurrencyOperation.READ, "write": RecordsConcurrencyOperation.WRITE, "delete": RecordsConcurrencyOperation.WRITE, } - def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: + def _get_semaphore( # type: ignore[override] + self, operation: Literal["read", "write", "delete"], stream_type: Literal["immutable", "mutable"] + ) -> asyncio.BoundedSemaphore: from cognite.client import global_config return global_config.concurrency_settings.records._semaphore_factory( @@ -44,6 +56,7 @@ async def delete( items: RecordId | Sequence[RecordId], *, stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", ignore_unknown_ids: Literal[True] = True, ) -> None: """`Delete records from a stream `_. @@ -54,6 +67,7 @@ async def delete( Args: items (RecordId | Sequence[RecordId]): Records to delete. stream_id (str): External ID of the stream to delete from. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". ignore_unknown_ids (Literal[True]): Currently only True is supported Examples: @@ -76,6 +90,7 @@ async def delete( identifiers=RecordIdSequence.load(items), wrap_ids=True, resource_path=self._records_url(stream_id), + override_semaphore=self._get_semaphore("delete", stream_type), ) async def ingest( @@ -83,6 +98,7 @@ async def ingest( items: RecordWrite | Sequence[RecordWrite], *, stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", ) -> None: """`Ingest records into a stream `_. @@ -94,6 +110,7 @@ async def ingest( Args: items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest. stream_id (str): External ID of the stream to ingest into. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". Examples: @@ -128,6 +145,7 @@ async def ingest( items=item_list, resource_path=self._records_url(stream_id), no_response=True, + override_semaphore=self._get_semaphore("write", stream_type), ) async def upsert( @@ -135,6 +153,7 @@ async def upsert( items: RecordWrite | Sequence[RecordWrite], *, stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", upsert_mode: Literal["replace"] = "replace", ) -> None: """`Upsert records into a stream `_. @@ -147,6 +166,7 @@ async def upsert( Args: items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert. stream_id (str): External ID of the stream to upsert into. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``. Examples: @@ -182,4 +202,75 @@ async def upsert( items=item_list, resource_path=self._records_url(stream_id, "/upsert"), no_response=True, + override_semaphore=self._get_semaphore("write", stream_type), + ) + + async def list( + self, + stream_id: str, + *, + stream_type: Literal["immutable", "mutable"] = "immutable", + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[InstanceSort] | InstanceSort | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> RecordList: + """`Filter records in a stream `_. + + Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom + ``sort`` is given. + + Args: + stream_id (str): External ID of the stream to query. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". + last_updated_time (TimeRange | None): Filter by last-updated time. **Required for + immutable streams** (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. + limit (int): Maximum number of records to return (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + RecordList: The matching records. + + Examples: + + List records updated since a given timestamp: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gt=1705341600000), + ... limit=100, + ... ) + """ + self._warning.warn() + other_params: dict[str, Any] = {} + if last_updated_time is not None: + other_params["lastUpdatedTime"] = last_updated_time.dump() + if sources is not None: + other_params["sources"] = [source.dump() for source in sources] + if sort is not None: + sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort) + other_params["sort"] = [spec.dump() for spec in sort_list] + if include_typing: + other_params["includeTyping"] = True + + return await self._list( + list_cls=RecordList, + resource_cls=Record, + method="POST", + resource_path=self._records_url(stream_id), + url_path=self._records_url(stream_id, "/filter"), + limit=limit, + filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, + other_params=other_params, + settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None, + override_semaphore=self._get_semaphore("read", stream_type), ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 9897313c9d..f8dfe8cc14 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -f86364d61385123f12bc60dd004ea1c2 +edfc27eee5ba5a131ccdddff1c6061f0 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -12,7 +12,15 @@ from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.data_classes.data_modeling.instances import InstanceSort +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordList, + RecordSourceSelector, + RecordWrite, + TimeRange, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: @@ -26,7 +34,12 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.__async_client = async_client def delete( - self, items: RecordId | Sequence[RecordId], *, stream_id: str, ignore_unknown_ids: Literal[True] = True + self, + items: RecordId | Sequence[RecordId], + *, + stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", + ignore_unknown_ids: Literal[True] = True, ) -> None: """ `Delete records from a stream `_. @@ -37,6 +50,7 @@ def delete( Args: items (RecordId | Sequence[RecordId]): Records to delete. stream_id (str): External ID of the stream to delete from. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". ignore_unknown_ids (Literal[True]): Currently only True is supported Examples: @@ -56,11 +70,17 @@ def delete( """ return run_sync( self.__async_client.data_modeling.records.delete( - items=items, stream_id=stream_id, ignore_unknown_ids=ignore_unknown_ids + items=items, stream_id=stream_id, stream_type=stream_type, ignore_unknown_ids=ignore_unknown_ids ) ) - def ingest(self, items: RecordWrite | Sequence[RecordWrite], *, stream_id: str) -> None: + def ingest( + self, + items: RecordWrite | Sequence[RecordWrite], + *, + stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", + ) -> None: """ `Ingest records into a stream `_. @@ -72,6 +92,7 @@ def ingest(self, items: RecordWrite | Sequence[RecordWrite], *, stream_id: str) Args: items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest. stream_id (str): External ID of the stream to ingest into. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". Examples: @@ -100,10 +121,17 @@ def ingest(self, items: RecordWrite | Sequence[RecordWrite], *, stream_id: str) ... stream_id="my-stream", ... ) """ - return run_sync(self.__async_client.data_modeling.records.ingest(items=items, stream_id=stream_id)) + return run_sync( + self.__async_client.data_modeling.records.ingest(items=items, stream_id=stream_id, stream_type=stream_type) + ) def upsert( - self, items: RecordWrite | Sequence[RecordWrite], *, stream_id: str, upsert_mode: Literal["replace"] = "replace" + self, + items: RecordWrite | Sequence[RecordWrite], + *, + stream_id: str, + stream_type: Literal["immutable", "mutable"] = "immutable", + upsert_mode: Literal["replace"] = "replace", ) -> None: """ `Upsert records into a stream `_. @@ -116,6 +144,7 @@ def upsert( Args: items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert. stream_id (str): External ID of the stream to upsert into. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``. Examples: @@ -146,5 +175,66 @@ def upsert( ... ) """ return run_sync( - self.__async_client.data_modeling.records.upsert(items=items, stream_id=stream_id, upsert_mode=upsert_mode) + self.__async_client.data_modeling.records.upsert( + items=items, stream_id=stream_id, stream_type=stream_type, upsert_mode=upsert_mode + ) + ) + + def list( + self, + stream_id: str, + *, + stream_type: Literal["immutable", "mutable"] = "immutable", + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[InstanceSort] | InstanceSort | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> RecordList: + """ + `Filter records in a stream `_. + + Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom + ``sort`` is given. + + Args: + stream_id (str): External ID of the stream to query. + stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". + last_updated_time (TimeRange | None): Filter by last-updated time. **Required for + immutable streams** (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. + limit (int): Maximum number of records to return (1-1000). Defaults to 10. + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + RecordList: The matching records. + + Examples: + + List records updated since a given timestamp: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gt=1705341600000), + ... limit=100, + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.list( + stream_id=stream_id, + stream_type=stream_type, + last_updated_time=last_updated_time, + filter=filter, + sources=sources, + sort=sort, + limit=limit, + include_typing=include_typing, + ) ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 20c4831bf7..1a80dab6a3 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -120,11 +120,15 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.records import ( + Record, RecordContainerId, RecordId, + RecordList, RecordSource, + RecordSourceSelector, RecordWrite, RecordWriteList, + TimeRange, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( @@ -249,9 +253,12 @@ "Query", "QueryResult", "QuerySync", + "Record", "RecordContainerId", "RecordId", + "RecordList", "RecordSource", + "RecordSourceSelector", "RecordWrite", "RecordWriteList", "RequiresConstraint", @@ -279,6 +286,7 @@ "StreamWrite", "SubscriptionContext", "Text", + "TimeRange", "TimeSeriesReference", "Timestamp", "TranslatedQuery", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index deb7967b0f..4508552fab 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -10,11 +10,24 @@ CogniteResource, CogniteResourceList, WriteableCogniteResource, + WriteableCogniteResourceList, ) from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId -__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"] +__all__ = [ + "Record", + "RecordContainerId", + "RecordId", + "RecordIdSequence", + "RecordList", + "RecordSource", + "RecordSourceSelector", + "RecordWrite", + "RecordWriteList", + "TimeRange", +] class RecordIdSequence(IdentifierSequenceCore[RecordId]): @@ -106,3 +119,154 @@ class RecordWriteList(CogniteResourceList[RecordWrite]): def as_ids(self) -> list[RecordId]: return [v.as_id() for v in self] + + +class Record(WriteableCogniteResource["RecordWrite"]): + """A record returned from the stream records API. + + This is the read version of :class:`RecordWrite`. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + properties (dict[str, dict[str, dict[str, Any]]] | None): Property values keyed by + ``{space: {container_external_id: {property_id: value}}}``. + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, dict[str, dict[str, Any]]] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output: dict[str, Any] = { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, + } + if self.properties is not None: + output["properties"] = self.properties + return output + + def as_id(self) -> RecordId: + return RecordId(space=self.space, external_id=self.external_id) + + def as_write(self) -> RecordWrite: + """Reconstruct the :class:`RecordWrite` by grouping read properties back into sources.""" + sources = [ + RecordSource( + source=RecordContainerId(space=space, external_id=container), + properties=dict(props), + ) + for space, containers in (self.properties or {}).items() + for container, props in containers.items() + ] + return RecordWrite(space=self.space, external_id=self.external_id, sources=sources) + + +class RecordList(WriteableCogniteResourceList[RecordWrite, Record]): + """A list of :class:`Record` objects. + + Args: + resources (Sequence[Record]): The records. + typing (TypeInformation | None): Property type information, present when the request + was made with ``include_typing=True``. + """ + + _RESOURCE = Record + + def __init__(self, resources: Sequence[Record], typing: TypeInformation | None = None) -> None: + super().__init__(resources) + self.typing = typing + + def as_ids(self) -> list[RecordId]: + return [record.as_id() for record in self] + + def as_write(self) -> RecordWriteList: + return RecordWriteList([record.as_write() for record in self]) + + @classmethod + def _load_raw_api_response(cls, responses: list[dict[str, Any]]) -> Self: + typing = next((TypeInformation._load(resp["typing"]) for resp in responses if "typing" in resp), None) + resources = [cls._RESOURCE._load(item) for response in responses for item in response.get("items", [])] + return cls(resources, typing) + + +class TimeRange(CogniteResource): + """A time range filter on ``lastUpdatedTime``. + + Bounds are either milliseconds since the Unix epoch (int) or an ISO-8601 string. At least a + lower bound (``gte`` or ``gt``) is required for immutable streams; specifying two lower or two + upper bounds is not allowed. + + Args: + gte (int | str | None): Greater than or equal to. + gt (int | str | None): Greater than. + lte (int | str | None): Less than or equal to. + lt (int | str | None): Less than. + """ + + def __init__( + self, + gte: int | str | None = None, + gt: int | str | None = None, + lte: int | str | None = None, + lt: int | str | None = None, + ) -> None: + self.gte = gte + self.gt = gt + self.lte = lte + self.lt = lt + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(gte=resource.get("gte"), gt=resource.get("gt"), lte=resource.get("lte"), lt=resource.get("lt")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + key: value + for key, value in {"gte": self.gte, "gt": self.gt, "lte": self.lte, "lt": self.lt}.items() + if value is not None + } + + +class RecordSourceSelector(CogniteResource): + """Selects which container properties to return for a record. + + Args: + source (RecordContainerId): The container to select properties from. + properties (list[str]): Property identifiers to return; use ``["*"]`` to return all. + """ + + def __init__(self, source: RecordContainerId, properties: list[str]) -> None: + self.source = source + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties} diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index b5df86a34b..fc468defaa 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -223,6 +223,7 @@ def __repr__(self) -> str: class RecordsConcurrencyOperation(Enum): + READ = "read" WRITE = "write" @@ -233,15 +234,17 @@ class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. - write (int): Maximum concurrent write requests (ingest, delete). + read (int): Maximum concurrent read requests (list/filter, sync). + write (int): Maximum concurrent write requests (ingest, upsert, delete). """ def __init__( self, concurrency_settings: ConcurrencySettings, + read: int, write: int, ) -> None: - super().__init__(concurrency_settings, "records", read=0, write=write, delete=0) + super().__init__(concurrency_settings, "records", read=read, write=write, delete=0) def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: str) -> asyncio.BoundedSemaphore: key = (operation.value, project, asyncio.get_running_loop()) @@ -252,6 +255,8 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st global_config.concurrency_settings._freeze() match operation: + case RecordsConcurrencyOperation.READ: + sem = asyncio.BoundedSemaphore(self._read) case RecordsConcurrencyOperation.WRITE: sem = asyncio.BoundedSemaphore(self._write) case _: @@ -260,7 +265,7 @@ def _semaphore_factory(self, operation: RecordsConcurrencyOperation, project: st return sem def __repr__(self) -> str: - return f"Concurrency[records](write={self._write})" + return f"Concurrency[records](read={self._read}, write={self._write})" class FileConcurrencyConfig(ConcurrencyConfig): @@ -425,7 +430,7 @@ def __init__(self) -> None: write_schema=1, ) self._files = FileConcurrencyConfig(self, read=4, write=2, upload=5, download=5, delete=2, open_files=15) - self._records = RecordsGlobalConcurrencyConfig(self, write=20) + self._records = RecordsGlobalConcurrencyConfig(self, read=10, write=20) @functools.cached_property def _all_concurrency_configs(self) -> list[ConcurrencyConfig]: diff --git a/scripts/sync_client_codegen/create_sync_client.py b/scripts/sync_client_codegen/create_sync_client.py index 866410ac60..a531e1b26f 100644 --- a/scripts/sync_client_codegen/create_sync_client.py +++ b/scripts/sync_client_codegen/create_sync_client.py @@ -27,9 +27,9 @@ def create_sync_cognite_client( override_api_name = foolish_cls_name_rewrite(api) all_apis.append(f"self.{attr} = Sync{override_api_name}(async_client)\n") - import_path = path_as_importable( - SYNC_API_DIR / Path(file_path_lookup[api]).relative_to(ASYNC_API_DIR.resolve()) - ).replace(".__init__", "") + api_file = Path(file_path_lookup[api]) + api_dir = next(p for p in api_file.parents if p.name == ASYNC_API_DIR.name) + import_path = path_as_importable(SYNC_API_DIR / api_file.relative_to(api_dir)).replace(".__init__", "") all_imports.append(f"from {import_path} import Sync{override_api_name}") return COGNITE_CLIENT_TEMPLATE.format( diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index 27ed46d90c..7b787ea584 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -6,11 +6,16 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.instances import InstanceSort, TypeInformation from cognite.client.data_classes.data_modeling.records import ( + Record, RecordContainerId, RecordId, + RecordList, RecordSource, + RecordSourceSelector, RecordWrite, + TimeRange, ) from tests.utils import jsgz_load @@ -55,6 +60,27 @@ def mock_upsert(httpx_mock: HTTPXMock, upsert_url_pattern: re.Pattern) -> None: httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202) +@pytest.fixture +def filter_url_pattern(records_base_url: str) -> re.Pattern: + return re.compile(re.escape(records_base_url) + r"/filter$") + + +@pytest.fixture +def record_response() -> dict: + return { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"container-x": {"temp": 22.5}}}, + } + + +@pytest.fixture +def mock_filter(httpx_mock: HTTPXMock, filter_url_pattern: re.Pattern, record_response: dict) -> None: + httpx_mock.add_response(method="POST", url=filter_url_pattern, status_code=200, json={"items": [record_response]}) + + @pytest.fixture def write_item() -> RecordWrite: return RecordWrite( @@ -226,6 +252,94 @@ def test_upsert_chunks( assert len(jsgz_load(requests[1].content)["items"]) == 1 +class TestRecordsAPIList: + def test_list_returns_record_list( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + result = cognite_client.data_modeling.records.list(stream_id=stream_id) + assert isinstance(result, RecordList) + assert len(result) == 1 + assert result[0].external_id == "rec-1" + assert result[0].properties == {"sp": {"container-x": {"temp": 22.5}}} + request = httpx_mock.get_requests()[0] + assert request.url.path.endswith(f"/streams/{stream_id}/records/filter") + + def test_list_default_limit_is_10( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list(stream_id=stream_id) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body == {"limit": 10} + + def test_list_sends_last_updated_time_and_limit( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, last_updated_time=TimeRange(gte=1_000_000), limit=50 + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["lastUpdatedTime"] == {"gte": 1_000_000} + assert body["limit"] == 50 + + def test_list_sources_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, + sources=[RecordSourceSelector(RecordContainerId(space="sp", external_id="container-x"), ["*"])], + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "container-x"}, "properties": ["*"]} + ] + + def test_list_sort_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, sort=InstanceSort(property=["sp", "container-x", "temp"], direction="descending") + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["sort"] == [{"property": ["sp", "container-x", "temp"], "direction": "descending"}] + + def test_list_include_typing( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + filter_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + typing = {"sp": {"container-x": {"temp": {"type": {"type": "float64", "list": False}, "nullable": True}}}} + httpx_mock.add_response( + method="POST", url=filter_url_pattern, status_code=200, json={"items": [record_response], "typing": typing} + ) + result = cognite_client.data_modeling.records.list(stream_id=stream_id, include_typing=True) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["includeTyping"] is True + assert isinstance(result.typing, TypeInformation) + + class TestRecordDTOs: def test_record_write_as_id(self, write_item: RecordWrite) -> None: rid = write_item.as_id() @@ -256,3 +370,57 @@ def test_record_source_dump(self) -> None: d = src.dump() assert d["source"]["type"] == "container" assert d["properties"] == {"x": 1} + + def test_record_load_dump_round_trip(self) -> None: + payload = { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"c": {"temp": 22.5}}}, + } + record = Record._load(payload) + assert record.created_time == 100 + assert record.last_updated_time == 200 + assert record.dump() == payload + + def test_record_as_id(self) -> None: + record = Record(space="sp", external_id="rec-1", created_time=1, last_updated_time=2) + rid = record.as_id() + assert isinstance(rid, RecordId) + assert (rid.space, rid.external_id) == ("sp", "rec-1") + + def test_record_as_write_reconstructs_sources(self) -> None: + record = Record( + space="sp", + external_id="rec-1", + created_time=1, + last_updated_time=2, + properties={"sp": {"c": {"temp": 22.5}}}, + ) + write = record.as_write() + assert isinstance(write, RecordWrite) + assert write.dump()["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "c"}, "properties": {"temp": 22.5}} + ] + + def test_record_list_as_ids_and_as_write(self) -> None: + records = RecordList( + [ + Record(space="sp", external_id="rec-1", created_time=1, last_updated_time=2), + Record(space="sp", external_id="rec-2", created_time=1, last_updated_time=2), + ] + ) + assert records.as_ids() == [RecordId("sp", "rec-1"), RecordId("sp", "rec-2")] + assert [w.external_id for w in records.as_write()] == ["rec-1", "rec-2"] + + def test_time_range_dump_omits_none(self) -> None: + assert TimeRange(gte=1, lt=5).dump() == {"gte": 1, "lt": 5} + assert TimeRange().dump() == {} + + def test_record_source_selector_dump(self) -> None: + selector = RecordSourceSelector(RecordContainerId(space="sp", external_id="c"), ["temp", "pressure"]) + assert selector.dump() == { + "source": {"type": "container", "space": "sp", "externalId": "c"}, + "properties": ["temp", "pressure"], + } diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index 3858afbc4f..6876b26f78 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -34,6 +34,7 @@ data_models, graphql, instances, + records, spaces, statistics, streams, @@ -137,6 +138,7 @@ def test_data_modeling(self) -> None: run_docstring_tests(graphql) run_docstring_tests(statistics) run_docstring_tests(streams) + run_docstring_tests(records) run_docstring_tests(dm_time_series) def test_datapoint_subscriptions(self) -> None: