Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
from typing import TYPE_CHECKING, Any, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
from cognite.client.data_classes.data_modeling.instances import InstanceSort
from cognite.client.data_classes.data_modeling.records import (
Record,
RecordId,
RecordIdSequence,
RecordList,
RecordSourceSelector,
RecordWrite,
TimeRange,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._url import interpolate_and_url_encode

Expand Down Expand Up @@ -177,3 +187,70 @@ async def upsert(
resource_path=self._records_url(stream_id, "/upsert"),
no_response=True,
)

async def list(
self,
stream_id: str,
*,
last_updated_time: TimeRange | None = None,
filter: Filter | None = None,
sources: Sequence[RecordSourceSelector] | None = None,
sort: Sequence[InstanceSort] | InstanceSort | None = None,
limit: int = 10,
include_typing: bool = False,
) -> RecordList:
"""`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.

Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
``sort`` is given.

Args:
stream_id (str): External ID of the stream to query.
last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
immutable streams** (must include a lower bound).
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
limit (int): Maximum number of records to return (1-1000).
include_typing (bool): If True, include property type information on the returned
list's ``typing`` attribute.

Returns:
RecordList: The matching records.

Examples:

List records updated since a given timestamp:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.records import TimeRange
>>> client = CogniteClient()
>>> res = client.data_modeling.records.list(
... stream_id="my-stream",
... last_updated_time=TimeRange(gt=1705341600000),
... limit=100,
... )
"""
self._warning.warn()
other_params: dict[str, Any] = {}
if last_updated_time is not None:
other_params["lastUpdatedTime"] = last_updated_time.dump()
if sources is not None:
other_params["sources"] = [source.dump() for source in sources]
if sort is not None:
sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort)
other_params["sort"] = [spec.dump() for spec in sort_list]
Comment on lines +240 to +242

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If a user passes a dictionary or a list of dictionaries for the sort parameter (which is a common pattern in the SDK), isinstance(sort, InstanceSort) will evaluate to False, and list(sort) on a dictionary will return its keys. This leads to an AttributeError when trying to call .dump() on a string key. Update the logic to support both InstanceSort and dictionary representations gracefully.

        if sort is not None:\n            sort_list = [sort] if isinstance(sort, (InstanceSort, dict)) else list(sort)\n            other_params[\"sort\"] = [spec.dump() if isinstance(spec, InstanceSort) else spec for spec in sort_list]

if include_typing:
other_params["includeTyping"] = True

return await self._list(
list_cls=RecordList,
resource_cls=Record,
method="POST",
resource_path=self._records_url(stream_id),
url_path=self._records_url(stream_id, "/filter"),
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None,
)
68 changes: 66 additions & 2 deletions cognite/client/_sync_api/data_modeling/records.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,15 @@
UnionAll,
)
from cognite.client.data_classes.data_modeling.records import (
Record,
RecordContainerId,
RecordId,
RecordList,
RecordSource,
RecordSourceSelector,
RecordWrite,
RecordWriteList,
TimeRange,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Expand Down Expand Up @@ -249,9 +253,12 @@
"Query",
"QueryResult",
"QuerySync",
"Record",
"RecordContainerId",
"RecordId",
"RecordList",
"RecordSource",
"RecordSourceSelector",
"RecordWrite",
"RecordWriteList",
"RequiresConstraint",
Expand Down Expand Up @@ -279,6 +286,7 @@
"StreamWrite",
"SubscriptionContext",
"Text",
"TimeRange",
"TimeSeriesReference",
"Timestamp",
"TranslatedQuery",
Expand Down
166 changes: 165 additions & 1 deletion cognite/client/data_classes/data_modeling/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,24 @@
CogniteResource,
CogniteResourceList,
WriteableCogniteResource,
WriteableCogniteResourceList,
)
from cognite.client.data_classes.data_modeling.ids import ContainerId
from cognite.client.data_classes.data_modeling.instances import TypeInformation
from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId

__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"]
__all__ = [
"Record",
"RecordContainerId",
"RecordId",
"RecordIdSequence",
"RecordList",
"RecordSource",
"RecordSourceSelector",
"RecordWrite",
"RecordWriteList",
"TimeRange",
]


class RecordIdSequence(IdentifierSequenceCore[RecordId]):
Expand Down Expand Up @@ -106,3 +119,154 @@ class RecordWriteList(CogniteResourceList[RecordWrite]):

def as_ids(self) -> list[RecordId]:
return [v.as_id() for v in self]


class Record(WriteableCogniteResource["RecordWrite"]):
"""A record returned from the stream records API.

This is the read version of :class:`RecordWrite`.

Args:
space (str): Space the record belongs to.
external_id (str): External ID of the record.
created_time (int): Creation time in milliseconds since epoch.
last_updated_time (int): Last updated time in milliseconds since epoch.
properties (dict[str, dict[str, dict[str, Any]]] | None): Property values keyed by
``{space: {container_external_id: {property_id: value}}}``.
"""

def __init__(
self,
space: str,
external_id: str,
created_time: int,
last_updated_time: int,
properties: dict[str, dict[str, dict[str, Any]]] | None = None,
) -> None:
self.space = space
self.external_id = external_id
self.created_time = created_time
self.last_updated_time = last_updated_time
self.properties = properties

@classmethod
def _load(cls, resource: dict[str, Any]) -> Self:
return cls(
space=resource["space"],
external_id=resource["externalId"],
created_time=resource["createdTime"],
last_updated_time=resource["lastUpdatedTime"],
properties=resource.get("properties"),
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
output: dict[str, Any] = {
"space": self.space,
"externalId" if camel_case else "external_id": self.external_id,
"createdTime" if camel_case else "created_time": self.created_time,
"lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time,
}
if self.properties is not None:
output["properties"] = self.properties
return output

def as_id(self) -> RecordId:
return RecordId(space=self.space, external_id=self.external_id)

def as_write(self) -> RecordWrite:
"""Reconstruct the :class:`RecordWrite` by grouping read properties back into sources."""
sources = [
RecordSource(
source=RecordContainerId(space=space, external_id=container),
properties=dict(props),
)
for space, containers in (self.properties or {}).items()
for container, props in containers.items()
]
return RecordWrite(space=self.space, external_id=self.external_id, sources=sources)


class RecordList(WriteableCogniteResourceList[RecordWrite, Record]):
"""A list of :class:`Record` objects.

Args:
resources (Sequence[Record]): The records.
typing (TypeInformation | None): Property type information, present when the request
was made with ``include_typing=True``.
"""

_RESOURCE = Record

def __init__(self, resources: Sequence[Record], typing: TypeInformation | None = None) -> None:
super().__init__(resources)
self.typing = typing

def as_ids(self) -> list[RecordId]:
return [record.as_id() for record in self]

def as_write(self) -> RecordWriteList:
return RecordWriteList([record.as_write() for record in self])

@classmethod
def _load_raw_api_response(cls, responses: list[dict[str, Any]]) -> Self:
typing = next((TypeInformation._load(resp["typing"]) for resp in responses if "typing" in resp), None)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the API response contains a "typing" key but its value is null (None), resp["typing"] will be None, causing TypeInformation._load(None) to raise a TypeError or KeyError. Guard against this by checking if resp.get("typing") is not None.

        typing = next((TypeInformation._load(resp[\"typing\"]) for resp in responses if resp.get(\"typing\") is not None), None)

resources = [cls._RESOURCE._load(item) for response in responses for item in response.get("items", [])]
return cls(resources, typing)


class TimeRange(CogniteResource):
"""A time range filter on ``lastUpdatedTime``.

Bounds are either milliseconds since the Unix epoch (int) or an ISO-8601 string. At least a
lower bound (``gte`` or ``gt``) is required for immutable streams; specifying two lower or two
upper bounds is not allowed.

Args:
gte (int | str | None): Greater than or equal to.
gt (int | str | None): Greater than.
lte (int | str | None): Less than or equal to.
lt (int | str | None): Less than.
"""

def __init__(
self,
gte: int | str | None = None,
gt: int | str | None = None,
lte: int | str | None = None,
lt: int | str | None = None,
) -> None:
self.gte = gte
self.gt = gt
self.lte = lte
self.lt = lt

@classmethod
def _load(cls, resource: dict[str, Any]) -> Self:
return cls(gte=resource.get("gte"), gt=resource.get("gt"), lte=resource.get("lte"), lt=resource.get("lt"))

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {
key: value
for key, value in {"gte": self.gte, "gt": self.gt, "lte": self.lte, "lt": self.lt}.items()
if value is not None
}


class RecordSourceSelector(CogniteResource):
"""Selects which container properties to return for a record.

Args:
source (RecordContainerId): The container to select properties from.
properties (list[str]): Property identifiers to return; use ``["*"]`` to return all.
"""

def __init__(self, source: RecordContainerId, properties: list[str]) -> None:
self.source = source
self.properties = properties

@classmethod
def _load(cls, resource: dict[str, Any]) -> Self:
return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"])

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties}
Loading
Loading