|
| 1 | +import numpy as np |
| 2 | +from unittest import TestCase |
| 3 | +import pandas as pd |
| 4 | +import polars as pl |
| 5 | +from pyindicators import bollinger_bands, bollinger_width, bollinger_overshoot |
| 6 | + |
| 7 | + |
| 8 | +class TestBollingerBands(TestCase): |
| 9 | + |
| 10 | + def test_bollinger_bands_pandas(self): |
| 11 | + """Test basic Bollinger Bands calculation with pandas.""" |
| 12 | + df = pd.DataFrame({ |
| 13 | + "Close": [100, 102, 104, 103, 105, 107, 106, 108, 110, 109, |
| 14 | + 111, 113, 112, 114, 116, 115, 117, 119, 118, 120] |
| 15 | + }) |
| 16 | + result = bollinger_bands(df.copy(), period=10, std_dev=2) |
| 17 | + |
| 18 | + self.assertIn('bollinger_middle', result.columns) |
| 19 | + self.assertIn('bollinger_upper', result.columns) |
| 20 | + self.assertIn('bollinger_lower', result.columns) |
| 21 | + |
| 22 | + # First 9 values should be NaN (period=10) |
| 23 | + self.assertTrue(pd.isna(result['bollinger_middle'].iloc[8])) |
| 24 | + self.assertFalse(pd.isna(result['bollinger_middle'].iloc[9])) |
| 25 | + |
| 26 | + # Upper band should be greater than middle, middle greater than lower |
| 27 | + valid_idx = 10 |
| 28 | + self.assertGreater( |
| 29 | + result['bollinger_upper'].iloc[valid_idx], |
| 30 | + result['bollinger_middle'].iloc[valid_idx] |
| 31 | + ) |
| 32 | + self.assertGreater( |
| 33 | + result['bollinger_middle'].iloc[valid_idx], |
| 34 | + result['bollinger_lower'].iloc[valid_idx] |
| 35 | + ) |
| 36 | + |
| 37 | + def test_bollinger_bands_polars(self): |
| 38 | + """Test basic Bollinger Bands calculation with polars.""" |
| 39 | + df = pl.DataFrame({ |
| 40 | + "Close": [100, 102, 104, 103, 105, 107, 106, 108, 110, 109, |
| 41 | + 111, 113, 112, 114, 116, 115, 117, 119, 118, 120] |
| 42 | + }) |
| 43 | + result = bollinger_bands(df, period=10, std_dev=2) |
| 44 | + |
| 45 | + self.assertIn('bollinger_middle', result.columns) |
| 46 | + self.assertIn('bollinger_upper', result.columns) |
| 47 | + self.assertIn('bollinger_lower', result.columns) |
| 48 | + |
| 49 | + |
| 50 | +class TestBollingerWidth(TestCase): |
| 51 | + |
| 52 | + def test_bollinger_width_pandas(self): |
| 53 | + """Test Bollinger Width calculation with pandas.""" |
| 54 | + df = pd.DataFrame({ |
| 55 | + "Close": [100, 102, 104, 103, 105, 107, 106, 108, 110, 109, |
| 56 | + 111, 113, 112, 114, 116, 115, 117, 119, 118, 120] |
| 57 | + }) |
| 58 | + result = bollinger_width(df.copy(), period=10, std_dev=2) |
| 59 | + |
| 60 | + self.assertIn('Bollinger_Width', result.columns) |
| 61 | + # Width should be positive |
| 62 | + valid_width = result['Bollinger_Width'].dropna() |
| 63 | + self.assertTrue(all(valid_width > 0)) |
| 64 | + |
| 65 | + def test_bollinger_width_polars(self): |
| 66 | + """Test Bollinger Width calculation with polars.""" |
| 67 | + df = pl.DataFrame({ |
| 68 | + "Close": [100, 102, 104, 103, 105, 107, 106, 108, 110, 109, |
| 69 | + 111, 113, 112, 114, 116, 115, 117, 119, 118, 120] |
| 70 | + }) |
| 71 | + result = bollinger_width(df, period=10, std_dev=2) |
| 72 | + |
| 73 | + self.assertIn('Bollinger_Width', result.columns) |
| 74 | + |
| 75 | + |
| 76 | +class TestBollingerOvershoot(TestCase): |
| 77 | + |
| 78 | + def test_overshoot_within_bands_pandas(self): |
| 79 | + """Test that overshoot is 0 when price is within bands.""" |
| 80 | + # Stable prices with minimal volatility - price should stay within bands |
| 81 | + df = pd.DataFrame({ |
| 82 | + "Close": [100.0] * 30 |
| 83 | + }) |
| 84 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 85 | + |
| 86 | + self.assertIn('bollinger_overshoot', result.columns) |
| 87 | + # All valid values should be 0 (price exactly at middle band) |
| 88 | + valid_overshoot = result['bollinger_overshoot'].iloc[10:] |
| 89 | + self.assertTrue(all(valid_overshoot == 0)) |
| 90 | + |
| 91 | + def test_overshoot_above_upper_band_pandas(self): |
| 92 | + """Test positive overshoot when price exceeds upper band.""" |
| 93 | + # Start with stable prices, then spike up |
| 94 | + prices = [100.0] * 20 + [100.0, 100.0, 100.0, 100.0, 100.0, 150.0] |
| 95 | + df = pd.DataFrame({"Close": prices}) |
| 96 | + |
| 97 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 98 | + |
| 99 | + # The last value should have a positive overshoot (price spiked above band) |
| 100 | + last_overshoot = result['bollinger_overshoot'].iloc[-1] |
| 101 | + self.assertGreater(last_overshoot, 0) |
| 102 | + |
| 103 | + def test_overshoot_below_lower_band_pandas(self): |
| 104 | + """Test negative overshoot when price drops below lower band.""" |
| 105 | + # Start with stable prices, then drop sharply |
| 106 | + prices = [100.0] * 20 + [100.0, 100.0, 100.0, 100.0, 100.0, 50.0] |
| 107 | + df = pd.DataFrame({"Close": prices}) |
| 108 | + |
| 109 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 110 | + |
| 111 | + # The last value should have a negative overshoot (price dropped below band) |
| 112 | + last_overshoot = result['bollinger_overshoot'].iloc[-1] |
| 113 | + self.assertLess(last_overshoot, 0) |
| 114 | + |
| 115 | + def test_overshoot_calculation_accuracy_pandas(self): |
| 116 | + """Test that overshoot percentage is calculated correctly.""" |
| 117 | + # Create a scenario with stable prices followed by a significant spike |
| 118 | + # The spike should create a measurable overshoot |
| 119 | + prices = [100.0] * 25 + [130.0] # 30% spike after stable period |
| 120 | + df = pd.DataFrame({"Close": prices}) |
| 121 | + |
| 122 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 123 | + |
| 124 | + # Also calculate bands to verify manually |
| 125 | + bands = bollinger_bands(df.copy(), period=10, std_dev=2) |
| 126 | + |
| 127 | + last_price = bands['Close'].iloc[-1] |
| 128 | + last_upper = bands['bollinger_upper'].iloc[-1] |
| 129 | + last_middle = bands['bollinger_middle'].iloc[-1] |
| 130 | + half_band_width = last_upper - last_middle |
| 131 | + |
| 132 | + # Manual calculation |
| 133 | + expected_overshoot = ((last_price - last_upper) / half_band_width) * 100 |
| 134 | + |
| 135 | + actual_overshoot = result['bollinger_overshoot'].iloc[-1] |
| 136 | + |
| 137 | + # Verify the calculation matches |
| 138 | + self.assertAlmostEqual(actual_overshoot, expected_overshoot, places=5) |
| 139 | + # Should be positive (above upper band) |
| 140 | + self.assertGreater(actual_overshoot, 0) |
| 141 | + |
| 142 | + def test_overshoot_with_datetime_index_pandas(self): |
| 143 | + """Test that overshoot works correctly with DatetimeIndex.""" |
| 144 | + dates = pd.date_range('2024-01-01', periods=30, freq='D') |
| 145 | + prices = [100.0] * 20 + [100.0, 100.0, 100.0, 100.0, 100.0, 150.0, 100.0, 100.0, 50.0, 100.0] |
| 146 | + df = pd.DataFrame({"Close": prices}, index=dates) |
| 147 | + |
| 148 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 149 | + |
| 150 | + self.assertIn('bollinger_overshoot', result.columns) |
| 151 | + # Should have both positive and negative overshoots |
| 152 | + self.assertTrue(any(result['bollinger_overshoot'] > 0)) |
| 153 | + self.assertTrue(any(result['bollinger_overshoot'] < 0)) |
| 154 | + |
| 155 | + def test_overshoot_polars(self): |
| 156 | + """Test overshoot calculation with polars DataFrame.""" |
| 157 | + # Start with stable prices, then spike up, then drop |
| 158 | + prices = [100.0] * 20 + [100.0, 100.0, 150.0, 100.0, 50.0] |
| 159 | + df = pl.DataFrame({"Close": prices}) |
| 160 | + |
| 161 | + result = bollinger_overshoot(df, period=10, std_dev=2) |
| 162 | + |
| 163 | + self.assertIn('bollinger_overshoot', result.columns) |
| 164 | + |
| 165 | + overshoot_values = result['bollinger_overshoot'].to_list() |
| 166 | + # Should have both positive and negative overshoots |
| 167 | + self.assertTrue(any(v > 0 for v in overshoot_values if v is not None)) |
| 168 | + self.assertTrue(any(v < 0 for v in overshoot_values if v is not None)) |
| 169 | + |
| 170 | + def test_overshoot_custom_column_names(self): |
| 171 | + """Test overshoot with custom source and result column names.""" |
| 172 | + df = pd.DataFrame({ |
| 173 | + "Price": [100.0] * 20 + [150.0] |
| 174 | + }) |
| 175 | + |
| 176 | + result = bollinger_overshoot( |
| 177 | + df.copy(), |
| 178 | + source_column='Price', |
| 179 | + period=10, |
| 180 | + std_dev=2, |
| 181 | + result_column='my_overshoot' |
| 182 | + ) |
| 183 | + |
| 184 | + self.assertIn('my_overshoot', result.columns) |
| 185 | + self.assertNotIn('bollinger_overshoot', result.columns) |
| 186 | + |
| 187 | + def test_overshoot_different_std_dev(self): |
| 188 | + """Test that different std_dev values affect overshoot.""" |
| 189 | + prices = [100.0] * 20 + [120.0] |
| 190 | + df = pd.DataFrame({"Close": prices}) |
| 191 | + |
| 192 | + # With std_dev=1, bands are tighter, so overshoot should be larger |
| 193 | + result_std1 = bollinger_overshoot(df.copy(), period=10, std_dev=1) |
| 194 | + # With std_dev=3, bands are wider, so overshoot should be smaller |
| 195 | + result_std3 = bollinger_overshoot(df.copy(), period=10, std_dev=3) |
| 196 | + |
| 197 | + overshoot_std1 = result_std1['bollinger_overshoot'].iloc[-1] |
| 198 | + overshoot_std3 = result_std3['bollinger_overshoot'].iloc[-1] |
| 199 | + |
| 200 | + # Overshoot with tighter bands should be larger (or equal if within bands) |
| 201 | + self.assertGreaterEqual(overshoot_std1, overshoot_std3) |
| 202 | + |
| 203 | + def test_overshoot_no_temp_columns_remain(self): |
| 204 | + """Test that temporary columns are removed from result.""" |
| 205 | + df = pd.DataFrame({ |
| 206 | + "Close": [100.0] * 25 |
| 207 | + }) |
| 208 | + |
| 209 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 210 | + |
| 211 | + # Temporary columns should not be in result |
| 212 | + self.assertNotIn('BB_middle_temp', result.columns) |
| 213 | + self.assertNotIn('BB_upper_temp', result.columns) |
| 214 | + self.assertNotIn('BB_lower_temp', result.columns) |
| 215 | + |
| 216 | + def test_overshoot_preserves_original_columns(self): |
| 217 | + """Test that original DataFrame columns are preserved.""" |
| 218 | + df = pd.DataFrame({ |
| 219 | + "Close": [100.0] * 25, |
| 220 | + "Volume": [1000] * 25, |
| 221 | + "Open": [99.0] * 25 |
| 222 | + }) |
| 223 | + |
| 224 | + result = bollinger_overshoot(df.copy(), period=10, std_dev=2) |
| 225 | + |
| 226 | + self.assertIn('Close', result.columns) |
| 227 | + self.assertIn('Volume', result.columns) |
| 228 | + self.assertIn('Open', result.columns) |
| 229 | + self.assertIn('bollinger_overshoot', result.columns) |
| 230 | + |
| 231 | + def test_overshoot_real_world_scenario(self): |
| 232 | + """Test overshoot with a realistic price movement scenario.""" |
| 233 | + # Simulate: trending up, then sharp spike (like silver 40% overshoot) |
| 234 | + np.random.seed(42) |
| 235 | + base_trend = np.linspace(100, 120, 50) |
| 236 | + noise = np.random.normal(0, 1, 50) |
| 237 | + prices = base_trend + noise |
| 238 | + |
| 239 | + # Add a sharp spike at the end |
| 240 | + prices[-1] = prices[-2] + 15 # Sharp spike |
| 241 | + |
| 242 | + df = pd.DataFrame({"Close": prices}) |
| 243 | + result = bollinger_overshoot(df.copy(), period=20, std_dev=2) |
| 244 | + |
| 245 | + # Should detect significant positive overshoot at the spike |
| 246 | + last_overshoot = result['bollinger_overshoot'].iloc[-1] |
| 247 | + self.assertGreater(last_overshoot, 0) |
| 248 | + print(f"Real-world scenario overshoot: {last_overshoot:.2f}%") |
| 249 | + |
0 commit comments