-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpreproccessing.py
More file actions
191 lines (155 loc) · 7.71 KB
/
preproccessing.py
File metadata and controls
191 lines (155 loc) · 7.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
Data Handling
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import gc
import pickle
import joblib
import argparse
import os
def transform_data(file_path, file_format='parquet', lat_bins=None, lon_bins=None, le_location=None):
"""Transform raw data into features for model training/testing"""
# Read data based on file format
if file_format == 'parquet':
df = pd.read_parquet(file_path)
elif file_format == 'csv':
df = pd.read_csv(file_path)
else:
raise ValueError(f"Unsupported file format: {file_format}. Use 'parquet' or 'csv'.")
# Use provided bins or compute from data
if lat_bins is None or lon_bins is None:
lat_min, lat_max = df['lat'].min(), df['lat'].max()
lon_min, lon_max = df['lon'].min(), df['lon'].max()
grid_size = 20
lat_bins = np.linspace(lat_min, lat_max, grid_size + 1)
lon_bins = np.linspace(lon_min, lon_max, grid_size + 1)
# Create grid positions
lat_positions = pd.cut(df['lat'], bins=lat_bins, labels=False, include_lowest=True)
lon_positions = pd.cut(df['lon'], bins=lon_bins, labels=False, include_lowest=True)
df['location'] = lat_positions * 20 + lon_positions
df['location'] = df['location'].fillna(0).astype(int)
# Process class and time features
df['class'] = df['class'].isin(['Good', 'Moderate']).astype(int)
df['time'] = pd.to_datetime(df['time'])
df['year'] = df['time'].dt.year
df['month'] = df['time'].dt.month
df['day'] = df['time'].dt.day
df['hour'] = df['time'].dt.hour
# Select columns
list_of_columns = ['class', 'PM25_MERRA2', 'DUCMASS', 'TOTANGSTR', 'DUFLUXV', 'SSFLUXV', 'DUFLUXU', 'BCCMASS', 'SSSMASS25', 'location']
selected_columns = list_of_columns + ['year', 'month', 'day', 'hour']
df = df[selected_columns]
# Encode location
if le_location is None:
le_location = LabelEncoder()
df['location_encoded'] = le_location.fit_transform(df['location'])
else:
# Handle unseen locations in test data
df['location_encoded'] = df['location'].map(
lambda x: le_location.transform([x])[0] if x in le_location.classes_ else -1
)
df = df.sort_values(['location_encoded', 'year', 'month', 'day', 'hour'])
# Define feature columns
feature_columns = [col for col in df.columns if col not in [
'class', 'location', 'year', 'month', 'day', 'hour', 'location_encoded'
]]
df.drop(columns='location', inplace=True)
feature_columns.append('location_encoded')
return df, feature_columns, lat_bins, lon_bins, le_location
def create_sequences_memory_efficient(df, feature_columns, scaler=None,
input_len=168, output_len=72, stride=24):
"""Create sequences for multi-step forecasting"""
print(f"Creating sequences with input length={input_len}, output length={output_len}...")
if scaler is None:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df[feature_columns])
else:
X_scaled = scaler.transform(df[feature_columns])
X_scaled = pd.DataFrame(X_scaled, columns=feature_columns, index=df.index)
X_sequences, y_sequences, location_indices = [], [], []
unique_locations = df['location_encoded'].unique()
for i, loc in enumerate(unique_locations):
loc_df = df[df['location_encoded'] == loc]
loc_X = X_scaled.loc[loc_df.index]
loc_y = loc_df['class'].values
max_start_idx = len(loc_df) - input_len - output_len
for j in range(0, max_start_idx, stride):
X_seq = loc_X.iloc[j : j + input_len].values
y_target = loc_y[j + input_len : j + input_len + output_len]
X_sequences.append(X_seq)
y_sequences.append(y_target)
location_indices.append(loc)
if (i+1) % 100 == 0:
print(f"Processed location {i+1}/{len(unique_locations)}")
X_sequences = np.array(X_sequences, dtype=np.float32)
y_sequences = np.array(y_sequences, dtype=np.float32)
location_indices = np.array(location_indices)
print(f"Total sequences: {X_sequences.shape[0]}")
print(f"Input sequence shape: {X_sequences.shape}")
print(f"Output sequence shape: {y_sequences.shape}")
return X_sequences, y_sequences, location_indices, scaler
def main():
# Set up argument parser
parser = argparse.ArgumentParser(description='Preprocess air quality data.')
parser.add_argument('--data_path', type=str, required=True,
help='Path to the dataset file (parquet or csv format)')
parser.add_argument('--file_format', type=str, choices=['parquet', 'csv'], default='parquet',
help='File format of the dataset: "parquet" or "csv" (default: parquet)')
parser.add_argument('--mode', type=str, choices=['train', 'test'], default='train',
help='Processing mode: "train" to split data, "test" to transform without splitting')
args = parser.parse_args()
# Check if file exists
if not os.path.exists(args.data_path):
raise FileNotFoundError(f"Dataset file not found: {args.data_path}")
# Create output directory if it doesn't exist
os.makedirs('output', exist_ok=True)
# Transform data
df, feature_columns, lat_bins, lon_bins, le_location = transform_data(
args.data_path, file_format=args.file_format
)
# Create sequences
X_sequences, y_sequences, location_indices, scaler = create_sequences_memory_efficient(
df, feature_columns, input_len=168, output_len=72, stride=24
)
if args.mode == 'train':
# Split data
X_train, X_temp, y_train, y_temp, loc_train, loc_temp = train_test_split(
X_sequences, y_sequences, location_indices,
test_size=0.4, random_state=42, stratify=location_indices
)
X_test, X_val, y_test, y_val, loc_test, loc_val = train_test_split(
X_temp, y_temp, loc_temp,
test_size=0.5, random_state=42, stratify=loc_temp
)
del X_sequences, y_sequences
gc.collect()
print(f"Training set shape: {X_train.shape}")
print(f"Validation set shape: {X_val.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Training target shape: {y_train.shape}")
print(f"Validation target shape: {y_val.shape}")
print(f"Test target shape: {y_test.shape}")
# Save split data
np.save('data/Data files/X_train.npy', X_train)
np.save('data/Data files/y_train.npy', y_train)
np.save('data/Data files/loc_train.npy', loc_train)
np.save('data/Data files/X_test.npy', X_test)
np.save('data/Data files/y_test.npy', y_test)
np.save('data/Data files/X_val.npy', X_val)
np.save('data/Data files/y_val.npy', y_val)
np.save('data/Data files/loc_val.npy', loc_val)
np.save('data/Data files/loc_test.npy', loc_test)
print("Saved 9 files for train/test/val splits to output/ directory")
else: # test mode
print(f"Transformed data shape: {X_sequences.shape}")
print(f"Target shape: {y_sequences.shape}")
# Save transformed data without splitting
np.save('data/Data files/X_sequences.npy', X_sequences)
np.save('data/Data files/y_sequences.npy', y_sequences)
np.save('data/Data files/location_indices.npy', location_indices)
print("Saved 3 files for transformed data to output/ directory")
if __name__ == "__main__":
main()