-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathablation_experiment.py
More file actions
95 lines (78 loc) · 3.45 KB
/
ablation_experiment.py
File metadata and controls
95 lines (78 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import time
import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
import os
# 复用你的Dataset定义
class TimeSeriesDataset(Dataset):
def __init__(self, data, seq_len, pred_len):
self.data = data
self.seq_len = seq_len
self.pred_len = pred_len
def __len__(self):
return len(self.data) - self.seq_len - self.pred_len + 1
def __getitem__(self, index):
s_begin = index
s_end = s_begin + self.seq_len
r_begin = s_end
r_end = r_begin + self.pred_len
return torch.tensor(self.data[s_begin:s_end], dtype=torch.float32), \
torch.tensor(self.data[r_begin:r_end], dtype=torch.float32)
# 简化的LSTM测试模型,专注测试数据加载与传输速度
class DummyModel(torch.nn.Module):
def __init__(self, input_size):
super().__init__()
self.lstm = torch.nn.LSTM(input_size, 64, batch_first=True)
self.fc = torch.nn.Linear(64, input_size)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(out[:, -1, :])
def run_ablation(pin_mem, non_block, dataset_path="data/Weather.csv"):
df = pd.read_csv(dataset_path)
numeric_cols = df.select_dtypes(include=[np.number]).columns
data_values = df[numeric_cols].values
dataset = TimeSeriesDataset(data_values, seq_len=96, pred_len=96)
# 固定 batch_size=32 进行对照
loader = DataLoader(dataset, batch_size=32, shuffle=True, pin_memory=pin_mem, num_workers=4)
device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
model = DummyModel(input_size=len(numeric_cols)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.MSELoss()
print(f"\n测试配置: pin_memory={pin_mem}, non_blocking={non_block}")
# 预热GPU(避免第一次启动初始化带来的误差)
model.train()
for i, (x, y) in enumerate(loader):
x, y = x.to(device, non_blocking=non_block), y.to(device, non_blocking=non_block)
outputs = model(x)
loss = criterion(outputs, y[:, -1, :])
loss.backward()
if i > 5: break
# 正式记录 1 个完整 Epoch 的时间
start_time = time.time()
sample_count = 0
for batch_x, batch_y in loader:
batch_x = batch_x.to(device, non_blocking=non_block)
batch_y = batch_y.to(device, non_blocking=non_block)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y[:, -1, :])
loss.backward()
optimizer.step()
sample_count += batch_x.size(0)
epoch_time = time.time() - start_time
throughput = sample_count / epoch_time
print(f"✅ 单个Epoch耗时: {epoch_time:.2f} 秒")
print(f"✅ 系统吞吐率: {throughput:.2f} 样本/秒")
return epoch_time, throughput
if __name__ == "__main__":
if not torch.cuda.is_available() and not torch.backends.mps.is_available():
print("警告:未检测到GPU,消融实验需要在GPU环境下运行才能看出差异!")
else:
print("开始执行第4章硬件优化消融实验 (以 Weather 数据集为例)...")
# 对照组:都不开
run_ablation(pin_mem=False, non_block=False)
# 实验组A:只开锁页内存
run_ablation(pin_mem=True, non_block=False)
# 实验组B:全局最优(都开)
run_ablation(pin_mem=True, non_block=True)