Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from __future__ import annotations

import asyncio
from collections.abc import Sequence
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite
from cognite.client.data_classes.data_modeling.records import (
RecordId,
RecordIdSequence,
RecordsAggregation,
RecordWrite,
_dump_aggregate_value,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._url import interpolate_and_url_encode

Expand Down Expand Up @@ -177,3 +184,62 @@ async def upsert(
resource_path=self._records_url(stream_id, "/upsert"),
no_response=True,
)

async def aggregate(
self,
aggregates: Mapping[str, Any],
*,
stream_id: str,
last_updated_time: Mapping[str, Any] | None = None,
filter: Filter | dict[str, Any] | None = None,
target_units: Mapping[str, Any] | None = None,
include_typing: bool = False,
) -> RecordsAggregation:
"""`Aggregate records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_.

Args:
aggregates (Mapping[str, Any]): Aggregate request tree keyed by client-defined aggregate IDs.
stream_id (str): External ID of the stream to aggregate from.
last_updated_time (Mapping[str, Any] | None): Filter records by last-updated time.
**Required** for immutable streams (must include a lower bound).
filter (Filter | dict[str, Any] | None): Filter expression.
target_units (Mapping[str, Any] | None): Unit conversion specification.
include_typing (bool): Include property type metadata in the response.

Returns:
RecordsAggregation: Aggregate results keyed by the requested aggregate IDs.

Examples:

Aggregate average temperature:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.data_modeling.records.aggregate(
... stream_id="my-stream",
... aggregates={
... "avg_temperature": {
... "avg": {"property": ["my-space", "sensor", "temperature"]}
... }
... },
... )
>>> res.aggregates["avg_temperature"]["avg"]
22.5
"""
self._warning.warn()
body: dict[str, Any] = {"aggregates": _dump_aggregate_value(aggregates)}
if last_updated_time is not None:
body["lastUpdatedTime"] = _dump_aggregate_value(last_updated_time)
if filter is not None:
body["filter"] = filter.dump() if isinstance(filter, Filter) else filter
if target_units is not None:
body["targetUnits"] = _dump_aggregate_value(target_units)
if include_typing:
body["includeTyping"] = True

res = await self._post(
url_path=self._records_url(stream_id, "/aggregate"),
json=body,
semaphore=self._get_semaphore("read"),
)
return RecordsAggregation._load(res.json())
66 changes: 62 additions & 4 deletions cognite/client/_sync_api/data_modeling/records.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,31 @@
UnionAll,
)
from cognite.client.data_classes.data_modeling.records import (
Avg,
Count,
FilterAggregateResult,
Filters,
Max,
MetricAggregateResult,
Min,
MovingFunction,
MovingFunctionAggregateResult,
NumberHistogram,
NumberHistogramAggregateResult,
RecordContainerId,
RecordId,
RecordsAggregate,
RecordsAggregateResult,
RecordsAggregation,
RecordsBucket,
RecordSource,
RecordWrite,
RecordWriteList,
Sum,
TimeHistogram,
TimeHistogramAggregateResult,
UniqueValues,
UniqueValuesAggregateResult,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Expand Down Expand Up @@ -161,6 +181,7 @@
__all__ = [
"AggregatedValue",
"Aggregation",
"Avg",
"BTreeIndex",
"BTreeIndexApply",
"Boolean",
Expand All @@ -177,6 +198,7 @@
"ContainerProperty",
"ContainerPropertyApply",
"ContainerUsedFor",
"Count",
"DataModel",
"DataModelApply",
"DataModelApplyList",
Expand Down Expand Up @@ -207,6 +229,8 @@
"ExecutionPlan",
"FileReference",
"Filter",
"FilterAggregateResult",
"Filters",
"Float32",
"Float64",
"Index",
Expand All @@ -227,6 +251,11 @@
"Json",
"MappedProperty",
"MappedPropertyApply",
"Max",
"MetricAggregateResult",
"Min",
"MovingFunction",
"MovingFunctionAggregateResult",
"MultiEdgeConnection",
"MultiEdgeConnectionApply",
"MultiReverseDirectRelation",
Expand All @@ -243,6 +272,8 @@
"NodeOrEdgeResultSetExpression",
"NodeResultSetExpression",
"NodeResultSetExpressionSync",
"NumberHistogram",
"NumberHistogramAggregateResult",
"PropertyId",
"PropertyOptions",
"PropertyType",
Expand All @@ -254,6 +285,10 @@
"RecordSource",
"RecordWrite",
"RecordWriteList",
"RecordsAggregate",
"RecordsAggregateResult",
"RecordsAggregation",
"RecordsBucket",
"RequiresConstraint",
"RequiresConstraintApply",
"ResultSetExpression",
Expand All @@ -278,7 +313,10 @@
"StreamTemplateWriteSettings",
"StreamWrite",
"SubscriptionContext",
"Sum",
"Text",
"TimeHistogram",
"TimeHistogramAggregateResult",
"TimeSeriesReference",
"Timestamp",
"TranslatedQuery",
Expand All @@ -288,6 +326,8 @@
"TypedNodeApply",
"Union",
"UnionAll",
"UniqueValues",
"UniqueValuesAggregateResult",
"UniquenessConstraint",
"UniquenessConstraintApply",
"VersionedDataModelingId",
Expand Down
Loading
Loading