diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 15d4cf856d..43d488e485 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -5,7 +5,17 @@ from typing import TYPE_CHECKING, Any, Literal from cognite.client._api_client import APIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite +from cognite.client.data_classes.data_modeling.instances import InstanceSort +from cognite.client.data_classes.data_modeling.records import ( + Record, + RecordId, + RecordIdSequence, + RecordList, + RecordSourceSelector, + RecordWrite, + TimeRange, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -177,3 +187,70 @@ async def upsert( resource_path=self._records_url(stream_id, "/upsert"), no_response=True, ) + + async def list( + self, + stream_id: str, + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[InstanceSort] | InstanceSort | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> RecordList: + """`Filter records in a stream `_. + + Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom + ``sort`` is given. + + Args: + stream_id (str): External ID of the stream to query. + last_updated_time (TimeRange | None): Filter by last-updated time. **Required for + immutable streams** (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. + limit (int): Maximum number of records to return (1-1000). + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + RecordList: The matching records. + + Examples: + + List records updated since a given timestamp: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gt=1705341600000), + ... limit=100, + ... ) + """ + self._warning.warn() + other_params: dict[str, Any] = {} + if last_updated_time is not None: + other_params["lastUpdatedTime"] = last_updated_time.dump() + if sources is not None: + other_params["sources"] = [source.dump() for source in sources] + if sort is not None: + sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort) + other_params["sort"] = [spec.dump() for spec in sort_list] + if include_typing: + other_params["includeTyping"] = True + + return await self._list( + list_cls=RecordList, + resource_cls=Record, + method="POST", + resource_path=self._records_url(stream_id), + url_path=self._records_url(stream_id, "/filter"), + limit=limit, + filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, + other_params=other_params, + settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None, + ) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 30d29669c2..ad704c15ac 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -6fb9ded91a0d546955f9ed3109b468b6 +87a34ddba0646543b78ea05963291e8f This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -12,7 +12,15 @@ from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.data_classes.data_modeling.instances import InstanceSort +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordList, + RecordSourceSelector, + RecordWrite, + TimeRange, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: @@ -148,3 +156,59 @@ def upsert( return run_sync( self.__async_client.data_modeling.records.upsert(items=items, stream_id=stream_id, upsert_mode=upsert_mode) ) + + def list( + self, + stream_id: str, + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[InstanceSort] | InstanceSort | None = None, + limit: int = 10, + include_typing: bool = False, + ) -> RecordList: + """ + `Filter records in a stream `_. + + Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom + ``sort`` is given. + + Args: + stream_id (str): External ID of the stream to query. + last_updated_time (TimeRange | None): Filter by last-updated time. **Required for + immutable streams** (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. + limit (int): Maximum number of records to return (1-1000). + include_typing (bool): If True, include property type information on the returned + list's ``typing`` attribute. + + Returns: + RecordList: The matching records. + + Examples: + + List records updated since a given timestamp: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gt=1705341600000), + ... limit=100, + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.list( + stream_id=stream_id, + last_updated_time=last_updated_time, + filter=filter, + sources=sources, + sort=sort, + limit=limit, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 20c4831bf7..1a80dab6a3 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -120,11 +120,15 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.records import ( + Record, RecordContainerId, RecordId, + RecordList, RecordSource, + RecordSourceSelector, RecordWrite, RecordWriteList, + TimeRange, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( @@ -249,9 +253,12 @@ "Query", "QueryResult", "QuerySync", + "Record", "RecordContainerId", "RecordId", + "RecordList", "RecordSource", + "RecordSourceSelector", "RecordWrite", "RecordWriteList", "RequiresConstraint", @@ -279,6 +286,7 @@ "StreamWrite", "SubscriptionContext", "Text", + "TimeRange", "TimeSeriesReference", "Timestamp", "TranslatedQuery", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index deb7967b0f..4508552fab 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -10,11 +10,24 @@ CogniteResource, CogniteResourceList, WriteableCogniteResource, + WriteableCogniteResourceList, ) from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import TypeInformation from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId -__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"] +__all__ = [ + "Record", + "RecordContainerId", + "RecordId", + "RecordIdSequence", + "RecordList", + "RecordSource", + "RecordSourceSelector", + "RecordWrite", + "RecordWriteList", + "TimeRange", +] class RecordIdSequence(IdentifierSequenceCore[RecordId]): @@ -106,3 +119,154 @@ class RecordWriteList(CogniteResourceList[RecordWrite]): def as_ids(self) -> list[RecordId]: return [v.as_id() for v in self] + + +class Record(WriteableCogniteResource["RecordWrite"]): + """A record returned from the stream records API. + + This is the read version of :class:`RecordWrite`. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + properties (dict[str, dict[str, dict[str, Any]]] | None): Property values keyed by + ``{space: {container_external_id: {property_id: value}}}``. + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, dict[str, dict[str, Any]]] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output: dict[str, Any] = { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, + } + if self.properties is not None: + output["properties"] = self.properties + return output + + def as_id(self) -> RecordId: + return RecordId(space=self.space, external_id=self.external_id) + + def as_write(self) -> RecordWrite: + """Reconstruct the :class:`RecordWrite` by grouping read properties back into sources.""" + sources = [ + RecordSource( + source=RecordContainerId(space=space, external_id=container), + properties=dict(props), + ) + for space, containers in (self.properties or {}).items() + for container, props in containers.items() + ] + return RecordWrite(space=self.space, external_id=self.external_id, sources=sources) + + +class RecordList(WriteableCogniteResourceList[RecordWrite, Record]): + """A list of :class:`Record` objects. + + Args: + resources (Sequence[Record]): The records. + typing (TypeInformation | None): Property type information, present when the request + was made with ``include_typing=True``. + """ + + _RESOURCE = Record + + def __init__(self, resources: Sequence[Record], typing: TypeInformation | None = None) -> None: + super().__init__(resources) + self.typing = typing + + def as_ids(self) -> list[RecordId]: + return [record.as_id() for record in self] + + def as_write(self) -> RecordWriteList: + return RecordWriteList([record.as_write() for record in self]) + + @classmethod + def _load_raw_api_response(cls, responses: list[dict[str, Any]]) -> Self: + typing = next((TypeInformation._load(resp["typing"]) for resp in responses if "typing" in resp), None) + resources = [cls._RESOURCE._load(item) for response in responses for item in response.get("items", [])] + return cls(resources, typing) + + +class TimeRange(CogniteResource): + """A time range filter on ``lastUpdatedTime``. + + Bounds are either milliseconds since the Unix epoch (int) or an ISO-8601 string. At least a + lower bound (``gte`` or ``gt``) is required for immutable streams; specifying two lower or two + upper bounds is not allowed. + + Args: + gte (int | str | None): Greater than or equal to. + gt (int | str | None): Greater than. + lte (int | str | None): Less than or equal to. + lt (int | str | None): Less than. + """ + + def __init__( + self, + gte: int | str | None = None, + gt: int | str | None = None, + lte: int | str | None = None, + lt: int | str | None = None, + ) -> None: + self.gte = gte + self.gt = gt + self.lte = lte + self.lt = lt + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(gte=resource.get("gte"), gt=resource.get("gt"), lte=resource.get("lte"), lt=resource.get("lt")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + key: value + for key, value in {"gte": self.gte, "gt": self.gt, "lte": self.lte, "lt": self.lt}.items() + if value is not None + } + + +class RecordSourceSelector(CogniteResource): + """Selects which container properties to return for a record. + + Args: + source (RecordContainerId): The container to select properties from. + properties (list[str]): Property identifiers to return; use ``["*"]`` to return all. + """ + + def __init__(self, source: RecordContainerId, properties: list[str]) -> None: + self.source = source + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties} diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 5d59a668d5..513fcc1ece 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -228,8 +228,8 @@ class RecordsGlobalConcurrencyConfig(ConcurrencyConfig): Args: concurrency_settings (ConcurrencySettings): Reference to the parent settings object. - read (int): Maximum concurrent read requests (list, retrieve, sync). - write (int): Maximum concurrent write requests (ingest, delete). + read (int): Maximum concurrent read requests (list/filter, sync). + write (int): Maximum concurrent write requests (ingest, upsert, delete). """ def __init__( diff --git a/scripts/sync_client_codegen/create_sync_client.py b/scripts/sync_client_codegen/create_sync_client.py index 866410ac60..a531e1b26f 100644 --- a/scripts/sync_client_codegen/create_sync_client.py +++ b/scripts/sync_client_codegen/create_sync_client.py @@ -27,9 +27,9 @@ def create_sync_cognite_client( override_api_name = foolish_cls_name_rewrite(api) all_apis.append(f"self.{attr} = Sync{override_api_name}(async_client)\n") - import_path = path_as_importable( - SYNC_API_DIR / Path(file_path_lookup[api]).relative_to(ASYNC_API_DIR.resolve()) - ).replace(".__init__", "") + api_file = Path(file_path_lookup[api]) + api_dir = next(p for p in api_file.parents if p.name == ASYNC_API_DIR.name) + import_path = path_as_importable(SYNC_API_DIR / api_file.relative_to(api_dir)).replace(".__init__", "") all_imports.append(f"from {import_path} import Sync{override_api_name}") return COGNITE_CLIENT_TEMPLATE.format( diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index 27ed46d90c..7b787ea584 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -6,11 +6,16 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.instances import InstanceSort, TypeInformation from cognite.client.data_classes.data_modeling.records import ( + Record, RecordContainerId, RecordId, + RecordList, RecordSource, + RecordSourceSelector, RecordWrite, + TimeRange, ) from tests.utils import jsgz_load @@ -55,6 +60,27 @@ def mock_upsert(httpx_mock: HTTPXMock, upsert_url_pattern: re.Pattern) -> None: httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202) +@pytest.fixture +def filter_url_pattern(records_base_url: str) -> re.Pattern: + return re.compile(re.escape(records_base_url) + r"/filter$") + + +@pytest.fixture +def record_response() -> dict: + return { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"container-x": {"temp": 22.5}}}, + } + + +@pytest.fixture +def mock_filter(httpx_mock: HTTPXMock, filter_url_pattern: re.Pattern, record_response: dict) -> None: + httpx_mock.add_response(method="POST", url=filter_url_pattern, status_code=200, json={"items": [record_response]}) + + @pytest.fixture def write_item() -> RecordWrite: return RecordWrite( @@ -226,6 +252,94 @@ def test_upsert_chunks( assert len(jsgz_load(requests[1].content)["items"]) == 1 +class TestRecordsAPIList: + def test_list_returns_record_list( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + result = cognite_client.data_modeling.records.list(stream_id=stream_id) + assert isinstance(result, RecordList) + assert len(result) == 1 + assert result[0].external_id == "rec-1" + assert result[0].properties == {"sp": {"container-x": {"temp": 22.5}}} + request = httpx_mock.get_requests()[0] + assert request.url.path.endswith(f"/streams/{stream_id}/records/filter") + + def test_list_default_limit_is_10( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list(stream_id=stream_id) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body == {"limit": 10} + + def test_list_sends_last_updated_time_and_limit( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, last_updated_time=TimeRange(gte=1_000_000), limit=50 + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["lastUpdatedTime"] == {"gte": 1_000_000} + assert body["limit"] == 50 + + def test_list_sources_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, + sources=[RecordSourceSelector(RecordContainerId(space="sp", external_id="container-x"), ["*"])], + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "container-x"}, "properties": ["*"]} + ] + + def test_list_sort_body_shape( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + mock_filter: None, + stream_id: str, + ) -> None: + cognite_client.data_modeling.records.list( + stream_id=stream_id, sort=InstanceSort(property=["sp", "container-x", "temp"], direction="descending") + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["sort"] == [{"property": ["sp", "container-x", "temp"], "direction": "descending"}] + + def test_list_include_typing( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + filter_url_pattern: re.Pattern, + record_response: dict, + stream_id: str, + ) -> None: + typing = {"sp": {"container-x": {"temp": {"type": {"type": "float64", "list": False}, "nullable": True}}}} + httpx_mock.add_response( + method="POST", url=filter_url_pattern, status_code=200, json={"items": [record_response], "typing": typing} + ) + result = cognite_client.data_modeling.records.list(stream_id=stream_id, include_typing=True) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["includeTyping"] is True + assert isinstance(result.typing, TypeInformation) + + class TestRecordDTOs: def test_record_write_as_id(self, write_item: RecordWrite) -> None: rid = write_item.as_id() @@ -256,3 +370,57 @@ def test_record_source_dump(self) -> None: d = src.dump() assert d["source"]["type"] == "container" assert d["properties"] == {"x": 1} + + def test_record_load_dump_round_trip(self) -> None: + payload = { + "space": "sp", + "externalId": "rec-1", + "createdTime": 100, + "lastUpdatedTime": 200, + "properties": {"sp": {"c": {"temp": 22.5}}}, + } + record = Record._load(payload) + assert record.created_time == 100 + assert record.last_updated_time == 200 + assert record.dump() == payload + + def test_record_as_id(self) -> None: + record = Record(space="sp", external_id="rec-1", created_time=1, last_updated_time=2) + rid = record.as_id() + assert isinstance(rid, RecordId) + assert (rid.space, rid.external_id) == ("sp", "rec-1") + + def test_record_as_write_reconstructs_sources(self) -> None: + record = Record( + space="sp", + external_id="rec-1", + created_time=1, + last_updated_time=2, + properties={"sp": {"c": {"temp": 22.5}}}, + ) + write = record.as_write() + assert isinstance(write, RecordWrite) + assert write.dump()["sources"] == [ + {"source": {"type": "container", "space": "sp", "externalId": "c"}, "properties": {"temp": 22.5}} + ] + + def test_record_list_as_ids_and_as_write(self) -> None: + records = RecordList( + [ + Record(space="sp", external_id="rec-1", created_time=1, last_updated_time=2), + Record(space="sp", external_id="rec-2", created_time=1, last_updated_time=2), + ] + ) + assert records.as_ids() == [RecordId("sp", "rec-1"), RecordId("sp", "rec-2")] + assert [w.external_id for w in records.as_write()] == ["rec-1", "rec-2"] + + def test_time_range_dump_omits_none(self) -> None: + assert TimeRange(gte=1, lt=5).dump() == {"gte": 1, "lt": 5} + assert TimeRange().dump() == {} + + def test_record_source_selector_dump(self) -> None: + selector = RecordSourceSelector(RecordContainerId(space="sp", external_id="c"), ["temp", "pressure"]) + assert selector.dump() == { + "source": {"type": "container", "space": "sp", "externalId": "c"}, + "properties": ["temp", "pressure"], + } diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index 3858afbc4f..6876b26f78 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -34,6 +34,7 @@ data_models, graphql, instances, + records, spaces, statistics, streams, @@ -137,6 +138,7 @@ def test_data_modeling(self) -> None: run_docstring_tests(graphql) run_docstring_tests(statistics) run_docstring_tests(streams) + run_docstring_tests(records) run_docstring_tests(dm_time_series) def test_datapoint_subscriptions(self) -> None: