-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
100 lines (83 loc) · 3.59 KB
/
Copy pathutils.py
File metadata and controls
100 lines (83 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import random
import os, sys
import numpy as np
import torch
def seed_everything(seed=42):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def get_mlp_layers(lyr_str: str, n_layers: int):
mlp_layers_list = None
if lyr_str == "all":
mlp_layers_list = list(range(n_layers))
elif lyr_str == "none":
mlp_layers_list = []
else:
mlp_layers_list = [int(i) for i in lyr_str.split()]
for i in mlp_layers_list:
assert 0 <= i < n_layers, f"Some specified layer for the parameter 'mlp_layers' is out of bounds [0, {n_layers-1}]."
return mlp_layers_list
def get_layer_norm_list(layer_norm: str):
layer_norm_list = None
if layer_norm == "all":
layer_norm_list = ["attn", "mlp", "final"]
elif layer_norm == "none":
layer_norm_list = []
else:
layer_norm_list = []
ln_str_list = list(layer_norm)
assert len(ln_str_list) == 3, "The parameter 'layer_norm' should be a string of length 3."
for ln_item in zip(ln_str_list, ["attn", "mlp", "final"]):
assert ln_item[0] in ["0", "1"], "The parameter 'layer_norm' should be a string of 0s and 1s."
if ln_item[0] == "1":
layer_norm_list.append(ln_item[1])
return layer_norm_list
def is_model_equal(model1, model2):
"""
Checks if two models are equal or not.
"""
for p1, p2 in zip(model1.parameters(), model2.parameters()):
if p1.data.ne(p2.data).sum() > 0:
return False
return True
def acclerator_load_model(accelerator, model, checkpoint_path, **load_model_func_kwargs):
model = accelerator.prepare(model).to(load_model_func_kwargs['map_location']) # prepare other objects (such as optimizers, LR schedulers etc.) if you want to load them as well
accelerator.load_state(checkpoint_path, **load_model_func_kwargs)
model = accelerator.unwrap_model(model)
return model
class HiddenPrints:
"""
https://stackoverflow.com/a/45669280
Usage:
with HiddenPrints():
print("This will not be printed")
print("This will be printed as before")
"""
def __enter__(self):
self._original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout.close()
sys.stdout = self._original_stdout
# Position encoding utils
def get_position_embeds(enc_type, max_len, pos_emb_dim):
position_embeds = None
if enc_type == "sinusoidal":
position_embeds = sinusoidal_positional_embedding(token_sequence_size=max_len, token_embedding_dim=pos_emb_dim, n=10000.0)
return position_embeds
def sinusoidal_positional_embedding(token_sequence_size, token_embedding_dim, n=10000.0):
if token_embedding_dim % 2 != 0:
raise ValueError("Sinusoidal positional embedding cannot apply to odd token embedding dim (got dim={:d})".format(token_embedding_dim))
T = token_sequence_size
d = token_embedding_dim
positions = torch.arange(0, T).unsqueeze(1)
embeddings = torch.zeros(T, d)
denominators = torch.pow(n, 2*torch.arange(0, d//2)/d) # 10000^(2i/d_model), i is the index of embedding
embeddings[:, 0::2] = torch.sin(positions/denominators) # sin(pos/10000^(2i/d_model))
embeddings[:, 1::2] = torch.cos(positions/denominators) # cos(pos/10000^(2i/d_model))
return embeddings.unsqueeze(0) # 1, max_len x pos_emb_dim