diff --git a/quantflow_tests/test_ai.py b/quantflow_tests/test_ai.py index 41d09996..9de7bfec 100644 --- a/quantflow_tests/test_ai.py +++ b/quantflow_tests/test_ai.py @@ -10,6 +10,7 @@ import pytest from mcp.server.fastmcp import FastMCP +from quantflow.ai import server as ai_server from quantflow.ai.tools import charts, crypto, fred, stocks, vault from quantflow.ai.tools.base import McpTool from quantflow.data.vault import Vault @@ -459,3 +460,28 @@ async def test_ascii_chart_empty( with patch("quantflow.ai.tools.base.FMP", return_value=mock_fmp): result = await charts_server.call_tool("ascii_chart", {"symbol": "FAKE"}) assert "No price data" in text(result) + + +def test_create_server_registers_all_tools() -> None: + fake_tool = MagicMock() + with ( + patch("quantflow.ai.server.McpTool", return_value=fake_tool), + patch("quantflow.ai.server.vault.register") as vault_register, + patch("quantflow.ai.server.crypto.register") as crypto_register, + patch("quantflow.ai.server.stocks.register") as stocks_register, + patch("quantflow.ai.server.fred.register") as fred_register, + patch("quantflow.ai.server.charts.register") as charts_register, + ): + mcp = ai_server.create_server() + vault_register.assert_called_once_with(mcp, fake_tool) + crypto_register.assert_called_once_with(mcp, fake_tool) + stocks_register.assert_called_once_with(mcp, fake_tool) + fred_register.assert_called_once_with(mcp, fake_tool) + charts_register.assert_called_once_with(mcp, fake_tool) + + +def test_main_runs_server() -> None: + mock_server = MagicMock() + with patch("quantflow.ai.server.create_server", return_value=mock_server): + ai_server.main() + mock_server.run.assert_called_once_with() diff --git a/quantflow_tests/test_data.py b/quantflow_tests/test_data.py index 055e3f9e..ff749d77 100644 --- a/quantflow_tests/test_data.py +++ b/quantflow_tests/test_data.py @@ -1,4 +1,6 @@ +from datetime import date from typing import AsyncIterator +from unittest.mock import AsyncMock import pytest @@ -59,3 +61,38 @@ async def __test_fiscal_data() -> None: assert df is not None assert df.shape[0] > 0 assert df.shape[1] == 2 + + +async def test_fiscal_securities_builds_previous_month_filter() -> None: + fd = FiscalData() + fd.get_all = AsyncMock(return_value=[{"a": 1}]) # type: ignore[method-assign] + df = await fd.securities(record_date=date(2024, 3, 15)) + fd.get_all.assert_awaited_once_with( + "/v1/debt/mspd/mspd_table_3_market", + {"filter": "record_date:eq:2024-02-29"}, + ) + assert len(df) == 1 + + +async def test_fiscal_get_all_multi_page() -> None: + fd = FiscalData() + fd.get = AsyncMock( # type: ignore[method-assign] + side_effect=[ + { + "data": [{"id": 1}], + "links": {"next": "/v1/debt/mspd/mspd_table_3_market?page=2"}, + }, + {"data": [{"id": 2}], "links": {"next": None}}, + ] + ) + data = await fd.get_all("/v1/debt/mspd/mspd_table_3_market", {"a": "b"}) + assert data == [{"id": 1}, {"id": 2}] + assert fd.get.await_count == 2 + + +async def test_fiscal_get_all_single_page_without_links() -> None: + fd = FiscalData() + fd.get = AsyncMock(return_value={"data": [{"id": 7}]}) # type: ignore[method-assign] + data = await fd.get_all("/v1/debt/mspd/mspd_table_3_market", {"a": "b"}) + assert data == [{"id": 7}] + fd.get.assert_awaited_once() diff --git a/quantflow_tests/test_divfm.py b/quantflow_tests/test_divfm.py index bb0b8411..7b121f28 100644 --- a/quantflow_tests/test_divfm.py +++ b/quantflow_tests/test_divfm.py @@ -10,8 +10,12 @@ try: import torch - from quantflow.options.divfm.network import DIVFMNetwork - from quantflow.options.divfm.trainer import DayData, DIVFMTrainer + from quantflow.options.divfm.network import ( + DIVFMNetwork, + ) + from quantflow.options.divfm.network import _extract_subnet as extract_subnet_torch + from quantflow.options.divfm.network import _make_subnet as make_subnet_torch + from quantflow.options.divfm.trainer import DayData, DIVFMTrainer, _day_loss has_torch = True except ImportError: @@ -191,6 +195,12 @@ def test_network_default_construction() -> None: assert net.extra_features == 0 +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_network_minimum_factors_validation() -> None: + with pytest.raises(ValueError, match="at least 3"): + DIVFMNetwork(num_factors=2) + + @pytest.mark.skipif(not has_torch, reason="torch not installed") def test_network_forward_shape() -> None: net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) @@ -202,6 +212,28 @@ def test_network_forward_shape() -> None: assert (out[:, 0] == 1.0).all() # f_1 = 1 +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_make_subnet_layout() -> None: + subnet = make_subnet_torch(2, 4, 2, 3) + modules = list(subnet.children()) + assert isinstance(modules[0], torch.nn.Linear) + assert isinstance(modules[1], torch.nn.Sigmoid) + assert isinstance(modules[2], torch.nn.BatchNorm1d) + assert isinstance(modules[-1], torch.nn.BatchNorm1d) + assert modules[-1].affine is False + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_extract_subnet_output_structure() -> None: + subnet = make_subnet_torch(2, 4, 1, 1) + subnet.eval() + extracted = extract_subnet_torch(subnet) + assert isinstance(extracted, SubnetWeights) + assert len(extracted.layers) == 2 + assert extracted.layers[0].apply_activation is True + assert extracted.layers[-1].apply_activation is False + + @pytest.mark.skipif(not has_torch, reason="torch not installed") def test_to_weights_forward_matches_network() -> None: net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) @@ -220,6 +252,14 @@ def test_to_weights_forward_matches_network() -> None: np.testing.assert_allclose(torch_out, numpy_out, atol=1e-5) +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_to_weights_without_joint_subnet() -> None: + net = DIVFMNetwork(num_factors=3, hidden_size=HIDDEN_SIZE) + weights = net.to_weights() + assert weights.subnet_joint is None + assert weights.num_factors == 3 + + # --------------------------------------------------------------------------- # DIVFMTrainer tests (requires torch) # --------------------------------------------------------------------------- @@ -252,6 +292,14 @@ def test_trainer_construction() -> None: assert trainer.network is net +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_day_loss_non_negative() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + day = _make_days(num_days=1)[0] + loss = _day_loss(net, day, ridge=1e-6) + assert float(loss.detach().item()) >= 0.0 + + @pytest.mark.skipif(not has_torch, reason="torch not installed") def test_trainer_step_returns_loss() -> None: net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) @@ -272,6 +320,13 @@ def test_trainer_evaluate() -> None: assert val_loss >= 0.0 +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_evaluate_empty_days() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net) + assert trainer.evaluate([]) == 0.0 + + @pytest.mark.skipif(not has_torch, reason="torch not installed") def test_trainer_fit_loss_decreases() -> None: """Loss should decrease over training steps on a structured IV surface. @@ -296,6 +351,16 @@ def test_trainer_fit_loss_decreases() -> None: assert np.mean(losses[-10:]) < np.mean(losses[:10]) +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_fit_with_validation_days() -> None: + torch.manual_seed(1) + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net, lr=1e-2, batch_days=4) + days = _make_days(num_days=8) + losses = trainer.fit(days, num_steps=5, val_days=days[:2], log_every=2) + assert len(losses) == 5 + + @pytest.mark.skipif(not has_torch, reason="torch not installed") def test_trainer_to_weights_produces_pricer() -> None: net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) diff --git a/quantflow_tests/test_fmp_unit.py b/quantflow_tests/test_fmp_unit.py new file mode 100644 index 00000000..4f0200d6 --- /dev/null +++ b/quantflow_tests/test_fmp_unit.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from datetime import date +from unittest.mock import AsyncMock + +import pandas as pd + +from quantflow.data.fmp import FMP, nice_sector_performance, summary_sector_performance + + +def test_freq_crate_and_join_and_params() -> None: + assert FMP.freq.crate(None) == FMP.freq.daily + assert FMP.freq.crate("1hour") == FMP.freq.one_hour + assert FMP.freq.crate("bad") == FMP.freq.daily + + fmp = FMP(key="k") + assert fmp.join("AAPL", "MSFT") == "AAPL,MSFT" + assert fmp.params({"a": 1}) == {"params": {"a": 1, "apikey": "k"}} + + +async def test_prices_daily_and_intraday_paths() -> None: + fmp = FMP(key="k") + fmp.get_path = AsyncMock(return_value=[{"date": "2024-01-01", "close": 100}]) # type: ignore[method-assign] + df = await fmp.prices("AAPL", frequency=None, convert_to_date=True) + assert isinstance(df, pd.DataFrame) + assert str(df["date"].dtype).startswith("datetime64") + fmp.get_path.assert_awaited_with( + "historical-price-eod/full", + params={"symbol": "AAPL"}, + ) + + fmp.get_path = AsyncMock(return_value={"historical": [{"date": "2024-01-01"}]}) # type: ignore[method-assign] + await fmp.prices("AAPL", frequency="1hour") + fmp.get_path.assert_awaited_with( + "historical-chart/1hour", + params={"frequency": "1hour", "symbol": "AAPL"}, + ) + + +async def test_sector_performance_summary_and_timeseries() -> None: + fmp = FMP(key="k") + fmp.get_path = AsyncMock( # type: ignore[method-assign] + side_effect=[ + [{"sector": "Tech", "changesPercentage": "1.23%"}], + [ + {"date": "2024-01-01", "technologyChangesPercentage": 1.0}, + {"date": "2024-01-02", "technologyChangesPercentage": 2.0}, + ], + ] + ) + snapshot = await fmp.sector_performance() + assert isinstance(snapshot, dict) + assert str(snapshot["Tech"]) == "1.23" + + summary = await fmp.sector_performance(from_date=date(2024, 1, 1), summary=True) + assert isinstance(summary, dict) + assert str(summary["Technology"]) == "3.02" + + +def test_sector_helpers() -> None: + nice = dict( + nice_sector_performance( + {"date": "2024-01-01", "consumerStaplesChangesPercentage": 1.5} + ) + ) + assert nice["date"] == date(2024, 1, 1) + assert nice["Consumer Staples"] == 1.5 + + summary = summary_sector_performance( + [ + {"date": date(2024, 1, 1), "Tech": 1.0}, + {"date": date(2024, 1, 2), "Tech": 2.0}, + ] + ) + assert str(summary["Tech"]) == "3.02" diff --git a/quantflow_tests/test_heston_calibration.py b/quantflow_tests/test_heston_calibration.py new file mode 100644 index 00000000..1c841db5 --- /dev/null +++ b/quantflow_tests/test_heston_calibration.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import numpy as np + +from quantflow.options.calibration import ( + DoubleHestonCalibration, + DoubleHestonJCalibration, + HestonCalibration, + HestonJCalibration, +) +from quantflow.options.pricer import OptionPricer +from quantflow.sp.heston import DoubleHeston, DoubleHestonJ, Heston, HestonJ +from quantflow.utils.distributions import DoubleExponential + + +def test_heston_calibration_get_set_and_penalize(vol_surface) -> None: + cal: HestonCalibration = HestonCalibration( + pricer=OptionPricer( + model=Heston.create(vol=0.4, kappa=2.0, sigma=0.6, rho=-0.4) + ), + vol_surface=vol_surface, + ) + params = cal.get_params() + assert len(params) == 5 + updated = np.array([0.1, 0.1, 1.0, 1.0, -0.2], dtype=float) + cal.set_params(updated) + assert np.allclose(cal.get_params(), updated) + assert cal.penalize() >= 0.0 + bounds = cal.get_bounds() + assert len(bounds.lb) == 5 + assert len(bounds.ub) == 5 + + +def test_hestonj_calibration_get_set_and_bounds(vol_surface) -> None: + model = HestonJ.create( + DoubleExponential, + vol=0.3, + kappa=1.5, + sigma=0.4, + rho=-0.3, + jump_fraction=0.2, + jump_asymmetry=0.1, + ) + cal: HestonJCalibration = HestonJCalibration( + pricer=OptionPricer(model=model), vol_surface=vol_surface + ) + params = cal.get_params() + cal.set_params(params) + assert np.allclose(cal.get_params(), params) + bounds = cal.get_bounds() + assert len(bounds.lb) == len(bounds.ub) == len(params) + + +def test_double_heston_calibration_param_logic(vol_surface) -> None: + model = DoubleHeston( + heston1=Heston.create(vol=0.3, kappa=2.0, sigma=0.4, rho=-0.2), + heston2=Heston.create(vol=0.25, kappa=1.0, sigma=0.3, rho=-0.4), + ) + cal: DoubleHestonCalibration = DoubleHestonCalibration( + pricer=OptionPricer(model=model), vol_surface=vol_surface + ) + params = cal.get_params() + assert len(params) == 10 + cal.set_params(params) + assert ( + cal.model.heston1.variance_process.kappa + >= cal.model.heston2.variance_process.kappa + ) + assert cal.penalize() >= 0.0 + assert len(cal.feller_residuals()) == 2 + assert cal.maturity_split() > 0.0 + + +def test_double_hestonj_calibration_get_set_and_bounds(vol_surface) -> None: + model = DoubleHestonJ( + heston1=HestonJ.create( + DoubleExponential, + vol=0.3, + kappa=2.5, + sigma=0.4, + rho=-0.2, + jump_fraction=0.2, + jump_asymmetry=0.1, + ), + heston2=Heston.create(vol=0.25, kappa=1.2, sigma=0.3, rho=-0.4), + ) + cal: DoubleHestonJCalibration = DoubleHestonJCalibration( + pricer=OptionPricer(model=model), + vol_surface=vol_surface, + ) + params = cal.get_params() + cal.set_params(params) + assert np.allclose(cal.get_params(), params) + bounds = cal.get_bounds() + assert len(bounds.lb) == len(bounds.ub) == len(params) diff --git a/quantflow_tests/test_parity.py b/quantflow_tests/test_parity.py new file mode 100644 index 00000000..2136b2bf --- /dev/null +++ b/quantflow_tests/test_parity.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from decimal import Decimal + +import pytest + +from quantflow.options.parity import PutCallParities, PutCallParity +from quantflow.utils.price import Price + + +def _parity(strike: float, cp_mid: float, inverse: bool = False) -> PutCallParity: + call = Price(bid=Decimal("1.0"), ask=Decimal("1.0")) + put_value = Decimal(str(1.0 - cp_mid)) + put = Price(bid=put_value, ask=put_value) + return PutCallParity( + strike=Decimal(str(strike)), call=call, put=put, inverse=inverse + ) + + +def test_regressand_and_regressor_direct() -> None: + parities = PutCallParities.from_parities( + [_parity(90, 0.2), _parity(110, 0.0)], spot=100, ttm=1 + ) + y = parities.regressand() + x = parities.regressor() + assert y[0] == pytest.approx(0.002) + assert y[1] == pytest.approx(0.0) + assert x[0] == pytest.approx(0.9) + assert x[1] == pytest.approx(1.1) + + +def test_regressand_inverse() -> None: + parities = PutCallParities.from_parities( + [_parity(90, 0.2, inverse=True), _parity(110, 0.0, inverse=True)], + spot=100, + ttm=1, + ) + y = parities.regressand() + assert y[0] == pytest.approx(0.2) + assert y[1] == pytest.approx(0.0) + + +def test_fit_discounts_with_fixed_values() -> None: + parities = PutCallParities.from_parities( + [_parity(90, 12.5), _parity(110, -6.5)], + 100, + 1, + ) + fitted_both = parities.fit_discounts(dq=0.95, da=0.98) + assert fitted_both is not None + assert fitted_both.quote_discount == pytest.approx(0.95) + assert fitted_both.asset_discount == pytest.approx(0.98) + + fitted_da = parities.fit_discounts(dq=0.95) + assert fitted_da is not None + assert fitted_da.asset_discount == pytest.approx(0.98) + + fitted_dq = parities.fit_discounts(da=0.98) + assert fitted_dq is not None + assert fitted_dq.quote_discount == pytest.approx(0.95) + + +def test_fit_discounts_constrained_branch() -> None: + da_true = 0.98 + dq_true = 0.95 + spot = 100 + strikes = [90, 100, 110, 120] + mids = [spot * (da_true - dq_true * (k / spot)) for k in strikes] + parities = PutCallParities.from_parities( + [_parity(k, m) for k, m in zip(strikes, mids)], spot=spot, ttm=1 + ) + fitted = parities.fit_discounts() + assert fitted is not None + assert fitted.asset_discount == pytest.approx(da_true, abs=1e-6) + assert fitted.quote_discount == pytest.approx(dq_true, abs=1e-6) + + +def test_fit_discounts_invalid_or_empty_returns_none() -> None: + empty = PutCallParities.from_parities([], spot=100, ttm=1) + assert empty.fit_discounts() is None + + parities = PutCallParities.from_parities([_parity(100, 2.0)], spot=100, ttm=1) + assert parities.fit_discounts(dq=1.0, min_rate_q=0.1, min_rate_a=0.1) is None diff --git a/quantflow_tests/test_rates_options.py b/quantflow_tests/test_rates_options.py new file mode 100644 index 00000000..11f8b64b --- /dev/null +++ b/quantflow_tests/test_rates_options.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from typing import Literal + +import numpy as np +import pytest +from numpy.typing import ArrayLike +from scipy.optimize import Bounds + +from quantflow.rates.options import OptionsDiscountingCalibration, YieldCurveCalibration +from quantflow.rates.yield_curve import YieldCurve +from quantflow.utils.types import FloatArray, FloatArrayLike, maybe_float + + +class ExponentialCurve(YieldCurve): + curve_type: Literal["exp_curve"] = "exp_curve" + rate: float = 0.05 + + def instantaneous_forward_rate(self, ttm: FloatArrayLike) -> FloatArrayLike: + arr = np.asarray(ttm, dtype=float) + result = np.full_like(arr, self.rate) + return maybe_float(result) + + def discount_factor(self, ttm: FloatArrayLike) -> FloatArrayLike: + arr = np.asarray(ttm, dtype=float) + result = np.exp(-self.rate * arr) + return maybe_float(result) + + def jacobian(self, ttm: FloatArrayLike) -> FloatArray | None: + arr = np.asarray(ttm, dtype=float) + return (-arr * np.exp(-self.rate * arr)).reshape(-1, 1) + + @classmethod + def calibrate(cls, ttm: ArrayLike, rates: ArrayLike) -> "ExponentialCurve": + return cls(rate=float(np.mean(np.asarray(rates, dtype=float)))) + + +class ExponentialCurveCalibration(YieldCurveCalibration[ExponentialCurve]): + def get_params(self) -> FloatArray: + return np.array([self.yield_curve.rate], dtype=float) + + def set_params(self, params: FloatArray) -> None: + self.yield_curve.rate = float(params[0]) + + def get_bounds(self) -> Bounds: + return Bounds([0.0], [1.0]) + + +def _ttm_strikes_cp( + asset_rate: float, quote_rate: float +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + ttm = np.array([0.5, 1.0, 2.0, 3.0], dtype=float) + strikes = np.array([0.9, 1.0, 1.1, 1.2], dtype=float) + da = np.exp(-asset_rate * ttm) + dq = np.exp(-quote_rate * ttm) + cp = da - dq * strikes + return ttm, strikes, cp + + +def test_yield_curve_calibration_base_calibrate() -> None: + ttm = np.array([0.5, 1.0, 2.0, 3.0], dtype=float) + true_rate = 0.03 + target = np.exp(-true_rate * ttm) + cal = ExponentialCurveCalibration(yield_curve=ExponentialCurve(rate=0.10)) + fitted = cal.calibrate(ttm, target) + assert fitted.rate == pytest.approx(true_rate, abs=1e-4) + + +def test_options_discounting_joint_calibration() -> None: + ttm, strikes, cp = _ttm_strikes_cp(asset_rate=0.02, quote_rate=0.04) + asset_cal = ExponentialCurveCalibration(yield_curve=ExponentialCurve(rate=0.08)) + quote_cal = ExponentialCurveCalibration(yield_curve=ExponentialCurve(rate=0.09)) + calibration = OptionsDiscountingCalibration( + asset_curve=asset_cal, + quote_curve=quote_cal, + cp=cp, + strikes=strikes, + ttm=ttm, + ) + asset_curve, quote_curve = calibration.calibrate() + assert isinstance(asset_curve, ExponentialCurve) + assert isinstance(quote_curve, ExponentialCurve) + assert asset_curve.rate == pytest.approx(0.02, abs=1e-3) + assert quote_curve.rate == pytest.approx(0.04, abs=1e-3) + + +def test_options_discounting_asset_only_calibration() -> None: + ttm, strikes, cp = _ttm_strikes_cp(asset_rate=0.02, quote_rate=0.04) + asset_cal = ExponentialCurveCalibration(yield_curve=ExponentialCurve(rate=0.12)) + fixed_quote = ExponentialCurve(rate=0.04) + calibration = OptionsDiscountingCalibration( + asset_curve=asset_cal, + quote_curve=fixed_quote, + cp=cp, + strikes=strikes, + ttm=ttm, + ) + asset_curve, quote_curve = calibration.calibrate() + assert isinstance(asset_curve, ExponentialCurve) + assert isinstance(quote_curve, ExponentialCurve) + assert asset_curve.rate == pytest.approx(0.02, abs=1e-3) + assert quote_curve.rate == pytest.approx(0.04, abs=1e-9) + + +def test_options_discounting_quote_only_calibration() -> None: + ttm, strikes, cp = _ttm_strikes_cp(asset_rate=0.02, quote_rate=0.04) + fixed_asset = ExponentialCurve(rate=0.02) + quote_cal = ExponentialCurveCalibration(yield_curve=ExponentialCurve(rate=0.10)) + calibration = OptionsDiscountingCalibration( + asset_curve=fixed_asset, + quote_curve=quote_cal, + cp=cp, + strikes=strikes, + ttm=ttm, + ) + asset_curve, quote_curve = calibration.calibrate() + assert isinstance(asset_curve, ExponentialCurve) + assert isinstance(quote_curve, ExponentialCurve) + assert asset_curve.rate == pytest.approx(0.02, abs=1e-9) + assert quote_curve.rate == pytest.approx(0.04, abs=1e-3) + + +def test_options_discounting_both_fixed_returns_inputs() -> None: + ttm, strikes, cp = _ttm_strikes_cp(asset_rate=0.02, quote_rate=0.04) + fixed_asset = ExponentialCurve(rate=0.02) + fixed_quote = ExponentialCurve(rate=0.04) + calibration = OptionsDiscountingCalibration( + asset_curve=fixed_asset, + quote_curve=fixed_quote, + cp=cp, + strikes=strikes, + ttm=ttm, + ) + asset_curve, quote_curve = calibration.calibrate() + assert asset_curve is fixed_asset + assert quote_curve is fixed_quote diff --git a/quantflow_tests/test_ta_filters.py b/quantflow_tests/test_ta_filters.py new file mode 100644 index 00000000..8b5b47e6 --- /dev/null +++ b/quantflow_tests/test_ta_filters.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import pytest + +from quantflow.ta.ewma import EWMA +from quantflow.ta.kalman import KalmanFilter +from quantflow.ta.supersmoother import SuperSmoother + + +def test_ewma_initialization_and_alpha() -> None: + ewma = EWMA(period=10) + assert ewma.current_value is None + assert 0.0 < ewma.alpha < 1.0 + + +def test_ewma_first_and_second_update() -> None: + ewma = EWMA(period=5) + first = ewma.update(10.0) + second = ewma.update(20.0) + assert first == 10.0 + assert 10.0 < second < 20.0 + assert ewma.current_value == second + + +def test_ewma_asymmetric_tau_branch() -> None: + ewma = EWMA(period=4, tau=1.0) + ewma.update(10.0) + down = ewma.update(0.0) + up = ewma.update(20.0) + assert down == 10.0 + assert up > down + + +def test_ewma_factory_methods() -> None: + ewma_half_life = EWMA.from_half_life(half_life=2.0) + ewma_alpha = EWMA.from_alpha(alpha=0.5) + assert ewma_half_life.period >= 1 + assert ewma_alpha.period >= 1 + + +def test_kalman_initialization_and_first_update() -> None: + kf = KalmanFilter(R=1.0, Q=0.1) + assert kf.value() is None + first = kf.update(5.0) + assert first == 5.0 + assert kf.value() == 5.0 + assert kf.error_covariance == 1.0 + + +def test_kalman_steady_updates_and_properties() -> None: + kf = KalmanFilter(R=1.0, Q=0.1) + kf.update(10.0) + updated = kf.update(12.0) + assert 10.0 < updated < 12.0 + assert 0.0 < kf.kalman_gain < 1.0 + assert kf.error_covariance > 0.0 + + +def test_supersmoother_initialization() -> None: + smoother = SuperSmoother(period=10) + assert smoother.raw_value() is None + assert smoother.value() is None + + +def test_supersmoother_first_two_updates() -> None: + smoother = SuperSmoother(period=10) + first = smoother.update(1.0) + second = smoother.update(3.0) + assert first == 1.0 + assert second == pytest.approx(2.0) + assert smoother.raw_value() == 3.0 + assert smoother.value() == second + + +def test_supersmoother_steady_updates() -> None: + smoother = SuperSmoother(period=8) + smoother.update(1.0) + smoother.update(3.0) + third = smoother.update(5.0) + assert isinstance(third, float) + assert smoother.value() == third diff --git a/quantflow_tests/test_utils_dates_bins.py b/quantflow_tests/test_utils_dates_bins.py new file mode 100644 index 00000000..d0b01299 --- /dev/null +++ b/quantflow_tests/test_utils_dates_bins.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from datetime import date, datetime, timezone + +import numpy as np +import pandas as pd +import pytest + +from quantflow.utils.bins import event_density, pdf +from quantflow.utils.dates import as_date, as_utc, isoformat, start_of_day, to_date_iso + + +def test_pdf_with_default_bins() -> None: + data = np.array([0.0, 1.0, 2.0, 3.0]) + result = pdf(data) + assert "pdf" in result.columns + assert len(result) > 0 + + +def test_pdf_with_delta_and_symmetric() -> None: + data = np.array([-1.0, -0.5, 0.5, 1.0]) + result = pdf(data, delta=0.5, symmetric=0.0) + assert result.index.min() < 0.0 + assert result.index.max() > 0.0 + + +def test_pdf_invalid_num_bins_and_delta() -> None: + with pytest.raises(ValueError, match="Cannot specify both"): + pdf(np.array([1.0, 2.0]), num_bins=10, delta=0.1) + with pytest.raises(ValueError, match="greater than 1"): + pdf(np.array([1.0, 2.0]), num_bins=1) + + +def test_event_density() -> None: + df = pd.DataFrame({"a": [0, 1, 1, 2], "b": [1, 1, 2, 2]}) + density = event_density(df, columns=["a", "b"], num=3) + assert np.array_equal(density["n"], np.array([0, 1, 2])) + assert np.isclose(np.sum(density["a"]), 1.0) + assert np.isclose(np.sum(density["b"]), 1.0) + + +def test_date_helpers() -> None: + dt = datetime(2024, 1, 2, 15, 30, tzinfo=timezone.utc) + assert as_utc(dt) == dt + assert as_utc(date(2024, 1, 2)) == datetime(2024, 1, 2, tzinfo=timezone.utc) + assert isoformat("2024-01-02") == "2024-01-02" + assert isoformat(date(2024, 1, 2)) == "2024-01-02" + assert start_of_day(dt) == datetime(2024, 1, 2, tzinfo=timezone.utc) + assert as_date(dt) == date(2024, 1, 2) + assert as_date(date(2024, 1, 3)) == date(2024, 1, 3) + assert to_date_iso(date(2024, 1, 2)) == "2024-01-02" + assert to_date_iso("2024-01-02") == "2024-01-02" + assert to_date_iso(None) is None diff --git a/quantflow_tests/test_vasicek_curve.py b/quantflow_tests/test_vasicek_curve.py new file mode 100644 index 00000000..6c9234d6 --- /dev/null +++ b/quantflow_tests/test_vasicek_curve.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from decimal import Decimal + +import numpy as np +import pytest + +from quantflow.rates.vasicek import VasicekCurve + + +def test_vasicek_process_mapping() -> None: + curve = VasicekCurve( + rate=Decimal("0.03"), + kappa=Decimal("1.2"), + theta=Decimal("0.04"), + sigma=Decimal("0.1"), + ) + process = curve.process() + assert process.rate == pytest.approx(0.03) + assert process.kappa == pytest.approx(1.2) + assert process.theta == pytest.approx(0.04) + assert process.bdlp.sigma == pytest.approx(0.1) + + +def test_vasicek_forward_and_discount_shapes() -> None: + curve = VasicekCurve( + rate=Decimal("0.03"), + kappa=Decimal("0.8"), + theta=Decimal("0.05"), + sigma=Decimal("0.1"), + ) + ttms = np.array([0.0, 0.5, 1.0, 2.0]) + fwd = np.asarray(curve.instantaneous_forward_rate(ttms)) + df = np.asarray(curve.discount_factor(ttms)) + assert fwd.shape == ttms.shape + assert df.shape == ttms.shape + assert df[0] == pytest.approx(1.0) + assert df[-1] < df[1] + + +def test_vasicek_calibrate_recovers_curve() -> None: + true_curve = VasicekCurve( + rate=Decimal("0.03"), + kappa=Decimal("1.5"), + theta=Decimal("0.04"), + sigma=Decimal("0.06"), + ) + ttm = np.array([0.25, 0.5, 1.0, 2.0, 3.0, 5.0], dtype=float) + rates = -np.log(np.asarray(true_curve.discount_factor(ttm))) / ttm + fitted = VasicekCurve.calibrate(ttm, rates) + fitted_df = np.asarray(fitted.discount_factor(ttm)) + true_df = np.asarray(true_curve.discount_factor(ttm)) + np.testing.assert_allclose(fitted_df, true_df, atol=3e-3)