diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 15d4cf856d..6b12085cfc 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -1,11 +1,18 @@ from __future__ import annotations import asyncio -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal from cognite.client._api_client import APIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordIdSequence, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordIdSequence, + RecordsAggregation, + RecordWrite, + _dump_aggregate_value, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._url import interpolate_and_url_encode @@ -177,3 +184,62 @@ async def upsert( resource_path=self._records_url(stream_id, "/upsert"), no_response=True, ) + + async def aggregate( + self, + aggregates: Mapping[str, Any], + *, + stream_id: str, + last_updated_time: Mapping[str, Any] | None = None, + filter: Filter | dict[str, Any] | None = None, + target_units: Mapping[str, Any] | None = None, + include_typing: bool = False, + ) -> RecordsAggregation: + """`Aggregate records from a stream `_. + + Args: + aggregates (Mapping[str, Any]): Aggregate request tree keyed by client-defined aggregate IDs. + stream_id (str): External ID of the stream to aggregate from. + last_updated_time (Mapping[str, Any] | None): Filter records by last-updated time. + **Required** for immutable streams (must include a lower bound). + filter (Filter | dict[str, Any] | None): Filter expression. + target_units (Mapping[str, Any] | None): Unit conversion specification. + include_typing (bool): Include property type metadata in the response. + + Returns: + RecordsAggregation: Aggregate results keyed by the requested aggregate IDs. + + Examples: + + Aggregate average temperature: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.data_modeling.records.aggregate( + ... stream_id="my-stream", + ... aggregates={ + ... "avg_temperature": { + ... "avg": {"property": ["my-space", "sensor", "temperature"]} + ... } + ... }, + ... ) + >>> res.aggregates["avg_temperature"]["avg"] + 22.5 + """ + self._warning.warn() + body: dict[str, Any] = {"aggregates": _dump_aggregate_value(aggregates)} + if last_updated_time is not None: + body["lastUpdatedTime"] = _dump_aggregate_value(last_updated_time) + if filter is not None: + body["filter"] = filter.dump() if isinstance(filter, Filter) else filter + if target_units is not None: + body["targetUnits"] = _dump_aggregate_value(target_units) + if include_typing: + body["includeTyping"] = True + + res = await self._post( + url_path=self._records_url(stream_id, "/aggregate"), + json=body, + semaphore=self._get_semaphore("read"), + ) + return RecordsAggregation._load(res.json()) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index 30d29669c2..55bf0c70e6 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,18 +1,23 @@ """ =============================================================================== -6fb9ded91a0d546955f9ed3109b468b6 +419d6a4367d905ade2f6d51bbbbf817b This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from collections.abc import Sequence -from typing import TYPE_CHECKING, Literal +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Literal from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.data_modeling.records import RecordId, RecordWrite +from cognite.client.data_classes.data_modeling.records import ( + RecordId, + RecordsAggregation, + RecordWrite, +) +from cognite.client.data_classes.filters import Filter from cognite.client.utils._async_helpers import run_sync if TYPE_CHECKING: @@ -148,3 +153,56 @@ def upsert( return run_sync( self.__async_client.data_modeling.records.upsert(items=items, stream_id=stream_id, upsert_mode=upsert_mode) ) + + def aggregate( + self, + aggregates: Mapping[str, Any], + *, + stream_id: str, + last_updated_time: Mapping[str, Any] | None = None, + filter: Filter | dict[str, Any] | None = None, + target_units: Mapping[str, Any] | None = None, + include_typing: bool = False, + ) -> RecordsAggregation: + """ + `Aggregate records from a stream `_. + + Args: + aggregates (Mapping[str, Any]): Aggregate request tree keyed by client-defined aggregate IDs. + stream_id (str): External ID of the stream to aggregate from. + last_updated_time (Mapping[str, Any] | None): Filter records by last-updated time. + **Required** for immutable streams (must include a lower bound). + filter (Filter | dict[str, Any] | None): Filter expression. + target_units (Mapping[str, Any] | None): Unit conversion specification. + include_typing (bool): Include property type metadata in the response. + + Returns: + RecordsAggregation: Aggregate results keyed by the requested aggregate IDs. + + Examples: + + Aggregate average temperature: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.data_modeling.records.aggregate( + ... stream_id="my-stream", + ... aggregates={ + ... "avg_temperature": { + ... "avg": {"property": ["my-space", "sensor", "temperature"]} + ... } + ... }, + ... ) + >>> res.aggregates["avg_temperature"]["avg"] + 22.5 + """ + return run_sync( + self.__async_client.data_modeling.records.aggregate( + aggregates=aggregates, + stream_id=stream_id, + last_updated_time=last_updated_time, + filter=filter, + target_units=target_units, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 20c4831bf7..5ba64e7344 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -120,11 +120,31 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.records import ( + Avg, + Count, + FilterAggregateResult, + Filters, + Max, + MetricAggregateResult, + Min, + MovingFunction, + MovingFunctionAggregateResult, + NumberHistogram, + NumberHistogramAggregateResult, RecordContainerId, RecordId, + RecordsAggregate, + RecordsAggregateResult, + RecordsAggregation, + RecordsBucket, RecordSource, RecordWrite, RecordWriteList, + Sum, + TimeHistogram, + TimeHistogramAggregateResult, + UniqueValues, + UniqueValuesAggregateResult, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( @@ -161,6 +181,7 @@ __all__ = [ "AggregatedValue", "Aggregation", + "Avg", "BTreeIndex", "BTreeIndexApply", "Boolean", @@ -177,6 +198,7 @@ "ContainerProperty", "ContainerPropertyApply", "ContainerUsedFor", + "Count", "DataModel", "DataModelApply", "DataModelApplyList", @@ -207,6 +229,8 @@ "ExecutionPlan", "FileReference", "Filter", + "FilterAggregateResult", + "Filters", "Float32", "Float64", "Index", @@ -227,6 +251,11 @@ "Json", "MappedProperty", "MappedPropertyApply", + "Max", + "MetricAggregateResult", + "Min", + "MovingFunction", + "MovingFunctionAggregateResult", "MultiEdgeConnection", "MultiEdgeConnectionApply", "MultiReverseDirectRelation", @@ -243,6 +272,8 @@ "NodeOrEdgeResultSetExpression", "NodeResultSetExpression", "NodeResultSetExpressionSync", + "NumberHistogram", + "NumberHistogramAggregateResult", "PropertyId", "PropertyOptions", "PropertyType", @@ -254,6 +285,10 @@ "RecordSource", "RecordWrite", "RecordWriteList", + "RecordsAggregate", + "RecordsAggregateResult", + "RecordsAggregation", + "RecordsBucket", "RequiresConstraint", "RequiresConstraintApply", "ResultSetExpression", @@ -278,7 +313,10 @@ "StreamTemplateWriteSettings", "StreamWrite", "SubscriptionContext", + "Sum", "Text", + "TimeHistogram", + "TimeHistogramAggregateResult", "TimeSeriesReference", "Timestamp", "TranslatedQuery", @@ -288,6 +326,8 @@ "TypedNodeApply", "Union", "UnionAll", + "UniqueValues", + "UniqueValuesAggregateResult", "UniquenessConstraint", "UniquenessConstraintApply", "VersionedDataModelingId", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index deb7967b0f..61f529a3e5 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,8 +1,8 @@ from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from dataclasses import dataclass -from typing import Any +from typing import Any, ClassVar, Literal from typing_extensions import Self @@ -12,9 +12,241 @@ WriteableCogniteResource, ) from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import TypeInformation +from cognite.client.data_classes.filters import Filter from cognite.client.utils._identifier import IdentifierSequenceCore, RecordId -__all__ = ["RecordContainerId", "RecordId", "RecordIdSequence", "RecordSource", "RecordWrite", "RecordWriteList"] +__all__ = [ + "Avg", + "Count", + "FilterAggregateResult", + "Filters", + "Max", + "MetricAggregateResult", + "Min", + "MovingFunction", + "MovingFunctionAggregateResult", + "NumberHistogram", + "NumberHistogramAggregateResult", + "RecordContainerId", + "RecordId", + "RecordIdSequence", + "RecordSource", + "RecordWrite", + "RecordWriteList", + "RecordsAggregate", + "RecordsAggregateResult", + "RecordsAggregation", + "RecordsBucket", + "Sum", + "TimeHistogram", + "TimeHistogramAggregateResult", + "UniqueValues", + "UniqueValuesAggregateResult", +] + + +def _dump_aggregate_value(value: Any) -> Any: + if isinstance(value, Mapping): + return {key: _dump_aggregate_value(val) for key, val in value.items()} + if isinstance(value, list | tuple): + return [_dump_aggregate_value(item) for item in value] + dump = getattr(value, "dump", None) + if callable(dump): + return _dump_aggregate_value(dump()) + return value + + +class RecordsAggregate(CogniteResource): + """Base class for typed Records aggregate request builders.""" + + _aggregate_name: ClassVar[str] + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + raise NotImplementedError(f"{cls.__name__} is a request builder and cannot be loaded from API responses") + + def _dump_body(self) -> dict[str, Any]: + raise NotImplementedError + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {self._aggregate_name: _dump_aggregate_value(self._dump_body())} + + +class _PropertyAggregate(RecordsAggregate): + def __init__(self, property: Sequence[str]) -> None: + self.property = list(property) + + def _dump_body(self) -> dict[str, Any]: + return {"property": self.property} + + +class Avg(_PropertyAggregate): + """Average aggregate over a container property.""" + + _aggregate_name = "avg" + + +class Count(RecordsAggregate): + """Count records, or non-null values when ``property`` is provided.""" + + _aggregate_name = "count" + + def __init__(self, property: Sequence[str] | None = None) -> None: + self.property = list(property) if property is not None else None + + def _dump_body(self) -> dict[str, Any]: + return {"property": self.property} if self.property is not None else {} + + +class Min(_PropertyAggregate): + """Minimum aggregate over a property.""" + + _aggregate_name = "min" + + +class Max(_PropertyAggregate): + """Maximum aggregate over a property.""" + + _aggregate_name = "max" + + +class Sum(_PropertyAggregate): + """Sum aggregate over a container property.""" + + _aggregate_name = "sum" + + +class _NestedAggregate(RecordsAggregate): + def __init__(self, aggregates: Mapping[str, Any] | None = None) -> None: + self.aggregates = aggregates + + def _add_aggregates(self, body: dict[str, Any]) -> dict[str, Any]: + if self.aggregates is not None: + body["aggregates"] = self.aggregates + return body + + +class UniqueValues(_NestedAggregate): + """Bucket records by unique property values.""" + + _aggregate_name = "uniqueValues" + + def __init__(self, property: Sequence[str], aggregates: Mapping[str, Any] | None = None, size: int | None = None): + super().__init__(aggregates) + self.property = list(property) + self.size = size + + def _dump_body(self) -> dict[str, Any]: + body: dict[str, Any] = {"property": self.property} + self._add_aggregates(body) + if self.size is not None: + body["size"] = self.size + return body + + +class NumberHistogram(_NestedAggregate): + """Bucket numeric property values into fixed-width intervals.""" + + _aggregate_name = "numberHistogram" + + def __init__( + self, + property: Sequence[str], + interval: float, + aggregates: Mapping[str, Any] | None = None, + hard_bounds: Mapping[str, float] | None = None, + ) -> None: + super().__init__(aggregates) + self.property = list(property) + self.interval = interval + self.hard_bounds = hard_bounds + + def _dump_body(self) -> dict[str, Any]: + body: dict[str, Any] = {"property": self.property, "interval": self.interval} + self._add_aggregates(body) + if self.hard_bounds is not None: + body["hardBounds"] = self.hard_bounds + return body + + +class TimeHistogram(_NestedAggregate): + """Bucket timestamp values into calendar or fixed time intervals.""" + + _aggregate_name = "timeHistogram" + + def __init__( + self, + property: Sequence[str], + *, + calendar_interval: str | None = None, + fixed_interval: str | None = None, + aggregates: Mapping[str, Any] | None = None, + hard_bounds: Mapping[str, str] | None = None, + ) -> None: + if (calendar_interval is None) == (fixed_interval is None): + raise ValueError("Exactly one of calendar_interval or fixed_interval must be specified") + super().__init__(aggregates) + self.property = list(property) + self.calendar_interval = calendar_interval + self.fixed_interval = fixed_interval + self.hard_bounds = hard_bounds + + def _dump_body(self) -> dict[str, Any]: + body: dict[str, Any] = {"property": self.property} + self._add_aggregates(body) + if self.calendar_interval is not None: + body["calendarInterval"] = self.calendar_interval + if self.fixed_interval is not None: + body["fixedInterval"] = self.fixed_interval + if self.hard_bounds is not None: + body["hardBounds"] = self.hard_bounds + return body + + +class Filters(_NestedAggregate): + """Bucket records by a list of filter expressions.""" + + _aggregate_name = "filters" + + def __init__( + self, + filters: Sequence[Filter | dict[str, Any]], + aggregates: Mapping[str, Any] | None = None, + ) -> None: + super().__init__(aggregates) + self.filters = filters + + def _dump_body(self) -> dict[str, Any]: + body: dict[str, Any] = { + "filters": [filter.dump() if isinstance(filter, Filter) else filter for filter in self.filters] + } + return self._add_aggregates(body) + + +class MovingFunction(RecordsAggregate): + """Pipeline aggregate over a parent histogram bucket series.""" + + _aggregate_name = "movingFunction" + + def __init__( + self, + buckets_path: str, + window: int, + function: Literal[ + "MovingFunctions.max", + "MovingFunctions.min", + "MovingFunctions.sum", + "MovingFunctions.unweightedAvg", + "MovingFunctions.linearWeightedAvg", + ], + ) -> None: + self.buckets_path = buckets_path + self.window = window + self.function = function + + def _dump_body(self) -> dict[str, Any]: + return {"bucketsPath": self.buckets_path, "window": self.window, "function": self.function} class RecordIdSequence(IdentifierSequenceCore[RecordId]): @@ -106,3 +338,145 @@ class RecordWriteList(CogniteResourceList[RecordWrite]): def as_ids(self) -> list[RecordId]: return [v.as_id() for v in self] + + +class RecordsAggregateResult(CogniteResource): + """Base class for typed Records aggregate results.""" + + _raw_result: dict[str, Any] + + @classmethod + def _load(cls, resource: dict[str, Any]) -> RecordsAggregateResult: + for aggregate in ("avg", "count", "min", "max", "sum"): + if aggregate in resource: + return MetricAggregateResult(aggregate=aggregate, value=resource[aggregate], raw_result=resource) + if "fnValue" in resource: + return MovingFunctionAggregateResult(fn_value=resource["fnValue"], raw_result=resource) + for result_cls in ( + UniqueValuesAggregateResult, + NumberHistogramAggregateResult, + TimeHistogramAggregateResult, + FilterAggregateResult, + ): + if result_cls._buckets_key in resource: + return result_cls._load(resource) + return cls(resource) + + def __init__(self, raw_result: dict[str, Any]) -> None: + self._raw_result = raw_result + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return self._raw_result + + +class MetricAggregateResult(RecordsAggregateResult): + """Metric aggregate result such as ``avg``, ``count``, ``min``, ``max``, or ``sum``.""" + + def __init__(self, aggregate: str, value: Any, raw_result: dict[str, Any] | None = None) -> None: + super().__init__(raw_result or {aggregate: value}) + self.aggregate = aggregate + self.value = value + + +class MovingFunctionAggregateResult(RecordsAggregateResult): + """Pipeline moving function result.""" + + def __init__(self, fn_value: float, raw_result: dict[str, Any] | None = None) -> None: + super().__init__(raw_result or {"fnValue": fn_value}) + self.fn_value = fn_value + + +class RecordsBucket(CogniteResource): + """Bucket result from a Records bucket aggregate.""" + + def __init__(self, raw_bucket: dict[str, Any]) -> None: + self._raw_bucket = raw_bucket + self.count = raw_bucket["count"] + self.value = raw_bucket.get("value") + self.interval_start = raw_bucket.get("intervalStart") + self.aggregates = raw_bucket.get("aggregates", {}) + self.results = { + aggregate_id: RecordsAggregateResult._load(result) + for aggregate_id, result in self.aggregates.items() + if isinstance(result, dict) + } + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return self._raw_bucket + + +class _BucketAggregateResult(RecordsAggregateResult): + _buckets_key: ClassVar[str] + + def __init__(self, buckets: Sequence[RecordsBucket], raw_result: dict[str, Any] | None = None) -> None: + self.buckets = list(buckets) + super().__init__(raw_result or {self._buckets_key: [bucket.dump() for bucket in self.buckets]}) + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + buckets=[RecordsBucket._load(bucket) for bucket in resource.get(cls._buckets_key, [])], + raw_result=resource, + ) + + +class UniqueValuesAggregateResult(_BucketAggregateResult): + """Result from a ``uniqueValues`` aggregate.""" + + _buckets_key = "uniqueValueBuckets" + + +class NumberHistogramAggregateResult(_BucketAggregateResult): + """Result from a ``numberHistogram`` aggregate.""" + + _buckets_key = "numberHistogramBuckets" + + +class TimeHistogramAggregateResult(_BucketAggregateResult): + """Result from a ``timeHistogram`` aggregate.""" + + _buckets_key = "timeHistogramBuckets" + + +class FilterAggregateResult(_BucketAggregateResult): + """Result from a ``filters`` aggregate.""" + + _buckets_key = "filterBuckets" + + +class RecordsAggregation(CogniteResource): + """Aggregate results returned from the Records aggregate endpoint. + + Args: + aggregates (dict[str, Any]): Aggregate results keyed by the client-defined aggregate IDs. + typing (TypeInformation | None): Optional property typing metadata. + """ + + def __init__(self, aggregates: dict[str, Any], typing: TypeInformation | None = None) -> None: + self.aggregates = aggregates + self.results = { + aggregate_id: RecordsAggregateResult._load(result) + for aggregate_id, result in aggregates.items() + if isinstance(result, dict) + } + self.typing = typing + + def __getitem__(self, aggregate_id: str) -> RecordsAggregateResult: + return self.results[aggregate_id] + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + aggregates=resource["aggregates"], + typing=TypeInformation._load(resource["typing"]) if "typing" in resource else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output: dict[str, Any] = {"aggregates": _dump_aggregate_value(self.aggregates)} + if self.typing is not None: + output["typing"] = self.typing.dump(camel_case=camel_case) + return output diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index 27ed46d90c..c6d4fc4b0b 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -6,11 +6,28 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes import filters from cognite.client.data_classes.data_modeling.records import ( + Avg, + Count, + FilterAggregateResult, + Filters, + Max, + MetricAggregateResult, + MovingFunction, + MovingFunctionAggregateResult, + NumberHistogram, + NumberHistogramAggregateResult, RecordContainerId, RecordId, + RecordsAggregation, RecordSource, RecordWrite, + Sum, + TimeHistogram, + TimeHistogramAggregateResult, + UniqueValues, + UniqueValuesAggregateResult, ) from tests.utils import jsgz_load @@ -226,6 +243,227 @@ def test_upsert_chunks( assert len(jsgz_load(requests[1].content)["items"]) == 1 +class TestRecordsAPIAggregate: + def test_aggregate_posts_request_and_returns_wrapper( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + stream_id: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(records_base_url) + r"/aggregate$"), + json={"aggregates": {"avg_temp": {"avg": 22.5}}}, + ) + out = cognite_client.data_modeling.records.aggregate( + stream_id=stream_id, + aggregates={"avg_temp": {"avg": {"property": ["sp", "container-x", "temp"]}}}, + last_updated_time={"gte": 1_000_000}, + filter=filters.Equals(["space"], "sp"), + target_units={"unitSystemName": "SI"}, + include_typing=True, + ) + + assert isinstance(out, RecordsAggregation) + assert out.aggregates == {"avg_temp": {"avg": 22.5}} + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body == { + "aggregates": {"avg_temp": {"avg": {"property": ["sp", "container-x", "temp"]}}}, + "lastUpdatedTime": {"gte": 1_000_000}, + "filter": {"equals": {"property": ["space"], "value": "sp"}}, + "targetUnits": {"unitSystemName": "SI"}, + "includeTyping": True, + } + + def test_aggregate_accepts_dict_filter( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + stream_id: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(records_base_url) + r"/aggregate$"), + json={"aggregates": {"total": {"count": 7}}}, + ) + cognite_client.data_modeling.records.aggregate( + stream_id=stream_id, + aggregates={"total": {"count": {}}}, + filter={"matchAll": {}}, + ) + + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["filter"] == {"matchAll": {}} + + def test_aggregate_accepts_mixed_typed_and_dict_aggregates( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + stream_id: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(records_base_url) + r"/aggregate$"), + json={"aggregates": {"total": {"count": 7}}}, + ) + + cognite_client.data_modeling.records.aggregate( + stream_id=stream_id, + aggregates={ + "by_day": TimeHistogram( + ["sp", "container-x", "timestamp"], + calendar_interval="1d", + aggregates={ + "avg_temp": Avg(["sp", "container-x", "temp"]), + "moving_count": MovingFunction( + buckets_path="_count", + window=3, + function="MovingFunctions.unweightedAvg", + ), + "raw_total": {"count": {}}, + }, + ), + "by_region": UniqueValues( + ["sp", "container-x", "region"], + aggregates={"max_temp": Max(["sp", "container-x", "temp"])}, + size=5, + ), + "salary_histogram": NumberHistogram( + ["sp", "container-x", "salary"], + interval=1000, + aggregates={"sum_salary": Sum(["sp", "container-x", "salary"])}, + hard_bounds={"min": 0, "max": 10000}, + ), + "by_filters": Filters( + filters=[ + filters.Range(["createdTime"], gte=1), + {"matchAll": {}}, + ], + aggregates={"total": Count()}, + ), + }, + ) + + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["aggregates"] == { + "by_day": { + "timeHistogram": { + "property": ["sp", "container-x", "timestamp"], + "calendarInterval": "1d", + "aggregates": { + "avg_temp": {"avg": {"property": ["sp", "container-x", "temp"]}}, + "moving_count": { + "movingFunction": { + "bucketsPath": "_count", + "window": 3, + "function": "MovingFunctions.unweightedAvg", + } + }, + "raw_total": {"count": {}}, + }, + } + }, + "by_region": { + "uniqueValues": { + "property": ["sp", "container-x", "region"], + "aggregates": {"max_temp": {"max": {"property": ["sp", "container-x", "temp"]}}}, + "size": 5, + } + }, + "salary_histogram": { + "numberHistogram": { + "property": ["sp", "container-x", "salary"], + "interval": 1000, + "aggregates": {"sum_salary": {"sum": {"property": ["sp", "container-x", "salary"]}}}, + "hardBounds": {"min": 0, "max": 10000}, + } + }, + "by_filters": { + "filters": { + "filters": [ + {"range": {"property": ["createdTime"], "gte": 1}}, + {"matchAll": {}}, + ], + "aggregates": {"total": {"count": {}}}, + } + }, + } + + def test_records_aggregation_dump_round_trip(self) -> None: + raw = { + "aggregates": { + "by_space": { + "uniqueValueBuckets": [ + {"value": "sp", "count": 2, "aggregates": {"max_temp": {"max": 30.0}}}, + ] + } + } + } + loaded = RecordsAggregation._load(raw) + assert loaded.dump() == raw + + def test_records_aggregation_loads_typed_results(self) -> None: + loaded = RecordsAggregation._load( + { + "aggregates": { + "avg_temp": {"avg": 22.5}, + "by_region": { + "uniqueValueBuckets": [ + { + "value": "north", + "count": 2, + "aggregates": {"max_temp": {"max": 30.0}}, + } + ] + }, + "by_number": {"numberHistogramBuckets": [{"intervalStart": 0.0, "count": 1}]}, + "by_time": { + "timeHistogramBuckets": [ + { + "intervalStart": "2024-05-16T00:00:00Z", + "count": 3, + "aggregates": {"moving": {"fnValue": 7.5}}, + } + ] + }, + "by_filter": {"filterBuckets": [{"count": 4}]}, + "future": {"futureAggregateResult": 1}, + } + } + ) + + avg_temp = loaded["avg_temp"] + assert isinstance(avg_temp, MetricAggregateResult) + assert avg_temp.aggregate == "avg" + assert avg_temp.value == 22.5 + + by_region = loaded["by_region"] + assert isinstance(by_region, UniqueValuesAggregateResult) + assert by_region.buckets[0].value == "north" + max_temp = by_region.buckets[0].results["max_temp"] + assert isinstance(max_temp, MetricAggregateResult) + assert max_temp.value == 30.0 + + by_number = loaded["by_number"] + assert isinstance(by_number, NumberHistogramAggregateResult) + assert by_number.buckets[0].interval_start == 0.0 + + by_time = loaded["by_time"] + assert isinstance(by_time, TimeHistogramAggregateResult) + moving = by_time.buckets[0].results["moving"] + assert isinstance(moving, MovingFunctionAggregateResult) + assert moving.fn_value == 7.5 + + by_filter = loaded["by_filter"] + assert isinstance(by_filter, FilterAggregateResult) + assert by_filter.buckets[0].count == 4 + + assert loaded["future"].dump() == {"futureAggregateResult": 1} + + class TestRecordDTOs: def test_record_write_as_id(self, write_item: RecordWrite) -> None: rid = write_item.as_id()