-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSupport Vector Machine
More file actions
184 lines (130 loc) · 5.41 KB
/
Support Vector Machine
File metadata and controls
184 lines (130 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
## Feature engineering
features_dict = {}
for pair, _ in top_10_pairs:
stock1, stock2 = pair
# Take the logarithm of the price
log_price1 = np.log(dji_df_pivot[stock1])
log_price2 = np.log(dji_df_pivot[stock2])
spread = log_price1 - log_price2
# Feature construction
asset1_returns = log_price1.diff()
asset2_returns = log_price2.diff()
spread_ma5 = spread.rolling(5).mean()
asset1_volatility = asset1_returns.rolling(20).std()
asset2_volatility = asset2_returns.rolling(20).std()
# Merge into DataFrame
X = pd.DataFrame({
'Asset1Returns': asset1_returns,
'Asset2Returns': asset2_returns,
'SpreadMA5': spread_ma5,
'Asset1Volatility': asset1_volatility,
'Asset2Volatility': asset2_volatility
}).fillna(0)
y = spread.fillna(0)
features_dict[pair] = (X, y)
print(f"Features for pair {stock1}-{stock2} created.")
first_pair = top_10_pairs[0][0]
X_sample, y_sample = features_dict[first_pair]
train_test_split_dict = {} # Store the training and test sets for each pair of stocks
for pair in top_10_pairs:
stock_pair = pair[0]
X, y = features_dict[stock_pair]
train_size = int(len(y) * 0.8)
train_X = X.iloc[:train_size]
test_X = X.iloc[train_size:]
train_y = y.iloc[:train_size]
test_y = y.iloc[train_size:]
train_test_split_dict[stock_pair] = {
'train_X': train_X,
'test_X': test_X,
'train_y': train_y,
'test_y': test_y
}
print(f"Data split for pair {stock_pair[0]}-{stock_pair[1]} complete.")
# Model training and testing using SVM
svm_results = {}
for pair in top_10_pairs:
stock_pair = pair[0]
data = train_test_split_dict[stock_pair]
train_X = data['train_X']
test_X = data['test_X']
train_y = data['train_y']
test_y = data['test_y']
svm_model = SVR(kernel='linear')
svm_model.fit(train_X, train_y)
train_pred = svm_model.predict(train_X)
test_pred = svm_model.predict(test_X)
train_rmse = np.sqrt(mean_squared_error(train_y, train_pred))
test_rmse = np.sqrt(mean_squared_error(test_y, test_pred))
svm_results[stock_pair] = {
'train_rmse': train_rmse,
'test_rmse': test_rmse,
'model': svm_model,
'test_pred': test_pred
}
print(f"{stock_pair}: Train RMSE = {train_rmse:.4f}, Test RMSE = {test_rmse:.4f}")
def score_fn(model, test_X, spread, type="non_neural_net"):
# predict spread
if type == "non_neural_net":
test_pred = model.predict(test_X)
else:
test_pred = model(torch.Tensor(test_X.values)).detach().numpy()
zscore = (spread - test_pred.mean()) / test_pred.std()
entry_threshold = 2.0
exit_threshold = 1.0
stock1_position = pd.Series(data=0, index=zscore.index)
stock2_position = pd.Series(data=0, index=zscore.index)
for i in range(1, len(zscore)):
if zscore.iloc[i] < -entry_threshold and stock1_position.iloc[i-1] == 0:
stock1_position.iloc[i] = 1
stock2_position.iloc[i] = -1
elif zscore.iloc[i] > entry_threshold and stock2_position.iloc[i-1] == 0:
stock1_position.iloc[i] = -1
stock2_position.iloc[i] = 1
elif abs(zscore.iloc[i]) < exit_threshold:
stock1_position.iloc[i] = 0
stock2_position.iloc[i] = 0
else:
stock1_position.iloc[i] = stock1_position.iloc[i-1]
stock2_position.iloc[i] = stock2_position.iloc[i-1]
stock1_returns = (np.exp(test_X['Asset1Returns']) * stock1_position.shift(1)).fillna(0)
stock2_returns = (np.exp(test_X['Asset2Returns']) * stock2_position.shift(1)).fillna(0)
total_returns = stock1_returns + stock2_returns
cumulative_returns = (1 + total_returns).cumprod()
# return cumulative_returns[-1], cumulative_returns
return cumulative_returns.iloc[-1], cumulative_returns
svm_strategy_returns = {}
for pair in top_10_pairs:
stocks = pair[0]
data = train_test_split_dict[stocks]
test_X = data['test_X']
test_y = data['test_y']
spread = test_y
model = svm_results[stocks]['model']
# Get the cumulative returns
final_value, cum_returns = score_fn(model, test_X, spread, type="non_neural_net")
n_days = len(cum_returns)
# Calculate annualized return
annualized_return = (final_value ** (252 / n_days)) - 1
annualized_return_percent = annualized_return * 100
svm_strategy_returns[stocks] = annualized_return_percent
print(f"{stocks}: Annualized Return = {annualized_return_percent:.2f}%")
# Compute the mean annualized return for all 10 pairs
mean_annualized_return = np.mean(list(svm_strategy_returns.values()))
print(f"Mean Annualized Return of the SVM Model for the Top 10 Pairs: {mean_annualized_return:.2f}%")
# Visualization - SVM
pair_labels = [f"{p[0]}-{p[1]}" for p, _ in top_10_pairs]
returns = list(svm_strategy_returns.values())
x = np.arange(len(pair_labels))
plt.figure(figsize=(12, 6))
bars = plt.bar(x, returns, color='skyblue', edgecolor='black')
plt.axhline(mean_annualized_return, color='red', linestyle='--', label=f'Mean = {mean_annualized_return:.2f}%')
for i, bar in enumerate(bars):
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, height + 0.5, f'{height:.2f}%', ha='center', va='bottom')
plt.xticks(ticks=x - 0.2, labels=pair_labels, rotation=45, ha='right')
plt.title('Annualized Return of SVM Strategy for Top 10 Pairs')
plt.ylabel('Annualized Return (%)')
plt.legend()
plt.tight_layout()
plt.show()