Skip to content

Commit f1d324c

Browse files
committed
Added arima
1 parent 788d95b commit f1d324c

File tree

1 file changed

+357
-0
lines changed

1 file changed

+357
-0
lines changed

machine_learning/arima.py

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
import matplotlib.pyplot as plt
2+
import numpy as np
3+
from numpy.typing import NDArray
4+
5+
"""
6+
This program implements an ARIMA (AutoRegressive Integrated Moving Average) model
7+
from scratch in Python.
8+
9+
References:
10+
Wikipedia page on ARIMA:
11+
https://en.wikipedia.org/wiki/Autoregressive_integrated_moving_average
12+
13+
Author:-Debanuj Roy
14+
Email:- debanujroy1234@gmail.com
15+
"""
16+
17+
18+
class ARIMA:
19+
def __init__(self, p=1, d=1, q=1, lr=0.001, epochs=1000) -> None:
20+
"""
21+
Initializes the ARIMA model.
22+
23+
Args:
24+
p: AR lag order (uses past y values).
25+
d: Differencing order (makes data stationary).
26+
q: MA lag order (uses past errors).
27+
lr: Learning rate for Gradient Descent.
28+
epochs: Number of training cycles.
29+
"""
30+
# We need to make sure p, d, and q are sensible numbers (positive integers).
31+
if not all(isinstance(x, int) and x >= 0 for x in [p, d, q]):
32+
raise ValueError("p, d, and q must be non-negative integers")
33+
# Learning rate and epochs should also be positive.
34+
if lr <= 0 or epochs <= 0:
35+
raise ValueError("lr and epochs must be positive")
36+
37+
self.p = p
38+
self.d = d
39+
self.q = q
40+
self.lr = lr
41+
self.epochs = epochs
42+
43+
# These are the parameters our model will learn. We'll initialize them at zero.
44+
# phi -> The weights for the AR (past values) part.
45+
self.phi = np.zeros(p)
46+
# theta -> The weights for the MA (past errors) part.
47+
self.theta = np.zeros(q)
48+
# c -> A constant or intercept, like a baseline value.
49+
self.c = 0.0
50+
51+
# Store info after model training
52+
self.is_fitted = False # Flag for training status
53+
self.train_last = 0.0 # Last value from training data
54+
# Type hints for optional attributes
55+
self.diff_data: NDArray[np.float64] | None = None
56+
self.errors: NDArray[np.float64] | None = None
57+
self.n_train: int | None = None
58+
self.sigma_err: float | None = None
59+
60+
def difference(self, data) -> NDArray[np.float64]:
61+
"""
62+
Makes the time series stationary by differencing.
63+
64+
Args:
65+
data: Original time series data.
66+
Returns:
67+
Differenced data.
68+
"""
69+
diff = np.copy(data)
70+
# We loop 'd' times, applying the differencing each time.
71+
for _ in range(self.d):
72+
diff = np.diff(diff) # np.diff is a handy function that does exactly this.
73+
return diff
74+
75+
def inverse_difference(
76+
self, last_obs: float, diff_forecast: list[float]
77+
) -> list[float]:
78+
"""
79+
Converts differenced data back to the original scale.
80+
81+
Args:
82+
last_obs: Last value from the original data.
83+
diff_forecast: Predictions on differenced data.
84+
Returns:
85+
Forecasts in the original scale.
86+
"""
87+
forecast = []
88+
# We start with the last known value from the original data.
89+
prev = last_obs
90+
# For each predicted *change*, we add it to the last value to get the next one.
91+
for val in diff_forecast:
92+
next_val = prev + val
93+
forecast.append(next_val)
94+
# Store current value for next iteration
95+
prev = next_val
96+
return forecast
97+
98+
def _compute_residuals(
99+
self,
100+
diff_data: NDArray[np.float64],
101+
phi: NDArray[np.float64],
102+
theta: NDArray[np.float64],
103+
c: float,
104+
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
105+
"""
106+
Computes residuals for given parameters.
107+
108+
Args:
109+
diff_data: Differenced data.
110+
phi, theta, c: Model parameters.
111+
Returns:
112+
Tuple of predictions and residuals.
113+
"""
114+
n = len(diff_data)
115+
# Need initial values to get started,we begin after the max of p or q.
116+
start = max(self.p, self.q)
117+
preds = np.zeros(n)
118+
errors = np.zeros(n)
119+
120+
# Loop through the data from the starting point.
121+
for t in range(start, n):
122+
# AR part: a weighted sum of the last 'p' actual values.
123+
# We reverse the data slice [::-1] so that the most recent value is first.
124+
ar_term = (
125+
np.dot(phi, diff_data[t - self.p : t][::-1]) if self.p > 0 else 0.0
126+
)
127+
128+
# MA part: a weighted sum of the last 'q' prediction errors.
129+
ma_term = np.dot(theta, errors[t - self.q : t][::-1]) if self.q > 0 else 0.0
130+
131+
# Combine everything to make the one-step-ahead prediction.
132+
preds[t] = c + ar_term + ma_term
133+
# Calculate error
134+
errors[t] = diff_data[t] - preds[t]
135+
136+
return preds, errors
137+
138+
def fit(self, data: list[float] | NDArray[np.float64]) -> "ARIMA":
139+
"""
140+
Trains the ARIMA model.
141+
142+
Args:
143+
data: Time series data to train on.
144+
Returns:
145+
The fitted ARIMA model.
146+
"""
147+
data = np.asarray(data, dtype=float)
148+
# Check if we have enough data to even build the model.
149+
if len(data) < max(self.p, self.q) + self.d + 5:
150+
raise ValueError("Not enough data to train. You need more samples.")
151+
152+
# Store last value of the original data.We'll need it to forecast later.
153+
self.train_last = float(data[-1])
154+
155+
# Step 1: Make the data stationary.
156+
diff_data = self.difference(data)
157+
n = len(diff_data)
158+
start = max(self.p, self.q)
159+
160+
# Another check to make sure we have enough data AFTER differencing.
161+
if n <= start:
162+
raise ValueError("Not enough data after differencing. Try reducing 'd'.")
163+
164+
# Step 2: Call the chosen training method to find the best parameters.
165+
self._fit_gradient_descent(diff_data, start)
166+
# All done! Mark the model as fitted and ready to forecast.
167+
self.is_fitted = True
168+
self.diff_data = diff_data # Ensure diff_data is assigned correctly
169+
self.errors = np.zeros(len(diff_data)) # Initialize errors as a numpy array
170+
self.n_train = len(diff_data) # Assign n_train as an integer
171+
return self
172+
173+
def _fit_gradient_descent(self, diff_data: NDArray[np.float64], start: int) -> None:
174+
"""
175+
Trains the model using Gradient Descent.
176+
177+
Args:
178+
diff_data: Differenced data.
179+
start: Starting index for training.
180+
"""
181+
n = len(diff_data)
182+
errors = np.zeros(n)
183+
preds = np.zeros(n)
184+
m = max(1, n - start) # Number of points we can calculate error on.
185+
186+
# The main training loop. We repeat this 'epochs' times.
187+
for epoch in range(self.epochs):
188+
# First, calculate the predictions and errors with the current parameters.
189+
for t in range(start, n):
190+
ar_term = (
191+
np.dot(self.phi, diff_data[t - self.p : t][::-1])
192+
if self.p > 0
193+
else 0.0
194+
)
195+
ma_term = (
196+
np.dot(self.theta, errors[t - self.q : t][::-1])
197+
if self.q > 0
198+
else 0.0
199+
)
200+
preds[t] = self.c + ar_term + ma_term
201+
errors[t] = diff_data[t] - preds[t]
202+
203+
# Calculate the "gradient"-which direction we should nudge our parameters
204+
# to reduce the error. It's based on the partial derivatives of the MSE.
205+
dc = -2 * np.sum(errors[start:]) / m
206+
dphi = np.zeros_like(self.phi)
207+
208+
# Calculate AR gradients
209+
for i in range(self.p):
210+
err_idx = slice(start - i - 1, n - i - 1)
211+
error_term = errors[start:] * diff_data[err_idx]
212+
dphi[i] = -2 * np.sum(error_term) / m
213+
214+
# Calculate MA gradients
215+
dtheta = np.zeros_like(self.theta)
216+
for j in range(self.q):
217+
err_idx = slice(start - j - 1, n - j - 1)
218+
error_term = errors[start:] * errors[err_idx]
219+
dtheta[j] = -2 * np.sum(error_term) / m
220+
221+
# Update parameters by taking steps
222+
# in the opposite direction of the gradient.
223+
self.phi -= self.lr * dphi
224+
self.theta -= self.lr * dtheta
225+
self.c -= self.lr * dc
226+
227+
if epoch % 100 == 0:
228+
mse = np.mean(errors[start:] ** 2)
229+
print(f"Epoch {epoch}: MSE={mse:.6f}, c={self.c:.6f}")
230+
231+
# After training, store the final results.
232+
self.errors = errors # Ensure errors is assigned correctly
233+
self.diff_data = diff_data # Ensure diff_data is assigned correctly
234+
self.n_train = n # Ensure n_train is assigned as an integer
235+
sigma_term = np.sum(self.errors[start:] ** 2) / m
236+
self.sigma_err = float(np.sqrt(sigma_term))
237+
msg = f"Fitted params (GD): phi={self.phi},theta={self.theta},c={self.c:.6f}\n"
238+
print(msg)
239+
240+
def forecast(
241+
self, steps: int, simulate_errors: bool = False, random_state: int | None = None
242+
) -> NDArray[np.float64]:
243+
"""
244+
Generates future predictions.
245+
246+
Args:
247+
steps: Number of steps to predict.
248+
simulate_errors: Adds randomness to forecasts if True.
249+
random_state: Seed for reproducibility.
250+
Returns:
251+
Forecasted values.
252+
"""
253+
msg = "Model must be fitted before forecasting. Call fit() first."
254+
if not self.is_fitted:
255+
raise ValueError(msg)
256+
257+
if self.diff_data is None or self.errors is None or self.sigma_err is None:
258+
raise ValueError(msg)
259+
260+
# We'll be adding to these lists, so let's work on copies.
261+
diff_data = np.copy(self.diff_data)
262+
errors = np.copy(self.errors)
263+
forecasts_diff = []
264+
265+
# A tool for generating random numbers if we need them.
266+
rng = np.random.default_rng(random_state)
267+
268+
# Generate one forecast at a time.
269+
for _ in range(steps):
270+
# AR part: Use the last 'p' values (from previous y-values).
271+
ar_slice = slice(-self.p, None)
272+
ar_term = np.dot(self.phi, diff_data[ar_slice][::-1]) if self.p > 0 else 0.0
273+
# MA part:Use the last 'q' errors(from prediction errors).
274+
ma_slice = slice(-self.q, None)
275+
ma_term = np.dot(self.theta, errors[ma_slice][::-1]) if self.q > 0 else 0.0
276+
277+
# The next predicted value (on the differenced scale).
278+
next_diff_forecast = self.c + ar_term + ma_term
279+
forecasts_diff.append(next_diff_forecast)
280+
281+
# For the next loop, we need a value for the "next error".
282+
# If we're simulating, we draw a random error from a normal distribution
283+
# with the same standard deviation as our past errors.
284+
next_error = rng.normal(0.0, self.sigma_err) if simulate_errors else 0.0
285+
286+
# Now, append our new prediction and error to the history.
287+
# This is crucial: our next forecast will use these values we just made up!
288+
diff_data = np.append(diff_data, next_diff_forecast)
289+
errors = np.append(errors, next_error)
290+
291+
# Finally, "un-difference" the forecasts to get them back to the original scale.
292+
final_forecasts = self.inverse_difference(self.train_last, forecasts_diff)
293+
return np.array(final_forecasts, dtype=np.float64)
294+
295+
296+
# This part of the code only runs if you execute this script directly.
297+
if __name__ == "__main__":
298+
# Let's create some fake data that follows an ARIMA(2,1,2) pattern.
299+
rng = np.random.default_rng(42) # Replace legacy np.random.seed
300+
n = 500
301+
ar_params = [0.5, -0.3]
302+
ma_params = [0.4, 0.2]
303+
304+
# Start with some random noise.
305+
noise = rng.standard_normal(n) # Replace np.random.randn
306+
data = np.zeros(n)
307+
308+
# Build the ARMA part first.
309+
for t in range(2, n):
310+
data[t] = (
311+
ar_params[0] * data[t - 1]
312+
+ ar_params[1] * data[t - 2]
313+
+ noise[t]
314+
+ ma_params[0] * noise[t - 1]
315+
+ ma_params[1] * noise[t - 2]
316+
)
317+
# Now, "integrate" it by taking the cumulative sum.
318+
data = np.cumsum(data)
319+
320+
# Train the model on the first 300 data points.
321+
train_end = 400
322+
model = ARIMA(p=4, d=2, q=4)
323+
model.fit(data[:train_end])
324+
325+
# Forecast the next 200 steps.
326+
forecast_steps = 100
327+
forecast = model.forecast(forecast_steps, simulate_errors=True)
328+
329+
# Compare forecast to the actual data.
330+
true_future = data[train_end : train_end + forecast_steps]
331+
rmse = np.sqrt(np.mean((np.array(forecast) - true_future) ** 2))
332+
print(f"Forecast RMSE: {rmse:.6f}\n")
333+
334+
# Visualize results.
335+
plt.figure(figsize=(14, 6))
336+
plt.plot(range(len(data)), data, label="Original Data", linewidth=1.5)
337+
plt.plot(
338+
range(train_end, train_end + forecast_steps),
339+
forecast,
340+
label="Forecast",
341+
color="red",
342+
linewidth=1.5,
343+
)
344+
split_label = "Split"
345+
plt.axvline(
346+
train_end - 1,
347+
color="gray",
348+
linestyle=":",
349+
alpha=0.7,
350+
label=f"Train/Test {split_label}",
351+
)
352+
plt.xlabel("Time Step")
353+
plt.ylabel("Value")
354+
plt.title("ARIMA(2,1,2) Model: Training Data and Forecast")
355+
plt.legend()
356+
plt.tight_layout()
357+
plt.show()

0 commit comments

Comments
 (0)