Skip to content

Commit f06dbab

Browse files
committed
[UPDATE] Support data fetching density from trace server
Previously, TMLL used to retrieve data (XY and TIMEGRAPH) from trace server with the parameters of experiment's start to end and 65536 items in that specific timerange. Now, based on the indicated resampling frequency (e.g., 1s, 5ms, 100us, etc.), the client can indicate how many datapoints it's required to be fetched from the server. Signed-off-by: Kaveh Shahedi <kaveh.shahedi@ericsson.com>
1 parent 67fe0e1 commit f06dbab

2 files changed

Lines changed: 98 additions & 65 deletions

File tree

tmll/ml/modules/base_module.py

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ def _plot(self, plots: List[Dict[str, Any]], plot_size: Tuple[float, float] = (1
4141
Plot the given plots.
4242
4343
:param plots: The plots to plot. Each plot should be a dictionary with the following keys:
44-
- plot_type (PLOT_TYPES): The type of the plot (e.g., 'time_series' or 'scatter')
44+
- plot_type (PLOT_TYPES): The type of the plot (e.g., "time_series" or "scatter")
4545
- data (Any): The data to plot
4646
- x (str): The name of the x-axis column
4747
- y (str): The name of the y-axis column
4848
- hue (str, optional): The name of the hue column. Default is None
49-
- color (str, optional): The color of the plot. Default is 'blue'
49+
- color (str, optional): The color of the plot. Default is "blue"
5050
:type plots: List[Dict[str, Any]]
5151
:param plot_size: The size of the plot. Default is (15, 10)
5252
:type plot_size: Tuple[int, int], optional
@@ -62,31 +62,31 @@ def _plot(self, plots: List[Dict[str, Any]], plot_size: Tuple[float, float] = (1
6262
:return: None
6363
"""
6464
# Create a new figure and axis
65-
fig, ax = plt.subplots(figsize=plot_size, dpi=kwargs.get('dpi', 100))
65+
fig, ax = plt.subplots(figsize=plot_size, dpi=kwargs.get("dpi", 100))
6666

6767
# Plot each plot
6868
for plot_info in plots:
69-
plot_type = plot_info.get('plot_type', None)
70-
data = plot_info.get('data', None)
69+
plot_type = plot_info.get("plot_type", None)
70+
data = plot_info.get("data", None)
7171

7272
# Create the plot
7373
plot_strategy = PlotFactory.create_plot(plot_type)
74-
plot_strategy.plot(ax, data, **{k: v for k, v in plot_info.items() if k not in ['plot_type', 'data']})
74+
plot_strategy.plot(ax, data, **{k: v for k, v in plot_info.items() if k not in ["plot_type", "data"]})
7575

7676
# Set the title, x-axis label, and y-axis label of the plot
77-
ax.set_title(kwargs.get('fig_title', ''))
78-
ax.set_xlabel(kwargs.get('fig_xlabel', ''))
79-
ax.set_ylabel(kwargs.get('fig_ylabel', ''))
77+
ax.set_title(kwargs.get("fig_title", ""))
78+
ax.set_xlabel(kwargs.get("fig_xlabel", ""))
79+
ax.set_ylabel(kwargs.get("fig_ylabel", ""))
8080

81-
if kwargs.get('fig_xticks', None) is not None:
82-
ax.set_xticks(kwargs.get('fig_xticks')) # type: ignore
81+
if kwargs.get("fig_xticks", None) is not None:
82+
ax.set_xticks(kwargs.get("fig_xticks")) # type: ignore
8383

84-
if kwargs.get('fig_yticks', None) is not None:
85-
ax.set_yticks(kwargs.get('fig_yticks')) # type: ignore
84+
if kwargs.get("fig_yticks", None) is not None:
85+
ax.set_yticks(kwargs.get("fig_yticks")) # type: ignore
8686
else:
8787
y_min, y_max = ax.get_ylim()
8888
if isinstance(y_min, (int, float)) and isinstance(y_max, (int, float)):
89-
num_yticks = kwargs.get('fig_num_yticks', 6)
89+
num_yticks = kwargs.get("fig_num_yticks", 6)
9090
yticks = PlotUtils.get_formatted_ticks(y_min, y_max, num_yticks)
9191
ax.set_yticks(yticks)
9292

@@ -95,27 +95,27 @@ def _plot(self, plots: List[Dict[str, Any]], plot_size: Tuple[float, float] = (1
9595
ax.set_ylim(yticks[0] - padding, yticks[-1] + padding)
9696
ax.set_yticklabels([f"{val:.2f}{unit}" for tick in yticks for val, unit in [Formatter.format_large_number(tick)]])
9797

98-
if kwargs.get('fig_xticklabels', None) is not None:
99-
ax.set_xticklabels(kwargs.get('fig_xticklabels')) # type: ignore
100-
if kwargs.get('fig_yticklabels', None) is not None:
101-
ax.set_yticklabels(kwargs.get('fig_yticklabels')) # type: ignore
102-
if kwargs.get('fig_xticklabels_rotation', None) is not None:
98+
if kwargs.get("fig_xticklabels", None) is not None:
99+
ax.set_xticklabels(kwargs.get("fig_xticklabels")) # type: ignore
100+
if kwargs.get("fig_yticklabels", None) is not None:
101+
ax.set_yticklabels(kwargs.get("fig_yticklabels")) # type: ignore
102+
if kwargs.get("fig_xticklabels_rotation", None) is not None:
103103
ax.set_xticks(ax.get_xticks())
104-
ax.set_xticklabels(ax.get_xticklabels(), rotation=kwargs.get('fig_xticklabels_rotation'))
105-
if kwargs.get('fig_yticklabels_rotation', None) is not None:
104+
ax.set_xticklabels(ax.get_xticklabels(), rotation=kwargs.get("fig_xticklabels_rotation"))
105+
if kwargs.get("fig_yticklabels_rotation", None) is not None:
106106
ax.set_yticks(ax.get_yticks())
107-
ax.set_yticklabels(ax.get_yticklabels(), rotation=kwargs.get('fig_yticklabels_rotation'))
107+
ax.set_yticklabels(ax.get_yticklabels(), rotation=kwargs.get("fig_yticklabels_rotation"))
108108

109109
# Add the legend to the plot (remove duplicates)
110-
if kwargs.get('legend', True):
110+
if kwargs.get("legend", True):
111111
handles, labels = plt.gca().get_legend_handles_labels()
112112
by_label = dict(zip(labels, handles))
113-
PlotUtils.set_standard_legend_style(ax, by_label.values(), by_label.keys(), title=kwargs.get('legend_title', None))
113+
PlotUtils.set_standard_legend_style(ax, by_label.values(), by_label.keys(), title=kwargs.get("legend_title", None))
114114
else:
115115
if ax.get_legend():
116116
ax.get_legend().remove()
117117

118-
ax.grid(kwargs.get('grid', True))
118+
ax.grid(kwargs.get("grid", True))
119119

120120
# Display the plot
121121
plt.tight_layout()
@@ -134,7 +134,8 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None:
134134
data, outputs = self.data_fetcher.fetch_data(
135135
experiment=self.experiment,
136136
target_outputs=outputs,
137-
**kwargs.get('fetch_params', {}))
137+
resample_freq=kwargs.get("resample_freq", "1s"),
138+
**kwargs.get("fetch_params", {}))
138139

139140
if data is None:
140141
self.logger.error("No data fetched")
@@ -153,26 +154,26 @@ def _process(self, outputs: Optional[List[Output]] = None, **kwargs) -> None:
153154

154155
if converted and converted.type == "TIME_GRAPH":
155156
df = df.rename({"start_time": "timestamp"}, axis=1)
156-
df['end_time'] = pd.to_datetime(df['end_time'])
157+
df["end_time"] = pd.to_datetime(df["end_time"])
157158

158159
# Apply common preprocessing steps
159-
if kwargs.get('normalize', True):
160+
if kwargs.get("normalize", True):
160161
df = self.data_preprocessor.normalize(df)
161-
if kwargs.get('convert_datetime', True):
162+
if kwargs.get("convert_datetime", True):
162163
df = self.data_preprocessor.convert_to_datetime(df)
163-
if kwargs.get('resample', True):
164-
df = self.data_preprocessor.resample(df, frequency=kwargs.get('resample_freq', '1s'))
165-
if kwargs.get('remove_minimum', False):
164+
if kwargs.get("resample", True):
165+
df = self.data_preprocessor.resample(df, frequency=kwargs.get("resample_freq", "1s"))
166+
if kwargs.get("remove_minimum", False):
166167
df = self.data_preprocessor.remove_minimum(df)
167168

168169
self.dataframes[shortened] = df
169170

170171
# Filter out dataframes with less than min_size instances
171-
min_size = kwargs.get('min_size', 1)
172+
min_size = kwargs.get("min_size", 1)
172173
self.dataframes = {k: v for k, v in self.dataframes.items() if len(v) >= min_size}
173174

174175
# Align timestamps if needed
175-
if kwargs.get('align_timestamps', True) and self.dataframes:
176+
if kwargs.get("align_timestamps", True) and self.dataframes:
176177
self.dataframes, self.timestamps = DataPreprocessor.align_timestamps(self.dataframes)
177178

178179
# Call module-specific post-processing

tmll/tmll_client.py

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pandas as pd
99
import numpy as np
1010

11-
from typing import Dict, List, Optional, Union, cast
11+
from typing import Dict, List, Optional, Tuple, Union, cast
1212

1313
from tmll.common.models.data.table.column import TableDataColumn
1414
from tmll.common.models.data.table.table import TableData
@@ -324,16 +324,18 @@ def fetch_data(self, experiment: Experiment, outputs: List[Dict[str, Union[Outpu
324324
item_ids = list(map(int, [node.id for node in o_tree.nodes]))
325325
time_range_start = kwargs.get("start", experiment.start)
326326
time_range_end = kwargs.get("end", experiment.end)
327-
time_range_num_times = kwargs.get("num_times", 65536)
327+
resample_freq = kwargs.get("resample_freq", "1s")
328328

329-
while True:
329+
intervals = self._calculate_intervals(time_range_start, time_range_end, resample_freq)
330+
331+
for interval in intervals:
330332
parameters = {
331333
TspClient.PARAMETERS_KEY: {
332334
TspClient.REQUESTED_ITEM_KEY: item_ids,
333335
TspClient.REQUESTED_TIME_RANGE_KEY: {
334-
TspClient.REQUESTED_TIME_RANGE_START_KEY: time_range_start,
335-
TspClient.REQUESTED_TIME_RANGE_END_KEY: time_range_end,
336-
TspClient.REQUESTED_TIME_RANGE_NUM_TIMES_KEY: time_range_num_times
336+
TspClient.REQUESTED_TIME_RANGE_START_KEY: interval[0],
337+
TspClient.REQUESTED_TIME_RANGE_END_KEY: interval[1],
338+
TspClient.REQUESTED_TIME_RANGE_NUM_TIMES_KEY: interval[2]
337339
}
338340
}
339341
}
@@ -374,31 +376,23 @@ def fetch_data(self, experiment: Experiment, outputs: List[Dict[str, Union[Outpu
374376
datasets[o_output.id] = datasets.get(o_output.id, {})
375377
datasets[o_output.id][series_name] = pd.concat([datasets[o_output.id].get(series_name, pd.DataFrame()), dataset])
376378

377-
# Update the time_range_start for the next iteration
378-
if x and len(x) > 0:
379-
time_range_start = x[-1] + 1 # Start from the next timestamp after the last received
380-
381-
# Check if the time_range_start is greater than the time_range_end
382-
if time_range_start > time_range_end:
383-
break
384-
else:
385-
break # Exit if no data was received in this iteration
386-
387379
case "TIME_GRAPH":
388380
items = list(map(int, [node.id for node in o_tree.nodes]))
389381
time_range_start = kwargs.get("start", experiment.start)
390382
time_range_end = kwargs.get("end", experiment.end)
391-
time_range_num_times = kwargs.get("num_times", 65536)
383+
resample_freq = kwargs.get("resample_freq", "1s")
392384
strategy = kwargs.get("strategy", "DEEP")
393385

394-
while True:
386+
intervals = self._calculate_intervals(time_range_start, time_range_end, resample_freq)
387+
388+
for interval in intervals:
395389
parameters = {
396390
TspClient.PARAMETERS_KEY: {
397391
TspClient.REQUESTED_ITEM_KEY: items,
398392
TspClient.REQUESTED_TIME_RANGE_KEY: {
399-
TspClient.REQUESTED_TIME_RANGE_START_KEY: time_range_start,
400-
TspClient.REQUESTED_TIME_RANGE_END_KEY: time_range_end,
401-
TspClient.REQUESTED_TIME_RANGE_NUM_TIMES_KEY: time_range_num_times
393+
TspClient.REQUESTED_TIME_RANGE_START_KEY: interval[0],
394+
TspClient.REQUESTED_TIME_RANGE_END_KEY: interval[1],
395+
TspClient.REQUESTED_TIME_RANGE_NUM_TIMES_KEY: interval[2]
402396
},
403397
"filter_query_parameters": {
404398
"strategy": strategy
@@ -442,16 +436,6 @@ def fetch_data(self, experiment: Experiment, outputs: List[Dict[str, Union[Outpu
442436

443437
datasets[o_output.id] = pd.concat([datasets[o_output.id], dataset])
444438

445-
# Update the time_range_start for the next iteration
446-
if data and len(data) > 0:
447-
time_range_start = data[-1]["end_time"] + 1
448-
449-
# Check if the time_range_start is greater than the time_range_end
450-
if time_range_start > time_range_end:
451-
break
452-
else:
453-
break
454-
455439
case "TABLE" | "DATA_TREE":
456440
columns = self.tsp_client.fetch_virtual_table_columns(exp_uuid=experiment.uuid, output_id=o_output.id)
457441
if columns.status_code != 200:
@@ -532,6 +516,54 @@ def fetch_data(self, experiment: Experiment, outputs: List[Dict[str, Union[Outpu
532516

533517
return datasets
534518

519+
def _calculate_intervals(self, start: int, end: int, resample_freq: str) -> List[Tuple[int, int, int]]:
520+
"""
521+
Based on the resampling frequency, we should indicate how many items we want to fetch for XY/TIMEGRAPH data
522+
from the TSP server. For example, if the resampling frequency is 1s, we should fetch 1 item per second.
523+
524+
Now, based on the start and end timestamps, we should calculate the number of items
525+
to fetch from the TSP server. For example, if the start timestamp is 0 and the end timestamp is 1000,
526+
and the resampling frequency is 1s, we should fetch 1000 items.
527+
528+
The formula to calculate the number of items is:
529+
num_items = (end - start) / resample_freq
530+
where:
531+
- start: Start timestamp in nano-seconds
532+
- end: End timestamp in nano-seconds
533+
- resample_freq: Resampling frequency in string format (e.g., "1s", "1ms", "1us", etc.)
534+
535+
The catch is that, the tsp server can handle up to 65536 items at a time. So, we need to return the list
536+
of intervals to fetch the XY data from the TSP server. For example, if the number of items is 100000, we should
537+
return a list of two intervals with the following values:
538+
- start: the start timestamp of the interval
539+
- end: the end timestamp of the interval
540+
- num_items: the number of items to fetch from the TSP server
541+
So, the list should be like:
542+
[(start1, end1, 65536), (start2, end2, 34464)]
543+
544+
:param start: Start timestamp
545+
:type start: int
546+
:param end: End timestamp
547+
:type end: int
548+
:param resample_freq: Resampling frequency
549+
:type resample_freq: str
550+
:return: List of intervals
551+
:rtype: List[Tuple[int, int, int]]
552+
"""
553+
resample_freq_timedelta = pd.to_timedelta(resample_freq).value
554+
555+
num_items = int((end - start) / resample_freq_timedelta)
556+
intervals = []
557+
558+
while num_items > 0:
559+
batch_size = min(num_items, 65536)
560+
interval_end = start + batch_size * resample_freq_timedelta
561+
intervals.append((start, interval_end, batch_size))
562+
start = interval_end
563+
num_items -= batch_size
564+
565+
return intervals
566+
535567
@staticmethod
536568
def enable_instrumentation(instrumentation_file: Optional[str] = None, instrument_kernel: bool = False, verbose: bool = True) -> None:
537569
"""

0 commit comments

Comments
 (0)