Skip to content

Commit 7a50961

Browse files
allo fetching from warmpath
1 parent 2a6bdf5 commit 7a50961

2 files changed

Lines changed: 21 additions & 4 deletions

File tree

fourinsight/engineroom/utils/_datamanage.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ class DrioDataSource(BaseDataSource):
512512
DataReservoir.io client.
513513
lables : dict
514514
Labels and timeseries IDs as key/value pairs.
515+
storage : str, optional
516+
If 'warm' (default), drio is fetched from warm storage. If 'archive', data is fetched from the archive.
515517
index_type : str or obj
516518
Index type (see Notes). Should be 'datetime', 'integer' or an `index converter`
517519
object.
@@ -564,6 +566,7 @@ def __init__(
564566
self,
565567
drio_client,
566568
labels,
569+
storage="warm",
567570
index_type="datetime",
568571
index_sync=False,
569572
tolerance=None,
@@ -572,6 +575,7 @@ def __init__(
572575
**get_kwargs,
573576
):
574577
self._drio_client = drio_client
578+
self.storage = storage
575579
self._get_kwargs = get_kwargs
576580

577581
self._labels = {lab: id.strip() for lab, id in labels.items()}
@@ -586,6 +590,9 @@ def __init__(
586590
else:
587591
raise ValueError("'index_type' should be 'datetime' or 'integer'.")
588592

593+
if storage not in ["warm", "archive"]:
594+
raise ValueError("storage must be either 'warm' or 'archive'")
595+
589596
super().__init__(
590597
index_converter,
591598
index_sync=index_sync,
@@ -622,10 +629,20 @@ def _get(self, start, end):
622629
Label and data as key/value pairs. The data is returned as ``pandas.Series``
623630
objects.
624631
"""
632+
if self.storage == "warm":
633+
634+
def get_fun(ts_id, start, end, **kwargs):
635+
aggregation_period = kwargs.pop("aggregation_period", "tick")
636+
aggregation_function = kwargs.pop("aggregation_function", "mean")
637+
return self._drio_client.get_samples_aggregate(
638+
ts_id, start=start, end=end, aggregation_period=aggregation_period, aggregation_function=aggregation_function
639+
)
640+
641+
elif self.storage == "archive":
642+
get_fun = self._drio_client.get
643+
625644
return {
626-
label: self._drio_client.get(
627-
ts_id, start=start, end=end, **self._get_kwargs
628-
)
645+
label: get_fun(ts_id, start=start, end=end, **self._get_kwargs)
629646
for label, ts_id in self._labels.items()
630647
}
631648

tests/test_datamanage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ def test__get(self):
10791079
"c": "timeseriesid-c",
10801080
}
10811081
source = DrioDataSource(
1082-
drio_client, labels, convert_date=True, raise_empty=False
1082+
drio_client, labels, storage="archive", convert_date=True, raise_empty=False
10831083
)
10841084

10851085
data_out = source._get("<start-time>", "<end-time>")

0 commit comments

Comments
 (0)