Skip to content

Commit 6168b4a

Browse files
committed
feat(data structures, streams): variations of moving average
1 parent 156c662 commit 6168b4a

File tree

10 files changed

+224
-78
lines changed

10 files changed

+224
-78
lines changed
Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,11 @@
1-
class BufferFullException(Exception):
2-
pass
3-
4-
5-
class BufferEmptyException(Exception):
6-
pass
7-
8-
9-
class CircularBuffer:
10-
def __init__(self, size):
11-
self.buffer = bytearray(size)
12-
self.read_point = 0
13-
self.write_point = 0
14-
15-
def _update_buffer(self, data):
16-
"""
17-
Protected helper method for Python 2/3
18-
"""
19-
try:
20-
self.buffer[self.write_point] = data
21-
except TypeError:
22-
self.buffer[self.write_point] = ord(data)
23-
24-
def clear(self):
25-
self.buffer = bytearray(len(self.buffer))
26-
27-
def write(self, data):
28-
if all(self.buffer):
29-
raise BufferFullException
30-
self._update_buffer(data)
31-
self.write_point = (self.write_point + 1) % len(self.buffer)
32-
33-
def read(self):
34-
if not any(self.buffer):
35-
raise BufferEmptyException
36-
data = chr(self.buffer[self.read_point])
37-
self.buffer[self.read_point] = 0
38-
self.read_point = (self.read_point + 1) % len(self.buffer)
39-
return data
40-
41-
def overwrite(self, data):
42-
self._update_buffer(data)
43-
if all(self.buffer) and self.read_point == self.write_point:
44-
self.read_point = (self.read_point + 1) % len(self.buffer)
45-
self.write_point = (self.write_point + 1) % len(self.buffer)
1+
from datastructures.circular_buffer.circular_buffer import CircularBuffer
2+
from datastructures.circular_buffer.exceptions import (
3+
BufferFullException,
4+
BufferEmptyException,
5+
)
6+
7+
__all__ = [
8+
"CircularBuffer",
9+
"BufferFullException",
10+
"BufferEmptyException",
11+
]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from datastructures.circular_buffer.exceptions import (
2+
BufferFullException,
3+
BufferEmptyException,
4+
)
5+
6+
7+
class CircularBuffer:
8+
def __init__(self, size):
9+
self.buffer = bytearray(size)
10+
self.read_point = 0
11+
self.write_point = 0
12+
13+
def _update_buffer(self, data):
14+
"""
15+
Protected helper method for Python 2/3
16+
"""
17+
try:
18+
self.buffer[self.write_point] = data
19+
except TypeError:
20+
self.buffer[self.write_point] = ord(data)
21+
22+
def clear(self):
23+
self.buffer = bytearray(len(self.buffer))
24+
25+
def write(self, data):
26+
if all(self.buffer):
27+
raise BufferFullException
28+
self._update_buffer(data)
29+
self.write_point = (self.write_point + 1) % len(self.buffer)
30+
31+
def read(self):
32+
if not any(self.buffer):
33+
raise BufferEmptyException
34+
data = chr(self.buffer[self.read_point])
35+
self.buffer[self.read_point] = 0
36+
self.read_point = (self.read_point + 1) % len(self.buffer)
37+
return data
38+
39+
def overwrite(self, data):
40+
self._update_buffer(data)
41+
if all(self.buffer) and self.read_point == self.write_point:
42+
self.read_point = (self.read_point + 1) % len(self.buffer)
43+
self.write_point = (self.write_point + 1) % len(self.buffer)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class BufferFullException(Exception):
2+
pass
3+
4+
5+
class BufferEmptyException(Exception):
6+
pass
File renamed without changes.
Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,6 @@
1-
from typing import Deque
2-
from collections import deque
1+
from datastructures.streams.moving_average.moving_average_with_buffer import (
2+
MovingAverageWithBuffer,
3+
)
4+
from datastructures.streams.moving_average.moving_average import MovingAverage
35

4-
5-
class MovingAverage:
6-
def __init__(self, size):
7-
"""
8-
Initializes the moving average object
9-
Args:
10-
size (int): The size of the moving average
11-
"""
12-
self.queue: Deque[int] = deque()
13-
self.size: int = size
14-
self.window_sum: float = 0.0
15-
16-
def next(self, val: int) -> float:
17-
"""
18-
Adds a value to the stream and returns the moving average of the stream
19-
Args:
20-
val (int): The value to add to the stream
21-
Returns:
22-
float: The moving average of the stream
23-
"""
24-
if len(self.queue) == self.size:
25-
# remove oldest value
26-
oldest_value = self.queue.popleft()
27-
self.window_sum -= oldest_value
28-
29-
# add new value to queue
30-
self.queue.append(val)
31-
self.window_sum += val
32-
33-
# calculate average
34-
return self.window_sum / len(self.queue)
6+
__all__ = ["MovingAverage", "MovingAverageWithBuffer"]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
class ExponentialMovingAverage:
2+
"""
3+
The Exponential Moving Average (EMA) is widely used in finance and signal processing because it reacts faster to
4+
recent price changes than a simple moving average.
5+
6+
The beauty of the EMA is its efficiency: it does not require a buffer or a window of previous values. You only need
7+
to store the previous EMA result. This makes the time and space complexity both O(1).
8+
9+
Why use EMA?
10+
1. Reduced Lag: Because it weights the most recent data most heavily, it catches trend reversals much sooner than an SMA.
11+
2. Memory Efficiency: You don't need to store a list of numbers; you only need to store one variable (self.ema).
12+
3. Smoothness: It creates a smooth curve that isn't as sensitive to an old "outlier" dropping out of the window (a common issue with SMA).
13+
"""
14+
15+
def __init__(self, size: int):
16+
self.size = size
17+
self.alpha = 2 / (size + 1)
18+
self.ema = None # Initialized with the first value received
19+
20+
def next(self, val: int) -> float:
21+
if self.ema is None:
22+
# The first value acts as the starting point
23+
self.ema = float(val)
24+
else:
25+
# Apply the EMA formula
26+
self.ema = (val * self.alpha) + (self.ema * (1 - self.alpha))
27+
28+
return self.ema
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Deque
2+
from collections import deque
3+
4+
5+
class MovingAverage:
6+
def __init__(self, size: int):
7+
"""
8+
Initializes the moving average object
9+
Args:
10+
size (int): The size of the moving average
11+
"""
12+
self.queue: Deque[int] = deque()
13+
self.size: int = size
14+
self.window_sum: float = 0.0
15+
16+
def next(self, val: int) -> float:
17+
"""
18+
Adds a value to the stream and returns the moving average of the stream
19+
Args:
20+
val (int): The value to add to the stream
21+
Returns:
22+
float: The moving average of the stream
23+
"""
24+
if len(self.queue) == self.size:
25+
# remove oldest value
26+
oldest_value = self.queue.popleft()
27+
self.window_sum -= oldest_value
28+
29+
# add new value to queue
30+
self.queue.append(val)
31+
self.window_sum += val
32+
33+
# calculate average
34+
return self.window_sum / len(self.queue)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
class MovingAverageWithBuffer:
2+
"""
3+
Using a Circular Buffer (implemented with a fixed-size list) is an excellent way to optimize memory. Instead of
4+
dynamically resizing or shifting elements, we use a fixed array and a pointer that "wraps around" using the modulo
5+
operator (index(modsize)).
6+
7+
This approach is often preferred in embedded systems or high-performance scenarios because it avoids the overhead of
8+
frequent memory allocations.
9+
10+
The expression self.head = (self.head + 1) % self.size ensures that if our size is 3, the pointer sequence will be:
11+
0 → 1 → 2 → 0 → 1...
12+
13+
This effectively "recycles" the array positions, making it behave like a continuous loop.
14+
"""
15+
16+
def __init__(self, size: int):
17+
# Pre-allocate a list of zeros
18+
self.size = size
19+
self.buffer = [0] * size
20+
self.head = 0 # Pointer to the next position to overwrite
21+
self.count = 0 # Track how many elements we've actually added
22+
self.current_sum = 0.0
23+
24+
def next(self, val: int) -> float:
25+
# If the buffer is full, subtract the value we are about to overwrite
26+
if self.count == self.size:
27+
self.current_sum -= self.buffer[self.head]
28+
else:
29+
self.count += 1
30+
31+
# Overwrite the old value at the head pointer
32+
self.buffer[self.head] = val
33+
self.current_sum += val
34+
35+
# Move the pointer to the next index, wrapping around if at the end
36+
self.head = (self.head + 1) % self.size
37+
38+
return self.current_sum / self.count

datastructures/streams/moving_average/test_moving_average.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
from typing import List, Tuple
33
from parameterized import parameterized
44
from datastructures.streams.moving_average import MovingAverage
5+
from datastructures.streams.moving_average.moving_average_with_buffer import (
6+
MovingAverageWithBuffer,
7+
)
58

69

710
TEST_CASES = [
@@ -22,6 +25,16 @@ def test_moving_average(self, size: int, data_to_expected: List[Tuple[int, float
2225
round(actual, 5)
2326
self.assertEqual(expected, round(actual, 5))
2427

28+
@parameterized.expand(TEST_CASES)
29+
def test_moving_average_with_buffer(
30+
self, size: int, data_to_expected: List[Tuple[int, float]]
31+
):
32+
moving_average = MovingAverageWithBuffer(size)
33+
for data, expected in data_to_expected:
34+
actual = moving_average.next(data)
35+
round(actual, 5)
36+
self.assertEqual(expected, round(actual, 5))
37+
2538

2639
if __name__ == "__main__":
2740
unittest.main()
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
class WeightedMovingAverage:
2+
"""
3+
Weight Assignment: The most recent value always gets the maximum weight (self.size), and weights
4+
decrease as we look further back in time.
5+
6+
The Denominator: Unlike the simple moving average where you divide by the count, here you divide by the sum of the
7+
weights applied.
8+
9+
For a full window of size 3, weights are 3, 2, and 1. Sum = 6.
10+
Before the window is full, we use a current_denominator to ensure accuracy.
11+
12+
The WMA is slightly more expensive (O(size) per next() call) because we have to re-sum the weighted values each time.
13+
If you need O(1) performance for a weighted average, you might look into an Exponential Moving Average (EMA), which
14+
uses a smoothing factor (α) to give more weight to recent data without needing to store the full history.
15+
"""
16+
17+
def __init__(self, size: int):
18+
self.size = size
19+
self.buffer = [0] * size
20+
self.head = 0
21+
self.count = 0
22+
# The denominator is the sum of weights: 1 + 2 + ... + size
23+
# Formula: (n * (n + 1)) / 2
24+
self.denominator = (size * (size + 1)) / 2
25+
26+
def next(self, val: int) -> float:
27+
# 1. Update the buffer
28+
self.buffer[self.head] = val
29+
self.head = (self.head + 1) % self.size
30+
if self.count < self.size:
31+
self.count += 1
32+
33+
# 2. Calculate Weighted Sum
34+
weighted_sum = 0.0
35+
current_denominator = 0
36+
37+
# Iterate backward from the most recent element
38+
for i in range(self.count):
39+
# Find index of elements from newest to oldest
40+
# (self.head - 1 - i) handles the circular wrap-around
41+
idx = (self.head - 1 - i) % self.size
42+
weight = self.size - i
43+
weighted_sum += self.buffer[idx] * weight
44+
current_denominator += weight
45+
46+
return weighted_sum / current_denominator

0 commit comments

Comments
 (0)