-
Notifications
You must be signed in to change notification settings - Fork 37
feat(records): add list (filter) endpoint with read data classes #2680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4f3a737
39ae6e2
cd05ce9
b99c3d3
71fb912
7c6db37
0d88796
0a56f21
01e201b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,10 +2,20 @@ | |||||
|
|
||||||
| import asyncio | ||||||
| from collections.abc import Sequence | ||||||
| from typing import TYPE_CHECKING, ClassVar, Literal | ||||||
| from typing import TYPE_CHECKING, Any, ClassVar, Literal | ||||||
|
|
||||||
| from cognite.client._api_client import APIClient | ||||||
| from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite | ||||||
| from cognite.client.data_classes.data_modeling.instances import InstanceSort | ||||||
| from cognite.client.data_classes.data_modeling.records import ( | ||||||
| Record, | ||||||
| RecordId, | ||||||
| RecordIdSequence, | ||||||
| RecordList, | ||||||
| RecordSourceSelector, | ||||||
| RecordWrite, | ||||||
| TimeRange, | ||||||
| ) | ||||||
| from cognite.client.data_classes.filters import Filter | ||||||
| from cognite.client.utils._concurrency import RecordsConcurrencyOperation | ||||||
| from cognite.client.utils._experimental import FeaturePreviewWarning | ||||||
| from cognite.client.utils._url import interpolate_and_url_encode | ||||||
|
|
@@ -14,7 +24,6 @@ | |||||
| from cognite.client import AsyncCogniteClient | ||||||
| from cognite.client.config import ClientConfig | ||||||
|
|
||||||
|
|
||||||
| class RecordsAPI(APIClient): | ||||||
| def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: | ||||||
| super().__init__(config, api_version, cognite_client) | ||||||
|
|
@@ -23,11 +32,14 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client | |||||
| ) | ||||||
|
|
||||||
| _OPERATION_TO_RATE_LIMIT: ClassVar[dict[str, RecordsConcurrencyOperation]] = { | ||||||
| "read": RecordsConcurrencyOperation.READ, | ||||||
| "write": RecordsConcurrencyOperation.WRITE, | ||||||
| "delete": RecordsConcurrencyOperation.WRITE, | ||||||
| } | ||||||
|
|
||||||
| def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.BoundedSemaphore: | ||||||
| def _get_semaphore( # type: ignore[override] | ||||||
| self, operation: Literal["read", "write", "delete"], stream_type: Literal["immutable", "mutable"] | ||||||
| ) -> asyncio.BoundedSemaphore: | ||||||
|
andersfylling marked this conversation as resolved.
|
||||||
| from cognite.client import global_config | ||||||
|
|
||||||
| return global_config.concurrency_settings.records._semaphore_factory( | ||||||
|
|
@@ -44,6 +56,7 @@ async def delete( | |||||
| items: RecordId | Sequence[RecordId], | ||||||
| *, | ||||||
| stream_id: str, | ||||||
| stream_type: Literal["immutable", "mutable"] = "immutable", | ||||||
| ignore_unknown_ids: Literal[True] = True, | ||||||
| ) -> None: | ||||||
| """`Delete records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_. | ||||||
|
|
@@ -54,6 +67,7 @@ async def delete( | |||||
| Args: | ||||||
| items (RecordId | Sequence[RecordId]): Records to delete. | ||||||
| stream_id (str): External ID of the stream to delete from. | ||||||
| stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". | ||||||
| ignore_unknown_ids (Literal[True]): Currently only True is supported | ||||||
|
|
||||||
| Examples: | ||||||
|
|
@@ -76,13 +90,15 @@ async def delete( | |||||
| identifiers=RecordIdSequence.load(items), | ||||||
| wrap_ids=True, | ||||||
| resource_path=self._records_url(stream_id), | ||||||
| override_semaphore=self._get_semaphore("delete", stream_type), | ||||||
|
andersfylling marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you can't infer
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think I quite understand, could you elaborate? |
||||||
| ) | ||||||
|
|
||||||
| async def ingest( | ||||||
| self, | ||||||
| items: RecordWrite | Sequence[RecordWrite], | ||||||
| *, | ||||||
| stream_id: str, | ||||||
| stream_type: Literal["immutable", "mutable"] = "immutable", | ||||||
| ) -> None: | ||||||
| """`Ingest records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_. | ||||||
|
|
||||||
|
|
@@ -94,6 +110,7 @@ async def ingest( | |||||
| Args: | ||||||
| items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest. | ||||||
| stream_id (str): External ID of the stream to ingest into. | ||||||
| stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". | ||||||
|
|
||||||
| Examples: | ||||||
|
|
||||||
|
|
@@ -128,13 +145,15 @@ async def ingest( | |||||
| items=item_list, | ||||||
| resource_path=self._records_url(stream_id), | ||||||
| no_response=True, | ||||||
| override_semaphore=self._get_semaphore("write", stream_type), | ||||||
|
andersfylling marked this conversation as resolved.
|
||||||
| ) | ||||||
|
|
||||||
| async def upsert( | ||||||
| self, | ||||||
| items: RecordWrite | Sequence[RecordWrite], | ||||||
| *, | ||||||
| stream_id: str, | ||||||
| stream_type: Literal["immutable", "mutable"] = "immutable", | ||||||
| upsert_mode: Literal["replace"] = "replace", | ||||||
| ) -> None: | ||||||
| """`Upsert records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_. | ||||||
|
|
@@ -147,6 +166,7 @@ async def upsert( | |||||
| Args: | ||||||
| items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert. | ||||||
| stream_id (str): External ID of the stream to upsert into. | ||||||
| stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". | ||||||
| upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``. | ||||||
|
|
||||||
| Examples: | ||||||
|
|
@@ -182,4 +202,75 @@ async def upsert( | |||||
| items=item_list, | ||||||
| resource_path=self._records_url(stream_id, "/upsert"), | ||||||
| no_response=True, | ||||||
| override_semaphore=self._get_semaphore("write", stream_type), | ||||||
|
andersfylling marked this conversation as resolved.
|
||||||
| ) | ||||||
|
|
||||||
| async def list( | ||||||
| self, | ||||||
| stream_id: str, | ||||||
| *, | ||||||
| stream_type: Literal["immutable", "mutable"] = "immutable", | ||||||
| last_updated_time: TimeRange | None = None, | ||||||
| filter: Filter | None = None, | ||||||
| sources: Sequence[RecordSourceSelector] | None = None, | ||||||
| sort: Sequence[InstanceSort] | InstanceSort | None = None, | ||||||
| limit: int = 10, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a default constant that should be used |
||||||
| include_typing: bool = False, | ||||||
| ) -> RecordList: | ||||||
| """`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_. | ||||||
|
|
||||||
| Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom | ||||||
| ``sort`` is given. | ||||||
|
|
||||||
| Args: | ||||||
| stream_id (str): External ID of the stream to query. | ||||||
| stream_type (Literal["immutable", "mutable"]): Type of the stream. Defaults to "immutable". | ||||||
| last_updated_time (TimeRange | None): Filter by last-updated time. **Required for | ||||||
| immutable streams** (must include a lower bound). | ||||||
| filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). | ||||||
| sources (Sequence[RecordSourceSelector] | None): Which container properties to return. | ||||||
| sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5. | ||||||
| limit (int): Maximum number of records to return (1-1000). Defaults to 10. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| include_typing (bool): If True, include property type information on the returned | ||||||
| list's ``typing`` attribute. | ||||||
|
|
||||||
| Returns: | ||||||
| RecordList: The matching records. | ||||||
|
|
||||||
| Examples: | ||||||
|
|
||||||
| List records updated since a given timestamp: | ||||||
|
|
||||||
| >>> from cognite.client import CogniteClient | ||||||
| >>> from cognite.client.data_classes.data_modeling.records import TimeRange | ||||||
| >>> client = CogniteClient() | ||||||
| >>> res = client.data_modeling.records.list( | ||||||
| ... stream_id="my-stream", | ||||||
| ... last_updated_time=TimeRange(gt=1705341600000), | ||||||
| ... limit=100, | ||||||
| ... ) | ||||||
| """ | ||||||
| self._warning.warn() | ||||||
| other_params: dict[str, Any] = {} | ||||||
| if last_updated_time is not None: | ||||||
| other_params["lastUpdatedTime"] = last_updated_time.dump() | ||||||
| if sources is not None: | ||||||
| other_params["sources"] = [source.dump() for source in sources] | ||||||
| if sort is not None: | ||||||
| sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort) | ||||||
| other_params["sort"] = [spec.dump() for spec in sort_list] | ||||||
| if include_typing: | ||||||
| other_params["includeTyping"] = True | ||||||
|
|
||||||
| return await self._list( | ||||||
| list_cls=RecordList, | ||||||
| resource_cls=Record, | ||||||
| method="POST", | ||||||
| resource_path=self._records_url(stream_id), | ||||||
| url_path=self._records_url(stream_id, "/filter"), | ||||||
| limit=limit, | ||||||
| filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter, | ||||||
| other_params=other_params, | ||||||
| settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None, | ||||||
| override_semaphore=self._get_semaphore("read", stream_type), | ||||||
|
andersfylling marked this conversation as resolved.
|
||||||
| ) | ||||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this is an unnecessary indirection 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm cleaning it up here: #2688
the concurrency setup is a bit complicated, so wrote something dumbed down in this PR.