-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
320 lines (263 loc) · 8.9 KB
/
utils.py
File metadata and controls
320 lines (263 loc) · 8.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
Copyright 2025 Universitat Politècnica de Catalunya
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow as tf
import pickle
from enum import Enum
from typing import List
class AvgAccumulator:
def __init__(self):
self.value = 0.0
self.count = 0
def update(self, val):
self.count += 1
self.value += (val - self.value) / self.count
class CustomEarlyStop(tf.keras.callbacks.Callback):
"""Callback that stops training when a low enough learning rate is reached.
Parameters
----------
min_lr : float
Minimum learning rate before stopping training.
"""
def __init__(self, min_lr=1e-6):
super(CustomEarlyStop, self).__init__()
self.min_lr = min_lr
def on_epoch_end(self, epoch, logs=None):
if logs["lr"] < self.min_lr:
self.model.stop_training = True
class DatasetIteratorWrapper:
def __init__(self, ds, steps_per_epoch=None):
self.ds = ds
self.repeating = steps_per_epoch is not None
self.steps_per_epoch = steps_per_epoch
self.iterator = iter(ds)
def _non_repeating_generator(self):
for element in self.ds:
yield element
def _repeating_generator(self):
amount = self.steps_per_epoch
while amount > 0:
try:
yield next(self.iterator)
except StopIteration:
self.iterator = iter(self.ds)
yield next(self.iterator)
amount -= 1
def get_epoch_samples(self):
return (
self._repeating_generator()
if self.repeating
else self._non_repeating_generator()
)
class FINETUNE_OPTIONS(Enum):
"""Enum class to define the fine tunning options."""
FREEZE = 0
FINETUNE = 1
RETRAIN = 2
def _seg_to_global_reshape(tensor, num_dims=3):
"""Function that modifies the shape of the tensor, flattenging the window dimension
Parameters
----------
tensor : tf.tensor
Input tensor
num_dims : int, optional
Size of last dimension, by default 3
Returns
-------
tf.tensor
Reshaped tensor
"""
assert num_dims > 1
perms = [1, 0] + list(range(2, num_dims))
total_flows = tf.shape(tensor)[0] * tf.shape(tensor)[1]
if num_dims == 2:
new_shape = (total_flows,)
else:
new_shape = tf.concat([[total_flows], tf.shape(tensor)[2:]], axis=0)
return tf.reshape(tf.transpose(tensor, perms), new_shape)
def get_experiment_path(
experiment_name,
ds_name,
model_name,
target,
fine_tune_options=None,
variant=None,
og_ds_name=None,
):
"""Generates a unique experiment path based on the experiment parameters.
Parameters
----------
experiment_name : str
Experiment batch name
ds_name : str
Dataset name
model_name : str
Model name
target : str
Perfomance metric to be predicted by the model
fine_tune_options : str, optional
Descriptor string indicating fine tune operations, by default None
variant : str, optional
Additional experiment discriminating descriptor, by default None
og_ds_name : str, optional
If fine tuning, the donor dataset name, by default None
Returns
-------
_type_
_description_
"""
experiment_path = f"{experiment_name}/{ds_name}"
if og_ds_name is not None:
experiment_path += f"/og_ds_{og_ds_name}"
experiment_path += f"/{model_name}"
if fine_tune_options not in [None, ""]:
experiment_path += f"/{fine_tune_options}"
if variant not in [None, ""]:
experiment_path += f"/{variant}"
experiment_path += f"/{target}"
return experiment_path
def get_positional_denorm_mape(pos, name):
"""Returns a function to compute de denormalized MAPE at training. Expects the model
to output a two-dimensional tensor (batch, feature). 'pos' argument specifies which
feature to use for the metric.
Parameters
----------
pos : int
Position of the target variable in the output.
name : name
Name of the target variable (to be used in tensorboard).
"""
def denorm_mape(y_true, y_pred):
y_true = tf.expand_dims(tf.math.exp(y_true[:, pos]), 1)
y_pred = tf.expand_dims(tf.math.exp(y_pred[:, pos]), 1)
return tf.reduce_mean(tf.abs((y_true - y_pred) / y_true)) * 100
denorm_mape.__name__ = f"denorm_mape_{name}_metric"
return denorm_mape
def load_and_copy_z_scores(
params,
donor_res_path,
new_res_path,
check_existing=False,
):
"""
Get the mean and the std for different parameters of a dataset. Works by copying the
z-scores from another experiment. Meant for transfer learning.
Parameters
----------
params: List[str]
Input features to be normalized
donor_res_path: str
Path to normalization results of the donor experiment
new_res_path: str
Path to store the normalization results of the receiver experiment
check_existing: bool
If True, check if the new_res_path exists and return the dict if so.
Returns
-------
dict
Dictionary containing the min and the max-min for each parameter.
"""
# If check_existing is True, check if the file exists and return the dict (if so)
if check_existing and os.path.exists(new_res_path):
with open(new_res_path, "rb") as ff:
return pickle.load(ff)
# Load the donor dict
with open(donor_res_path, "rb") as ff:
donor_dict = pickle.load(ff)
# Check the dict
assert all(
kk in donor_dict for kk in params
), "Some parameters are missing in the donor dict."
# Store the dict
store_res_dir, _ = os.path.split(new_res_path)
os.makedirs(store_res_dir, exist_ok=True)
with open(new_res_path, "wb") as ff:
pickle.dump(donor_dict, ff)
return donor_dict
def load_model_with_ckpt(
model: tf.keras.Model, ckpt_path: str, layer_options: List[FINETUNE_OPTIONS]
) -> None:
"""Loads a model with donor weights according to the fine tuning options.
Parameters
----------
model : tf.keras.Model
Reciever model
ckpt_path : str
Path to donor checkpoint
layer_options : List[FINETUNE_OPTIONS]
Fine tuning options per layer
"""
# Save randomly initialized weights for retrain scenarios
model_random_weights = [layer.get_weights() for layer in model.layers]
# Load weights from checkpoint
model.load_weights(ckpt_path)
# Set layers
for layer, option, layer_rng_init in zip(
model.layers, layer_options, model_random_weights
):
if option == FINETUNE_OPTIONS.FREEZE:
layer.trainable = False
elif option == FINETUNE_OPTIONS.FINETUNE:
layer.trainable = True
elif option == FINETUNE_OPTIONS.RETRAIN:
layer.trainable = True
layer.set_weights(layer_rng_init)
def log_transform(x, y):
"""Apply log transformation to output variable.
Parameters
----------
x: dict
Predictor variables
y: tf.Tensor
Output variable
Returns
-------
dict
Predictor variables
tf.Tensor
Transformed output variable
"""
return x, tf.math.log(y)
def prepare_targets_and_mask(targets, mask, output_dim=1):
"""Pre-process the samples by selecting the target variables and applying the mask.
The mask is used to only selecting valid flows (with generated packets) per window.
Parameters
----------
targets : List[str]
List of targest
mask : str
Mask feature
output_dim : int, optional
Number of output dimensions, by default 1
Returns
-------
Function
Function to be mapped to the tf.data.Dataset for the processing to take place.
"""
assert output_dim > 0, "tile_mask must be greater than 0"
def modified_target_map(x, y):
reshaped_mask = tf.expand_dims(_seg_to_global_reshape(x[mask], num_dims=2), 1)
if output_dim > 1:
reshaped_mask = tf.tile(reshaped_mask, [1, output_dim])
return x, tf.concat(
[
tf.reshape(
tf.boolean_mask(_seg_to_global_reshape(x[target]), reshaped_mask),
(-1, output_dim),
)
for target in targets
],
axis=1,
)
return modified_target_map