Skip to content

Commit 78e18fd

Browse files
Enable DrioDataSource to fetch data from warm path (#61)
* allo fetching from warmpath * add tests * black * set archive as default * update docstring, add experimental for warm storage
1 parent 2a6bdf5 commit 78e18fd

2 files changed

Lines changed: 145 additions & 5 deletions

File tree

fourinsight/engineroom/utils/_datamanage.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,11 @@ class DrioDataSource(BaseDataSource):
510510
----------
511511
drio_client : obj
512512
DataReservoir.io client.
513-
lables : dict
513+
labels : dict
514514
Labels and timeseries IDs as key/value pairs.
515+
storage : str, optional
516+
Where to fetch data from. If 'archive' (default), data is fetched from the archive.
517+
If 'warm' (experimental), data is fetched from warm storage, which supports aggregated queries.
515518
index_type : str or obj
516519
Index type (see Notes). Should be 'datetime', 'integer' or an `index converter`
517520
object.
@@ -558,12 +561,16 @@ class DrioDataSource(BaseDataSource):
558561
be given as a dtype that the :meth:`index_converter.to_universal_delta` can
559562
parse.
560563
564+
- The 'warm' storage option is experimental and its behavior may change in
565+
future releases.
566+
561567
"""
562568

563569
def __init__(
564570
self,
565571
drio_client,
566572
labels,
573+
storage="archive",
567574
index_type="datetime",
568575
index_sync=False,
569576
tolerance=None,
@@ -572,6 +579,7 @@ def __init__(
572579
**get_kwargs,
573580
):
574581
self._drio_client = drio_client
582+
self.storage = storage
575583
self._get_kwargs = get_kwargs
576584

577585
self._labels = {lab: id.strip() for lab, id in labels.items()}
@@ -586,6 +594,9 @@ def __init__(
586594
else:
587595
raise ValueError("'index_type' should be 'datetime' or 'integer'.")
588596

597+
if storage not in ["warm", "archive"]:
598+
raise ValueError("storage must be either 'warm' or 'archive'")
599+
589600
super().__init__(
590601
index_converter,
591602
index_sync=index_sync,
@@ -622,10 +633,24 @@ def _get(self, start, end):
622633
Label and data as key/value pairs. The data is returned as ``pandas.Series``
623634
objects.
624635
"""
636+
if self.storage == "warm":
637+
638+
def get_fun(ts_id, start, end, **kwargs):
639+
aggregation_period = kwargs.pop("aggregation_period", "tick")
640+
aggregation_function = kwargs.pop("aggregation_function", "mean")
641+
return self._drio_client.get_samples_aggregate(
642+
ts_id,
643+
start=start,
644+
end=end,
645+
aggregation_period=aggregation_period,
646+
aggregation_function=aggregation_function,
647+
)
648+
649+
elif self.storage == "archive":
650+
get_fun = self._drio_client.get
651+
625652
return {
626-
label: self._drio_client.get(
627-
ts_id, start=start, end=end, **self._get_kwargs
628-
)
653+
label: get_fun(ts_id, start=start, end=end, **self._get_kwargs)
629654
for label, ts_id in self._labels.items()
630655
}
631656

tests/test_datamanage.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ def test__get(self):
10791079
"c": "timeseriesid-c",
10801080
}
10811081
source = DrioDataSource(
1082-
drio_client, labels, convert_date=True, raise_empty=False
1082+
drio_client, labels, storage="archive", convert_date=True, raise_empty=False
10831083
)
10841084

10851085
data_out = source._get("<start-time>", "<end-time>")
@@ -1120,6 +1120,121 @@ def test__get(self):
11201120
]
11211121
)
11221122

1123+
def test__get_warm_storage(self):
1124+
# Mock the drio_client
1125+
drio_client = Mock()
1126+
drio_client.get_samples_aggregate.return_value = pd.Series(
1127+
data=[1.1, 1.2, 1.3], index=[1.0, 2.0, 3.0]
1128+
)
1129+
1130+
# Labels to request
1131+
labels = {
1132+
"a": "timeseriesid-a",
1133+
"b": "timeseriesid-b",
1134+
"c": "timeseriesid-c",
1135+
}
1136+
1137+
# Instantiate DrioDataSource with storage="warm" and constructor kwargs
1138+
source = DrioDataSource(
1139+
drio_client,
1140+
labels,
1141+
storage="warm",
1142+
convert_date=True,
1143+
raise_empty=False,
1144+
aggregation_period="1h",
1145+
aggregation_function="min",
1146+
)
1147+
1148+
# Call _get (no kwargs allowed per your current design)
1149+
data_out = source._get("<start-time>", "<end-time>")
1150+
1151+
# Expected output
1152+
data_expect = {
1153+
"a": pd.Series(data=[1.1, 1.2, 1.3], index=[1.0, 2.0, 3.0]),
1154+
"b": pd.Series(data=[1.1, 1.2, 1.3], index=[1.0, 2.0, 3.0]),
1155+
"c": pd.Series(data=[1.1, 1.2, 1.3], index=[1.0, 2.0, 3.0]),
1156+
}
1157+
1158+
# Assert returned data is correct
1159+
assert data_out.keys() == data_expect.keys()
1160+
for key, series_expect in data_expect.items():
1161+
pd.testing.assert_series_equal(series_expect, data_out[key])
1162+
1163+
# Assert get_samples_aggregate called with correct parameters
1164+
drio_client.get_samples_aggregate.assert_has_calls(
1165+
[
1166+
call(
1167+
"timeseriesid-a",
1168+
start="<start-time>",
1169+
end="<end-time>",
1170+
aggregation_period="1h",
1171+
aggregation_function="min",
1172+
),
1173+
call(
1174+
"timeseriesid-b",
1175+
start="<start-time>",
1176+
end="<end-time>",
1177+
aggregation_period="1h",
1178+
aggregation_function="min",
1179+
),
1180+
call(
1181+
"timeseriesid-c",
1182+
start="<start-time>",
1183+
end="<end-time>",
1184+
aggregation_period="1h",
1185+
aggregation_function="min",
1186+
),
1187+
]
1188+
)
1189+
1190+
def test__get_warm_defaults(self):
1191+
# Mock the drio_client
1192+
drio_client = Mock()
1193+
drio_client.get_samples_aggregate.return_value = pd.Series(
1194+
data=[10, 20, 30], index=[1, 2, 3]
1195+
)
1196+
1197+
labels = {
1198+
"x": "ts-x",
1199+
"y": "ts-y",
1200+
}
1201+
1202+
# Instantiate DrioDataSource with storage="warm", no overrides
1203+
source = DrioDataSource(
1204+
drio_client, labels, storage="warm", convert_date=True, raise_empty=False
1205+
)
1206+
1207+
# Call _get with defaults
1208+
data_out = source._get("<start>", "<end>")
1209+
1210+
# Expected series
1211+
expected = pd.Series([10, 20, 30], index=[1, 2, 3])
1212+
1213+
# Assert keys and values
1214+
assert data_out.keys() == labels.keys()
1215+
for key in labels:
1216+
pd.testing.assert_series_equal(data_out[key], expected)
1217+
1218+
# Assert get_samples_aggregate called with default aggregation values
1219+
drio_client.get_samples_aggregate.assert_has_calls(
1220+
[
1221+
call(
1222+
"ts-x",
1223+
start="<start>",
1224+
end="<end>",
1225+
aggregation_period="tick",
1226+
aggregation_function="mean",
1227+
),
1228+
call(
1229+
"ts-y",
1230+
start="<start>",
1231+
end="<end>",
1232+
aggregation_period="tick",
1233+
aggregation_function="mean",
1234+
),
1235+
]
1236+
)
1237+
11231238

11241239
class Test_NullDataSource:
11251240
def test__init__(self):

0 commit comments

Comments
 (0)