Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 95 additions & 4 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@

import asyncio
from collections.abc import Sequence
from typing import TYPE_CHECKING, ClassVar, Literal
from typing import TYPE_CHECKING, Any, ClassVar, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
from cognite.client.data_classes.data_modeling.instances import InstanceSort
from cognite.client.data_classes.data_modeling.records import (
Record,
RecordId,
RecordIdSequence,
RecordList,
RecordSourceSelector,
RecordWrite,
TimeRange,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._url import interpolate_and_url_encode
Expand All @@ -14,7 +24,6 @@
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class RecordsAPI(APIClient):
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
Expand All @@ -23,11 +32,14 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
)

_OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = {
"read": RecordsConcurrencyOperation.READ,
"write": RecordsConcurrencyOperation.WRITE,
"delete": RecordsConcurrencyOperation.WRITE,
}
Comment on lines 34 to 38

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this is an unnecessary indirection 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm cleaning it up here: #2688

the concurrency setup is a bit complicated, so wrote something dumbed down in this PR.


def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore:
def _get_semaphore( # type: ignore[override]
self, operation: Literal["read", "write", "delete"], stream_type: Literal["immutable", "mutable"]
) -> asyncio.BoundedSemaphore:
Comment thread
andersfylling marked this conversation as resolved.
from cognite.client import global_config

return global_config.concurrency_settings.records._semaphore_factory(
Expand All @@ -44,6 +56,7 @@ async def delete(
items: RecordId | Sequence[RecordId],
*,
stream_id: str,
stream_type: Literal["immutable", "mutable"] = "immutable",
ignore_unknown_ids: Literal[True] = True,
) -> None:
"""`Delete records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_.
Expand All @@ -54,6 +67,7 @@ async def delete(
Args:
items (RecordId | Sequence[RecordId]): Records to delete.
stream_id (str): External ID of the stream to delete from.
stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable".
ignore_unknown_ids (Literal[True]): Currently only True is supported

Examples:
Expand All @@ -76,13 +90,15 @@ async def delete(
identifiers=RecordIdSequence.load(items),
wrap_ids=True,
resource_path=self._records_url(stream_id),
override_semaphore=self._get_semaphore("delete", stream_type),
Comment thread
andersfylling marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can't infer stream_type, you need to collapse this to a single semaphore setting that works in all cases

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I quite understand, could you elaborate?

)

async def ingest(
self,
items: RecordWrite | Sequence[RecordWrite],
*,
stream_id: str,
stream_type: Literal["immutable", "mutable"] = "immutable",
) -> None:
"""`Ingest records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_.

Expand All @@ -94,6 +110,7 @@ async def ingest(
Args:
items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest.
stream_id (str): External ID of the stream to ingest into.
stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable".

Examples:

Expand Down Expand Up @@ -128,13 +145,15 @@ async def ingest(
items=item_list,
resource_path=self._records_url(stream_id),
no_response=True,
override_semaphore=self._get_semaphore("write", stream_type),
Comment thread
andersfylling marked this conversation as resolved.
)

async def upsert(
self,
items: RecordWrite | Sequence[RecordWrite],
*,
stream_id: str,
stream_type: Literal["immutable", "mutable"] = "immutable",
upsert_mode: Literal["replace"] = "replace",
) -> None:
"""`Upsert records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_.
Expand All @@ -147,6 +166,7 @@ async def upsert(
Args:
items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert.
stream_id (str): External ID of the stream to upsert into.
stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable".
upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``.

Examples:
Expand Down Expand Up @@ -182,4 +202,75 @@ async def upsert(
items=item_list,
resource_path=self._records_url(stream_id, "/upsert"),
no_response=True,
override_semaphore=self._get_semaphore("write", stream_type),
Comment thread
andersfylling marked this conversation as resolved.
)

async def list(
self,
stream_id: str,
*,
stream_type: Literal["immutable", "mutable"] = "immutable",
last_updated_time: TimeRange | None = None,
filter: Filter | None = None,
sources: Sequence[RecordSourceSelector] | None = None,
sort: Sequence[InstanceSort] | InstanceSort | None = None,
limit: int = 10,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a default constant that should be used

include_typing: bool = False,
) -> RecordList:
"""`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.

Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
``sort`` is given.

Args:
stream_id (str): External ID of the stream to query.
stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable".
last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
immutable streams** (must include a lower bound).
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
limit (int): Maximum number of records to return (1-1000). Defaults to 10.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limit (int): Maximum number of records to return (1-1000). Defaults to 10.
limit (int): Maximum number of records to return (1-1000).

include_typing (bool): If True, include property type information on the returned
list's ``typing`` attribute.

Returns:
RecordList: The matching records.

Examples:

List records updated since a given timestamp:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.records import TimeRange
>>> client = CogniteClient()
>>> res = client.data_modeling.records.list(
... stream_id="my-stream",
... last_updated_time=TimeRange(gt=1705341600000),
... limit=100,
... )
"""
self._warning.warn()
other_params: dict[str, Any] = {}
if last_updated_time is not None:
other_params["lastUpdatedTime"] = last_updated_time.dump()
if sources is not None:
other_params["sources"] = [source.dump() for source in sources]
if sort is not None:
sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort)
other_params["sort"] = [spec.dump() for spec in sort_list]
if include_typing:
other_params["includeTyping"] = True

return await self._list(
list_cls=RecordList,
resource_cls=Record,
method="POST",
resource_path=self._records_url(stream_id),
url_path=self._records_url(stream_id, "/filter"),
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None,
override_semaphore=self._get_semaphore("read", stream_type),
Comment thread
andersfylling marked this conversation as resolved.
)
106 changes: 98 additions & 8 deletions cognite/client/_sync_api/data_modeling/records.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,15 @@
UnionAll,
)
from cognite.client.data_classes.data_modeling.records import (
Record,
RecordContainerId,
RecordId,
RecordList,
RecordSource,
RecordSourceSelector,
RecordWrite,
RecordWriteList,
TimeRange,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Expand Down Expand Up @@ -249,9 +253,12 @@
"Query",
"QueryResult",
"QuerySync",
"Record",
"RecordContainerId",
"RecordId",
"RecordList",
"RecordSource",
"RecordSourceSelector",
"RecordWrite",
"RecordWriteList",
"RequiresConstraint",
Expand Down Expand Up @@ -279,6 +286,7 @@
"StreamWrite",
"SubscriptionContext",
"Text",
"TimeRange",
"TimeSeriesReference",
"Timestamp",
"TranslatedQuery",
Expand Down
Loading