Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions quantflow_tests/test_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
from mcp.server.fastmcp import FastMCP

from quantflow.ai import server as ai_server
from quantflow.ai.tools import charts, crypto, fred, stocks, vault
from quantflow.ai.tools.base import McpTool
from quantflow.data.vault import Vault
Expand Down Expand Up @@ -459,3 +460,28 @@ async def test_ascii_chart_empty(
with patch("quantflow.ai.tools.base.FMP", return_value=mock_fmp):
result = await charts_server.call_tool("ascii_chart", {"symbol": "FAKE"})
assert "No price data" in text(result)


def test_create_server_registers_all_tools() -> None:
fake_tool = MagicMock()
with (
patch("quantflow.ai.server.McpTool", return_value=fake_tool),
patch("quantflow.ai.server.vault.register") as vault_register,
patch("quantflow.ai.server.crypto.register") as crypto_register,
patch("quantflow.ai.server.stocks.register") as stocks_register,
patch("quantflow.ai.server.fred.register") as fred_register,
patch("quantflow.ai.server.charts.register") as charts_register,
):
mcp = ai_server.create_server()
vault_register.assert_called_once_with(mcp, fake_tool)
crypto_register.assert_called_once_with(mcp, fake_tool)
stocks_register.assert_called_once_with(mcp, fake_tool)
fred_register.assert_called_once_with(mcp, fake_tool)
charts_register.assert_called_once_with(mcp, fake_tool)


def test_main_runs_server() -> None:
mock_server = MagicMock()
with patch("quantflow.ai.server.create_server", return_value=mock_server):
ai_server.main()
mock_server.run.assert_called_once_with()
37 changes: 37 additions & 0 deletions quantflow_tests/test_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import date
from typing import AsyncIterator
from unittest.mock import AsyncMock

import pytest

Expand Down Expand Up @@ -59,3 +61,38 @@ async def __test_fiscal_data() -> None:
assert df is not None
assert df.shape[0] > 0
assert df.shape[1] == 2


async def test_fiscal_securities_builds_previous_month_filter() -> None:
fd = FiscalData()
fd.get_all = AsyncMock(return_value=[{"a": 1}]) # type: ignore[method-assign]
df = await fd.securities(record_date=date(2024, 3, 15))
fd.get_all.assert_awaited_once_with(
"/v1/debt/mspd/mspd_table_3_market",
{"filter": "record_date:eq:2024-02-29"},
)
assert len(df) == 1


async def test_fiscal_get_all_multi_page() -> None:
fd = FiscalData()
fd.get = AsyncMock( # type: ignore[method-assign]
side_effect=[
{
"data": [{"id": 1}],
"links": {"next": "/v1/debt/mspd/mspd_table_3_market?page=2"},
},
{"data": [{"id": 2}], "links": {"next": None}},
]
)
data = await fd.get_all("/v1/debt/mspd/mspd_table_3_market", {"a": "b"})
assert data == [{"id": 1}, {"id": 2}]
assert fd.get.await_count == 2


async def test_fiscal_get_all_single_page_without_links() -> None:
fd = FiscalData()
fd.get = AsyncMock(return_value={"data": [{"id": 7}]}) # type: ignore[method-assign]
data = await fd.get_all("/v1/debt/mspd/mspd_table_3_market", {"a": "b"})
assert data == [{"id": 7}]
fd.get.assert_awaited_once()
69 changes: 67 additions & 2 deletions quantflow_tests/test_divfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
try:
import torch

from quantflow.options.divfm.network import DIVFMNetwork
from quantflow.options.divfm.trainer import DayData, DIVFMTrainer
from quantflow.options.divfm.network import (
DIVFMNetwork,
)
from quantflow.options.divfm.network import _extract_subnet as extract_subnet_torch
from quantflow.options.divfm.network import _make_subnet as make_subnet_torch
from quantflow.options.divfm.trainer import DayData, DIVFMTrainer, _day_loss

has_torch = True
except ImportError:
Expand Down Expand Up @@ -191,6 +195,12 @@ def test_network_default_construction() -> None:
assert net.extra_features == 0


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_network_minimum_factors_validation() -> None:
with pytest.raises(ValueError, match="at least 3"):
DIVFMNetwork(num_factors=2)


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_network_forward_shape() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
Expand All @@ -202,6 +212,28 @@ def test_network_forward_shape() -> None:
assert (out[:, 0] == 1.0).all() # f_1 = 1


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_make_subnet_layout() -> None:
subnet = make_subnet_torch(2, 4, 2, 3)
modules = list(subnet.children())
assert isinstance(modules[0], torch.nn.Linear)
assert isinstance(modules[1], torch.nn.Sigmoid)
assert isinstance(modules[2], torch.nn.BatchNorm1d)
assert isinstance(modules[-1], torch.nn.BatchNorm1d)
assert modules[-1].affine is False


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_extract_subnet_output_structure() -> None:
subnet = make_subnet_torch(2, 4, 1, 1)
subnet.eval()
extracted = extract_subnet_torch(subnet)
assert isinstance(extracted, SubnetWeights)
assert len(extracted.layers) == 2
assert extracted.layers[0].apply_activation is True
assert extracted.layers[-1].apply_activation is False


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_to_weights_forward_matches_network() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
Expand All @@ -220,6 +252,14 @@ def test_to_weights_forward_matches_network() -> None:
np.testing.assert_allclose(torch_out, numpy_out, atol=1e-5)


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_to_weights_without_joint_subnet() -> None:
net = DIVFMNetwork(num_factors=3, hidden_size=HIDDEN_SIZE)
weights = net.to_weights()
assert weights.subnet_joint is None
assert weights.num_factors == 3


# ---------------------------------------------------------------------------
# DIVFMTrainer tests (requires torch)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -252,6 +292,14 @@ def test_trainer_construction() -> None:
assert trainer.network is net


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_day_loss_non_negative() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
day = _make_days(num_days=1)[0]
loss = _day_loss(net, day, ridge=1e-6)
assert float(loss.detach().item()) >= 0.0


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_trainer_step_returns_loss() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
Expand All @@ -272,6 +320,13 @@ def test_trainer_evaluate() -> None:
assert val_loss >= 0.0


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_trainer_evaluate_empty_days() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
trainer = DIVFMTrainer(net)
assert trainer.evaluate([]) == 0.0


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_trainer_fit_loss_decreases() -> None:
"""Loss should decrease over training steps on a structured IV surface.
Expand All @@ -296,6 +351,16 @@ def test_trainer_fit_loss_decreases() -> None:
assert np.mean(losses[-10:]) < np.mean(losses[:10])


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_trainer_fit_with_validation_days() -> None:
torch.manual_seed(1)
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
trainer = DIVFMTrainer(net, lr=1e-2, batch_days=4)
days = _make_days(num_days=8)
losses = trainer.fit(days, num_steps=5, val_days=days[:2], log_every=2)
assert len(losses) == 5


@pytest.mark.skipif(not has_torch, reason="torch not installed")
def test_trainer_to_weights_produces_pricer() -> None:
net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE)
Expand Down
75 changes: 75 additions & 0 deletions quantflow_tests/test_fmp_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from datetime import date
from unittest.mock import AsyncMock

import pandas as pd

from quantflow.data.fmp import FMP, nice_sector_performance, summary_sector_performance


def test_freq_crate_and_join_and_params() -> None:
assert FMP.freq.crate(None) == FMP.freq.daily
assert FMP.freq.crate("1hour") == FMP.freq.one_hour
assert FMP.freq.crate("bad") == FMP.freq.daily

fmp = FMP(key="k")
assert fmp.join("AAPL", "MSFT") == "AAPL,MSFT"
assert fmp.params({"a": 1}) == {"params": {"a": 1, "apikey": "k"}}


async def test_prices_daily_and_intraday_paths() -> None:
fmp = FMP(key="k")
fmp.get_path = AsyncMock(return_value=[{"date": "2024-01-01", "close": 100}]) # type: ignore[method-assign]
df = await fmp.prices("AAPL", frequency=None, convert_to_date=True)
assert isinstance(df, pd.DataFrame)
assert str(df["date"].dtype).startswith("datetime64")
fmp.get_path.assert_awaited_with(
"historical-price-eod/full",
params={"symbol": "AAPL"},
)

fmp.get_path = AsyncMock(return_value={"historical": [{"date": "2024-01-01"}]}) # type: ignore[method-assign]
await fmp.prices("AAPL", frequency="1hour")
fmp.get_path.assert_awaited_with(
"historical-chart/1hour",
params={"frequency": "1hour", "symbol": "AAPL"},
)


async def test_sector_performance_summary_and_timeseries() -> None:
fmp = FMP(key="k")
fmp.get_path = AsyncMock( # type: ignore[method-assign]
side_effect=[
[{"sector": "Tech", "changesPercentage": "1.23%"}],
[
{"date": "2024-01-01", "technologyChangesPercentage": 1.0},
{"date": "2024-01-02", "technologyChangesPercentage": 2.0},
],
]
)
snapshot = await fmp.sector_performance()
assert isinstance(snapshot, dict)
assert str(snapshot["Tech"]) == "1.23"

summary = await fmp.sector_performance(from_date=date(2024, 1, 1), summary=True)
assert isinstance(summary, dict)
assert str(summary["Technology"]) == "3.02"


def test_sector_helpers() -> None:
nice = dict(
nice_sector_performance(
{"date": "2024-01-01", "consumerStaplesChangesPercentage": 1.5}
)
)
assert nice["date"] == date(2024, 1, 1)
assert nice["Consumer Staples"] == 1.5

summary = summary_sector_performance(
[
{"date": date(2024, 1, 1), "Tech": 1.0},
{"date": date(2024, 1, 2), "Tech": 2.0},
]
)
assert str(summary["Tech"]) == "3.02"
95 changes: 95 additions & 0 deletions quantflow_tests/test_heston_calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import numpy as np

from quantflow.options.calibration import (
DoubleHestonCalibration,
DoubleHestonJCalibration,
HestonCalibration,
HestonJCalibration,
)
from quantflow.options.pricer import OptionPricer
from quantflow.sp.heston import DoubleHeston, DoubleHestonJ, Heston, HestonJ
from quantflow.utils.distributions import DoubleExponential


def test_heston_calibration_get_set_and_penalize(vol_surface) -> None:
cal: HestonCalibration = HestonCalibration(
pricer=OptionPricer(
model=Heston.create(vol=0.4, kappa=2.0, sigma=0.6, rho=-0.4)
),
vol_surface=vol_surface,
)
params = cal.get_params()
assert len(params) == 5
updated = np.array([0.1, 0.1, 1.0, 1.0, -0.2], dtype=float)
cal.set_params(updated)
assert np.allclose(cal.get_params(), updated)
assert cal.penalize() >= 0.0
bounds = cal.get_bounds()
assert len(bounds.lb) == 5
assert len(bounds.ub) == 5


def test_hestonj_calibration_get_set_and_bounds(vol_surface) -> None:
model = HestonJ.create(
DoubleExponential,
vol=0.3,
kappa=1.5,
sigma=0.4,
rho=-0.3,
jump_fraction=0.2,
jump_asymmetry=0.1,
)
cal: HestonJCalibration = HestonJCalibration(
pricer=OptionPricer(model=model), vol_surface=vol_surface
)
params = cal.get_params()
cal.set_params(params)
assert np.allclose(cal.get_params(), params)
bounds = cal.get_bounds()
assert len(bounds.lb) == len(bounds.ub) == len(params)


def test_double_heston_calibration_param_logic(vol_surface) -> None:
model = DoubleHeston(
heston1=Heston.create(vol=0.3, kappa=2.0, sigma=0.4, rho=-0.2),
heston2=Heston.create(vol=0.25, kappa=1.0, sigma=0.3, rho=-0.4),
)
cal: DoubleHestonCalibration = DoubleHestonCalibration(
pricer=OptionPricer(model=model), vol_surface=vol_surface
)
params = cal.get_params()
assert len(params) == 10
cal.set_params(params)
assert (
cal.model.heston1.variance_process.kappa
>= cal.model.heston2.variance_process.kappa
)
assert cal.penalize() >= 0.0
assert len(cal.feller_residuals()) == 2
assert cal.maturity_split() > 0.0


def test_double_hestonj_calibration_get_set_and_bounds(vol_surface) -> None:
model = DoubleHestonJ(
heston1=HestonJ.create(
DoubleExponential,
vol=0.3,
kappa=2.5,
sigma=0.4,
rho=-0.2,
jump_fraction=0.2,
jump_asymmetry=0.1,
),
heston2=Heston.create(vol=0.25, kappa=1.2, sigma=0.3, rho=-0.4),
)
cal: DoubleHestonJCalibration = DoubleHestonJCalibration(
pricer=OptionPricer(model=model),
vol_surface=vol_surface,
)
params = cal.get_params()
cal.set_params(params)
assert np.allclose(cal.get_params(), params)
bounds = cal.get_bounds()
assert len(bounds.lb) == len(bounds.ub) == len(params)
Loading
Loading