diff --git a/docusaurus/docs/Getting Started/deployment.md b/docusaurus/docs/Getting Started/deployment.md index d7674061..e7943830 100644 --- a/docusaurus/docs/Getting Started/deployment.md +++ b/docusaurus/docs/Getting Started/deployment.md @@ -4,517 +4,195 @@ sidebar_position: 10 # Deployment -Learn how to deploy your trading algorithms to production environments. +Learn how to deploy your trading algorithms to AWS Lambda or Azure Functions using the built-in CLI. ## Overview -Deploying trading algorithms requires careful consideration of infrastructure, security, monitoring, and risk management. This guide covers best practices for moving from development to production. +The Investing Algorithm Framework includes a CLI tool (`iaf`) that handles scaffolding and deployment to cloud platforms. Two deployment targets are supported out of the box: -## Deployment Options +- **AWS Lambda** — Serverless deployment using boto3, with an S3 bucket for state persistence. +- **Azure Functions** — Serverless deployment using the Azure SDK, with blob storage for state persistence. -### 1. Local Deployment +## Scaffolding a Project -Run your algorithm on a local machine or server: +Use `iaf init` to generate a project skeleton for your chosen deployment target: -```python -from investing_algorithm_framework import create_app +```bash +# Default project (local execution) +iaf init + +# Project with a web interface +iaf init --type default_web -# Create production app -app = create_app(config_file="production.yaml") +# AWS Lambda project +iaf init --type aws_lambda -# Start the algorithm -if __name__ == "__main__": - app.start_trading() +# Azure Function project +iaf init --type azure_function ``` -**Pros:** -- Full control over environment -- Lower latency to exchanges -- Cost-effective for smaller operations +### Options -**Cons:** -- Single point of failure -- Requires manual monitoring -- Infrastructure management overhead +| Option | Default | Description | +|--------|---------|-------------| +| `--type` | `default` | Project type: `default`, `default_web`, `aws_lambda`, or `azure_function`. | +| `--path` | Current directory | Path to the directory where the project will be created. | +| `--replace` | `False` | If set, existing files will be overwritten. | -### 2. Cloud Deployment +Each template generates the appropriate entry point, requirements file, configuration files, and deployment scaffolding for the chosen platform. -Deploy to cloud platforms like AWS, Google Cloud, or Azure: +## Deploying to AWS Lambda -```dockerfile -# Dockerfile -FROM python:3.9-slim +The `iaf deploy-aws-lambda` command packages your project, creates (or updates) an AWS Lambda function, and sets up an S3 bucket for state persistence. -WORKDIR /app +### Prerequisites -# Copy requirements and install dependencies -COPY requirements.txt . -RUN pip install -r requirements.txt +- AWS credentials configured (via `aws configure` or environment variables) +- Python 3.10+ and `boto3` installed +- Docker installed (for building the deployment package) -# Copy application code -COPY . . +### Command -# Run the trading algorithm -CMD ["python", "main.py"] +```bash +iaf deploy-aws-lambda \ + --lambda_function_name my-trading-bot \ + --region us-east-1 ``` -**Pros:** -- High availability and scalability -- Managed infrastructure -- Built-in monitoring and logging - -**Cons:** -- Higher costs -- Potential latency issues -- Vendor lock-in - -### 3. VPS Deployment - -Use a Virtual Private Server for dedicated resources: - -```bash -# Example deployment script -#!/bin/bash +### Options -# Update system -sudo apt update && sudo apt upgrade -y +| Option | Required | Default | Description | +|--------|----------|---------|-------------| +| `--lambda_function_name` | Yes | — | Name of the Lambda function to create or update. | +| `--region` | Yes | — | AWS region (e.g., `us-east-1`, `eu-west-1`). | +| `--project_dir` | No | Current directory | Path to the project directory containing your code. | +| `--memory_size` | No | `3000` | Memory allocation in MB for the Lambda function. | +| `-e KEY VALUE` | No | — | Environment variables. Can be repeated: `-e API_KEY xxx -e SECRET yyy`. | -# Install Python and dependencies -sudo apt install python3 python3-pip git -y +### What It Does -# Clone your trading bot -git clone https://github.com/yourusername/your-trading-bot.git -cd your-trading-bot +1. Packages your project code into a deployment zip. +2. Creates an IAM role for Lambda execution (if it doesn't exist). +3. Creates an S3 bucket for state storage (named after the function). +4. Deploys the Lambda function with the specified memory and environment variables. +5. Sets the `AWS_S3_STATE_BUCKET_NAME` environment variable on the function automatically. -# Install dependencies -pip3 install -r requirements.txt +### Example -# Create systemd service -sudo systemctl enable trading-bot -sudo systemctl start trading-bot +```bash +# Deploy with environment variables for exchange credentials +iaf deploy-aws-lambda \ + --lambda_function_name btc-trading-bot \ + --region eu-west-1 \ + --memory_size 3000 \ + -e BITVAVO_API_KEY your_key \ + -e BITVAVO_API_SECRET your_secret ``` -## Production Configuration +## Deploying to Azure Functions -### Environment Configuration +The `iaf deploy-azure-function` command deploys your project as an Azure Function App, creating the necessary resource group, storage account, and function app. -Create separate configuration files for different environments: +### Prerequisites -```yaml -# production.yaml -environment: production -debug: false +- Azure CLI installed and authenticated (`az login`), or use `--skip_login` in CI/CD +- Azure Functions Core Tools installed (`npm install -g azure-functions-core-tools@4`) +- Python 3.10+ -database: - uri: "postgresql://user:pass@host:5432/trading_prod" - -exchanges: - binance: - api_key: "${BINANCE_API_KEY}" - api_secret: "${BINANCE_API_SECRET}" - sandbox: false +### Command -portfolio: - initial_balance: 10000 - risk_per_trade: 0.02 - max_drawdown: 0.15 - -logging: - level: INFO - file: "/var/log/trading-bot/app.log" -``` - -```yaml -# staging.yaml -environment: staging -debug: true - -database: - uri: "sqlite:///staging.db" - -exchanges: - binance: - api_key: "${BINANCE_TESTNET_KEY}" - api_secret: "${BINANCE_TESTNET_SECRET}" - sandbox: true - -portfolio: - initial_balance: 1000 - risk_per_trade: 0.05 - max_drawdown: 0.20 - -logging: - level: DEBUG - file: "staging.log" +```bash +iaf deploy-azure-function \ + --resource_group my-resource-group \ + --deployment_name my-trading-bot \ + --region westeurope ``` -### Environment Variables +### Options -Use environment variables for sensitive information: +| Option | Required | Default | Description | +|--------|----------|---------|-------------| +| `--resource_group` | Yes | — | Azure resource group name. | +| `--deployment_name` | Yes | — | Name for the Function App. | +| `--region` | Yes | — | Azure region (e.g., `westeurope`, `eastus`). | +| `--subscription_id` | No | Default subscription | Azure subscription ID. | +| `--storage_account_name` | No | Auto-generated | Name for the Azure Storage account. | +| `--container_name` | No | `iafcontainer` | Blob container name for state storage. | +| `--create_resource_group_if_not_exists` | No | `False` | Create the resource group if it doesn't exist. | +| `--skip_login` | No | `False` | Skip `az login` (useful for CI/CD pipelines). | -```python -import os -from investing_algorithm_framework import create_app +### What It Does -# Load configuration from environment -config = { - "database_uri": os.getenv("DATABASE_URI"), - "api_key": os.getenv("EXCHANGE_API_KEY"), - "api_secret": os.getenv("EXCHANGE_API_SECRET"), - "webhook_url": os.getenv("WEBHOOK_URL"), -} +1. Verifies Azure Functions Core Tools are installed. +2. Creates the resource group (if `--create_resource_group_if_not_exists` is set). +3. Creates or reuses a storage account and blob container for state. +4. Deploys the Function App using Azure Functions Core Tools. +5. Reads `.env` file from your project directory and sets those values as Function App configuration. -app = create_app(config=config) -``` +### Example ```bash -# .env file (never commit to git) -DATABASE_URI=postgresql://user:pass@localhost:5432/trading -EXCHANGE_API_KEY=your_api_key_here -EXCHANGE_API_SECRET=your_api_secret_here -WEBHOOK_URL=https://hooks.slack.com/your_webhook +# Deploy with a new resource group +iaf deploy-azure-function \ + --resource_group trading-bots-rg \ + --deployment_name btc-trader \ + --region westeurope \ + --create_resource_group_if_not_exists ``` -## Security Best Practices - -### API Key Management - -```python -from cryptography.fernet import Fernet - -class SecureConfig: - def __init__(self): - self.encryption_key = os.getenv("ENCRYPTION_KEY") - self.cipher = Fernet(self.encryption_key) - - def decrypt_api_key(self, encrypted_key): - return self.cipher.decrypt(encrypted_key.encode()).decode() - - def get_exchange_credentials(self): - return { - "api_key": self.decrypt_api_key(os.getenv("ENCRYPTED_API_KEY")), - "api_secret": self.decrypt_api_key(os.getenv("ENCRYPTED_API_SECRET")) - } -``` +## Project Templates -### Network Security - -```python -# Use HTTPS and verify SSL certificates -import requests -import ssl - -# Verify SSL certificates -ssl_context = ssl.create_default_context() -ssl_context.check_hostname = True -ssl_context.verify_mode = ssl.CERT_REQUIRED - -# Configure exchange with security settings -exchange_config = { - "enableRateLimit": True, - "timeout": 30000, - "ssl_verify": True, - "requests_session": requests.Session() -} -``` +When you run `iaf init`, the framework generates different files depending on the `--type`: -## Monitoring and Alerting - -### Health Checks - -```python -from flask import Flask, jsonify -import threading - -class HealthMonitor: - def __init__(self, app): - self.app = app - self.is_healthy = True - self.last_heartbeat = datetime.now() - - def start_health_endpoint(self): - """Start health check endpoint""" - health_app = Flask(__name__) - - @health_app.route("/health") - def health_check(): - return jsonify({ - "status": "healthy" if self.is_healthy else "unhealthy", - "last_heartbeat": self.last_heartbeat.isoformat(), - "uptime": (datetime.now() - self.app.start_time).total_seconds() - }) - - health_app.run(host="0.0.0.0", port=8080) - - def heartbeat(self): - """Update heartbeat timestamp""" - self.last_heartbeat = datetime.now() -``` +### Default (`default`) -### Logging Configuration - -```python -import logging -import logging.handlers - -def setup_production_logging(): - """Configure logging for production""" - - # Create logger - logger = logging.getLogger("trading_bot") - logger.setLevel(logging.INFO) - - # File handler with rotation - file_handler = logging.handlers.RotatingFileHandler( - "/var/log/trading-bot/app.log", - maxBytes=10*1024*1024, # 10MB - backupCount=5 - ) - - # Console handler - console_handler = logging.StreamHandler() - - # Formatter - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - - logger.addHandler(file_handler) - logger.addHandler(console_handler) - - return logger -``` +- `app.py` — Main entry point with `create_app()` and `app.start()` +- `strategy.py` — Example `TradingStrategy` subclass +- `data_providers.py` — Example data provider setup +- `requirements.txt` — Python dependencies +- `.env.example` — Template for environment variables +- `.gitignore` — Standard Python gitignore -### Alerting System - -```python -import requests -import smtplib -from email.mime.text import MIMEText - -class AlertManager: - def __init__(self, slack_webhook=None, email_config=None): - self.slack_webhook = slack_webhook - self.email_config = email_config - - def send_alert(self, message, severity="INFO"): - """Send alert via multiple channels""" - - if severity == "CRITICAL": - self.send_slack_alert(f"🚨 CRITICAL: {message}") - self.send_email_alert(f"CRITICAL ALERT: {message}") - elif severity == "WARNING": - self.send_slack_alert(f"⚠️ WARNING: {message}") - else: - self.send_slack_alert(f"ℹ️ INFO: {message}") - - def send_slack_alert(self, message): - if self.slack_webhook: - payload = {"text": message} - requests.post(self.slack_webhook, json=payload) - - def send_email_alert(self, message): - if self.email_config: - msg = MIMEText(message) - msg['Subject'] = "Trading Bot Alert" - msg['From'] = self.email_config['from'] - msg['To'] = self.email_config['to'] - - # Send email logic here -``` +### AWS Lambda (`aws_lambda`) -## Performance Optimization +Everything from `default`, plus: +- `app.py` — Lambda handler entry point +- `Dockerfile` — Container image for Lambda deployment +- `.dockerignore` — Files to exclude from the Docker image +- `requirements.txt` — Includes `boto3` and framework dependencies +- `README.md` — Lambda-specific deployment instructions -### Database Optimization +### Azure Function (`azure_function`) -```python -from sqlalchemy import create_engine -from sqlalchemy.pool import QueuePool +Everything from `default`, plus: +- `function_app.py` — Azure Function entry point +- `host.json` — Azure Functions host configuration +- `local.settings.json` — Local development settings +- `requirements.txt` — Includes Azure SDK dependencies +- `.env.example` — Azure-specific environment variables -# Optimized database connection -engine = create_engine( - database_uri, - poolclass=QueuePool, - pool_size=20, - max_overflow=30, - pool_pre_ping=True, - pool_recycle=3600 -) -``` +## Environment Variables -### Caching Strategy - -```python -import redis -from functools import wraps - -class CacheManager: - def __init__(self, redis_url): - self.redis_client = redis.from_url(redis_url) - - def cache_market_data(self, symbol, timeframe, ttl=60): - """Cache market data with TTL""" - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - cache_key = f"market_data:{symbol}:{timeframe}" - - # Try to get from cache - cached_data = self.redis_client.get(cache_key) - if cached_data: - return json.loads(cached_data) - - # Get fresh data and cache it - data = func(*args, **kwargs) - self.redis_client.setex( - cache_key, - ttl, - json.dumps(data, default=str) - ) - return data - return wrapper - return decorator -``` +Both deployment targets support environment variables for sensitive configuration. Store exchange API keys and other secrets as environment variables rather than in code. -## Disaster Recovery - -### Backup Strategy - -```python -import shutil -import os -from datetime import datetime - -class BackupManager: - def __init__(self, backup_dir="/backups"): - self.backup_dir = backup_dir - os.makedirs(backup_dir, exist_ok=True) - - def backup_database(self, db_path): - """Backup database""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = f"{self.backup_dir}/db_backup_{timestamp}.db" - shutil.copy2(db_path, backup_path) - return backup_path - - def backup_logs(self, log_dir): - """Backup logs""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = f"{self.backup_dir}/logs_backup_{timestamp}" - shutil.copytree(log_dir, backup_path) - return backup_path -``` +For **AWS Lambda**, use the `-e` flag during deployment: -### Recovery Procedures - -```python -class RecoveryManager: - def __init__(self, app): - self.app = app - - def graceful_shutdown(self): - """Gracefully shutdown the application""" - - # Cancel all open orders - self.app.cancel_all_orders() - - # Save state - self.app.save_state() - - # Close database connections - self.app.close_database() - - print("Application shutdown complete") - - def emergency_stop(self): - """Emergency stop with position closure""" - - # Close all positions immediately - positions = self.app.get_positions() - for position in positions: - self.app.create_sell_order( - target_symbol=position.symbol.split('/')[0], - percentage=1.0, - order_type="MARKET" - ) - - # Cancel all orders - self.app.cancel_all_orders() - - print("Emergency stop executed") +```bash +iaf deploy-aws-lambda \ + --lambda_function_name my-bot \ + --region us-east-1 \ + -e BITVAVO_API_KEY your_key \ + -e BITVAVO_API_SECRET your_secret ``` -## Deployment Checklist - -### Pre-Deployment - -- [ ] **Strategy Testing**: Thorough backtesting and forward testing -- [ ] **Security Review**: API keys, encryption, network security -- [ ] **Configuration**: Production config files and environment variables -- [ ] **Monitoring**: Logging, health checks, and alerting setup -- [ ] **Backup**: Database and log backup procedures -- [ ] **Documentation**: Deployment and recovery procedures - -### Post-Deployment - -- [ ] **Health Verification**: Confirm all systems are operational -- [ ] **Monitor Initial Trades**: Watch first few trades closely -- [ ] **Performance Baseline**: Establish performance metrics -- [ ] **Alert Testing**: Verify alerting systems work -- [ ] **Backup Testing**: Test backup and recovery procedures - -## Scaling Considerations - -### Horizontal Scaling - -```python -# Multi-instance deployment with Redis coordination -class CoordinatedStrategy(TradingStrategy): - def __init__(self, instance_id, redis_client): - super().__init__() - self.instance_id = instance_id - self.redis_client = redis_client - - def apply_strategy(self, algorithm, market_data): - # Use distributed locking - lock_key = f"strategy_lock:{symbol}" - - with self.redis_client.lock(lock_key, timeout=10): - # Strategy logic here - pass -``` +For **Azure Functions**, add variables to your `.env` file in the project root. The deploy command reads this file and sets them as Function App configuration: -### Vertical Scaling - -```python -# Resource optimization -import psutil -import gc - -class ResourceManager: - def monitor_resources(self): - """Monitor system resources""" - cpu_percent = psutil.cpu_percent() - memory_percent = psutil.virtual_memory().percent - - if cpu_percent > 80: - print(f"High CPU usage: {cpu_percent}%") - - if memory_percent > 80: - print(f"High memory usage: {memory_percent}%") - gc.collect() # Force garbage collection +```bash +# .env +BITVAVO_API_KEY=your_key +BITVAVO_API_SECRET=your_secret ``` ## Next Steps -With your algorithm deployed, focus on: - -1. **Continuous Monitoring**: Watch performance and system health -2. **Regular Updates**: Update strategies based on market conditions -3. **Risk Management**: Monitor and adjust risk parameters -4. **Performance Analysis**: Regular strategy performance reviews - -Your trading algorithm is now ready for production! Remember to start with small position sizes and gradually scale up as you gain confidence in your deployment. +With your bot deployed, refer to the [Trading Strategies](strategies) and [Backtesting](backtesting) documentation to refine your algorithms before going live. diff --git a/docusaurus/docs/Getting Started/tasks.md b/docusaurus/docs/Getting Started/tasks.md index 102aa4b8..48ffb356 100644 --- a/docusaurus/docs/Getting Started/tasks.md +++ b/docusaurus/docs/Getting Started/tasks.md @@ -4,604 +4,229 @@ sidebar_position: 8 # Tasks -Learn how to create and schedule automated tasks to enhance your trading system. +Learn how to create and schedule automated tasks that run alongside your trading strategies. ## Overview -Tasks are automated functions that run independently of your trading strategies. They can be used for maintenance, data collection, reporting, monitoring, and other background operations that support your trading system. +Tasks are automated functions that run on a fixed schedule, independently of your trading strategies. They are useful for maintenance, monitoring, reporting, and other periodic background work. Tasks receive a `Context` object, giving them access to portfolio data, trades, orders, and positions. -## Creating Tasks +## Task Attributes -### Basic Task Structure +| Attribute | Type | Description | +|-----------|------|-------------| +| `time_unit` | `TimeUnit` | The time unit for the schedule: `SECOND`, `MINUTE`, `HOUR`, or `DAY`. | +| `interval` | `int` | How many time units between each run (e.g., `10` with `MINUTE` = every 10 minutes). | +| `worker_id` | `str` | Optional identifier. Defaults to the class name (class-based) or function name (decorator-based). | -```python -from investing_algorithm_framework import Task +## Creating Tasks -class DataCleanupTask(Task): - - def __init__(self): - super().__init__( - name="data_cleanup", - interval="daily", # Run daily - time="02:00" # Run at 2 AM - ) - - def run(self, algorithm): - """Task execution logic""" - print("Running data cleanup task...") - - # Cleanup old data - self.cleanup_old_market_data() - - # Compact database - self.compact_database() - - print("Data cleanup completed") - - def cleanup_old_market_data(self): - # Implementation for data cleanup - pass - - def compact_database(self): - # Implementation for database optimization - pass -``` +### Class-Based Task -### Registering Tasks +Subclass `Task` and implement the `run(self, context)` method. You can set the schedule using **class-level attributes** or by passing parameters to `__init__`: -```python -from investing_algorithm_framework import create_app +**Class-level attributes (recommended for simple tasks):** -app = create_app() +```python +from investing_algorithm_framework import Task, TimeUnit -# Register the task -app.add_task(DataCleanupTask()) +class LogOpenTrades(Task): + time_unit = TimeUnit.MINUTE + interval = 15 -# Start the app (tasks will run automatically) -app.start() + def run(self, context): + for trade in context.get_open_trades(): + print(f"[{trade.target_symbol}] net_gain={trade.net_gain}") ``` -## Task Scheduling +**Constructor parameters:** -### Schedule Types - -**Fixed Intervals:** ```python -# Run every 5 minutes -class MarketDataTask(Task): - def __init__(self): - super().__init__( - name="market_data_collection", - interval="5m" - ) +from investing_algorithm_framework import Task, TimeUnit -# Run every hour -class PortfolioReportTask(Task): - def __init__(self): - super().__init__( - name="portfolio_report", - interval="1h" - ) +class PortfolioLoggerTask(Task): -# Run daily -class BackupTask(Task): def __init__(self): super().__init__( - name="daily_backup", - interval="daily", - time="23:30" + time_unit=TimeUnit.HOUR, + interval=1 # Run every hour ) + + def run(self, context): + """Receives a Context object with access to trades, orders, positions.""" + open_trades = context.get_open_trades() + print(f"Currently {len(open_trades)} open trades") ``` -**Cron-style Scheduling:** +Both approaches are equivalent. Class-level attributes are simpler when the schedule is fixed; constructor parameters are useful when you need dynamic configuration. + +### Decorator-Based Task + +Use `@app.task()` to turn any function into a task: + ```python -class WeeklyReportTask(Task): - def __init__(self): - super().__init__( - name="weekly_report", - cron="0 9 * * MON" # Every Monday at 9 AM - ) +from investing_algorithm_framework import create_app, TimeUnit -class MonthlyRebalanceTask(Task): - def __init__(self): - super().__init__( - name="monthly_rebalance", - cron="0 0 1 * *" # First day of each month - ) +app = create_app() + +@app.task(time_unit=TimeUnit.MINUTE, interval=10) +def check_positions(context): + """Runs every 10 minutes.""" + positions = context.get_positions() + + for position in positions: + print(f"{position.symbol}: {position.get_amount()}") ``` -## Common Task Examples +## Registering Tasks + +### With `add_task` -### Market Data Collection +Register a class-based task (instance or class) using `app.add_task()`: ```python -class MarketDataCollector(Task): - - def __init__(self, symbols, data_provider): - super().__init__( - name="market_data_collector", - interval="1m" # Collect every minute - ) - self.symbols = symbols - self.data_provider = data_provider - - def run(self, algorithm): - """Collect market data for specified symbols""" - - for symbol in self.symbols: - try: - # Fetch latest data - data = self.data_provider.get_latest_data(symbol) - - # Store in database - algorithm.store_market_data(symbol, data) - - print(f"Collected data for {symbol}") - - except Exception as e: - print(f"Failed to collect data for {symbol}: {e}") -``` +from investing_algorithm_framework import create_app -### Portfolio Monitoring +app = create_app() -```python -class PortfolioMonitor(Task): - - def __init__(self, alert_manager): - super().__init__( - name="portfolio_monitor", - interval="5m" - ) - self.alert_manager = alert_manager - - def run(self, algorithm): - """Monitor portfolio health and send alerts""" - - portfolio = algorithm.get_portfolio() - positions = algorithm.get_positions() - - # Check total portfolio value - total_value = portfolio.get_total_value() - initial_value = portfolio.get_initial_value() - - pnl_percentage = (total_value - initial_value) / initial_value * 100 - - # Alert on significant changes - if pnl_percentage < -10: - self.alert_manager.send_alert( - f"Portfolio down {abs(pnl_percentage):.2f}%", - severity="WARNING" - ) - elif pnl_percentage > 20: - self.alert_manager.send_alert( - f"Portfolio up {pnl_percentage:.2f}%", - severity="INFO" - ) - - # Check individual positions - self.check_position_alerts(positions) - - def check_position_alerts(self, positions): - """Check for position-specific alerts""" - - for position in positions: - # Alert on large positions - if position.current_value > 5000: - print(f"Large position alert: {position.symbol} = ${position.current_value:.2f}") +# Pass an instance +app.add_task(PortfolioLoggerTask()) + +# Or pass the class — the framework will instantiate it +app.add_task(PortfolioLoggerTask) ``` -### Performance Reporting +### With `add_tasks` + +Register multiple tasks at once: ```python -class PerformanceReporter(Task): - - def __init__(self, report_email=None): - super().__init__( - name="performance_reporter", - interval="daily", - time="18:00" # 6 PM daily - ) - self.report_email = report_email - - def run(self, algorithm): - """Generate and send daily performance report""" - - # Calculate daily metrics - daily_metrics = self.calculate_daily_metrics(algorithm) - - # Generate report - report = self.generate_report(daily_metrics) - - # Send report - if self.report_email: - self.send_report_email(report) - - print("Daily performance report generated") - - def calculate_daily_metrics(self, algorithm): - """Calculate daily performance metrics""" - - portfolio = algorithm.get_portfolio() - trades = algorithm.get_trades() - - # Get today's trades - today = datetime.now().date() - today_trades = [ - t for t in trades - if t.created_at.date() == today - ] - - metrics = { - "portfolio_value": portfolio.get_total_value(), - "daily_trades": len(today_trades), - "daily_volume": sum(t.cost for t in today_trades), - "daily_fees": sum(t.fee for t in today_trades), - "open_positions": len(algorithm.get_positions()) - } - - return metrics - - def generate_report(self, metrics): - """Generate formatted report""" - - report = f""" - Daily Trading Report - {datetime.now().strftime('%Y-%m-%d')} - ===================================================== - - Portfolio Value: ${metrics['portfolio_value']:,.2f} - - Daily Activity: - - Trades: {metrics['daily_trades']} - - Volume: ${metrics['daily_volume']:,.2f} - - Fees: ${metrics['daily_fees']:,.2f} - - Open Positions: {metrics['open_positions']} - - Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - """ - - return report +app.add_tasks([PortfolioLoggerTask, AnotherTask()]) ``` -### Risk Management +## Schedule Examples ```python -class RiskManager(Task): - - def __init__(self, max_drawdown=0.15, max_position_size=0.1): - super().__init__( - name="risk_manager", - interval="1m" # Check risk every minute - ) - self.max_drawdown = max_drawdown - self.max_position_size = max_position_size - - def run(self, algorithm): - """Monitor and enforce risk limits""" - - # Check portfolio drawdown - self.check_drawdown(algorithm) - - # Check position sizes - self.check_position_sizes(algorithm) - - # Check correlation exposure - self.check_correlation_risk(algorithm) - - def check_drawdown(self, algorithm): - """Check if portfolio drawdown exceeds limit""" - - portfolio = algorithm.get_portfolio() - peak_value = portfolio.get_peak_value() - current_value = portfolio.get_total_value() - - drawdown = (peak_value - current_value) / peak_value - - if drawdown > self.max_drawdown: - # Emergency risk reduction - self.reduce_risk(algorithm, f"Drawdown {drawdown:.2%} exceeds limit") - - def check_position_sizes(self, algorithm): - """Check if any position is too large""" - - portfolio = algorithm.get_portfolio() - positions = algorithm.get_positions() - total_value = portfolio.get_total_value() - - for position in positions: - position_weight = position.current_value / total_value - - if position_weight > self.max_position_size: - # Reduce oversized position - target_symbol = position.symbol.split('/')[0] - excess_percentage = position_weight - self.max_position_size - - algorithm.create_sell_order( - target_symbol=target_symbol, - percentage=excess_percentage / position_weight, - order_type="MARKET" - ) - - print(f"Reduced oversized position: {position.symbol}") - - def reduce_risk(self, algorithm, reason): - """Emergency risk reduction""" - - print(f"RISK ALERT: {reason}") - print("Implementing risk reduction measures...") - - # Cancel all open orders - algorithm.cancel_all_orders() - - # Reduce position sizes by 50% - positions = algorithm.get_positions() - for position in positions: - target_symbol = position.symbol.split('/')[0] - algorithm.create_sell_order( - target_symbol=target_symbol, - percentage=0.5, - order_type="MARKET" - ) - - print("Risk reduction completed") -``` +from investing_algorithm_framework import Task, TimeUnit -### Database Maintenance +# Run every 30 seconds +class FrequentCheck(Task): + def __init__(self): + super().__init__(time_unit=TimeUnit.SECOND, interval=30) -```python -class DatabaseMaintenanceTask(Task): - + def run(self, context): + pass + +# Run every 5 minutes +class FiveMinuteTask(Task): def __init__(self): - super().__init__( - name="database_maintenance", - interval="daily", - time="03:00" # 3 AM daily - ) - - def run(self, algorithm): - """Perform database maintenance tasks""" - - print("Starting database maintenance...") - - # Archive old data - self.archive_old_data(algorithm) - - # Optimize database - self.optimize_database(algorithm) - - # Backup database - self.backup_database(algorithm) - - print("Database maintenance completed") - - def archive_old_data(self, algorithm): - """Archive old market data and trades""" - - cutoff_date = datetime.now() - timedelta(days=365) - - # Archive old trades - old_trades = algorithm.get_trades(before_date=cutoff_date) - if old_trades: - algorithm.archive_trades(old_trades) - print(f"Archived {len(old_trades)} old trades") - - # Archive old market data - algorithm.archive_market_data(before_date=cutoff_date) - - def optimize_database(self, algorithm): - """Optimize database performance""" - - # Rebuild indices - algorithm.rebuild_database_indices() - - # Update statistics - algorithm.update_database_statistics() - - print("Database optimization completed") - - def backup_database(self, algorithm): - """Create database backup""" - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = f"/backups/trading_db_backup_{timestamp}.sql" - - algorithm.backup_database(backup_path) - print(f"Database backed up to {backup_path}") -``` + super().__init__(time_unit=TimeUnit.MINUTE, interval=5) -## Task Management + def run(self, context): + pass -### Task Monitoring +# Run every 4 hours +class FourHourTask(Task): + def __init__(self): + super().__init__(time_unit=TimeUnit.HOUR, interval=4) -```python -class TaskMonitor(Task): - + def run(self, context): + pass + +# Run once per day +class DailyTask(Task): def __init__(self): - super().__init__( - name="task_monitor", - interval="10m" - ) - self.task_history = {} - - def run(self, algorithm): - """Monitor other tasks for failures""" - - # Get task execution history - task_status = algorithm.get_task_status() - - for task_name, status in task_status.items(): - if status.get('last_error'): - print(f"Task {task_name} failed: {status['last_error']}") - - # Check if task is overdue - last_run = status.get('last_run') - if last_run: - time_since_run = datetime.now() - last_run - if time_since_run > timedelta(hours=2): # Configurable threshold - print(f"Task {task_name} is overdue (last run: {last_run})") + super().__init__(time_unit=TimeUnit.DAY, interval=1) + + def run(self, context): + pass ``` -### Conditional Tasks +## Common Task Examples + +### Trade Monitoring ```python -class ConditionalRebalanceTask(Task): - +from investing_algorithm_framework import Task, TimeUnit + +class TradeMonitorTask(Task): + def __init__(self): - super().__init__( - name="conditional_rebalance", - interval="1h" - ) - - def run(self, algorithm): - """Only rebalance if conditions are met""" - - # Check if rebalancing is needed - if not self.should_rebalance(algorithm): - return - - print("Rebalancing conditions met - executing rebalance") - self.execute_rebalance(algorithm) - - def should_rebalance(self, algorithm): - """Check if rebalancing conditions are met""" - - positions = algorithm.get_positions() - portfolio = algorithm.get_portfolio() - total_value = portfolio.get_total_value() - - # Check if any position deviates more than 5% from target - target_weights = {"BTC": 0.5, "ETH": 0.3, "ADA": 0.2} - - for symbol, target_weight in target_weights.items(): - position = next( - (p for p in positions if p.symbol.startswith(symbol)), - None + super().__init__(time_unit=TimeUnit.MINUTE, interval=5) + + def run(self, context): + open_trades = context.get_open_trades() + + for trade in open_trades: + print( + f"Trade {trade.id}: {trade.target_symbol} " + f"opened at {trade.open_price}, " + f"net gain: {trade.net_gain}" ) - - current_weight = (position.current_value / total_value) if position else 0 - - if abs(current_weight - target_weight) > 0.05: - return True - - return False - - def execute_rebalance(self, algorithm): - """Execute portfolio rebalancing""" - # Implementation for rebalancing logic - pass ``` -## Best Practices - -### 1. Error Handling +### Portfolio Snapshot ```python -class RobustTask(Task): - +from investing_algorithm_framework import Task, TimeUnit + +class PortfolioSnapshotTask(Task): + def __init__(self): - super().__init__(name="robust_task", interval="5m") - self.max_retries = 3 - self.retry_delay = 30 # seconds - - def run(self, algorithm): - """Run task with error handling and retries""" - - for attempt in range(self.max_retries): - try: - self.execute_task_logic(algorithm) - return # Success, exit retry loop - - except Exception as e: - print(f"Task attempt {attempt + 1} failed: {e}") - - if attempt < self.max_retries - 1: - time.sleep(self.retry_delay) - else: - print(f"Task failed after {self.max_retries} attempts") - self.handle_task_failure(e) - - def execute_task_logic(self, algorithm): - """Main task logic that might fail""" - # Implementation here - pass - - def handle_task_failure(self, error): - """Handle permanent task failure""" - # Log error, send alerts, etc. - pass + super().__init__(time_unit=TimeUnit.HOUR, interval=1) + + def run(self, context): + portfolio = context.get_portfolio() + positions = context.get_positions() + + print(f"Portfolio net size: {portfolio.get_net_size()}") + print(f"Number of positions: {len(positions)}") + print(f"Open trades: {context.count_trades()}") ``` -### 2. Resource Management +### Trade Count Summary ```python -class ResourceAwareTask(Task): - - def run(self, algorithm): - """Task that monitors resource usage""" - - # Check system resources before running - if not self.has_sufficient_resources(): - print("Insufficient resources - skipping task execution") - return - - # Execute task logic - self.execute_heavy_computation() - - def has_sufficient_resources(self): - """Check if system has sufficient resources""" - import psutil - - # Check memory usage - memory_percent = psutil.virtual_memory().percent - if memory_percent > 90: - return False - - # Check CPU usage - cpu_percent = psutil.cpu_percent(interval=1) - if cpu_percent > 95: - return False - - return True +from investing_algorithm_framework import create_app, TimeUnit + +app = create_app() + +@app.task(time_unit=TimeUnit.DAY, interval=1) +def daily_summary(context): + """Logs a daily summary of trade counts.""" + total = context.count_trades() + closed = len(context.get_closed_trades()) + open_trades = len(context.get_open_trades()) + pending = len(context.get_pending_trades()) + + print(f"Total: {total}, Open: {open_trades}, Closed: {closed}, Pending: {pending}") ``` -### 3. Task Dependencies +## Full Example ```python -class DependentTask(Task): - - def __init__(self, prerequisite_tasks): - super().__init__(name="dependent_task", interval="1h") - self.prerequisite_tasks = prerequisite_tasks - - def run(self, algorithm): - """Only run if prerequisite tasks completed successfully""" - - # Check if prerequisites are met - if not self.prerequisites_met(algorithm): - print("Prerequisites not met - skipping task") - return - - # Execute task logic - self.execute_task(algorithm) - - def prerequisites_met(self, algorithm): - """Check if prerequisite tasks completed successfully""" - - task_status = algorithm.get_task_status() - - for prereq_task in self.prerequisite_tasks: - status = task_status.get(prereq_task, {}) - - if status.get('last_error'): - return False - - # Check if task ran recently - last_run = status.get('last_run') - if not last_run or (datetime.now() - last_run) > timedelta(hours=2): - return False - - return True +from investing_algorithm_framework import create_app, Task, TimeUnit + +app = create_app() + +# Class-based task +class LogOpenTrades(Task): + def __init__(self): + super().__init__(time_unit=TimeUnit.MINUTE, interval=15) + + def run(self, context): + for trade in context.get_open_trades(): + print(f"[{trade.target_symbol}] net_gain={trade.net_gain}") + +# Decorator-based task +@app.task(time_unit=TimeUnit.HOUR, interval=1) +def log_portfolio(context): + portfolio = context.get_portfolio() + print(f"Portfolio size: {portfolio.get_net_size()}") + +# Register class-based task +app.add_task(LogOpenTrades) ``` ## Next Steps -Tasks provide powerful automation capabilities for your trading system. Next, learn about [Backtesting](backtesting) to test your strategies and tasks against historical data before deploying them live. +Now that you understand tasks, learn about [Trades](trades) to see how the framework tracks your trading activity. diff --git a/docusaurus/docs/Getting Started/trades.md b/docusaurus/docs/Getting Started/trades.md index 74d9e791..df648df9 100644 --- a/docusaurus/docs/Getting Started/trades.md +++ b/docusaurus/docs/Getting Started/trades.md @@ -4,469 +4,260 @@ sidebar_position: 7 # Trades -Understand how individual trades work and how to track trading performance. +Understand how the framework tracks trades and how to access trade data through the Context API. ## Overview -Trades represent completed transactions in your trading system. Unlike orders (which are instructions to trade) or positions (which represent holdings), trades are the actual executed transactions that move money between your cash balance and asset holdings. +A **Trade** represents a round-trip position: it is opened by a buy order and closed by one or more sell orders. Unlike orders (instructions to buy/sell) or positions (current holdings), a trade tracks the full lifecycle from entry to exit, including net gain, stop losses, and take profits. -## Understanding Trades +## Trade Lifecycle -### Trade Lifecycle +1. **Created** — A buy order is placed, and a trade record is created with status `CREATED`. +2. **Open** — The buy order fills and the trade becomes `OPEN`. The trade has an `open_price`, `amount`, and `cost`. +3. **Closed** — A sell order fills against the trade, closing it. The `net_gain` is calculated, `closed_at` is set, and status becomes `CLOSED`. -1. **Order Placed**: You create a buy or sell order -2. **Order Executed**: The market fills your order -3. **Trade Created**: A trade record is generated -4. **Position Updated**: Your asset holdings are adjusted -5. **Portfolio Updated**: Your portfolio balance reflects the trade +A single sell order can close multiple trades, and a single trade can be closed by multiple partial sell orders. -### Trade Properties +## Trade Attributes + +| Attribute | Type | Description | +|-----------|------|-------------| +| `id` | `int` | Unique trade identifier. | +| `target_symbol` | `str` | The asset being traded (e.g., `"BTC"`). | +| `trading_symbol` | `str` | The quote currency (e.g., `"EUR"`). | +| `status` | `str` | One of `CREATED`, `OPEN`, or `CLOSED`. | +| `opened_at` | `datetime` | When the trade was opened. | +| `closed_at` | `datetime` | When the trade was closed (`None` if still open). | +| `open_price` | `float` | The price at which the trade was opened. | +| `amount` | `float` | The total amount of the trade. | +| `available_amount` | `float` | The amount still available (not yet sold). | +| `filled_amount` | `float` | The amount filled by the opening buy order. | +| `remaining` | `float` | The remaining unfilled amount from the buy order. | +| `cost` | `float` | The total cost of the trade (price × amount). | +| `net_gain` | `float` | Realized profit or loss. | +| `last_reported_price` | `float` | The most recent market price reported for this trade. | +| `orders` | `list` | Orders associated with this trade. | +| `stop_losses` | `list` | Stop loss rules attached to this trade. | +| `take_profits` | `list` | Take profit rules attached to this trade. | +| `metadata` | `dict` | Custom key-value data you can attach to a trade. | + +## Accessing Trades + +All trade access goes through the `Context` object, which is passed to your strategy's `run` method and to tasks. + +### Get a Single Trade ```python -def analyze_trade(self, trade): - """Analyze a completed trade""" - - print(f"Trade ID: {trade.id}") - print(f"Symbol: {trade.target_symbol}/{trade.trading_symbol}") - print(f"Side: {trade.side}") # BUY or SELL - print(f"Amount: {trade.amount}") - print(f"Price: ${trade.price:.2f}") - print(f"Total Cost: ${trade.cost:.2f}") - print(f"Fee: ${trade.fee:.2f}") - print(f"Timestamp: {trade.created_at}") - print(f"Status: {trade.status}") +# Get a trade by target symbol +trade = context.get_trade(target_symbol="BTC") + +# Get a trade by status +trade = context.get_trade(status="OPEN", target_symbol="ETH") + +# Get a trade by order ID +trade = context.get_trade(order_id=some_order_id) ``` -## Accessing Trade Data +**Parameters:** `target_symbol`, `trading_symbol`, `market`, `portfolio`, `status`, `order_id` — all optional filters. -### Get All Trades +### Get Multiple Trades ```python -def review_trading_history(self, algorithm): - """Review all completed trades""" - trades = algorithm.get_trades() - - total_trades = len(trades) - total_volume = sum(trade.cost for trade in trades) - total_fees = sum(trade.fee for trade in trades) - - print(f"Total Trades: {total_trades}") - print(f"Total Volume: ${total_volume:.2f}") - print(f"Total Fees: ${total_fees:.2f}") - - return trades +# All trades +trades = context.get_trades() + +# Trades for a specific symbol +btc_trades = context.get_trades(target_symbol="BTC") + +# Trades filtered by status +open_trades = context.get_trades(status="OPEN") ``` -### Filter Trades +### Get Open Trades ```python -def filter_trades(self, algorithm, symbol=None, days=None): - """Filter trades by symbol and time period""" - trades = algorithm.get_trades() - - # Filter by symbol - if symbol: - trades = [t for t in trades if t.target_symbol == symbol] - - # Filter by date range - if days: - cutoff_date = datetime.now() - timedelta(days=days) - trades = [t for t in trades if t.created_at >= cutoff_date] - - return trades - -# Examples -recent_btc_trades = self.filter_trades(algorithm, symbol="BTC", days=7) -all_recent_trades = self.filter_trades(algorithm, days=30) +# All open trades +open_trades = context.get_open_trades() + +# Open trades for a specific symbol +btc_open = context.get_open_trades(target_symbol="BTC") ``` -### Trade Statistics +### Get Closed Trades ```python -def calculate_trade_statistics(self, trades): - """Calculate trading performance statistics""" - - if not trades: - return {"error": "No trades found"} - - buy_trades = [t for t in trades if t.side == "BUY"] - sell_trades = [t for t in trades if t.side == "SELL"] - - stats = { - "total_trades": len(trades), - "buy_trades": len(buy_trades), - "sell_trades": len(sell_trades), - "total_volume": sum(t.cost for t in trades), - "total_fees": sum(t.fee for t in trades), - "average_trade_size": sum(t.cost for t in trades) / len(trades), - "largest_trade": max(t.cost for t in trades), - "smallest_trade": min(t.cost for t in trades), - } - - return stats +closed_trades = context.get_closed_trades() ``` -## Trade Performance Analysis +### Get Pending Trades -### Profit and Loss Tracking +Pending trades have status `CREATED` — the buy order hasn't filled yet: ```python -class TradeAnalyzer: - def __init__(self, algorithm): - self.algorithm = algorithm - - def match_buy_sell_trades(self, symbol): - """Match buy and sell trades to calculate P&L""" - trades = self.algorithm.get_trades() - symbol_trades = [t for t in trades if t.target_symbol == symbol] - - # Separate buy and sell trades - buys = [t for t in symbol_trades if t.side == "BUY"] - sells = [t for t in symbol_trades if t.side == "SELL"] - - # Sort by timestamp - buys.sort(key=lambda x: x.created_at) - sells.sort(key=lambda x: x.created_at) - - return self._calculate_fifo_pnl(buys, sells) - - def _calculate_fifo_pnl(self, buys, sells): - """Calculate P&L using FIFO method""" - realized_pnl = 0 - buy_queue = [(trade.amount, trade.price) for trade in buys] - - for sell_trade in sells: - sell_amount = sell_trade.amount - sell_price = sell_trade.price - - while sell_amount > 0 and buy_queue: - buy_amount, buy_price = buy_queue[0] - - # Calculate trade quantity - trade_qty = min(sell_amount, buy_amount) - - # Calculate P&L for this portion - pnl = trade_qty * (sell_price - buy_price) - realized_pnl += pnl - - # Update amounts - sell_amount -= trade_qty - buy_queue[0] = (buy_amount - trade_qty, buy_price) - - # Remove empty buy order - if buy_queue[0][0] == 0: - buy_queue.pop(0) - - return realized_pnl +pending = context.get_pending_trades() +pending_btc = context.get_pending_trades(target_symbol="BTC") ``` -### Win Rate Analysis +### Count Trades ```python -def calculate_win_rate(self, algorithm, symbol): - """Calculate win rate for a specific symbol""" - analyzer = TradeAnalyzer(algorithm) - - # Get all completed round trips (buy-sell pairs) - round_trips = analyzer.get_round_trip_trades(symbol) - - if not round_trips: - return {"error": "No completed round trips found"} - - winning_trades = [rt for rt in round_trips if rt['pnl'] > 0] - losing_trades = [rt for rt in round_trips if rt['pnl'] < 0] - - win_rate = len(winning_trades) / len(round_trips) * 100 - - avg_win = sum(rt['pnl'] for rt in winning_trades) / len(winning_trades) if winning_trades else 0 - avg_loss = sum(rt['pnl'] for rt in losing_trades) / len(losing_trades) if losing_trades else 0 - - return { - "total_round_trips": len(round_trips), - "winning_trades": len(winning_trades), - "losing_trades": len(losing_trades), - "win_rate": win_rate, - "average_win": avg_win, - "average_loss": avg_loss, - "profit_factor": abs(avg_win / avg_loss) if avg_loss != 0 else float('inf') - } +total = context.count_trades() +btc_count = context.count_trades(target_symbol="BTC") ``` -## Trade Monitoring - -### Real-time Trade Tracking +## Inspecting a Trade ```python -class TradeMonitor: - def __init__(self): - self.last_trade_count = 0 - - def check_new_trades(self, algorithm): - """Check for new trades and log them""" - current_trades = algorithm.get_trades() - current_count = len(current_trades) - - if current_count > self.last_trade_count: - # New trades detected - new_trades = current_trades[self.last_trade_count:] - - for trade in new_trades: - self.log_new_trade(trade) - - self.last_trade_count = current_count - - def log_new_trade(self, trade): - """Log details of a new trade""" - side_emoji = "🟢" if trade.side == "BUY" else "🔴" - - print(f"{side_emoji} NEW TRADE:") - print(f" Symbol: {trade.target_symbol}") - print(f" Side: {trade.side}") - print(f" Amount: {trade.amount}") - print(f" Price: ${trade.price:.2f}") - print(f" Total: ${trade.cost:.2f}") - print(f" Fee: ${trade.fee:.2f}") - print(f" Time: {trade.created_at}") - print("-" * 40) +def log_trade(trade): + print(f"Trade ID: {trade.id}") + print(f"Symbol: {trade.target_symbol}/{trade.trading_symbol}") + print(f"Status: {trade.status}") + print(f"Opened at: {trade.opened_at}") + print(f"Open price: {trade.open_price}") + print(f"Amount: {trade.amount}") + print(f"Available amount: {trade.available_amount}") + print(f"Cost: {trade.cost}") + print(f"Net gain: {trade.net_gain}") + print(f"Last reported price: {trade.last_reported_price}") + + if trade.closed_at: + print(f"Closed at: {trade.closed_at}") ``` -### Trade Alerts +## Stop Losses + +Add a stop loss to an open trade using `context.add_stop_loss()`. When the price drops below the stop loss level, the framework automatically sells. ```python -class TradeAlertSystem: - def __init__(self, alert_manager): - self.alert_manager = alert_manager - self.thresholds = { - "large_trade_amount": 1000, # Alert for trades > $1000 - "high_fee_percentage": 0.005, # Alert for fees > 0.5% - "rapid_trading": 5 # Alert if more than 5 trades in 1 minute - } - - def check_trade_alerts(self, trade): - """Check if trade triggers any alerts""" - - # Large trade alert - if trade.cost > self.thresholds["large_trade_amount"]: - self.alert_manager.send_alert( - f"Large trade executed: {trade.target_symbol} " - f"${trade.cost:.2f}", - severity="INFO" - ) - - # High fee alert - fee_percentage = trade.fee / trade.cost if trade.cost > 0 else 0 - if fee_percentage > self.thresholds["high_fee_percentage"]: - self.alert_manager.send_alert( - f"High fee trade: {trade.target_symbol} " - f"fee {fee_percentage:.3%}", - severity="WARNING" - ) +# Fixed stop loss: sell if price drops 5% below open price +context.add_stop_loss(trade, percentage=5) + +# Trailing stop loss: the stop level moves up as the price rises +context.add_stop_loss(trade, percentage=5, trailing=True) + +# Partial stop loss: sell only 50% of the position +context.add_stop_loss(trade, percentage=5, sell_percentage=50) ``` -## Trade Reporting +**Parameters:** -### Daily Trading Summary +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `trade` | `Trade` | — | The trade to protect. | +| `percentage` | `float` | — | Percentage below open price (e.g., `5` for 5%). | +| `trailing` | `bool` | `False` | If `True`, the stop level trails the high water mark. | +| `sell_percentage` | `float` | `100` | Percentage of the trade to sell when triggered. | -```python -def generate_daily_summary(self, algorithm, date=None): - """Generate daily trading summary""" - - if date is None: - date = datetime.now().date() - - # Get trades for the day - start_of_day = datetime.combine(date, datetime.min.time()) - end_of_day = datetime.combine(date, datetime.max.time()) - - daily_trades = [ - t for t in algorithm.get_trades() - if start_of_day <= t.created_at <= end_of_day - ] - - if not daily_trades: - return f"No trades on {date}" - - # Calculate summary statistics - total_volume = sum(t.cost for t in daily_trades) - total_fees = sum(t.fee for t in daily_trades) - unique_symbols = set(t.target_symbol for t in daily_trades) - - summary = f""" - Daily Trading Summary - {date} - {'='*40} - Total Trades: {len(daily_trades)} - Total Volume: ${total_volume:.2f} - Total Fees: ${total_fees:.2f} - Symbols Traded: {', '.join(unique_symbols)} - Average Trade Size: ${total_volume/len(daily_trades):.2f} - Fee Rate: {(total_fees/total_volume)*100:.3f}% - """ - - return summary -``` +### How Fixed Stop Loss Works -### Trade Export +1. You buy BTC at $40,000. +2. You set a 5% stop loss → stop level at $38,000. +3. BTC rises to $42,000 → stop level stays at $38,000. +4. BTC drops to $38,000 → stop loss triggers, trade closes. -```python -import pandas as pd - -def export_trades_to_csv(self, algorithm, filename="trades.csv"): - """Export trades to CSV file""" - trades = algorithm.get_trades() - - # Convert to pandas DataFrame - trade_data = [] - for trade in trades: - trade_data.append({ - 'id': trade.id, - 'timestamp': trade.created_at, - 'symbol': f"{trade.target_symbol}/{trade.trading_symbol}", - 'side': trade.side, - 'amount': trade.amount, - 'price': trade.price, - 'cost': trade.cost, - 'fee': trade.fee, - 'status': trade.status - }) - - df = pd.DataFrame(trade_data) - df.to_csv(filename, index=False) - - print(f"Exported {len(trades)} trades to {filename}") - return df -``` +### How Trailing Stop Loss Works -## Advanced Trade Analysis +1. You buy BTC at $40,000. +2. You set a 5% trailing stop loss → initial stop at $38,000. +3. BTC rises to $42,000 → stop level adjusts to $39,900 (5% below new high). +4. BTC drops to $39,900 → stop loss triggers, trade closes. -### Trade Timing Analysis +## Take Profits + +Add a take profit to lock in gains when the price rises to a target level. ```python -def analyze_trade_timing(self, algorithm, symbol): - """Analyze timing patterns in trades""" - trades = [t for t in algorithm.get_trades() if t.target_symbol == symbol] - - if len(trades) < 2: - return "Insufficient data for timing analysis" - - # Calculate time between trades - trade_intervals = [] - for i in range(1, len(trades)): - interval = trades[i].created_at - trades[i-1].created_at - trade_intervals.append(interval.total_seconds()) - - avg_interval = sum(trade_intervals) / len(trade_intervals) - min_interval = min(trade_intervals) - max_interval = max(trade_intervals) - - return { - "average_interval_seconds": avg_interval, - "min_interval_seconds": min_interval, - "max_interval_seconds": max_interval, - "total_trades": len(trades), - "trading_period_days": (trades[-1].created_at - trades[0].created_at).days - } -``` +# Fixed take profit: sell if price rises 10% above open price +context.add_take_profit(trade, percentage=10) -### Trade Size Distribution +# Trailing take profit: locks in gains as price rises +context.add_take_profit(trade, percentage=10, trailing=True) -```python -import matplotlib.pyplot as plt - -def plot_trade_size_distribution(self, algorithm): - """Plot distribution of trade sizes""" - trades = algorithm.get_trades() - trade_sizes = [t.cost for t in trades] - - plt.figure(figsize=(10, 6)) - plt.hist(trade_sizes, bins=20, edgecolor='black', alpha=0.7) - plt.title('Trade Size Distribution') - plt.xlabel('Trade Size ($)') - plt.ylabel('Frequency') - plt.grid(True, alpha=0.3) - - # Add statistics - avg_size = sum(trade_sizes) / len(trade_sizes) - plt.axvline(avg_size, color='red', linestyle='--', - label=f'Average: ${avg_size:.2f}') - plt.legend() - - plt.tight_layout() - plt.show() +# Partial take profit: sell 50% of the position +context.add_take_profit(trade, percentage=10, sell_percentage=50) ``` -## Best Practices +**Parameters:** + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `trade` | `Trade` | — | The trade to set a target on. | +| `percentage` | `float` | — | Percentage above open price (e.g., `10` for 10%). | +| `trailing` | `bool` | `False` | If `True`, the take profit level trails the price upward. | +| `sell_percentage` | `float` | `100` | Percentage of the trade to sell when triggered. | + +### How Fixed Take Profit Works -### 1. Trade Validation +1. You buy BTC at $40,000. +2. You set a 5% take profit → target at $42,000. +3. BTC rises to $42,000 → take profit triggers, trade closes. + +### How Trailing Take Profit Works + +1. You buy BTC at $40,000. +2. You set a 5% trailing take profit → target initially at $42,000. +3. BTC rises to $45,000 → take profit adjusts to $42,750 (5% below new high). +4. BTC drops to $42,750 → take profit triggers, trade closes with profit. + +## Closing a Trade Manually + +You can close a trade programmatically via `context.close_trade()`: ```python -def validate_trade_execution(self, expected_trade, actual_trade): - """Validate that trade executed as expected""" - - tolerance = 0.01 # 1% tolerance for price differences - - checks = { - "symbol_match": expected_trade.symbol == actual_trade.target_symbol, - "side_match": expected_trade.side == actual_trade.side, - "amount_match": abs(expected_trade.amount - actual_trade.amount) < 0.001, - "price_reasonable": abs(expected_trade.price - actual_trade.price) / expected_trade.price < tolerance - } - - if not all(checks.values()): - print(f"Trade validation failed: {checks}") - return False - - return True +open_trades = context.get_open_trades(target_symbol="BTC") + +for trade in open_trades: + context.close_trade(trade) ``` -### 2. Trade Reconciliation +This creates a market sell order for the trade's available amount. + +## Example: Strategy with Stop Loss and Take Profit ```python -def reconcile_trades_with_exchange(self, algorithm, exchange): - """Reconcile internal trades with exchange records""" - - internal_trades = algorithm.get_trades() - exchange_trades = exchange.fetch_my_trades() - - # Compare trade counts and totals - internal_count = len(internal_trades) - exchange_count = len(exchange_trades) - - if internal_count != exchange_count: - print(f"Trade count mismatch: Internal={internal_count}, Exchange={exchange_count}") - - # Compare total volumes - internal_volume = sum(t.cost for t in internal_trades) - exchange_volume = sum(t['cost'] for t in exchange_trades) - - if abs(internal_volume - exchange_volume) > 1.0: # $1 tolerance - print(f"Volume mismatch: Internal=${internal_volume:.2f}, Exchange=${exchange_volume:.2f}") +from investing_algorithm_framework import TradingStrategy, TimeUnit + +class ManagedTradeStrategy(TradingStrategy): + time_unit = TimeUnit.HOUR + interval = 1 + + def run(self, context): + # Check for new open trades and attach risk management + open_trades = context.get_open_trades(target_symbol="BTC") + + for trade in open_trades: + # Only add stop/take if none exist yet + if not trade.stop_losses: + context.add_stop_loss(trade, percentage=5, trailing=True) + + if not trade.take_profits: + context.add_take_profit(trade, percentage=15) ``` -### 3. Trade History Maintenance +## Trade Statistics ```python -def archive_old_trades(self, algorithm, days_to_keep=365): - """Archive trades older than specified days""" - - cutoff_date = datetime.now() - timedelta(days=days_to_keep) - all_trades = algorithm.get_trades() - - old_trades = [t for t in all_trades if t.created_at < cutoff_date] - recent_trades = [t for t in all_trades if t.created_at >= cutoff_date] - - if old_trades: - # Export old trades before archiving - self.export_trades_to_csv( - old_trades, - f"archived_trades_{cutoff_date.strftime('%Y%m%d')}.csv" - ) - - # Remove from active database (implementation specific) - algorithm.archive_trades(old_trades) - - print(f"Archived {len(old_trades)} old trades") +def print_trade_summary(context): + total = context.count_trades() + open_trades = context.get_open_trades() + closed_trades = context.get_closed_trades() + + total_net_gain = sum(t.net_gain for t in closed_trades) + winners = [t for t in closed_trades if t.net_gain > 0] + losers = [t for t in closed_trades if t.net_gain <= 0] + + print(f"Total trades: {total}") + print(f"Open: {len(open_trades)}") + print(f"Closed: {len(closed_trades)}") + print(f"Winners: {len(winners)}") + print(f"Losers: {len(losers)}") + print(f"Total net gain: {total_net_gain:.2f}") + + if closed_trades: + win_rate = len(winners) / len(closed_trades) * 100 + print(f"Win rate: {win_rate:.1f}%") ``` ## Next Steps -Understanding trades is crucial for performance analysis and strategy optimization. Next, learn about [Tasks](tasks) to automate trade analysis and reporting processes. +Now that you understand trades, learn about [Tasks](tasks) to automate monitoring and reporting for your trading activity. diff --git a/docusaurus/docusaurus.config.js b/docusaurus/docusaurus.config.js index 32c9fa2e..b78b457a 100644 --- a/docusaurus/docusaurus.config.js +++ b/docusaurus/docusaurus.config.js @@ -1,8 +1,9 @@ // @ts-check // Note: type annotations allow type checking and IDEs autocompletion -const lightCodeTheme = require('prism-react-renderer/themes/github'); -const darkCodeTheme = require('prism-react-renderer/themes/dracula'); +const {themes} = require('prism-react-renderer'); +const lightCodeTheme = themes.github; +const darkCodeTheme = themes.dracula; /** @type {import('@docusaurus/types').Config} */ const config = { diff --git a/docusaurus/sidebar.js b/docusaurus/sidebar.js index 9496c276..96f9652e 100644 --- a/docusaurus/sidebar.js +++ b/docusaurus/sidebar.js @@ -61,11 +61,11 @@ const sidebars = { }, { type: 'doc', - id: 'Data/market-data-sources', + id: 'Data/data-sources', }, { type: 'doc', - id: 'Data/multiple-market-data-sources', + id: 'Data/backtest_data', }, ], }, diff --git a/investing_algorithm_framework/app/app.py b/investing_algorithm_framework/app/app.py index 763e25c8..2be92db3 100644 --- a/investing_algorithm_framework/app/app.py +++ b/investing_algorithm_framework/app/app.py @@ -2124,11 +2124,10 @@ def add_strategy(self, strategy, throw_exception=True) -> None: has_duplicates = False - for i in range(len(self._strategies)): - for j in range(i + 1, len(self._strategies)): - if self._strategies[i].worker_id == strategy.worker_id: - has_duplicates = True - break + for existing_strategy in self._strategies: + if existing_strategy.strategy_id == strategy.strategy_id: + has_duplicates = True + break if has_duplicates: raise OperationalException( diff --git a/show_docs.sh b/show_docs.sh new file mode 100755 index 00000000..02084f7f --- /dev/null +++ b/show_docs.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -e + +cd "$(dirname "$0")/docusaurus" + +if [ ! -d "node_modules" ]; then + echo "Installing dependencies..." + npm install +fi + +if [ "$1" = "--build" ] || [ "$1" = "-b" ]; then + echo "Building docs..." + npm run build + echo "Serving built docs..." + npm run serve +else + echo "Starting Docusaurus dev server..." + npm start +fi diff --git a/tests/app/test_eventloop.py b/tests/app/test_eventloop.py index 9bd07008..2e4e6a0a 100644 --- a/tests/app/test_eventloop.py +++ b/tests/app/test_eventloop.py @@ -1,250 +1,258 @@ -# from datetime import datetime, timezone, timedelta -# from typing import Any -# import os -# import shutil -# -# from investing_algorithm_framework import TradingStrategy, DataSource, \ -# DataType, MarketCredential, PortfolioConfiguration, Order, Trade, \ -# CCXTOHLCVDataProvider, BacktestDateRange, DataProvider, \ -# INDEX_DATETIME, OrderStatus -# from investing_algorithm_framework.app.eventloop import EventLoopService -# from investing_algorithm_framework.domain import ENVIRONMENT, Environment, \ -# BACKTESTING_START_DATE, LAST_SNAPSHOT_DATETIME, \ -# SNAPSHOT_INTERVAL, SnapshotInterval -# from investing_algorithm_framework.infrastructure import BacktestOrderExecutor -# from investing_algorithm_framework.services import DataProviderService, \ -# BacktestTradeOrderEvaluator -# from tests.resources import TestBase, OrderExecutorTest -# -# -# class CustomFeedDataProvider(DataProvider): -# -# def has_data(self, data_source: DataSource, start_date: datetime = None, -# end_date: datetime = None) -> bool: -# pass -# -# def prepare_backtest_data(self, backtest_start_date, -# backtest_end_date) -> None: -# pass -# -# def get_data(self, date: datetime = None, start_date: datetime = None, -# end_date: datetime = None, save: bool = False) -> Any: -# pass -# -# def get_backtest_data(self, backtest_index_date: datetime, -# backtest_start_date: datetime = None, -# backtest_end_date: datetime = None) -> Any: -# pass -# -# def copy(self, data_source: DataSource) -> "DataProvider": -# pass -# -# -# class StrategyForTesting(TradingStrategy): -# data_sources = [ -# DataSource( -# identifier="DOT/EUR_2h", -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="DOT/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="BTC/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# ] -# time_unit = "hour" -# interval = 2 -# -# def run_strategy(self, context, data): -# pass -# -# class StrategyForTestingTwo(TradingStrategy): -# data_sources = [ -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="ETH/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.CUSTOM, -# data_provider_identifier="custom_feed_data" -# ), -# ] -# time_unit = "hour" -# interval = 4 -# -# def run_strategy(self, context, data): -# pass -# -# -# class StrategyForTestingThree(TradingStrategy): -# data_sources = [ -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="BTC/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.CUSTOM, -# data_provider_identifier="twitter_data" -# ), -# ] -# time_unit = "day" -# interval = 1 -# -# def run_strategy(self, context, market_data): -# pass -# -# -# class TestEventloopService(TestBase): -# initialize = False -# storage_repo_type = "pandas" -# market_credentials = [ -# MarketCredential( -# market="bitvavo", -# api_key="api_key", -# secret_key="secret_key", -# ) -# ] -# external_balances = { -# "EUR": 1000 -# } -# portfolio_configurations = [ -# PortfolioConfiguration( -# market="bitvavo", -# trading_symbol="EUR", -# initial_balance=1000 -# ) -# ] -# -# -# def test_initialize(self): -# self.app.initialize_config() -# self.app.initialize_storage() -# self.app.initialize_services() -# self.app.initialize_portfolios() -# event_loop_service = EventLoopService( -# order_service=self.app.container.order_service(), -# portfolio_service=self.app.container.portfolio_service(), -# configuration_service=self.app.container.configuration_service(), -# data_provider_service=self.app.container.data_provider_service(), -# context=self.app.container.context(), -# trade_service=self.app.container.trade_service(), -# portfolio_snapshot_service=self.app.container.portfolio_snapshot_service(), -# ) -# self.app.add_strategy( -# StrategyForTesting(), -# ) -# self.app.add_strategy( -# StrategyForTestingTwo(), -# ) -# self.app.add_strategy( -# StrategyForTestingThree(), -# ) -# event_loop_service.initialize( -# trade_order_evaluator=BacktestTradeOrderEvaluator( -# trade_service=self.app.container.trade_service(), -# order_service=self.app.container.order_service(), -# ), -# algorithm=self.app.get_algorithm() -# ) -# self.assertEqual(len(event_loop_service.next_run_times), 3) -# self.assertEqual(len(event_loop_service.data_sources), 5) -# -# # Each next run time should be set to the current datatime -# # because no runs have been executed yet -# for strategy in event_loop_service.strategies: -# self.assertIn( -# strategy.strategy_id, -# event_loop_service.next_run_times -# ) -# self.assertAlmostEqual( -# event_loop_service\ -# .next_run_times[strategy.strategy_id]["next_run"], -# datetime.now(tz=timezone.utc), -# delta=timedelta(seconds=10) -# ) -# -# def test_get_data_sources_for_iteration(self): -# correct_data_sources = [ -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="ETH/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.CUSTOM, -# data_provider_identifier="custom_feed_data" -# ), -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="DOT/EUR", -# time_frame="2h", -# market="bitvavo" -# ) -# ] -# -# data_sources = EventLoopService._get_data_sources_for_iteration( -# [ -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="DOT/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.CUSTOM, -# data_provider_identifier="custom_feed_data" -# ), -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="ETH/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.OHLCV, -# window_size=200, -# symbol="ETH/EUR", -# time_frame="2h", -# market="bitvavo" -# ), -# DataSource( -# data_type=DataType.CUSTOM, -# data_provider_identifier="custom_feed_data" -# ), -# ], -# ) -# -# self.assertEqual(data_sources, set(correct_data_sources)) -# -# def tearDown(self) -> None: -# super().tearDown() -# -# databases_directory = os.path.join(self.resource_directory, "databases") -# backtest_databases_directory = os.path.join(self.resource_directory, "backtest_databases") -# -# if os.path.exists(databases_directory): -# shutil.rmtree(databases_directory) -# -# if os.path.exists(backtest_databases_directory): -# shutil.rmtree(backtest_databases_directory) -# +from datetime import datetime, timezone, timedelta +from typing import Any +import os +import shutil + +from investing_algorithm_framework import TradingStrategy, DataSource, \ + DataType, MarketCredential, PortfolioConfiguration, Order, Trade, \ + CCXTOHLCVDataProvider, BacktestDateRange, DataProvider, \ + INDEX_DATETIME, OrderStatus +from investing_algorithm_framework.app.eventloop import EventLoopService +from investing_algorithm_framework.domain import ENVIRONMENT, Environment, \ + BACKTESTING_START_DATE, LAST_SNAPSHOT_DATETIME, \ + SNAPSHOT_INTERVAL, SnapshotInterval +from investing_algorithm_framework.infrastructure import BacktestOrderExecutor +from investing_algorithm_framework.services import DataProviderService, \ + BacktestTradeOrderEvaluator +from tests.resources import TestBase, OrderExecutorTest + + +class CustomFeedDataProvider(DataProvider): + + def has_data(self, data_source: DataSource, start_date: datetime = None, + end_date: datetime = None) -> bool: + pass + + def prepare_backtest_data(self, backtest_start_date, + backtest_end_date) -> None: + pass + + def get_data(self, date: datetime = None, start_date: datetime = None, + end_date: datetime = None, save: bool = False) -> Any: + pass + + def get_backtest_data(self, backtest_index_date: datetime, + backtest_start_date: datetime = None, + backtest_end_date: datetime = None) -> Any: + pass + + def copy(self, data_source: DataSource) -> "DataProvider": + pass + + +class StrategyForTesting(TradingStrategy): + data_sources = [ + DataSource( + identifier="DOT/EUR_2h", + data_type=DataType.OHLCV, + window_size=200, + symbol="DOT/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="BTC/EUR", + time_frame="2h", + market="bitvavo" + ), + ] + time_unit = "hour" + interval = 2 + + def run_strategy(self, context, data): + pass + +class StrategyForTestingTwo(TradingStrategy): + data_sources = [ + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="ETH/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.CUSTOM, + data_provider_identifier="custom_feed_data" + ), + ] + time_unit = "hour" + interval = 4 + + def run_strategy(self, context, data): + pass + + +class StrategyForTestingThree(TradingStrategy): + data_sources = [ + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="BTC/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.CUSTOM, + data_provider_identifier="twitter_data" + ), + ] + time_unit = "day" + interval = 1 + + def run_strategy(self, context, market_data): + pass + + +class TestEventloopService(TestBase): + initialize = False + market_credentials = [ + MarketCredential( + market="bitvavo", + api_key="api_key", + secret_key="secret_key", + ) + ] + external_balances = { + "EUR": 1000 + } + portfolio_configurations = [ + PortfolioConfiguration( + market="bitvavo", + trading_symbol="EUR", + initial_balance=1000 + ) + ] + + + def test_initialize(self): + self.app.initialize_config() + self.app.initialize_storage() + self.app.initialize_services() + self.app.initialize_portfolios() + event_loop_service = EventLoopService( + order_service=self.app.container.order_service(), + portfolio_service=self.app.container.portfolio_service(), + configuration_service=self.app.container.configuration_service(), + data_provider_service=self.app.container.data_provider_service(), + context=self.app.container.context(), + trade_service=self.app.container.trade_service(), + portfolio_snapshot_service=self.app.container.portfolio_snapshot_service(), + ) + self.app.add_strategy( + StrategyForTesting(), + ) + self.app.add_strategy( + StrategyForTestingTwo(), + ) + self.app.add_strategy( + StrategyForTestingThree(), + ) + event_loop_service.initialize( + trade_order_evaluator=BacktestTradeOrderEvaluator( + trade_service=self.app.container.trade_service(), + order_service=self.app.container.order_service(), + trade_stop_loss_service=self.app.container.trade_stop_loss_service(), + trade_take_profit_service=self.app.container.trade_take_profit_service(), + ), + algorithm=self.app.get_algorithm() + ) + self.assertEqual(len(event_loop_service.next_run_times), 3) + self.assertEqual(len(event_loop_service.data_sources), 5) + + # Each next run time should be set to the current datatime + # because no runs have been executed yet + for strategy in event_loop_service.strategies: + self.assertIn( + strategy.strategy_id, + event_loop_service.next_run_times + ) + self.assertAlmostEqual( + event_loop_service\ + .next_run_times[strategy.strategy_id]["next_run"], + datetime.now(tz=timezone.utc), + delta=timedelta(seconds=10) + ) + + def test_get_data_sources_for_iteration(self): + correct_data_sources = [ + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="ETH/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.CUSTOM, + data_provider_identifier="custom_feed_data" + ), + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="DOT/EUR", + time_frame="2h", + market="bitvavo" + ) + ] + + data_sources = EventLoopService._get_data_sources_for_iteration( + [ + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="DOT/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.CUSTOM, + data_provider_identifier="custom_feed_data" + ), + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="ETH/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.OHLCV, + window_size=200, + symbol="ETH/EUR", + time_frame="2h", + market="bitvavo" + ), + DataSource( + data_type=DataType.CUSTOM, + data_provider_identifier="custom_feed_data" + ), + ], + ) + + self.assertEqual(data_sources, set(correct_data_sources)) + + def tearDown(self) -> None: + super().tearDown() + + databases_directory = os.path.join(self.resource_directory, "databases") + backtest_databases_directory = os.path.join(self.resource_directory, "backtest_databases") + + if os.path.exists(databases_directory): + shutil.rmtree(databases_directory) + + if os.path.exists(backtest_databases_directory): + shutil.rmtree(backtest_databases_directory) + + # ------------------------------------------------------------------ + # The following backtest tests remain commented out because they + # depend on CCXTOHLCVDataProvider.prepare_backtest_data() which + # downloads live OHLCV data from the Bitvavo exchange. They cannot + # run in CI or without network access to the exchange API. + # ------------------------------------------------------------------ + # def test_backtest_loop(self): # self.app.initialize_config() # self.app.initialize_storage()