Skip to content
168 changes: 157 additions & 11 deletions cognite/client/_api/datapoint_tasks.py

Large diffs are not rendered by default.

152 changes: 138 additions & 14 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
LatestDatapointQuery,
)
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.datapoint_aggregates import Aggregate
from cognite.client.data_classes.datapoint_aggregates import STATE_AGGREGATES_CAMEL, Aggregate
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.client.utils import _json_extended as _json
from cognite.client.utils._auxiliary import (
Expand All @@ -59,6 +59,7 @@
unpack_items_in_payload,
)
from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._importing import local_import
from cognite.client.utils._time import (
Expand Down Expand Up @@ -532,6 +533,36 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self._POST_DPS_OBJECTS_LIMIT = 10_000

self.query_validator = _DpsQueryValidator(dps_limit_raw=self._DPS_LIMIT_RAW, dps_limit_agg=self._DPS_LIMIT_AGG)
self._state_time_series_warning = FeaturePreviewWarning(
api_maturity="beta",
sdk_maturity="alpha",
feature_name="State time series",
)

# Warns once per DatapointsAPI instance when state time series features are touched
# via queries (state aggregates), results (returned series of type "state") or inserts
# (handled by DatapointsPoster after validation).
def _maybe_warn_state(
self,
*,
queries: list[DatapointsQuery] | None = None,
results: Datapoints | DatapointsArray | DatapointsList | DatapointsArrayList | None = None,
) -> None:
if queries is not None:
for q in queries:
if q.aggregates and STATE_AGGREGATES_CAMEL.intersection(q.aggs_camel_case):
self._state_time_series_warning.warn()
return
if results is None:
return
if isinstance(results, (Datapoints, DatapointsArray)):
if results.type == "state":
self._state_time_series_warning.warn()
return
for item in results.data:
if item.type == "state":
self._state_time_series_warning.warn()
return

def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
from cognite.client import global_config
Expand Down Expand Up @@ -1209,7 +1240,9 @@ async def retrieve(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
dps_lst = await self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries).fetch_all_datapoints()
self._maybe_warn_state(results=dps_lst)

if not query.is_single_identifier:
return dps_lst
Expand Down Expand Up @@ -1457,9 +1490,11 @@ async def retrieve_arrays(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
dps_lst = await self._select_dps_fetch_strategy(parsed_queries)(
self, parsed_queries
).fetch_all_datapoints_numpy()
self._maybe_warn_state(results=dps_lst)

if not query.is_single_identifier:
return dps_lst
Expand Down Expand Up @@ -1605,10 +1640,12 @@ async def retrieve_dataframe(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
fetcher = self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries)

if not uniform_index:
result = await fetcher.fetch_all_datapoints_numpy()
self._maybe_warn_state(results=result)
return result.to_pandas(
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
Expand All @@ -1626,6 +1663,7 @@ async def retrieve_dataframe(
"OR when timezone is used OR when a calendar granularity is used (e.g. month/quarter/year)"
)
result = await fetcher.fetch_all_datapoints_numpy()
self._maybe_warn_state(results=result)
df = result.to_pandas(
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
Expand Down Expand Up @@ -2307,24 +2345,44 @@ def _select_dps_fetch_strategy(self, queries: list[DatapointsQuery]) -> type[Dps

class _InsertDatapoint(NamedTuple):
ts: int | datetime.datetime
value: str | float
value: str | float | None = None
status_code: int | None = None
status_symbol: str | None = None
kind: Literal["raw", "state"] = "raw"
numeric_value: int | None = None
string_value: str | None = None

@classmethod
def from_dict(cls, dct: dict[str, Any]) -> Self:
if "numericValue" in dct or "stringValue" in dct:
if status := dct.get("status"):
return cls(
dct["timestamp"],
None,
status.get("code"),
status.get("symbol"),
"state",
dct.get("numericValue"),
dct.get("stringValue"),
)
return cls(dct["timestamp"], None, None, None, "state", dct.get("numericValue"), dct.get("stringValue"))
if status := dct.get("status"):
return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol"))
return cls(dct["timestamp"], dct["value"])
return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol"), "raw", None, None)
return cls(dct["timestamp"], dct["value"], None, None, "raw", None, None)

def dump(self) -> dict[str, Any]:
dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts), "value": self.value}
dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts)}
if self.kind == "state":
if self.numeric_value is not None:
dumped["numericValue"] = self.numeric_value
if self.string_value is not None:
dumped["stringValue"] = self.string_value
else:
dumped["value"] = _json.convert_nonfinite_float_to_str(self.value)
if self.status_code: # also skip if 0
dumped["status"] = {"code": self.status_code}
if self.status_symbol and self.status_symbol != "Good":
dumped.setdefault("status", {})["symbol"] = self.status_symbol
# Out-of-range float values must be passed as strings:
dumped["value"] = _json.convert_nonfinite_float_to_str(dumped["value"])
return dumped


Expand Down Expand Up @@ -2360,6 +2418,8 @@ def _verify_and_prepare_dps_objects(
continue
identifier = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"})
validated_dps = self._parse_and_validate_dps(obj["datapoints"])
if any(dp.kind == "state" for dp in validated_dps):
self.dps_client._state_time_series_warning.warn()
dps_to_insert[identifier].extend(validated_dps)
return list(dps_to_insert.items())

Expand All @@ -2376,14 +2436,25 @@ def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple
if self._dps_are_insert_ready(dps):
return dps # Internal SDK shortcut to avoid casting
elif self._dps_are_tuples(dps):
return [_InsertDatapoint(*tpl) for tpl in dps]
out: list[_InsertDatapoint] = []
for tpl in dps:
match len(tpl):
case 2:
out.append(_InsertDatapoint(tpl[0], tpl[1]))
case 3:
ts, val, code = tpl
out.append(_InsertDatapoint(ts, val, int(code), None))
case _:
raise TypeError(f"Unsupported datapoint tuple length {len(tpl)}: {tpl!r}")
return out
elif self._dps_are_dicts(dps):
try:
return [_InsertDatapoint.from_dict(dp) for dp in dps]
except KeyError:
except KeyError as e:
raise KeyError(
"A datapoint is missing one or both keys ['value', 'timestamp']. Note: 'status' is optional."
)
"A datapoint dict must include 'timestamp' and either 'value' or at least one of "
"['numericValue', 'stringValue'] for state time series. 'status' is optional."
) from e
raise TypeError(
"Datapoints to be inserted must be of type Datapoints or DatapointsArray (with raw datapoints), "
f"or be a list containing tuples or dicts, not {type(dps[0])}"
Expand Down Expand Up @@ -2450,9 +2521,26 @@ def _split_datapoints(lst: list[_T], n_first: int, n: int) -> Iterator[tuple[lis

@staticmethod
def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None:
if dps.value is None:
n_ts = len(dps.timestamp)
if dps.type == "state":
if dps.numeric_value is None and dps.string_value is None:
raise ValueError(
f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}`` "
"(state series require numeric_value and/or string_value arrays)"
)
if dps.numeric_value is not None and len(dps.numeric_value) != n_ts:
raise ValueError(
f"Number of timestamps ({n_ts}) does not match number of numeric state values "
f"({len(dps.numeric_value)}) to insert"
)
if dps.string_value is not None and len(dps.string_value) != n_ts:
raise ValueError(
f"Number of timestamps ({n_ts}) does not match number of string state values "
f"({len(dps.string_value)}) to insert"
)
elif dps.value is None:
raise ValueError(f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}``")
if (n_ts := len(dps.timestamp)) != (n_dps := len(dps.value)):
elif n_ts != (n_dps := len(dps.value)):
raise ValueError(f"Number of timestamps ({n_ts}) does not match number of datapoints ({n_dps}) to insert")

if dps.status_code is not None and dps.status_symbol is not None:
Expand All @@ -2465,14 +2553,50 @@ def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None:
raise ValueError("One of status code/symbol is missing on datapoints object")

def _extract_raw_data_from_datapoints(self, dps: Datapoints) -> list[_InsertDatapoint]:
if dps.type == "state":
n = len(dps.timestamp)
nums = list(dps.numeric_value) if dps.numeric_value is not None else [None] * n
strs = list(dps.string_value) if dps.string_value is not None else [None] * n
if dps.status_code is None:
return [
_InsertDatapoint(ts, None, None, None, "state", nv, sv)
for ts, nv, sv in zip(dps.timestamp, nums, strs)
]
return [
_InsertDatapoint(ts, None, c, s, "state", nv, sv)
for ts, nv, sv, c, s in zip(dps.timestamp, nums, strs, dps.status_code, dps.status_symbol) # type: ignore [arg-type]
]
if dps.status_code is None:
return list(map(_InsertDatapoint, dps.timestamp, dps.value)) # type: ignore [arg-type]
return list(map(_InsertDatapoint, dps.timestamp, dps.value, dps.status_code)) # type: ignore [arg-type]

def _extract_raw_data_from_datapoints_array(self, dps: DatapointsArray) -> list[_InsertDatapoint]:
# Using `tolist()` converts to the nearest compatible built-in Python type (in C code):
values = dps.value.tolist() # type: ignore [union-attr]
timestamps = dps.timestamp.astype("datetime64[ms]").astype("int64").tolist()
if dps.type == "state":
n = len(timestamps)
nums = dps.numeric_value.tolist() if dps.numeric_value is not None else [None] * n
strs = dps.string_value.tolist() if dps.string_value is not None else [None] * n
if dps.null_timestamps:
nums = [None if ts in dps.null_timestamps else nv for ts, nv in zip(timestamps, nums)]
strs = [None if ts in dps.null_timestamps else sv for ts, sv in zip(timestamps, strs)]
if dps.status_code is None:
return [
_InsertDatapoint(ts, None, None, None, "state", nv, sv)
for ts, nv, sv in zip(timestamps, nums, strs)
]
return [
_InsertDatapoint(ts, None, c, s, "state", nv, sv)
for ts, nv, sv, c, s in zip(
timestamps,
nums,
strs,
dps.status_code.tolist(),
dps.status_symbol.tolist(), # type: ignore [union-attr]
)
]

values = dps.value.tolist() # type: ignore [union-attr]

if dps.null_timestamps:
# 'Missing' and NaN can not be differentiated when we read from numpy arrays:
Expand Down
8 changes: 4 additions & 4 deletions cognite/client/_proto/data_point_insertion_request_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions cognite/client/_proto/data_point_insertion_request_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map
DESCRIPTOR: _descriptor.FileDescriptor

class DataPointInsertionItem(_message.Message):
__slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints")
__slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints", "stateDatapoints")
ID_FIELD_NUMBER: _ClassVar[int]
EXTERNALID_FIELD_NUMBER: _ClassVar[int]
INSTANCEID_FIELD_NUMBER: _ClassVar[int]
NUMERICDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
STRINGDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
STATEDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
id: int
externalId: str
instanceId: _data_points_pb2.InstanceId
numericDatapoints: _data_points_pb2.NumericDatapoints
stringDatapoints: _data_points_pb2.StringDatapoints
def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ...) -> None: ...
stateDatapoints: _data_points_pb2.StateDatapoints
def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ..., stateDatapoints: _Optional[_Union[_data_points_pb2.StateDatapoints, _Mapping]] = ...) -> None: ...

class DataPointInsertionRequest(_message.Message):
__slots__ = ("items",)
Expand Down
Loading
Loading