diff --git a/data_drifter/README.md b/data_drifter/README.md index b1c756d..cb8706e 100644 --- a/data_drifter/README.md +++ b/data_drifter/README.md @@ -16,6 +16,11 @@ Simulates a fleet of sailboats racing and visualizes their positions in real-tim - Databricks workspace with Apps enabled - [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/install.html) installed - Unity Catalog access with permission to create tables +- A Databricks **service principal** with OAuth credentials (client ID + secret) for Zerobus Ingest authentication. To create one: + 1. Go to **Workspace Settings β†’ Identity and access β†’ Service principals β†’ Add** + 2. Create a new service principal + 3. Under the service principal's settings, generate an **OAuth secret** + 4. Save the **client ID** and **client secret** β€” you'll need them in Step 5 ### Step 1: Clone and Install Dependencies @@ -34,8 +39,8 @@ pip install -r requirements.txt ### Step 2: Configure Databricks CLI ```bash -# Configure authentication (if not already done) -databricks configure --token +# Configure a named profile (recommended) +databricks configure --token --profile MY_PROFILE # You'll be prompted for: # - Databricks Host: https://your-workspace.cloud.databricks.com @@ -66,6 +71,10 @@ sql_warehouse_id = "your-warehouse-id" ### Step 4: Deploy Everything with One Command ```bash +# Using a named profile (recommended) +DATABRICKS_CONFIG_PROFILE=MY_PROFILE ./deploy.sh + +# Or with the DEFAULT profile ./deploy.sh ``` @@ -88,17 +97,18 @@ sql_warehouse_id = "your-warehouse-id" URL: https://your-app-url.cloud.databricks.com πŸ“Š Analysis Job: - Run with: databricks bundle run sailboat_analysis + Run with: databricks bundle run sailboat_analysis --profile MY_PROFILE πŸš€ Next Steps: 1. Start telemetry generator: python3 main.py --client-id --client-secret + (use the service principal credentials you created in the Prerequisites) 2. Open app in browser (URL above) ``` ### Step 5: Start the Race! ```bash -# Generate sailboat telemetry (pass OAuth credentials as CLI arguments) +# Start the telemetry generator using your service principal credentials python3 main.py \ --client-id your-service-principal-client-id \ --client-secret your-service-principal-secret @@ -129,7 +139,7 @@ Open the app URL from deployment output in your browser. You'll see: After the race, run the analysis notebooks: ```bash -databricks bundle run sailboat_analysis +databricks bundle run sailboat_analysis --profile MY_PROFILE ``` This runs 4 notebooks in sequence: @@ -138,6 +148,8 @@ This runs 4 notebooks in sequence: 3. **Race Progress** - Position changes throughout race, mark rounding analysis 4. **Race Summary** - Executive summary with key insights +Analysis results are persisted as Delta tables in your Unity Catalog schema (e.g. `your_catalog.your_schema.finished_boats_summary`), so they can be queried independently after the job completes. + --- ## πŸ—οΈ Architecture @@ -145,9 +157,9 @@ This runs 4 notebooks in sequence: ### Data Flow ``` -Sailboat Fleet (20 boats) Weather Station (IoT device) - ↓ (gRPC/SDK) ↓ (REST API) - └──────────→ Zerobus Ingest β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +Sailboat Fleet (configurable, default=6) Weather Station (IoT device) + ↓ (gRPC/SDK) ↓ (REST API) + └──────────→ Zerobus Ingest β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ Delta Lake Tables (Unity Catalog) @@ -174,6 +186,13 @@ Sailboat Fleet (20 boats) Weather Station (IoT device) ## πŸ”§ Advanced Usage +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `DATABRICKS_CONFIG_PROFILE` | `DEFAULT` | Databricks CLI profile to use | +| `DATABRICKS_TARGET` | `dev` | Deployment target (dev/prod) | + ### Customize Race Configuration Edit `config.toml` to change: @@ -189,7 +208,7 @@ See inline comments in `config.toml` for all options. Tables are created automatically by `deploy.sh`, but if needed: ```bash -databricks bundle run create_tables +databricks bundle run create_tables --profile MY_PROFILE ``` ### Manual Permissions Grant (Optional) @@ -198,10 +217,10 @@ Permissions are granted automatically, but if needed: ```bash # Get app service principal ID -APP_SP_ID=$(databricks apps get data-drifter-regatta --output json | jq -r '.service_principal_client_id') +APP_SP_ID=$(databricks apps get data-drifter-regatta-v3 --profile MY_PROFILE --output json | jq -r '.service_principal_client_id') # Grant permissions -databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_ID +databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_ID --profile MY_PROFILE ``` ### View Deployed Resources @@ -211,10 +230,10 @@ databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_I databricks bundle resources --target dev # View app details -databricks apps get data-drifter-regatta +databricks apps get data-drifter-regatta-v3 --profile MY_PROFILE # View jobs -databricks jobs list +databricks jobs list --profile MY_PROFILE ``` --- @@ -232,19 +251,22 @@ databricks jobs list **Problem:** `deploy.sh` fails with "table_name not found" - **Solution:** Check `[zerobus]` section in `config.toml` - ensure `table_name` and `weather_station_table_name` are set +**Problem:** `deploy.sh` fails with "no module named toml" +- **Solution:** Activate the virtual environment first: `source venv/bin/activate && ./deploy.sh` + **Problem:** Permission errors during deployment - **Solution:** Ensure you have CREATE TABLE permissions in Unity Catalog **Problem:** App service principal ID not found - **Solution:** Wait a few moments for app to initialize, then re-run: ```bash - databricks bundle run grant_permissions --var app_service_principal_id= + databricks bundle run grant_permissions --var app_service_principal_id= --profile MY_PROFILE ``` ### Telemetry Generator Issues **Problem:** Missing required argument: --client-id or --client-secret -- **Solution:** Pass OAuth credentials as command-line arguments: +- **Solution:** These are OAuth credentials for a Databricks service principal (see Prerequisites): ```bash python3 main.py --client-id --client-secret ``` diff --git a/data_drifter/app/app.py b/data_drifter/app/app.py index 1dd022a..ce21167 100644 --- a/data_drifter/app/app.py +++ b/data_drifter/app/app.py @@ -1,5 +1,5 @@ """ -🌊 DATA DRIFTER REGATTA 🌊 +DATA DRIFTER REGATTA Real-Time Sailboat Race Visualization Powered by Databricks Zerobus Ingest & Lakeflow Connect @@ -11,14 +11,7 @@ import time import sys from datetime import datetime -import json import logging -import threading -from databricks.sdk.core import Config, oauth_service_principal -import folium -from folium import plugins -from streamlit_folium import st_folium -import uuid # Handle TOML for different Python versions if sys.version_info >= (3, 11): @@ -33,17 +26,10 @@ ) logger = logging.getLogger(__name__) -# Import databricks-sql-connector with error handling -try: - from databricks import sql - from databricks.sdk import WorkspaceClient -except ImportError as e: - st.error(f"❌ Failed to import databricks modules: {str(e)}") - st.info("Please ensure databricks-sql-connector and databricks-sdk are installed in requirements.txt") - st.stop() +from streamlit_folium import st_folium -# Import navigation utilities -from navigation_utils import calculate_distance +import db_connection +import components # Load configuration from config.toml file @st.cache_resource @@ -60,27 +46,27 @@ def load_config(): try: with open(config_path, "rb") as f: config_data = tomllib.load(f) - logger.info(f"βœ“ Configuration loaded successfully from: {config_path}") + logger.info(f"Configuration loaded successfully from: {config_path}") except FileNotFoundError: error_msg = f"Config file not found: {config_path}" - st.error(f"❌ {error_msg}") + st.error(f"{error_msg}") logger.error(error_msg) # Show current working directory for debugging import os cwd = os.getcwd() - st.info(f"ℹ️ Current working directory: {cwd}") + st.info(f"Current working directory: {cwd}") logger.error(f"Current working directory: {cwd}") # List files in current directory try: files = os.listdir(".") - st.info(f"ℹ️ Files in current directory: {', '.join(files[:10])}") + st.info(f"Files in current directory: {', '.join(files[:10])}") logger.error(f"Files in current directory: {files}") except: pass - st.warning("πŸ’‘ Ensure deploy.sh successfully copied config.toml to app/ directory") + st.warning("Ensure deploy.sh successfully copied config.toml to app/ directory") return None # Extract and structure configuration @@ -102,11 +88,11 @@ def load_config(): return config except KeyError as e: - st.error(f"❌ Missing required configuration key: {e}") + st.error(f"Missing required configuration key: {e}") logger.error(f"Config key error: {e}", exc_info=True) return None except Exception as e: - st.error(f"❌ Failed to load configuration: {str(e)}") + st.error(f"Failed to load configuration: {str(e)}") logger.error(f"Config load error: {str(e)}", exc_info=True) return None @@ -180,885 +166,9 @@ def load_config(): '#FD79A8', '#FDCB6E', '#E17055', '#00B894', '#00CEC9' ] -def get_connection(): - """Create Databricks SQL connection using workspace authentication""" - logger.info("Starting SQL warehouse connection attempt") - debug_mode = os.getenv("DEBUG", "false").lower() == "true" - start_time = time.time() - - try: - if debug_mode: - st.write("πŸ” Debug - Step 1: Checking environment variables") - st.write("Available DATABRICKS/DEFAULT env vars:") - for key in sorted(os.environ.keys()): - if 'DATABRICKS' in key or 'DEFAULT' in key: - value = os.environ[key] - if 'TOKEN' in key or 'SECRET' in key or 'PASSWORD' in key: - value = '***' if value else 'None' - else: - value = value[:80] if value and len(value) > 80 else value - st.write(f" {key} = {value}") - - # Get warehouse ID from config - warehouse_id = config["warehouse_id"] - - # Extract server hostname from workspace URL (remove https:// prefix) - workspace_url = config.get("workspace_url", os.getenv("DATABRICKS_HOST", "")) - if workspace_url.startswith("https://"): - server_hostname = workspace_url.replace("https://", "") - elif workspace_url.startswith("http://"): - server_hostname = workspace_url.replace("http://", "") - else: - server_hostname = workspace_url - - if debug_mode: - st.write(f"πŸ” Debug - Step 2: Connection parameters") - st.write(f" Server hostname: {server_hostname}") - st.write(f" Warehouse ID: {warehouse_id}") - st.write(f" HTTP Path: /sql/1.0/warehouses/{warehouse_id}") - - # Method 1: Try using OAuth M2M with client credentials (recommended for Databricks Apps) - client_id = os.getenv("DATABRICKS_CLIENT_ID") - client_secret = os.getenv("DATABRICKS_CLIENT_SECRET") - - if client_id and client_secret: - logger.info("Attempting OAuth M2M authentication (Method 1)") - if debug_mode: - st.write("πŸ” Debug - Step 3: Attempting connection with OAuth M2M (client credentials)...") - st.write(f" βœ… DATABRICKS_CLIENT_ID found: {client_id[:8]}...") - st.write(" βœ… DATABRICKS_CLIENT_SECRET found") - - try: - credential_provider = lambda: oauth_service_principal(Config( - host = f"https://{server_hostname}", - client_id = client_id, - client_secret = client_secret)) - - connection =sql.connect( - server_hostname = server_hostname, - http_path = f"/sql/1.0/warehouses/{warehouse_id}", - credentials_provider = credential_provider) - - if debug_mode: - st.write(" Connection object created, testing query...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 'test' as status, current_database() as db") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… OAuth M2M authentication successful! Result: {result}") - - logger.info(f"OAuth M2M authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as oauth_error: - logger.warning(f"OAuth M2M auth failed: {str(oauth_error)}") - if debug_mode: - st.error(f"❌ OAuth M2M auth failed: {str(oauth_error)}") - st.write(" Falling back to alternative auth methods...") - else: - if debug_mode: - st.write("πŸ” Debug - Step 3: Client credentials not found in environment") - if not client_id: - st.write(" ❌ DATABRICKS_CLIENT_ID not set") - if not client_secret: - st.write(" ❌ DATABRICKS_CLIENT_SECRET not set") - st.write(" Attempting connection with SDK default auth...") - - # Method 2: Try SDK automatic authentication - # In Databricks Apps, use the SDK's automatic authentication - # The app runs as a service principal and credentials are handled automatically - logger.info("Attempting SDK OAuth authentication (Method 2)") - try: - if debug_mode: - st.write(" Initializing WorkspaceClient for auth...") - - # Initialize WorkspaceClient - it will automatically use the service principal - from databricks.sdk import WorkspaceClient - from databricks.sdk.core import ApiClient - - # Create config that will auto-detect credentials - cfg = Config() - - if debug_mode: - st.write(f" Config host: {cfg.host if cfg.host else 'auto-detect'}") - st.write(" Creating API client...") - - # Create API client for making authenticated requests - api_client = ApiClient(cfg) - - if debug_mode: - st.write(" Getting auth headers...") - - # Get authentication headers - def get_token(): - headers = api_client.do("GET", "/api/2.0/preview/scim/v2/Me", - headers={}, data={}, raw=True).headers - # Extract token from Authorization header - auth_header = headers.get('Authorization', '') - if auth_header.startswith('Bearer '): - return auth_header[7:] - return None - - if debug_mode: - st.write(" Connecting to SQL Warehouse...") - - # Connect using the SDK's auth mechanism - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - auth_type="databricks-oauth", # Use OAuth authentication - _socket_timeout=30 - ) - - if debug_mode: - st.write(" Connection object created, testing query...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 'test' as status, current_database() as db") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Connection successful! Result: {result}") - - logger.info(f"SDK OAuth authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as sdk_error: - logger.warning(f"SDK OAuth auth failed: {str(sdk_error)}") - if debug_mode: - st.error(f"❌ SDK OAuth auth method failed: {str(sdk_error)}") - st.write(f" Error type: {type(sdk_error).__name__}") - st.exception(sdk_error) - - # Method 3: Try WorkspaceClient with better error handling - logger.info("Attempting WorkspaceClient authentication (Method 3)") - if debug_mode: - st.write("πŸ” Method 3: Trying WorkspaceClient authentication...") - - try: - # Initialize WorkspaceClient with explicit host - w = WorkspaceClient(host=f"https://{server_hostname}") - - if debug_mode: - st.write(f" WorkspaceClient created") - st.write(f" Host: {w.config.host}") - - # Try to get auth method info - try: - auth_details = str(w.config) - st.write(f" Config: {auth_details[:200]}") - except: - pass - - # Try to authenticate and get token - if debug_mode: - st.write(" Attempting authentication...") - - try: - # Call authenticate to get credentials - credentials = w.config.authenticate() - if debug_mode: - st.write(f" Authentication successful, got credentials") - - # Use the credentials with SQL connector - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - credentials_provider=lambda: credentials, - _socket_timeout=30 - ) - - if debug_mode: - st.write(" SQL Connection created, testing...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 1 as test") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Method 3 successful! Test query returned: {result}") - - logger.info(f"WorkspaceClient authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as auth_error: - if debug_mode: - st.error(f" Authentication failed: {str(auth_error)}") - st.write(f" Error type: {type(auth_error).__name__}") - raise - - except Exception as wc_error: - logger.warning(f"WorkspaceClient auth failed: {str(wc_error)}") - if debug_mode: - st.error(f"❌ Method 3 failed: {str(wc_error)}") - st.write(f" Error type: {type(wc_error).__name__}") - st.exception(wc_error) - - # Method 4: Try OAuth U2M flow (for apps with attached resources) - logger.info("Attempting OAuth U2M flow (Method 4)") - if debug_mode: - st.write("πŸ” Method 4: Trying OAuth U2M flow...") - - try: - # For Databricks Apps with SQL Warehouse resources, use OAuth U2M - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - auth_type="databricks-oauth", - _socket_timeout=30 - ) - - if debug_mode: - st.write(" OAuth connection created, testing...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 1 as test") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Method 4 successful! Test query returned: {result}") - - logger.info(f"OAuth U2M authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as oauth_error: - logger.warning(f"OAuth U2M auth failed: {str(oauth_error)}") - if debug_mode: - st.error(f"❌ Method 4 failed: {str(oauth_error)}") - st.write(f" Error type: {type(oauth_error).__name__}") - st.exception(oauth_error) - - # All methods failed - logger.error(f"All connection methods failed after {time.time() - start_time:.2f}s") - if debug_mode: - st.error("❌ All connection methods failed!") - st.write("Troubleshooting suggestions:") - st.write("1. Check that the SQL Warehouse resource is properly attached") - st.write("2. Verify the warehouse is running and accessible") - st.write("3. Ensure the app has CAN_USE permission on the warehouse") - st.write("4. Check that environment variables are being set correctly") - - raise Exception("Unable to connect to Databricks SQL Warehouse. All authentication methods failed.") - - except Exception as e: - st.error(f"❌ Failed to connect to Databricks SQL: {str(e)}") - st.info("πŸ’‘ Troubleshooting tips:") - st.info("- Ensure the app has a SQL Warehouse resource with CAN_USE permission") - st.info("- Check that the warehouse is running and accessible") - st.info(f"- Warehouse ID: {warehouse_id}") - if debug_mode: - st.write("πŸ” Full error details:") - st.exception(e) - return None - -def execute_query_with_timeout(cursor, query, timeout_seconds=60): - """Execute query with a timeout using threading""" - result = [None] - error = [None] - - def run_query(): - try: - cursor.execute(query) - result[0] = True - except Exception as e: - error[0] = e - - thread = threading.Thread(target=run_query) - thread.daemon = True - thread.start() - thread.join(timeout=timeout_seconds) - - if thread.is_alive(): - raise TimeoutError(f"Query execution exceeded {timeout_seconds} seconds timeout") - if error[0]: - raise error[0] - - return result[0] - -def query_telemetry(limit=10000): - """Query latest telemetry data from table""" - logger.info(f"Starting telemetry query with limit={limit}") - debug_mode = os.getenv("DEBUG", "false").lower() == "true" - query_start = time.time() - - try: - # Step 1: Get connection - logger.info("Step 1: Establishing connection") - if debug_mode: - st.write("πŸ” Step 1: Getting connection...") - - conn = get_connection() - if not conn: - logger.error("Failed to establish connection") - st.warning("⚠️ Could not establish database connection") - return None - - logger.info(f"Step 1 complete in {time.time() - query_start:.2f}s") - - # Step 2: Count rows in table to verify we can query it - logger.info(f"Step 2: Counting rows in {TABLE_NAME}") - step2_start = time.time() - - try: - count_query = f"SELECT COUNT(*) as row_count FROM {TABLE_NAME}" - cursor = conn.cursor() - cursor.execute(count_query) - count_result = cursor.fetchone() - row_count = count_result[0] if count_result else 0 - cursor.close() - - logger.info(f"Step 2 complete: {row_count:,} rows found in {time.time() - step2_start:.2f}s") - - if row_count == 0: - logger.warning("Table is empty, no data available") - st.warning("⚠️ Table is empty. No telemetry data available yet.") - st.info("πŸ’‘ Run `python main.py` to start generating telemetry data.") - return None - - except Exception as count_error: - logger.error(f"Failed to count rows: {str(count_error)}") - st.error(f"❌ Failed to count rows in table: {str(count_error)}") - st.info("πŸ’‘ Check that:") - st.info(" - The table exists") - st.info(" - The service principal has SELECT permission on the table") - st.info(f" - Table name is correct: {TABLE_NAME}") - if debug_mode: - st.exception(count_error) - return None - - if debug_mode: - st.write(f"πŸ” Step 3: Querying table: {TABLE_NAME}") - - query = f""" - SELECT - boat_id, - boat_name, - boat_type, - timestamp, - latitude, - longitude, - speed_over_ground_knots, - heading_degrees, - wind_speed_knots, - wind_direction_degrees, - distance_traveled_nm, - distance_to_destination_nm, - vmg_knots, - current_mark_index, - marks_rounded, - total_marks, - has_started, - has_finished, - race_status - FROM {TABLE_NAME} - ORDER BY timestamp DESC - LIMIT {limit} - """ - - # Step 3: Execute query with timeout - logger.info(f"Step 3: Querying {TABLE_NAME} for {limit} rows") - step3_start = time.time() - - if debug_mode: - st.write("πŸ” Step 4: Executing query with 60 second timeout...") - - cursor = conn.cursor() - - try: - # Execute query with timeout - execute_query_with_timeout(cursor, query, timeout_seconds=60) - execution_time = time.time() - step3_start - - logger.info(f"Step 3 query execution complete in {execution_time:.2f}s") - - if debug_mode: - st.write(f" βœ… Query executed in {execution_time:.2f} seconds") - - except TimeoutError as te: - logger.error(f"Query timeout: {str(te)}") - st.error(f"❌ Query execution timeout: {str(te)}") - st.warning("πŸ’‘ Troubleshooting suggestions:") - st.info("β€’ The SQL warehouse may be slow or overloaded") - st.info("β€’ Try reducing the data limit or adding filters") - st.info("β€’ Check SQL warehouse status in Databricks UI") - st.info(f"β€’ Table: {TABLE_NAME}") - cursor.close() - conn.close() - return None - - # Step 4-5: Fetch results - logger.info("Step 4-5: Fetching results as DataFrame") - fetch_start = time.time() - - if debug_mode: - st.write("πŸ” Step 5: Fetching results...") - - df = cursor.fetchall_arrow().to_pandas() - cursor.close() - - fetch_time = time.time() - fetch_start - logger.info(f"Fetch complete: {len(df)} rows in {fetch_time:.2f}s") - - total_time = time.time() - query_start - logger.info(f"Query completed successfully in {total_time:.2f}s total") - - if debug_mode: - st.write(f"πŸ” Step 6: Retrieved {len(df)} rows") - - return df - - except TimeoutError as te: - logger.error(f"Query timeout after {time.time() - query_start:.2f}s: {str(te)}") - # Error already displayed above - return None - except Exception as e: - logger.error(f"Query failed after {time.time() - query_start:.2f}s: {str(e)}", exc_info=True) - st.error(f"❌ Failed to query data: {str(e)}") - st.info("πŸ’‘ Check:") - st.info("β€’ Table exists and has data") - st.info("β€’ Service principal has SELECT permission") - st.info(f"β€’ Table name: {TABLE_NAME}") - if debug_mode: - st.exception(e) - return None - -def query_weather_station(): - """Query latest weather station data""" - logger.info("Querying weather station data") - - # Check if weather station table is configured - weather_table = config.get("weather_table_name") - if not weather_table: - logger.warning("Weather station table not configured") - return None - - try: - conn = get_connection() - if not conn: - logger.error("Failed to establish connection for weather station query") - return None - - # Query latest weather station reading - query = f""" - SELECT - station_id, - station_name, - station_location, - timestamp, - wind_speed_knots, - wind_direction_degrees, - event_type, - in_transition, - time_in_state_seconds, - next_change_in_seconds - FROM {weather_table} - ORDER BY timestamp DESC - LIMIT 1 - """ - - cursor = conn.cursor() - cursor.execute(query) - result = cursor.fetchall_arrow().to_pandas() - cursor.close() - - if result.empty: - logger.warning("No weather station data found") - return None - - logger.info(f"Weather station data retrieved: {len(result)} record") - return result.iloc[0] # Return the latest record as a Series - - except Exception as e: - logger.error(f"Failed to query weather station: {str(e)}", exc_info=True) - return None - -def get_race_course_config(): - """Get race course configuration (marks, start/finish) from config""" - return { - "start_lat": config["race_course_start_lat"], - "start_lon": config["race_course_start_lon"], - "marks": config["race_course_marks"] - } - -def calculate_total_remaining_distance(lat, lon, current_mark_index, marks): - """ - Calculate total remaining distance along the race course in nautical miles. - - Args: - lat: Current latitude - lon: Current longitude - current_mark_index: Index of the next mark to round (0-based) - marks: List of all marks [[lat1, lon1], [lat2, lon2], ...] where last mark is finish - - Returns: - Total distance remaining along the course in nautical miles - """ - if not marks or len(marks) == 0: - return 0.0 - - total_distance = 0.0 - - # Marks to round are all except the last one (which is the finish line) - marks_to_round = marks[:-1] if len(marks) > 1 else [] - finish_lat, finish_lon = marks[-1][0], marks[-1][1] - - # If all marks are rounded, just return distance to finish - if current_mark_index >= len(marks_to_round): - return calculate_distance(lat, lon, finish_lat, finish_lon) - - # Add distance to next mark - next_mark = marks_to_round[current_mark_index] - total_distance += calculate_distance(lat, lon, next_mark[0], next_mark[1]) - - # Add distances between subsequent marks - for i in range(current_mark_index, len(marks_to_round) - 1): - mark1 = marks_to_round[i] - mark2 = marks_to_round[i + 1] - total_distance += calculate_distance(mark1[0], mark1[1], mark2[0], mark2[1]) - - # Add distance from last mark to finish - if len(marks_to_round) > 0: - last_mark = marks_to_round[-1] - total_distance += calculate_distance(last_mark[0], last_mark[1], finish_lat, finish_lon) - - return total_distance - -def create_race_map(df): - """Create interactive race map with boat tracks using Folium""" - if df is None or len(df) == 0: - st.warning("No telemetry data available") - return None - - # Convert timestamp from epoch microseconds to datetime - df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') - - # Get race course configuration - course_config = get_race_course_config() - - # Get race course marks - mark_lats = [m[0] for m in course_config['marks']] - mark_lons = [m[1] for m in course_config['marks']] - - # Calculate center of the map - center_lat = sum(mark_lats) / len(mark_lats) - center_lon = sum(mark_lons) / len(mark_lons) - - # Create Folium map - m = folium.Map( - location=[center_lat, center_lon], - zoom_start=9, - tiles='OpenStreetMap', - prefer_canvas=True - ) - - # Add race course line connecting marks - course_coords = [[lat, lon] for lat, lon in zip(mark_lats, mark_lons)] - folium.PolyLine( - course_coords, - color='gray', - weight=2, - opacity=0.7, - dash_array='10, 5', - popup='Race Course' - ).add_to(m) - - # Add markers for each race course mark - for idx, (lat, lon) in enumerate(zip(mark_lats, mark_lons)): - folium.Marker( - location=[lat, lon], - popup=f'Mark {idx + 1}', - tooltip=f'Mark {idx + 1}', - icon=folium.Icon(color='orange', icon='flag', prefix='fa') - ).add_to(m) - - # Add start line marker - folium.Marker( - location=[course_config['start_lat'], course_config['start_lon']], - popup='START', - tooltip='Start Line', - icon=folium.Icon(color='green', icon='play', prefix='fa') - ).add_to(m) - - # Add finish line marker (last mark) - finish_lat, finish_lon = course_config['marks'][-1] - folium.Marker( - location=[finish_lat, finish_lon], - popup='FINISH', - tooltip='Finish Line', - icon=folium.Icon(color='red', icon='stop', prefix='fa') - ).add_to(m) - - # Group by boat and plot tracks - boats = df.groupby('boat_id') - - for idx, (boat_id, boat_df) in enumerate(boats): - boat_df = boat_df.sort_values('timestamp') - - boat_name = boat_df['boat_name'].iloc[0] - boat_type = boat_df['boat_type'].iloc[0] - race_status = boat_df['race_status'].iloc[0] - - color = BOAT_COLORS[idx % len(BOAT_COLORS)] - - # Create boat track coordinates - track_coords = [[row['latitude'], row['longitude']] for _, row in boat_df.iterrows()] - - # Add boat track as polyline - folium.PolyLine( - track_coords, - color=color, - weight=3, - opacity=0.8, - popup=boat_name, - tooltip=boat_name - ).add_to(m) - - # Add current position marker (most recent point - first in DESC order) - latest = boat_df.iloc[-1] # Last point after sorting by timestamp ASC - - # Create popup content with boat information - popup_html = f""" -
- {boat_name}
- Status: {race_status}
- Speed: {latest['speed_over_ground_knots']:.1f} knots
- Heading: {latest['heading_degrees']:.0f}Β°
- Distance: {latest['distance_traveled_nm']:.1f} nm
- VMG: {latest['vmg_knots']:.1f} knots
- Marks: {latest['marks_rounded']}/{latest['total_marks']}
- Time: {pd.to_datetime(latest['timestamp'], unit='us').strftime('%Y-%m-%d %H:%M:%S')} -
- """ - - # Get heading for boat marker - heading = latest['heading_degrees'] - - # Use BoatMarker for racing boats, regular markers for finished/DNF - if race_status == 'finished': - folium.Marker( - location=[latest['latitude'], latest['longitude']], - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0], - icon=folium.Icon(color='lightgreen', icon='star', prefix='fa') - ).add_to(m) - elif race_status == 'dnf': - folium.Marker( - location=[latest['latitude'], latest['longitude']], - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0], - icon=folium.Icon(color='red', icon='times', prefix='fa') - ).add_to(m) - else: - # Use BoatMarker for racing boats with rounded SVG shape - plugins.BoatMarker( - location=[latest['latitude'], latest['longitude']], - heading=heading, - wind_heading=latest.get('wind_direction_degrees', heading + 45), - wind_speed=latest.get('wind_speed_knots', 10), - color=color, - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0] - ).add_to(m) - - # Add wind indicator in top left corner - # Get average wind conditions from latest positions - latest_positions_for_wind = df.sort_values('timestamp').groupby('boat_id').last() - avg_wind_speed = latest_positions_for_wind['wind_speed_knots'].mean() - avg_wind_direction = latest_positions_for_wind['wind_direction_degrees'].mean() - - # Create wind direction name - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - index = int((degrees + 11.25) / 22.5) % 16 - return directions[index] - - wind_dir_name = get_wind_direction_name(avg_wind_direction) - - # Create custom HTML for wind indicator - wind_html = f""" -
-
-
- 🌬️ WIND -
-
- ↓ -
-
- {avg_wind_speed:.1f} kts -
-
- {wind_dir_name} ({avg_wind_direction:.0f}Β°) -
-
-
- """ - - # Add the wind indicator to the map - m.get_root().html.add_child(folium.Element(wind_html)) - - return m - -def display_leaderboard(df): - """Display leaderboard with all boats""" - if df is None or len(df) == 0: - st.info("No race data available") - return - - # Get latest position for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - - # Get race course marks - course_config = get_race_course_config() - marks = course_config['marks'] - - # Calculate total remaining distance along the race course for each boat - latest_positions['total_remaining_distance'] = latest_positions.apply( - lambda row: calculate_total_remaining_distance( - row['latitude'], row['longitude'], row['current_mark_index'], marks - ), - axis=1 - ) - - # Sort by total remaining distance (less distance = better rank) - # DNF boats go to the end - latest_positions['sort_key'] = latest_positions.apply( - lambda row: ( - float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] - ), - axis=1 - ) - latest_positions = latest_positions.sort_values('sort_key') - - # Display leaderboard header - st.markdown("##### Race Leaderboard") - - # Create container for scrollable leaderboard - leaderboard_container = st.container(height=600) - - with leaderboard_container: - # Display each boat with rank, name, and distance - for rank, (idx, row) in enumerate(latest_positions.iterrows(), start=1): - boat_id = row['boat_id'] - boat_name = row['boat_name'] - distance = row['distance_traveled_nm'] - - # Create columns for rank and boat info - col1, col2 = st.columns([0.5, 4.5]) - - with col1: - # Rank with medal emoji for top 3 - if rank == 1: - st.markdown(f"### πŸ₯‡") - elif rank == 2: - st.markdown(f"### πŸ₯ˆ") - elif rank == 3: - st.markdown(f"### πŸ₯‰") - else: - st.markdown(f"### **{rank}**") - - with col2: - # Boat name and distance - st.markdown(f"**{boat_name}**") - st.caption(f"{distance:.1f} nm") - - # Add divider between boats - if rank < len(latest_positions): - st.divider() - -def display_boat_stats(df): - """Display boat statistics table""" - if df is None or len(df) == 0: - return - - # Get latest position for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - - # Get race course marks - course_config = get_race_course_config() - marks = course_config['marks'] - - # Calculate total remaining distance along the race course for each boat - latest_positions['total_remaining_distance'] = latest_positions.apply( - lambda row: calculate_total_remaining_distance( - row['latitude'], row['longitude'], row['current_mark_index'], marks - ), - axis=1 - ) - - # Sort by total remaining distance (ascending) - boat with least distance is in 1st place - # DNF boats go to the end - latest_positions['sort_key'] = latest_positions.apply( - lambda row: ( - float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] - ), - axis=1 - ) - latest_positions = latest_positions.sort_values('sort_key') - - # Create display dataframe with the new total_remaining_distance column - display_df = latest_positions[[ - 'boat_name', 'boat_type', 'race_status', 'speed_over_ground_knots', - 'distance_traveled_nm', 'total_remaining_distance', 'vmg_knots', - 'marks_rounded', 'total_marks' - ]].copy() - - display_df.columns = [ - 'Boat', 'Type', 'Status', 'Speed (kt)', 'Distance (nm)', - 'To Dest (nm)', 'VMG (kt)', 'Marks', 'Total Marks' - ] - - # Add position column - display_df.insert(0, 'Pos', range(1, len(display_df) + 1)) - - # Format marks column - display_df['Marks'] = display_df.apply(lambda row: f"{row['Marks']}/{row['Total Marks']}", axis=1) - display_df = display_df.drop('Total Marks', axis=1) - - # Round numeric columns - display_df['Speed (kt)'] = display_df['Speed (kt)'].round(1) - display_df['Distance (nm)'] = display_df['Distance (nm)'].round(1) - display_df['To Dest (nm)'] = display_df['To Dest (nm)'].round(1) - display_df['VMG (kt)'] = display_df['VMG (kt)'].round(1) - - st.dataframe( - display_df, - use_container_width=True, - hide_index=True, - column_config={ - "Status": st.column_config.TextColumn( - "Status", - help="Current race status", - ) - } - ) +# Initialize modules +db_connection.init(config, TABLE_NAME) +components.init(config, BOAT_COLORS) # Main app def main(): @@ -1068,15 +178,141 @@ def main(): # Sidebar with st.sidebar: - st.header("βš™οΈ Settings") + st.header("βš™οΈ Race Controls") + + # Auto-refresh toggle + auto_refresh = st.toggle("Auto-refresh", value=True) - # Manual refresh only - auto_refresh = False - st.info("πŸ”„ Manual refresh mode - click 'Refresh Now' to update data") + # Refresh interval slider (only shown when auto-refresh is on) + if auto_refresh: + refresh_interval = st.slider("Refresh interval (seconds)", min_value=5, max_value=60, value=REFRESH_INTERVAL, step=5) + else: + refresh_interval = REFRESH_INTERVAL # Manual refresh button - if st.button("πŸ”„ Refresh Now"): - st.rerun() + if not auto_refresh: + if st.button("πŸ”„ Refresh Now"): + st.rerun() + + # Start new race + if st.button("πŸš€ Start New Race", type="primary"): + with st.spinner("Clearing all tables..."): + try: + conn = db_connection.get_connection() + if conn: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + truncated = 0 + errors = [] + # Get all tables in the schema and truncate them (except race_control) + cursor = conn.cursor() + cursor.execute(f"SHOW TABLES IN {schema_prefix}") + tables = cursor.fetchall() + cursor.close() + for row in tables: + tbl = row[1] if len(row) > 1 else row[0] + if tbl == "race_control": + continue + full_name = f"{schema_prefix}.{tbl}" + try: + c = conn.cursor() + c.execute(f"TRUNCATE TABLE {full_name}") + c.close() + truncated += 1 + except Exception as e: + errors.append(f"{tbl}: {e}") + # Reset speed control to 1.0x + try: + c = conn.cursor() + c.execute(f""" + MERGE INTO {schema_prefix}.race_control AS target + USING (SELECT 1.0 AS speed_multiplier) AS source + ON 1=1 + WHEN MATCHED THEN UPDATE SET + speed_multiplier = source.speed_multiplier, + updated_at = current_timestamp(), + updated_by = 'app' + WHEN NOT MATCHED THEN INSERT + (speed_multiplier, updated_at, updated_by) + VALUES (source.speed_multiplier, current_timestamp(), 'app') + """) + c.close() + except Exception as e: + errors.append(f"race_control reset: {e}") + conn.close() + if errors: + st.warning(f"Truncated {truncated} tables, {len(errors)} errors: {'; '.join(errors)}") + else: + st.success(f"Cleared {truncated} tables! Run `python3 main.py` to start a new race.") + time.sleep(2) + st.rerun() + else: + st.error("Could not connect to database") + except Exception as e: + st.error(f"Failed to clear tables: {e}") + + st.divider() + + # Speed controls + st.subheader("⚑ Race Speed") + speed_msg = None + row1_col1, row1_col2 = st.columns(2) + row2_col1, row2_col2 = st.columns(2) + speed_layout = [(row1_col1, "0.5x", 0.5), (row1_col2, "1x", 1.0), + (row2_col1, "2x", 2.0), (row2_col2, "4x", 4.0)] + + for col, label, multiplier in speed_layout: + with col: + if st.button(label, use_container_width=True): + try: + conn = db_connection.get_connection() + if conn: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + cursor = conn.cursor() + cursor.execute(f""" + MERGE INTO {schema_prefix}.race_control AS target + USING (SELECT {multiplier} AS speed_multiplier) AS source + ON 1=1 + WHEN MATCHED THEN UPDATE SET + speed_multiplier = source.speed_multiplier, + updated_at = current_timestamp(), + updated_by = 'app' + WHEN NOT MATCHED THEN INSERT + (speed_multiplier, updated_at, updated_by) + VALUES (source.speed_multiplier, current_timestamp(), 'app') + """) + cursor.close() + conn.close() + speed_msg = ("success", f"Speed set to {label}") + except Exception as e: + speed_msg = ("error", f"Failed to set speed: {e}") + + # Show speed message below all buttons (full width) + if speed_msg: + if speed_msg[0] == "success": + st.success(speed_msg[1]) + else: + st.error(speed_msg[1]) + + # Current effective speed (under buttons) + speed_multiplier = 1.0 + try: + conn_speed = db_connection.get_connection() + if conn_speed: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + c = conn_speed.cursor() + c.execute(f"SELECT speed_multiplier FROM {schema_prefix}.race_control LIMIT 1") + row = c.fetchone() + if row: + speed_multiplier = float(row[0]) + c.close() + conn_speed.close() + except Exception: + pass + effective_speed = TIME_ACCELERATION * speed_multiplier + if speed_multiplier != 1.0: + st.caption(f"Current: {effective_speed:.0f}x ({TIME_ACCELERATION:.0f}x base Γ— {speed_multiplier:.1f}x)") + else: + st.caption(f"Current: {TIME_ACCELERATION:.0f}x") st.divider() @@ -1096,9 +332,6 @@ def main(): real_minutes = REAL_TIME_DURATION_SECONDS // 60 st.markdown(f"**Playback:** {real_minutes} min (real time)") - # Time acceleration - st.markdown(f"**Speed:** {TIME_ACCELERATION:.0f}x acceleration") - st.markdown(f"**Course Marks:** {len(config['race_course_marks'])}") st.markdown(f"**Fleet Size:** {config['num_boats']} boats") @@ -1106,7 +339,10 @@ def main(): # Data source st.header("πŸ“Š Data Source") - st.markdown(f"**Table:** `{TABLE_NAME}`") + st.markdown(f"**Telemetry:** `{TABLE_NAME}`") + weather_table = config.get("weather_table_name", "") + if weather_table: + st.markdown(f"**Weather:** `{weather_table}`") # Last update time st.markdown(f"**Last Update:** {datetime.now().strftime('%H:%M:%S')}") @@ -1124,218 +360,219 @@ def main(): ]) st.markdown(f"**DB Credentials:** {'βœ“' if has_credentials else 'βœ—'}") - # Query data - with st.spinner("Loading race data..."): - df = query_telemetry() + # Use st.fragment for partial refresh - only the race data section re-renders + @st.fragment(run_every=refresh_interval if auto_refresh else None) + def race_dashboard(): + # Query data + with st.spinner("Loading race data..."): + df = db_connection.query_telemetry() - if df is not None and len(df) > 0: - # Convert timestamp for display - df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') + if df is not None and len(df) > 0: + # Convert timestamp for display + df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') - # Get latest data for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + # Get latest data for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - # Get latest timestamp - latest_time = df['datetime'].max() + # Get latest timestamp + latest_time = df['datetime'].max() - # Calculate race progress - elapsed_race_seconds = (latest_time.timestamp() - RACE_START_TIME.timestamp()) - race_progress_percent = min(100, (elapsed_race_seconds / RACE_DURATION_SECONDS) * 100) if RACE_DURATION_SECONDS > 0 else 0 + # Calculate race progress + elapsed_race_seconds = (latest_time.timestamp() - RACE_START_TIME.timestamp()) + race_progress_percent = min(100, (elapsed_race_seconds / RACE_DURATION_SECONDS) * 100) if RACE_DURATION_SECONDS > 0 else 0 - # Combined Race Progress & Fleet Status in one line - st.subheader("🏁 Race Progress & Fleet Status β›΅") + # Combined Race Progress & Fleet Status in one line + st.subheader("🏁 Race Progress & Fleet Status β›΅") - # Calculate fleet stats - total_boats = df['boat_id'].nunique() - racing = len(latest_positions[latest_positions['race_status'] == 'racing']) - finished = len(latest_positions[latest_positions['race_status'] == 'finished']) - dnf = len(latest_positions[latest_positions['race_status'] == 'dnf']) - not_started = len(latest_positions[latest_positions['race_status'] == 'not_started']) + # Calculate fleet stats + total_boats = df['boat_id'].nunique() + racing = len(latest_positions[latest_positions['race_status'] == 'racing']) + finished = len(latest_positions[latest_positions['race_status'] == 'finished']) + dnf = len(latest_positions[latest_positions['race_status'] == 'dnf']) + not_started = len(latest_positions[latest_positions['race_status'] == 'not_started']) - # Display race time - elapsed_days = int(elapsed_race_seconds // 86400) - elapsed_hours = int((elapsed_race_seconds % 86400) // 3600) - elapsed_minutes = int((elapsed_race_seconds % 3600) // 60) - time_str = f"{elapsed_days}d {elapsed_hours}h {elapsed_minutes}m" if elapsed_days > 0 else f"{elapsed_hours}h {elapsed_minutes}m" + # Display race time + elapsed_days = int(elapsed_race_seconds // 86400) + elapsed_hours = int((elapsed_race_seconds % 86400) // 3600) + elapsed_minutes = int((elapsed_race_seconds % 3600) // 60) + time_str = f"{elapsed_days}d {elapsed_hours}h {elapsed_minutes}m" if elapsed_days > 0 else f"{elapsed_hours}h {elapsed_minutes}m" - # Single row with progress bar and all fleet metrics - col1, col2, col3, col4, col5, col6, col7 = st.columns([3, 1, 1, 1, 1, 1, 1]) + # Single row with progress bar and all fleet metrics + col1, col2, col3, col4, col5, col6, col7 = st.columns([3, 1, 1, 1, 1, 1, 1]) - with col1: - st.progress(race_progress_percent / 100.0) - st.caption(f"{time_str} elapsed | {latest_time.strftime('%Y-%m-%d %H:%M UTC')}") + with col1: + st.progress(race_progress_percent / 100.0) + st.caption(f"{time_str} elapsed | {latest_time.strftime('%Y-%m-%d %H:%M UTC')}") - with col2: - st.metric("Progress", f"{race_progress_percent:.1f}%") + with col2: + st.metric("Progress", f"{race_progress_percent:.1f}%") - with col3: - st.metric("Total", total_boats) + with col3: + st.metric("Total", total_boats) - with col4: - st.metric("πŸƒ Racing", racing) + with col4: + st.metric("πŸƒ Racing", racing) - with col5: - st.metric("βœ… Done", finished) + with col5: + st.metric("βœ… Done", finished) - with col6: - st.metric("❌ DNF", dnf) + with col6: + st.metric("❌ DNF", dnf) - with col7: - st.metric("⏸️ Waiting", not_started) + with col7: + st.metric("⏸️ Waiting", not_started) - st.divider() + st.divider() - # Weather Station Display - st.subheader("🌀️ Captain's Weather Watch") - - # Query weather station data - weather_data = query_weather_station() - - if weather_data is not None: - # Weather station data available - wind_col1, wind_col2, wind_col3, wind_col4, wind_col5 = st.columns(5) - - # Wind direction name helper - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - idx = int((degrees % 360) / 22.5 + 0.5) % 16 - return directions[idx] - - with wind_col1: - st.metric("Wind Speed", f"{weather_data['wind_speed_knots']:.1f} kt") - - with wind_col2: - wind_dir = weather_data['wind_direction_degrees'] - st.metric("Wind Direction", f"{wind_dir:.0f}Β° ({get_wind_direction_name(wind_dir)})") - - with wind_col3: - # Format event type for display - event_type = weather_data['event_type'].replace('_', ' ').title() - # Add emoji based on event type - event_emoji = { - 'Stable': '😌', - 'Gradual Shift': 'πŸŒ€', - 'Frontal Passage': 'β›ˆοΈ', - 'Gust': 'πŸ’¨' - }.get(event_type, '🌬️') - st.metric("Conditions", f"{event_emoji} {event_type}") - - with wind_col4: - if weather_data['in_transition']: - st.metric("Status", "⚠️ Changing") - else: - time_in_state = weather_data['time_in_state_seconds'] - if time_in_state < 60: - st.metric("Status", f"βœ… Stable ({time_in_state:.0f}s)") - elif time_in_state < 3600: - st.metric("Status", f"βœ… Stable ({time_in_state/60:.0f}m)") + # Weather Station Display + st.subheader("🌀️ Captain's Weather Watch") + + # Query weather station data + weather_data = db_connection.query_weather_station() + + if weather_data is not None: + # Weather station data available + wind_col1, wind_col2, wind_col3, wind_col4, wind_col5 = st.columns(5) + + # Wind direction name helper + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + idx = int((degrees % 360) / 22.5 + 0.5) % 16 + return directions[idx] + + with wind_col1: + st.metric("Wind Speed", f"{weather_data['wind_speed_knots']:.1f} kt") + + with wind_col2: + wind_dir = weather_data['wind_direction_degrees'] + st.metric("Wind Direction", f"{wind_dir:.0f}Β° ({get_wind_direction_name(wind_dir)})") + + with wind_col3: + # Format event type for display + event_type = weather_data['event_type'].replace('_', ' ').title() + # Add emoji based on event type + event_emoji = { + 'Stable': '😌', + 'Gradual Shift': 'πŸŒ€', + 'Frontal Passage': 'β›ˆοΈ', + 'Gust': 'πŸ’¨' + }.get(event_type, '🌬️') + st.metric("Conditions", f"{event_emoji} {event_type}") + + with wind_col4: + if weather_data['in_transition']: + st.metric("Status", "⚠️ Changing") else: - st.metric("Status", f"βœ… Stable ({time_in_state/3600:.1f}h)") - - with wind_col5: - if weather_data['in_transition']: - st.metric("Next Change", "In progress") - else: - next_change = weather_data['next_change_in_seconds'] - if next_change < 60: - st.metric("Next Change", f"~{next_change:.0f}s") - elif next_change < 3600: - st.metric("Next Change", f"~{next_change/60:.0f}m") + time_in_state = weather_data['time_in_state_seconds'] + if time_in_state < 60: + st.metric("Status", f"βœ… Stable ({time_in_state:.0f}s)") + elif time_in_state < 3600: + st.metric("Status", f"βœ… Stable ({time_in_state/60:.0f}m)") + else: + st.metric("Status", f"βœ… Stable ({time_in_state/3600:.1f}h)") + + with wind_col5: + if weather_data['in_transition']: + st.metric("Next Change", "In progress") else: - st.metric("Next Change", f"~{next_change/3600:.1f}h") + next_change = weather_data['next_change_in_seconds'] + if next_change < 60: + st.metric("Next Change", f"~{next_change:.0f}s") + elif next_change < 3600: + st.metric("Next Change", f"~{next_change/60:.0f}m") + else: + st.metric("Next Change", f"~{next_change/3600:.1f}h") - # Show weather station info - st.caption(f"πŸ“‘ {weather_data['station_name']} β€’ {weather_data['station_location']}") + # Show weather station info + st.caption(f"πŸ“‘ {weather_data['station_name']} β€’ {weather_data['station_location']}") - else: - # Fallback to telemetry-based wind display - st.info("πŸ“‘ Weather station data not available - showing approximate conditions from boat telemetry") + else: + # Fallback to telemetry-based wind display + st.info("πŸ“‘ Weather station data not available - showing approximate conditions from boat telemetry") - wind_col1, wind_col2, wind_col3, wind_col4 = st.columns(4) + wind_col1, wind_col2, wind_col3, wind_col4 = st.columns(4) - # Get average wind conditions from latest telemetry - avg_wind_speed = latest_positions['wind_speed_knots'].mean() - avg_wind_direction = latest_positions['wind_direction_degrees'].mean() - min_wind = latest_positions['wind_speed_knots'].min() - max_wind = latest_positions['wind_speed_knots'].max() + # Get average wind conditions from latest telemetry + avg_wind_speed = latest_positions['wind_speed_knots'].mean() + avg_wind_direction = latest_positions['wind_direction_degrees'].mean() + min_wind = latest_positions['wind_speed_knots'].min() + max_wind = latest_positions['wind_speed_knots'].max() - # Wind direction name - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - idx = int((degrees % 360) / 22.5 + 0.5) % 16 - return directions[idx] + # Wind direction name + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + idx = int((degrees % 360) / 22.5 + 0.5) % 16 + return directions[idx] - with wind_col1: - st.metric("Avg Wind Speed", f"{avg_wind_speed:.1f} kt") + with wind_col1: + st.metric("Avg Wind Speed", f"{avg_wind_speed:.1f} kt") - with wind_col2: - st.metric("Wind Direction", f"{avg_wind_direction:.0f}Β° ({get_wind_direction_name(avg_wind_direction)})") + with wind_col2: + st.metric("Wind Direction", f"{avg_wind_direction:.0f}Β° ({get_wind_direction_name(avg_wind_direction)})") - with wind_col3: - st.metric("Min Wind", f"{min_wind:.1f} kt") + with wind_col3: + st.metric("Min Wind", f"{min_wind:.1f} kt") - with wind_col4: - st.metric("Max Wind", f"{max_wind:.1f} kt") + with wind_col4: + st.metric("Max Wind", f"{max_wind:.1f} kt") - # Performance Statistics - st.subheader("πŸ“ˆ Performance Statistics") - perf_col1, perf_col2, perf_col3, perf_col4 = st.columns(4) + # Performance Statistics + st.subheader("πŸ“ˆ Performance Statistics") + perf_col1, perf_col2, perf_col3, perf_col4 = st.columns(4) - with perf_col1: - avg_speed = latest_positions['speed_over_ground_knots'].mean() - st.metric("Avg Speed", f"{avg_speed:.1f} kt") + with perf_col1: + avg_speed = latest_positions['speed_over_ground_knots'].mean() + st.metric("Avg Speed", f"{avg_speed:.1f} kt") - with perf_col2: - avg_vmg = latest_positions['vmg_knots'].mean() - st.metric("Avg VMG", f"{avg_vmg:.1f} kt") + with perf_col2: + avg_vmg = latest_positions['vmg_knots'].mean() + st.metric("Avg VMG", f"{avg_vmg:.1f} kt") - with perf_col3: - total_distance = latest_positions['distance_traveled_nm'].max() - st.metric("Max Distance", f"{total_distance:.1f} nm") + with perf_col3: + total_distance = latest_positions['distance_traveled_nm'].max() + st.metric("Max Distance", f"{total_distance:.1f} nm") - with perf_col4: - avg_marks = latest_positions['marks_rounded'].mean() - total_marks = latest_positions['total_marks'].iloc[0] - st.metric("Avg Progress", f"{avg_marks:.1f}/{total_marks} marks") + with perf_col4: + avg_marks = latest_positions['marks_rounded'].mean() + total_marks = latest_positions['total_marks'].iloc[0] + st.metric("Avg Progress", f"{avg_marks:.1f}/{total_marks} marks") - st.divider() + st.divider() - # Two-column layout: Race Map (left) and Leaderboard (right) - map_col, leaderboard_col = st.columns([2, 1]) + # Two-column layout: Race Map (left) and Leaderboard (right) + map_col, leaderboard_col = st.columns([2, 1]) - with map_col: - st.subheader("πŸ—ΊοΈ Race Map") - folium_map = create_race_map(df) - if folium_map: - # Disable map interaction returns to prevent reruns on zoom/pan - st_folium(folium_map, width=None, height=700, returned_objects=[]) + with map_col: + st.subheader("πŸ—ΊοΈ Race Map") + folium_map = components.create_race_map(df) + if folium_map: + # Disable map interaction returns to prevent reruns on zoom/pan + st_folium(folium_map, width=None, height=700, returned_objects=[]) - with leaderboard_col: - st.subheader("πŸ† Leaderboard") - display_leaderboard(df) + with leaderboard_col: + st.subheader("πŸ† Leaderboard") + components.display_leaderboard(df) - st.divider() + st.divider() - # Full boat statistics table - st.subheader("πŸ“‹ Boat Positions & Statistics") - display_boat_stats(df) + # Full boat statistics table + st.subheader("πŸ“‹ Boat Positions & Statistics") + components.display_boat_stats(df) - # Race timeline info - st.caption(f"Race time: {latest_time.strftime('%Y-%m-%d %H:%M:%S UTC')} | Total telemetry records: {len(df):,}") + # Race timeline info + st.caption(f"Race time: {latest_time.strftime('%Y-%m-%d %H:%M:%S UTC')} | Total telemetry records: {len(df):,}") - else: - st.warning("⚠️ No race data available. Make sure the telemetry generator is running and sending data to the table.") - st.info("Run `python main.py` to start the race simulation.") + else: + st.warning("⚠️ No race data available. Make sure the telemetry generator is running and sending data to the table.") + st.info("Run `python main.py` to start the race simulation.") - # Auto-refresh - always refresh regardless of data availability - if auto_refresh: - time.sleep(REFRESH_INTERVAL) - st.rerun() + # Render the auto-refreshing fragment + race_dashboard() except Exception as e: - st.error(f"❌ Application Error: {str(e)}") + st.error(f"Application Error: {str(e)}") st.exception(e) - st.info("πŸ”„ Click 'Refresh Now' in the sidebar to retry...") + st.info("Click 'Refresh Now' in the sidebar to retry...") if __name__ == "__main__": try: diff --git a/data_drifter/app/components.py b/data_drifter/app/components.py new file mode 100644 index 0000000..1ae83dc --- /dev/null +++ b/data_drifter/app/components.py @@ -0,0 +1,433 @@ +""" +UI rendering components for the Data Drifter Regatta app. + +Usage: + import components + components.init(config, boat_colors) + folium_map = components.create_race_map(df) + components.display_leaderboard(df) + components.display_boat_stats(df) +""" + +import streamlit as st +import pandas as pd +import folium +from folium import plugins +from navigation_utils import calculate_distance + +# Module-level state set by init() +_config = None +_boat_colors = None + + +def init(config, boat_colors): + """Initialize the components module with config and boat colors.""" + global _config, _boat_colors + _config = config + _boat_colors = boat_colors + + +def get_race_course_config(): + """Get race course configuration (marks, start/finish) from config""" + return { + "start_lat": _config["race_course_start_lat"], + "start_lon": _config["race_course_start_lon"], + "marks": _config["race_course_marks"] + } + + +def calculate_total_remaining_distance(lat, lon, current_mark_index, marks): + """ + Calculate total remaining distance along the race course in nautical miles. + + Args: + lat: Current latitude + lon: Current longitude + current_mark_index: Index of the next mark to round (0-based) + marks: List of all marks [[lat1, lon1], [lat2, lon2], ...] where last mark is finish + + Returns: + Total distance remaining along the course in nautical miles + """ + if not marks or len(marks) == 0: + return 0.0 + + total_distance = 0.0 + + # Marks to round are all except the last one (which is the finish line) + marks_to_round = marks[:-1] if len(marks) > 1 else [] + finish_lat, finish_lon = marks[-1][0], marks[-1][1] + + # If all marks are rounded, just return distance to finish + if current_mark_index >= len(marks_to_round): + return calculate_distance(lat, lon, finish_lat, finish_lon) + + # Add distance to next mark + next_mark = marks_to_round[current_mark_index] + total_distance += calculate_distance(lat, lon, next_mark[0], next_mark[1]) + + # Add distances between subsequent marks + for i in range(current_mark_index, len(marks_to_round) - 1): + mark1 = marks_to_round[i] + mark2 = marks_to_round[i + 1] + total_distance += calculate_distance(mark1[0], mark1[1], mark2[0], mark2[1]) + + # Add distance from last mark to finish + if len(marks_to_round) > 0: + last_mark = marks_to_round[-1] + total_distance += calculate_distance(last_mark[0], last_mark[1], finish_lat, finish_lon) + + return total_distance + + +def create_race_map(df): + """Create interactive race map with boat tracks using Folium""" + if df is None or len(df) == 0: + st.warning("No telemetry data available") + return None + + # Convert timestamp from epoch microseconds to datetime + df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') + + # Get race course configuration + course_config = get_race_course_config() + + # Get race course marks + mark_lats = [m[0] for m in course_config['marks']] + mark_lons = [m[1] for m in course_config['marks']] + + # Center map on current boat positions (centroid of fleet) + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + racing_boats = latest_positions[latest_positions['race_status'].isin(['racing', 'not_started'])] + if len(racing_boats) > 0: + center_lat = racing_boats['latitude'].mean() + center_lon = racing_boats['longitude'].mean() + else: + # All finished/DNF β€” center on all boats + center_lat = latest_positions['latitude'].mean() + center_lon = latest_positions['longitude'].mean() + + # Auto-fit zoom to show all current boat positions + lat_spread = latest_positions['latitude'].max() - latest_positions['latitude'].min() + lon_spread = latest_positions['longitude'].max() - latest_positions['longitude'].min() + spread = max(lat_spread, lon_spread) + if spread < 0.05: + zoom = 13 + elif spread < 0.2: + zoom = 11 + elif spread < 0.5: + zoom = 10 + elif spread < 1.5: + zoom = 9 + elif spread < 3.0: + zoom = 8 + else: + zoom = 7 + + # Create Folium map + m = folium.Map( + location=[center_lat, center_lon], + zoom_start=zoom, + tiles='OpenStreetMap', + prefer_canvas=True + ) + + # Add race course line connecting marks + course_coords = [[lat, lon] for lat, lon in zip(mark_lats, mark_lons)] + folium.PolyLine( + course_coords, + color='gray', + weight=2, + opacity=0.7, + dash_array='10, 5', + popup='Race Course' + ).add_to(m) + + # Add markers for each race course mark + for idx, (lat, lon) in enumerate(zip(mark_lats, mark_lons)): + folium.Marker( + location=[lat, lon], + popup=f'Mark {idx + 1}', + tooltip=f'Mark {idx + 1}', + icon=folium.Icon(color='orange', icon='flag', prefix='fa') + ).add_to(m) + + # Add start line marker + folium.Marker( + location=[course_config['start_lat'], course_config['start_lon']], + popup='START', + tooltip='Start Line', + icon=folium.Icon(color='green', icon='play', prefix='fa') + ).add_to(m) + + # Add finish line marker (last mark) + finish_lat, finish_lon = course_config['marks'][-1] + folium.Marker( + location=[finish_lat, finish_lon], + popup='FINISH', + tooltip='Finish Line', + icon=folium.Icon(color='red', icon='stop', prefix='fa') + ).add_to(m) + + # Group by boat and plot tracks + boats = df.groupby('boat_id') + + for idx, (boat_id, boat_df) in enumerate(boats): + boat_df = boat_df.sort_values('timestamp') + + boat_name = boat_df['boat_name'].iloc[0] + boat_type = boat_df['boat_type'].iloc[0] + race_status = boat_df['race_status'].iloc[0] + + color = _boat_colors[idx % len(_boat_colors)] + + # Create boat track coordinates + track_coords = [[row['latitude'], row['longitude']] for _, row in boat_df.iterrows()] + + # Add boat track as polyline + folium.PolyLine( + track_coords, + color=color, + weight=3, + opacity=0.8, + popup=boat_name, + tooltip=boat_name + ).add_to(m) + + # Add current position marker (most recent point - first in DESC order) + latest = boat_df.iloc[-1] # Last point after sorting by timestamp ASC + + # Create popup content with boat information + popup_html = f""" +
+ {boat_name}
+ Status: {race_status}
+ Speed: {latest['speed_over_ground_knots']:.1f} knots
+ Heading: {latest['heading_degrees']:.0f}
+ Distance: {latest['distance_traveled_nm']:.1f} nm
+ VMG: {latest['vmg_knots']:.1f} knots
+ Marks: {latest['marks_rounded']}/{latest['total_marks']}
+ Time: {pd.to_datetime(latest['timestamp'], unit='us').strftime('%Y-%m-%d %H:%M:%S')} +
+ """ + + # Get heading for boat marker + heading = latest['heading_degrees'] + + # Use BoatMarker for racing boats, regular markers for finished/DNF + if race_status == 'finished': + folium.Marker( + location=[latest['latitude'], latest['longitude']], + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0], + icon=folium.Icon(color='lightgreen', icon='star', prefix='fa') + ).add_to(m) + elif race_status == 'dnf': + folium.Marker( + location=[latest['latitude'], latest['longitude']], + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0], + icon=folium.Icon(color='red', icon='times', prefix='fa') + ).add_to(m) + else: + # Use BoatMarker for racing boats with rounded SVG shape + plugins.BoatMarker( + location=[latest['latitude'], latest['longitude']], + heading=heading, + wind_heading=latest.get('wind_direction_degrees', heading + 45), + wind_speed=latest.get('wind_speed_knots', 10), + color=color, + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0] + ).add_to(m) + + # Add wind indicator in top left corner + latest_positions_for_wind = df.sort_values('timestamp').groupby('boat_id').last() + avg_wind_speed = latest_positions_for_wind['wind_speed_knots'].mean() + avg_wind_direction = latest_positions_for_wind['wind_direction_degrees'].mean() + + # Create wind direction name + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + index = int((degrees + 11.25) / 22.5) % 16 + return directions[index] + + wind_dir_name = get_wind_direction_name(avg_wind_direction) + + # Create custom HTML for wind indicator + wind_html = f""" +
+
+
+ WIND +
+
+ ↓ +
+
+ {avg_wind_speed:.1f} kts +
+
+ {wind_dir_name} ({avg_wind_direction:.0f}) +
+
+
+ """ + + # Add the wind indicator to the map + m.get_root().html.add_child(folium.Element(wind_html)) + + return m + + +def display_leaderboard(df): + """Display leaderboard with all boats""" + if df is None or len(df) == 0: + st.info("No race data available") + return + + # Get latest position for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + + # Get race course marks + course_config = get_race_course_config() + marks = course_config['marks'] + + # Calculate total remaining distance along the race course for each boat + latest_positions['total_remaining_distance'] = latest_positions.apply( + lambda row: calculate_total_remaining_distance( + row['latitude'], row['longitude'], row['current_mark_index'], marks + ), + axis=1 + ) + + # Sort by total remaining distance (less distance = better rank) + # DNF boats go to the end + latest_positions['sort_key'] = latest_positions.apply( + lambda row: ( + float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] + ), + axis=1 + ) + latest_positions = latest_positions.sort_values('sort_key') + + # Display leaderboard header + st.markdown("##### Race Leaderboard") + + # Create container for scrollable leaderboard + leaderboard_container = st.container(height=600) + + with leaderboard_container: + # Display each boat with rank, name, and distance + for rank, (idx, row) in enumerate(latest_positions.iterrows(), start=1): + boat_id = row['boat_id'] + boat_name = row['boat_name'] + distance = row['distance_traveled_nm'] + + # Create columns for rank and boat info + col1, col2 = st.columns([0.5, 4.5]) + + with col1: + # Rank with medal emoji for top 3 + if rank == 1: + st.markdown(f"### πŸ₯‡") + elif rank == 2: + st.markdown(f"### πŸ₯ˆ") + elif rank == 3: + st.markdown(f"### πŸ₯‰") + else: + st.markdown(f"### **{rank}**") + + with col2: + # Boat name and distance + st.markdown(f"**{boat_name}**") + st.caption(f"{distance:.1f} nm") + + # Add divider between boats + if rank < len(latest_positions): + st.divider() + + +def display_boat_stats(df): + """Display boat statistics table""" + if df is None or len(df) == 0: + return + + # Get latest position for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + + # Get race course marks + course_config = get_race_course_config() + marks = course_config['marks'] + + # Calculate total remaining distance along the race course for each boat + latest_positions['total_remaining_distance'] = latest_positions.apply( + lambda row: calculate_total_remaining_distance( + row['latitude'], row['longitude'], row['current_mark_index'], marks + ), + axis=1 + ) + + # Sort by total remaining distance (ascending) - boat with least distance is in 1st place + # DNF boats go to the end + latest_positions['sort_key'] = latest_positions.apply( + lambda row: ( + float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] + ), + axis=1 + ) + latest_positions = latest_positions.sort_values('sort_key') + + # Create display dataframe with the new total_remaining_distance column + display_df = latest_positions[[ + 'boat_name', 'boat_type', 'race_status', 'speed_over_ground_knots', + 'distance_traveled_nm', 'total_remaining_distance', 'vmg_knots', + 'marks_rounded', 'total_marks' + ]].copy() + + display_df.columns = [ + 'Boat', 'Type', 'Status', 'Speed (kt)', 'Distance (nm)', + 'To Dest (nm)', 'VMG (kt)', 'Marks', 'Total Marks' + ] + + # Add position column + display_df.insert(0, 'Pos', range(1, len(display_df) + 1)) + + # Format marks column + display_df['Marks'] = display_df.apply(lambda row: f"{row['Marks']}/{row['Total Marks']}", axis=1) + display_df = display_df.drop('Total Marks', axis=1) + + # Round numeric columns + display_df['Speed (kt)'] = display_df['Speed (kt)'].round(1) + display_df['Distance (nm)'] = display_df['Distance (nm)'].round(1) + display_df['To Dest (nm)'] = display_df['To Dest (nm)'].round(1) + display_df['VMG (kt)'] = display_df['VMG (kt)'].round(1) + + st.dataframe( + display_df, + use_container_width=True, + hide_index=True, + column_config={ + "Status": st.column_config.TextColumn( + "Status", + help="Current race status", + ) + } + ) diff --git a/data_drifter/app/db_connection.py b/data_drifter/app/db_connection.py new file mode 100644 index 0000000..ab02690 --- /dev/null +++ b/data_drifter/app/db_connection.py @@ -0,0 +1,522 @@ +""" +Database connection and query logic for the Data Drifter Regatta app. + +Usage: + import db_connection + db_connection.init(config, table_name) + df = db_connection.query_telemetry() +""" + +import os +import time +import logging +import threading + +import streamlit as st +from databricks.sdk.core import Config, oauth_service_principal + +# Import databricks-sql-connector with error handling +try: + from databricks import sql + from databricks.sdk import WorkspaceClient +except ImportError as e: + st.error(f"Failed to import databricks modules: {str(e)}") + st.info("Please ensure databricks-sql-connector and databricks-sdk are installed in requirements.txt") + st.stop() + +logger = logging.getLogger(__name__) + +# Module-level state set by init() +_config = None +_table_name = None + + +def init(config, table_name): + """Initialize the db_connection module with config and table name.""" + global _config, _table_name + _config = config + _table_name = table_name + + +def get_connection(): + """Create Databricks SQL connection using workspace authentication""" + logger.info("Starting SQL warehouse connection attempt") + debug_mode = os.getenv("DEBUG", "false").lower() == "true" + start_time = time.time() + + try: + if debug_mode: + st.write("Debug - Step 1: Checking environment variables") + st.write("Available DATABRICKS/DEFAULT env vars:") + for key in sorted(os.environ.keys()): + if 'DATABRICKS' in key or 'DEFAULT' in key: + value = os.environ[key] + if 'TOKEN' in key or 'SECRET' in key or 'PASSWORD' in key: + value = '***' if value else 'None' + else: + value = value[:80] if value and len(value) > 80 else value + st.write(f" {key} = {value}") + + # Get warehouse ID from config + warehouse_id = _config["warehouse_id"] + + # Extract server hostname from workspace URL (remove https:// prefix) + workspace_url = _config.get("workspace_url", os.getenv("DATABRICKS_HOST", "")) + if workspace_url.startswith("https://"): + server_hostname = workspace_url.replace("https://", "") + elif workspace_url.startswith("http://"): + server_hostname = workspace_url.replace("http://", "") + else: + server_hostname = workspace_url + + if debug_mode: + st.write(f"Debug - Step 2: Connection parameters") + st.write(f" Server hostname: {server_hostname}") + st.write(f" Warehouse ID: {warehouse_id}") + st.write(f" HTTP Path: /sql/1.0/warehouses/{warehouse_id}") + + # Method 1: Try using OAuth M2M with client credentials (recommended for Databricks Apps) + client_id = os.getenv("DATABRICKS_CLIENT_ID") + client_secret = os.getenv("DATABRICKS_CLIENT_SECRET") + + if client_id and client_secret: + logger.info("Attempting OAuth M2M authentication (Method 1)") + if debug_mode: + st.write("Debug - Step 3: Attempting connection with OAuth M2M (client credentials)...") + st.write(f" DATABRICKS_CLIENT_ID found: {client_id[:8]}...") + st.write(" DATABRICKS_CLIENT_SECRET found") + + try: + credential_provider = lambda: oauth_service_principal(Config( + host = f"https://{server_hostname}", + client_id = client_id, + client_secret = client_secret)) + + connection = sql.connect( + server_hostname = server_hostname, + http_path = f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider = credential_provider) + + if debug_mode: + st.write(" Connection object created, testing query...") + + # Test the connection + cursor = connection.cursor() + cursor.execute("SELECT 'test' as status, current_database() as db") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"OAuth M2M authentication successful! Result: {result}") + + logger.info(f"OAuth M2M authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as oauth_error: + logger.warning(f"OAuth M2M auth failed: {str(oauth_error)}") + if debug_mode: + st.error(f"OAuth M2M auth failed: {str(oauth_error)}") + st.write(" Falling back to alternative auth methods...") + else: + if debug_mode: + st.write("Debug - Step 3: Client credentials not found in environment") + if not client_id: + st.write(" DATABRICKS_CLIENT_ID not set") + if not client_secret: + st.write(" DATABRICKS_CLIENT_SECRET not set") + st.write(" Attempting connection with SDK default auth...") + + # Method 2: Try SDK automatic authentication + logger.info("Attempting SDK OAuth authentication (Method 2)") + try: + if debug_mode: + st.write(" Initializing WorkspaceClient for auth...") + + from databricks.sdk import WorkspaceClient + from databricks.sdk.core import ApiClient + + cfg = Config() + + if debug_mode: + st.write(f" Config host: {cfg.host if cfg.host else 'auto-detect'}") + st.write(" Creating API client...") + + api_client = ApiClient(cfg) + + if debug_mode: + st.write(" Getting auth headers...") + + def get_token(): + headers = api_client.do("GET", "/api/2.0/preview/scim/v2/Me", + headers={}, data={}, raw=True).headers + auth_header = headers.get('Authorization', '') + if auth_header.startswith('Bearer '): + return auth_header[7:] + return None + + if debug_mode: + st.write(" Connecting to SQL Warehouse...") + + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + auth_type="databricks-oauth", + _socket_timeout=30 + ) + + if debug_mode: + st.write(" Connection object created, testing query...") + + cursor = connection.cursor() + cursor.execute("SELECT 'test' as status, current_database() as db") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Connection successful! Result: {result}") + + logger.info(f"SDK OAuth authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as sdk_error: + logger.warning(f"SDK OAuth auth failed: {str(sdk_error)}") + if debug_mode: + st.error(f"SDK OAuth auth method failed: {str(sdk_error)}") + st.write(f" Error type: {type(sdk_error).__name__}") + st.exception(sdk_error) + + # Method 3: Try WorkspaceClient with better error handling + logger.info("Attempting WorkspaceClient authentication (Method 3)") + if debug_mode: + st.write("Method 3: Trying WorkspaceClient authentication...") + + try: + w = WorkspaceClient(host=f"https://{server_hostname}") + + if debug_mode: + st.write(f" WorkspaceClient created") + st.write(f" Host: {w.config.host}") + try: + auth_details = str(w.config) + st.write(f" Config: {auth_details[:200]}") + except: + pass + + if debug_mode: + st.write(" Attempting authentication...") + + try: + credentials = w.config.authenticate() + if debug_mode: + st.write(f" Authentication successful, got credentials") + + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider=lambda: credentials, + _socket_timeout=30 + ) + + if debug_mode: + st.write(" SQL Connection created, testing...") + + cursor = connection.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Method 3 successful! Test query returned: {result}") + + logger.info(f"WorkspaceClient authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as auth_error: + if debug_mode: + st.error(f" Authentication failed: {str(auth_error)}") + st.write(f" Error type: {type(auth_error).__name__}") + raise + + except Exception as wc_error: + logger.warning(f"WorkspaceClient auth failed: {str(wc_error)}") + if debug_mode: + st.error(f"Method 3 failed: {str(wc_error)}") + st.write(f" Error type: {type(wc_error).__name__}") + st.exception(wc_error) + + # Method 4: Try OAuth U2M flow (for apps with attached resources) + logger.info("Attempting OAuth U2M flow (Method 4)") + if debug_mode: + st.write("Method 4: Trying OAuth U2M flow...") + + try: + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + auth_type="databricks-oauth", + _socket_timeout=30 + ) + + if debug_mode: + st.write(" OAuth connection created, testing...") + + cursor = connection.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Method 4 successful! Test query returned: {result}") + + logger.info(f"OAuth U2M authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as oauth_error: + logger.warning(f"OAuth U2M auth failed: {str(oauth_error)}") + if debug_mode: + st.error(f"Method 4 failed: {str(oauth_error)}") + st.write(f" Error type: {type(oauth_error).__name__}") + st.exception(oauth_error) + + # All methods failed + logger.error(f"All connection methods failed after {time.time() - start_time:.2f}s") + if debug_mode: + st.error("All connection methods failed!") + st.write("Troubleshooting suggestions:") + st.write("1. Check that the SQL Warehouse resource is properly attached") + st.write("2. Verify the warehouse is running and accessible") + st.write("3. Ensure the app has CAN_USE permission on the warehouse") + st.write("4. Check that environment variables are being set correctly") + + raise Exception("Unable to connect to Databricks SQL Warehouse. All authentication methods failed.") + + except Exception as e: + st.error(f"Failed to connect to Databricks SQL: {str(e)}") + st.info("Troubleshooting tips:") + st.info("- Ensure the app has a SQL Warehouse resource with CAN_USE permission") + st.info("- Check that the warehouse is running and accessible") + st.info(f"- Warehouse ID: {warehouse_id}") + if debug_mode: + st.write("Full error details:") + st.exception(e) + return None + + +def execute_query_with_timeout(cursor, query, timeout_seconds=60): + """Execute query with a timeout using threading""" + result = [None] + error = [None] + + def run_query(): + try: + cursor.execute(query) + result[0] = True + except Exception as e: + error[0] = e + + thread = threading.Thread(target=run_query) + thread.daemon = True + thread.start() + thread.join(timeout=timeout_seconds) + + if thread.is_alive(): + raise TimeoutError(f"Query execution exceeded {timeout_seconds} seconds timeout") + if error[0]: + raise error[0] + + return result[0] + + +def query_telemetry(limit=10000): + """Query latest telemetry data from table""" + logger.info(f"Starting telemetry query with limit={limit}") + debug_mode = os.getenv("DEBUG", "false").lower() == "true" + query_start = time.time() + + try: + # Step 1: Get connection + logger.info("Step 1: Establishing connection") + if debug_mode: + st.write("Step 1: Getting connection...") + + conn = get_connection() + if not conn: + logger.error("Failed to establish connection") + st.warning("Could not establish database connection") + return None + + logger.info(f"Step 1 complete in {time.time() - query_start:.2f}s") + + # Step 2: Count rows in table to verify we can query it + logger.info(f"Step 2: Counting rows in {_table_name}") + step2_start = time.time() + + try: + count_query = f"SELECT COUNT(*) as row_count FROM {_table_name}" + cursor = conn.cursor() + cursor.execute(count_query) + count_result = cursor.fetchone() + row_count = count_result[0] if count_result else 0 + cursor.close() + + logger.info(f"Step 2 complete: {row_count:,} rows found in {time.time() - step2_start:.2f}s") + + if row_count == 0: + logger.warning("Table is empty, no data available") + st.warning("Table is empty. No telemetry data available yet.") + st.info("Run `python main.py` to start generating telemetry data.") + return None + + except Exception as count_error: + logger.error(f"Failed to count rows: {str(count_error)}") + st.error(f"Failed to count rows in table: {str(count_error)}") + st.info("Check that:") + st.info(" - The table exists") + st.info(" - The service principal has SELECT permission on the table") + st.info(f" - Table name is correct: {_table_name}") + if debug_mode: + st.exception(count_error) + return None + + if debug_mode: + st.write(f"Step 3: Querying table: {_table_name}") + + query = f""" + SELECT + boat_id, + boat_name, + boat_type, + timestamp, + latitude, + longitude, + speed_over_ground_knots, + heading_degrees, + wind_speed_knots, + wind_direction_degrees, + distance_traveled_nm, + distance_to_destination_nm, + vmg_knots, + current_mark_index, + marks_rounded, + total_marks, + has_started, + has_finished, + race_status + FROM {_table_name} + ORDER BY timestamp DESC + LIMIT {limit} + """ + + # Step 3: Execute query with timeout + logger.info(f"Step 3: Querying {_table_name} for {limit} rows") + step3_start = time.time() + + if debug_mode: + st.write("Step 4: Executing query with 60 second timeout...") + + cursor = conn.cursor() + + try: + execute_query_with_timeout(cursor, query, timeout_seconds=60) + execution_time = time.time() - step3_start + + logger.info(f"Step 3 query execution complete in {execution_time:.2f}s") + + if debug_mode: + st.write(f" Query executed in {execution_time:.2f} seconds") + + except TimeoutError as te: + logger.error(f"Query timeout: {str(te)}") + st.error(f"Query execution timeout: {str(te)}") + st.warning("Troubleshooting suggestions:") + st.info("- The SQL warehouse may be slow or overloaded") + st.info("- Try reducing the data limit or adding filters") + st.info("- Check SQL warehouse status in Databricks UI") + st.info(f"- Table: {_table_name}") + cursor.close() + conn.close() + return None + + # Step 4-5: Fetch results + logger.info("Step 4-5: Fetching results as DataFrame") + fetch_start = time.time() + + if debug_mode: + st.write("Step 5: Fetching results...") + + df = cursor.fetchall_arrow().to_pandas() + cursor.close() + + fetch_time = time.time() - fetch_start + logger.info(f"Fetch complete: {len(df)} rows in {fetch_time:.2f}s") + + total_time = time.time() - query_start + logger.info(f"Query completed successfully in {total_time:.2f}s total") + + if debug_mode: + st.write(f"Step 6: Retrieved {len(df)} rows") + + return df + + except TimeoutError as te: + logger.error(f"Query timeout after {time.time() - query_start:.2f}s: {str(te)}") + return None + except Exception as e: + logger.error(f"Query failed after {time.time() - query_start:.2f}s: {str(e)}", exc_info=True) + st.error(f"Failed to query data: {str(e)}") + st.info("Check:") + st.info(f"- Table exists and has data") + st.info("- Service principal has SELECT permission") + st.info(f"- Table name: {_table_name}") + if debug_mode: + st.exception(e) + return None + + +def query_weather_station(): + """Query latest weather station data""" + logger.info("Querying weather station data") + + weather_table = _config.get("weather_table_name") + if not weather_table: + logger.warning("Weather station table not configured") + return None + + try: + conn = get_connection() + if not conn: + logger.error("Failed to establish connection for weather station query") + return None + + query = f""" + SELECT + station_id, + station_name, + station_location, + timestamp, + wind_speed_knots, + wind_direction_degrees, + event_type, + in_transition, + time_in_state_seconds, + next_change_in_seconds + FROM {weather_table} + ORDER BY timestamp DESC + LIMIT 1 + """ + + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchall_arrow().to_pandas() + cursor.close() + + if result.empty: + logger.warning("No weather station data found") + return None + + logger.info(f"Weather station data retrieved: {len(result)} record") + return result.iloc[0] + + except Exception as e: + logger.error(f"Failed to query weather station: {str(e)}", exc_info=True) + return None diff --git a/data_drifter/config.toml b/data_drifter/config.toml index 2b31502..efb6bbe 100644 --- a/data_drifter/config.toml +++ b/data_drifter/config.toml @@ -114,7 +114,7 @@ race_duration_seconds = 172800 # 600 = watch 4-day race in 10 minutes (576x speed) # 1800 = watch 4-day race in 30 minutes (192x speed) # 3600 = watch 4-day race in 1 hour (96x speed) -real_time_duration_seconds = 300 +real_time_duration_seconds = 120 # How often to emit telemetry in RACE TIME (seconds) # This is the interval between telemetry records in race time @@ -130,4 +130,14 @@ stats_interval_seconds = 5.0 [warehouse] # SQL Warehouse ID for app resource permissions # The app will be granted CAN_USE permission on this warehouse -sql_warehouse_id = "" +sql_warehouse_id = "" + +[analysis] +# Wind speed category thresholds (knots) +wind_light_threshold_knots = 8 +wind_moderate_threshold_knots = 15 + +# Consistency rating thresholds (coefficient of variation) +consistency_very_consistent = 0.1 +consistency_consistent = 0.2 +consistency_variable = 0.3 diff --git a/data_drifter/databricks.yml b/data_drifter/databricks.yml index cbd17d6..365b82e 100644 --- a/data_drifter/databricks.yml +++ b/data_drifter/databricks.yml @@ -30,6 +30,14 @@ variables: description: "Schema name within catalog" default: "" + config_file_path: + description: "Path to config.toml in workspace" + default: "${workspace.file_path}/app/config.toml" + + sql_warehouse_id: + description: "SQL Warehouse ID for app resource permissions" + default: "" + # Service Principal (populated after app creation for permissions) app_service_principal_id: description: "Service principal ID of the deployed app (for permissions)" @@ -71,7 +79,7 @@ resources: resources: - name: sql-warehouse sql_warehouse: - id: dd43ee29fedd958d # Hardcoded from config.toml + id: ${var.sql_warehouse_id} permission: CAN_USE # App permissions @@ -109,6 +117,15 @@ resources: depends_on: - task_key: create_telemetry_table + - task_key: create_control_table + notebook_task: + notebook_path: ./notebooks/create_control_table.py + source: WORKSPACE + base_parameters: + telemetry_table: ${var.telemetry_table} + depends_on: + - task_key: create_weather_table + # Job permissions permissions: - level: CAN_MANAGE @@ -190,8 +207,6 @@ targets: dev: mode: development default: true - workspace: - host: https://your-dev-workspace.cloud.databricks.com # Production target (if needed in future) # prod: diff --git a/data_drifter/deploy.sh b/data_drifter/deploy.sh index 8dbed56..bd512a6 100644 --- a/data_drifter/deploy.sh +++ b/data_drifter/deploy.sh @@ -1,7 +1,13 @@ #!/bin/bash set -e -export DATABRICKS_CONFIG_PROFILE="${DATABRICKS_CONFIG_PROFILE:-default}" +# Validate prerequisites +command -v databricks >/dev/null 2>&1 || { echo "❌ databricks CLI not found. Install: https://docs.databricks.com/en/dev-tools/cli/install.html"; exit 1; } +command -v jq >/dev/null 2>&1 || { echo "❌ jq not found. Install: brew install jq"; exit 1; } +command -v python3 >/dev/null 2>&1 || { echo "❌ python3 not found"; exit 1; } +python3 -c "import toml" 2>/dev/null || python3 -c "import tomllib" 2>/dev/null || python3 -c "import tomli" 2>/dev/null || { echo "❌ No TOML parser available. Run: pip install toml (or activate your venv first)"; exit 1; } + +export DATABRICKS_CONFIG_PROFILE="${DATABRICKS_CONFIG_PROFILE:-DEFAULT}" TARGET="${DATABRICKS_TARGET:-dev}" APP_NAME="data-drifter-regatta-v3" @@ -34,16 +40,18 @@ echo "πŸ“¦ Deploying bundle..." databricks bundle deploy \ --target "$TARGET" \ + --profile="$DATABRICKS_CONFIG_PROFILE" \ --var="telemetry_table=$TELEMETRY_TABLE" \ --var="weather_table=$WEATHER_TABLE" \ --var="catalog_name=$CATALOG_NAME" \ - --var="schema_name=$SCHEMA_NAME" + --var="schema_name=$SCHEMA_NAME" \ + --var="sql_warehouse_id=$WAREHOUSE_ID" echo "βœ… Bundle deployed" # Create tables echo "πŸ—οΈ Creating tables..." -if databricks bundle run create_tables --target "$TARGET"; then +if databricks bundle run create_tables --target "$TARGET" --profile="$DATABRICKS_CONFIG_PROFILE"; then echo "βœ… Tables created" else echo "❌ Table creation failed" @@ -53,15 +61,15 @@ fi # Deploy app echo "πŸ“± Deploying app..." -CURRENT_USER=$(databricks current-user me --output json 2>/dev/null | jq -r '.userName') +CURRENT_USER=$(databricks current-user me --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.userName') BUNDLE_NAME="data_drifter_regatta" APP_SOURCE_PATH="/Workspace/Users/$CURRENT_USER/.bundle/$BUNDLE_NAME/$TARGET/files/app" -if databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -e '.name' > /dev/null; then +if databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -e '.name' > /dev/null; then # Wait for any active deployment to complete WAIT_COUNT=0 - while [ $WAIT_COUNT -lt 12 ]; do - DEPLOYMENT_STATE=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.active_deployment.deployment_state // "NONE"') + while [ $WAIT_COUNT -lt 24 ]; do + DEPLOYMENT_STATE=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.active_deployment.deployment_state // "NONE"') if [ "$DEPLOYMENT_STATE" = "NONE" ] || [ "$DEPLOYMENT_STATE" = "null" ]; then break fi @@ -70,20 +78,20 @@ if databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -e '.name' > / WAIT_COUNT=$((WAIT_COUNT + 1)) done - COMPUTE_STATUS=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.compute_status.state') + COMPUTE_STATUS=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.compute_status.state') if [ "$COMPUTE_STATUS" = "ACTIVE" ] || [ "$COMPUTE_STATUS" = "STARTING" ]; then echo "Updating running app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" else echo "Deploying and starting app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" - databricks apps start "$APP_NAME" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" + databricks apps start "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" fi else echo "Creating new app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" - databricks apps start "$APP_NAME" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" + databricks apps start "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" fi echo "βœ… App deployed" @@ -92,8 +100,8 @@ echo "βœ… App deployed" echo "πŸ” Granting permissions..." sleep 2 -if APP_SP_ID=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.service_principal_client_id'); then - if databricks bundle run grant_permissions --notebook-params="app_service_principal_id=$APP_SP_ID"; then +if APP_SP_ID=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.service_principal_client_id'); then + if databricks bundle run grant_permissions --notebook-params="app_service_principal_id=$APP_SP_ID" --profile="$DATABRICKS_CONFIG_PROFILE"; then echo "βœ… Permissions granted" else echo "⚠️ Permission grant failed" @@ -107,7 +115,7 @@ fi echo "" echo "βœ… Deployment complete!" echo "" -if APP_URL=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.url'); then +if APP_URL=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.url'); then echo "App URL: $APP_URL" fi echo "" diff --git a/data_drifter/main.py b/data_drifter/main.py index b068397..3f117af 100644 --- a/data_drifter/main.py +++ b/data_drifter/main.py @@ -27,6 +27,8 @@ from weather_station import WeatherStation from zerobus.sdk.aio import ZerobusSdk from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties +from databricks import sql as dbsql +from databricks.sdk.core import Config, oauth_service_principal # Configure logging logging.basicConfig( @@ -45,10 +47,10 @@ def load_config(config_path: str = "config.toml") -> dict: return config except FileNotFoundError: logger.error(f"βœ— Configuration file not found: {config_path}") - sys.exit(1) + raise except Exception as e: logger.error(f"βœ— Failed to load configuration: {e}") - sys.exit(1) + raise class RaceSimulator: @@ -86,6 +88,13 @@ def __init__(self, fleet: SailboatFleet, stream: Any, config: Dict[str, Any], we self.time_acceleration = 1.0 self.emission_interval_real_time = self.emission_interval_race_time + # Speed control from control table + self.speed_multiplier = 1.0 + self.last_control_check = 0 + self.control_check_interval = 5.0 # Check every 5 seconds of real time + self.control_table = None + self.sql_connection = None + # Statistics tracking self.records_sent = 0 self.records_failed = 0 @@ -93,6 +102,60 @@ def __init__(self, fleet: SailboatFleet, stream: Any, config: Dict[str, Any], we self.last_stats_time = None self.elapsed_race_time = 0.0 + def _init_control_table(self, config, client_id, client_secret): + """Initialize SQL connection for reading the speed control table""" + try: + schema_prefix = ".".join(config["zerobus"]["table_name"].split(".")[:2]) + self.control_table = f"{schema_prefix}.race_control" + + workspace_url = config["zerobus"]["workspace_url"] + server_hostname = workspace_url.replace("https://", "").replace("http://", "") + warehouse_id = config["warehouse"]["sql_warehouse_id"] + + def credential_provider(): + config_obj = Config( + host=workspace_url, + client_id=client_id, + client_secret=client_secret, + ) + return oauth_service_principal(config_obj) + + self.sql_connection = dbsql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider=credential_provider, + ) + logger.info(f"βœ“ Connected to control table: {self.control_table}") + except Exception as e: + logger.warning(f"⚠️ Could not connect to control table: {e}") + logger.warning(" Speed control from app will not be available") + self.control_table = None + + def _check_speed_control(self): + """Read speed multiplier from control table""" + if not self.control_table or not self.sql_connection: + return + + current_time = time.time() + if current_time - self.last_control_check < self.control_check_interval: + return + + self.last_control_check = current_time + try: + cursor = self.sql_connection.cursor() + cursor.execute(f"SELECT speed_multiplier FROM {self.control_table} LIMIT 1") + row = cursor.fetchone() + cursor.close() + if row and row[0] != self.speed_multiplier: + old = self.speed_multiplier + self.speed_multiplier = float(row[0]) + if self.speed_multiplier <= 0: + self.speed_multiplier = 1.0 + logger.info(f"⚑ Speed changed: {old:.1f}x β†’ {self.speed_multiplier:.1f}x") + except Exception as e: + # Silently ignore - control table might not exist yet + pass + async def run(self): """Run the race simulation""" logger.info(f"\nStarting race simulation...\n") @@ -137,8 +200,12 @@ async def run(self): self._print_stats() self.last_stats_time = current_real_time - # Sleep for real time interval before next emission - await asyncio.sleep(self.emission_interval_real_time) + # Check for speed control updates from the app + self._check_speed_control() + + # Apply speed multiplier to sleep interval + adjusted_interval = self.emission_interval_real_time / self.speed_multiplier + await asyncio.sleep(adjusted_interval) except KeyboardInterrupt: logger.info("\n\nInterrupted by user") @@ -155,8 +222,6 @@ async def _send_telemetry(self, fleet_telemetry: list): except Exception as e: self.records_failed += 1 logger.error(f"βœ— Failed to send record from {telemetry['boat_name']}: {e}") - logger.error(e) - exit(1) def _all_boats_finished(self) -> bool: """Check if all boats have finished the race""" @@ -197,7 +262,8 @@ def _print_stats(self): logger.info(f"Records sent: {self.records_sent}") logger.info(f"Records failed: {self.records_failed}") logger.info(f"Success rate: {(self.records_sent/(self.records_sent+self.records_failed)*100) if (self.records_sent+self.records_failed) > 0 else 0:.1f}%") - logger.info(f"Throughput: {rate:.2f} records/sec (real time)") + speed_str = f" | Speed: {self.speed_multiplier:.1f}x" if self.speed_multiplier != 1.0 else "" + logger.info(f"Throughput: {rate:.2f} records/sec (real time){speed_str}") logger.info("-" * 60 + "\n") def _print_final_summary(self): @@ -281,7 +347,7 @@ async def main(): Real-Time Sailing Competition β›΅ Presented by Lakeflow Connect β›΅ - Made possbile with Zeorbus Ingest + Made possible with Zerobus Ingest ════════════════════════════════════════════════════════════ """) @@ -463,6 +529,7 @@ async def main(): # Create and run the race simulator simulator = RaceSimulator(fleet, stream, config, weather_station) + simulator._init_control_table(config, CLIENT_ID, CLIENT_SECRET) await simulator.run() # Close stream @@ -473,6 +540,12 @@ async def main(): except Exception as e: logger.error(f"βœ— Error closing stream: {e}") + if simulator.sql_connection: + try: + simulator.sql_connection.close() + except: + pass + return 0 diff --git a/data_drifter/notebooks/01_boat_performance.py b/data_drifter/notebooks/01_boat_performance.py index ea1fc7a..f63f1a2 100644 --- a/data_drifter/notebooks/01_boat_performance.py +++ b/data_drifter/notebooks/01_boat_performance.py @@ -21,7 +21,7 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data +from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data, get_schema_prefix # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -151,13 +151,16 @@ # COMMAND ---------- -# Save boat performance summary -boat_performance.createOrReplaceTempView("boat_performance_summary") -finished_boats.createOrReplaceTempView("finished_boats_summary") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) -print("Boat performance results saved to temp views:") -print(" - boat_performance_summary") -print(" - finished_boats_summary") +# Save boat performance summary as tables +boat_performance.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_performance_summary") +finished_boats.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.finished_boats_summary") + +print("Boat performance results saved to tables:") +print(f" - {SCHEMA_PREFIX}.boat_performance_summary") +print(f" - {SCHEMA_PREFIX}.finished_boats_summary") # COMMAND ---------- diff --git a/data_drifter/notebooks/02_wind_conditions.py b/data_drifter/notebooks/02_wind_conditions.py index 4ccc197..ebadb66 100644 --- a/data_drifter/notebooks/02_wind_conditions.py +++ b/data_drifter/notebooks/02_wind_conditions.py @@ -21,7 +21,11 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, load_race_data +from race_utils import load_race_course_config, load_race_data, get_schema_prefix, categorize_wind +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +categorize_wind_udf = udf(lambda speed: categorize_wind(speed), StringType()) # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -38,9 +42,7 @@ # Categorize wind conditions df_with_wind = df.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ) # Check distribution @@ -146,13 +148,16 @@ # COMMAND ---------- -# Save wind condition analysis results -performance_by_wind.createOrReplaceTempView("performance_by_wind") -top_by_wind.createOrReplaceTempView("top_performers_by_wind") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) + +# Save wind condition analysis results as tables +performance_by_wind.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_wind") +top_by_wind.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_wind") -print("Wind condition analysis results saved to temp views:") -print(" - performance_by_wind") -print(" - top_performers_by_wind") +print("Wind condition analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.performance_by_wind") +print(f" - {SCHEMA_PREFIX}.top_performers_by_wind") # COMMAND ---------- @@ -319,9 +324,7 @@ # Analyze by both wind condition and point of sail combined_analysis = df_with_sail.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ).groupBy("boat_id", "boat_name", "wind_category", "point_of_sail").agg( F.avg("vmg_knots").alias("avg_vmg"), F.avg("speed_over_ground_knots").alias("avg_speed"), @@ -365,18 +368,18 @@ # COMMAND ---------- -# Save point of sail analysis results -performance_by_sail.createOrReplaceTempView("performance_by_point_of_sail") -top_by_sail.createOrReplaceTempView("top_performers_by_sail") -boat_specialization.createOrReplaceTempView("boat_specializations") -combined_analysis.createOrReplaceTempView("performance_wind_and_sail") -distance_by_sail.createOrReplaceTempView("distance_by_point_of_sail") -distance_by_sail_boat.createOrReplaceTempView("distance_by_point_of_sail_boat") +# Save point of sail analysis results as tables +performance_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_point_of_sail") +top_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_sail") +boat_specialization.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_specializations") +combined_analysis.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_wind_and_sail") +distance_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.distance_by_point_of_sail") +distance_by_sail_boat.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.distance_by_point_of_sail_boat") -print("Point of sail analysis results saved to temp views:") -print(" - performance_by_point_of_sail") -print(" - top_performers_by_sail") -print(" - boat_specializations") +print("Point of sail analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.performance_by_point_of_sail") +print(f" - {SCHEMA_PREFIX}.top_performers_by_sail") +print(f" - {SCHEMA_PREFIX}.boat_specializations") print(" - performance_wind_and_sail") print(" - distance_by_point_of_sail") print(" - distance_by_point_of_sail_boat") @@ -655,7 +658,7 @@ display(vmg_changes) # Save for use in other notebooks - vmg_changes.createOrReplaceTempView("performance_change_by_weather_event") + vmg_changes.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_change_by_weather_event") # COMMAND ---------- @@ -665,16 +668,16 @@ # COMMAND ---------- if weather_df is not None: - # Save analysis results - weather_df.createOrReplaceTempView("weather_events") - performance_by_weather_event.createOrReplaceTempView("performance_by_weather_event") - top_by_event.createOrReplaceTempView("top_performers_by_weather_event") - adaptability.createOrReplaceTempView("boat_adaptability_to_weather") - - print("Weather event analysis results saved to temp views:") - print(" - weather_events") - print(" - performance_by_weather_event") - print(" - top_performers_by_weather_event") + # Save analysis results as tables + weather_df.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.weather_events") + performance_by_weather_event.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_weather_event") + top_by_event.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_weather_event") + adaptability.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_adaptability_to_weather") + + print("Weather event analysis results saved to tables:") + print(f" - {SCHEMA_PREFIX}.weather_events") + print(f" - {SCHEMA_PREFIX}.performance_by_weather_event") + print(f" - {SCHEMA_PREFIX}.top_performers_by_weather_event") print(" - boat_adaptability_to_weather") print(" - performance_change_by_weather_event") diff --git a/data_drifter/notebooks/03_race_progress.py b/data_drifter/notebooks/03_race_progress.py index a5798cf..aa6143c 100644 --- a/data_drifter/notebooks/03_race_progress.py +++ b/data_drifter/notebooks/03_race_progress.py @@ -21,7 +21,7 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data +from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data, get_schema_prefix # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -162,6 +162,11 @@ # COMMAND ---------- +# Read consistency thresholds from config +_very_consistent = config.get("analysis", {}).get("consistency_very_consistent", 0.1) +_consistent = config.get("analysis", {}).get("consistency_consistent", 0.2) +_variable = config.get("analysis", {}).get("consistency_variable", 0.3) + # Calculate VMG consistency across legs consistency = vmg_by_leg.groupBy("boat_id", "boat_name").agg( F.avg("avg_vmg").alias("mean_vmg"), @@ -172,9 +177,9 @@ F.col("stddev_vmg") / F.abs(F.col("mean_vmg")) ).withColumn( "consistency_rating", - F.when(F.col("coefficient_of_variation") < 0.1, "Very Consistent") - .when(F.col("coefficient_of_variation") < 0.2, "Consistent") - .when(F.col("coefficient_of_variation") < 0.3, "Moderate") + F.when(F.col("coefficient_of_variation") < _very_consistent, "Very Consistent") + .when(F.col("coefficient_of_variation") < _consistent, "Consistent") + .when(F.col("coefficient_of_variation") < _variable, "Moderate") .otherwise("Variable") ).orderBy("coefficient_of_variation") @@ -215,17 +220,20 @@ # COMMAND ---------- -# Save race progress analysis results -positions.createOrReplaceTempView("race_positions_over_time") -vmg_by_leg.createOrReplaceTempView("race_leg_vmg") -leg_trend.createOrReplaceTempView("race_leg_trends") -consistency.createOrReplaceTempView("race_leg_consistency") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) + +# Save race progress analysis results as tables +positions.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_positions_over_time") +vmg_by_leg.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_vmg") +leg_trend.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_trends") +consistency.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_consistency") -print("Race progress analysis results saved to temp views:") -print(" - race_positions_over_time (includes race_position and remaining_distance_nm)") -print(" - race_leg_vmg") -print(" - race_leg_trends") -print(" - race_leg_consistency") +print("Race progress analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.race_positions_over_time") +print(f" - {SCHEMA_PREFIX}.race_leg_vmg") +print(f" - {SCHEMA_PREFIX}.race_leg_trends") +print(f" - {SCHEMA_PREFIX}.race_leg_consistency") # COMMAND ---------- diff --git a/data_drifter/notebooks/04_race_summary.py b/data_drifter/notebooks/04_race_summary.py index e3debae..71a5ab3 100644 --- a/data_drifter/notebooks/04_race_summary.py +++ b/data_drifter/notebooks/04_race_summary.py @@ -22,11 +22,18 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, load_race_data, get_finished_boats +from race_utils import load_race_course_config, load_race_data, get_finished_boats, get_schema_prefix, categorize_wind +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +categorize_wind_udf = udf(lambda speed: categorize_wind(speed), StringType()) # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) + # Load telemetry data using utility function df = load_race_data(TABLE_NAME) @@ -69,16 +76,16 @@ # MAGIC %md # MAGIC ## 1. Overall Boat Rankings # MAGIC -# MAGIC Load results from boat_performance_summary temp view created by 01_boat_performance notebook +# MAGIC Load results from boat_performance_summary table created by 01_boat_performance notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - boat_performance = spark.table("finished_boats_summary") - print("βœ“ Loaded boat rankings from finished_boats_summary temp view") + boat_performance = spark.table(f"{SCHEMA_PREFIX}.finished_boats_summary") + print(f"βœ“ Loaded boat rankings from {SCHEMA_PREFIX}.finished_boats_summary table") except: - print("⚠️ Temp view not found, calculating boat rankings...") + print("⚠️ Table not found, calculating boat rankings...") boat_performance = get_finished_boats(df) print("=" * 80) @@ -96,25 +103,23 @@ # MAGIC %md # MAGIC ## 2. Performance by Wind Condition # MAGIC -# MAGIC Load results from performance_by_wind temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_wind table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - vmg_by_wind = spark.table("performance_by_wind") - print("βœ“ Loaded wind condition performance from performance_by_wind temp view") + vmg_by_wind = spark.table(f"{SCHEMA_PREFIX}.performance_by_wind") + print(f"βœ“ Loaded wind condition performance from {SCHEMA_PREFIX}.performance_by_wind table") wind_winners = vmg_by_wind.withColumn( "rank", F.row_number().over(Window.partitionBy("wind_category").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("⚠️ Temp view not found, calculating wind condition performance...") + print("⚠️ Table not found, calculating wind condition performance...") df_conditions = df.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ) vmg_by_wind = df_conditions.groupBy("boat_id", "boat_name", "wind_category").agg( @@ -138,21 +143,21 @@ # MAGIC %md # MAGIC ## 3. Performance by Point of Sail # MAGIC -# MAGIC Load results from performance_by_point_of_sail temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_point_of_sail table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - vmg_by_sail = spark.table("performance_by_point_of_sail") - print("βœ“ Loaded point of sail performance from performance_by_point_of_sail temp view") + vmg_by_sail = spark.table(f"{SCHEMA_PREFIX}.performance_by_point_of_sail") + print(f"βœ“ Loaded point of sail performance from {SCHEMA_PREFIX}.performance_by_point_of_sail table") sail_winners = vmg_by_sail.withColumn( "rank", F.row_number().over(Window.partitionBy("point_of_sail").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("⚠️ Temp view not found, calculating point of sail performance...") + print("⚠️ Table not found, calculating point of sail performance...") from race_utils import add_point_of_sail_column df_with_sail = add_point_of_sail_column(df) @@ -178,11 +183,11 @@ # MAGIC %md # MAGIC ## 3.5. Weather Event Analysis # MAGIC -# MAGIC Load results from performance_by_weather_event temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_weather_event table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view first +# Try to load from table first from race_utils import load_weather_station_data, summarize_weather_events weather_df = load_weather_station_data(config) @@ -207,17 +212,17 @@ pct = (count / weather_summary['total_events']) * 100 print(f" {event_type.replace('_', ' ').title():20s}: {count:3d} events ({pct:5.1f}%)") - # Try to load from temp view, fallback to calculation if not available + # Try to load from table, fallback to calculation if not available try: - vmg_by_weather = spark.table("performance_by_weather_event") - print("\nβœ“ Loaded weather event performance from performance_by_weather_event temp view") + vmg_by_weather = spark.table(f"{SCHEMA_PREFIX}.performance_by_weather_event") + print(f"\nβœ“ Loaded weather event performance from {SCHEMA_PREFIX}.performance_by_weather_event") weather_winners = vmg_by_weather.withColumn( "rank", F.row_number().over(Window.partitionBy("weather_event_type").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("\n⚠️ Temp view not found, calculating weather event performance...") + print("\n⚠️ Table not found, calculating weather event performance...") from race_utils import join_telemetry_with_weather df_with_weather = join_telemetry_with_weather(df, weather_df) @@ -239,15 +244,9 @@ print(f"\n{event_name}: {row['boat_name']}") print(f" Average VMG: {row['avg_vmg']:.2f} knots") - # Calculate fleet average VMG by weather event - fleet_vmg_by_weather = df_with_weather.groupBy("weather_event_type").agg( - F.avg("vmg_knots").alias("fleet_avg_vmg"), - F.count("*").alias("observations") - ).filter(F.col("weather_event_type").isNotNull()).orderBy(F.desc("fleet_avg_vmg")) - - # Calculate fleet average VMG by weather event (only if not using temp view) + # Calculate fleet average VMG by weather event from the persisted table try: - fleet_vmg_by_weather = spark.table("performance_by_weather_event").groupBy("weather_event_type").agg( + fleet_vmg_by_weather = spark.table(f"{SCHEMA_PREFIX}.performance_by_weather_event").groupBy("weather_event_type").agg( F.avg("avg_vmg").alias("fleet_avg_vmg"), F.sum("observations").alias("observations") ).filter(F.col("weather_event_type").isNotNull()).orderBy(F.desc("fleet_avg_vmg")) @@ -307,18 +306,18 @@ # MAGIC %md # MAGIC ## 4. Position Changes Throughout Race # MAGIC -# MAGIC **Note**: Position change analysis requires full race data. Loading from race_positions_over_time temp view if available. +# MAGIC **Note**: Position change analysis requires full race data. Loading from race_positions_over_time table if available. # COMMAND ---------- -# Try to load from temp view created by 03_race_progress notebook +# Try to load from table created by 03_race_progress notebook try: print("βœ“ Position analysis available - see 03_race_progress notebook for detailed position tracking") - print(" Temp views available:") - print(" - race_positions_over_time") - print(" - race_leg_vmg") - print(" - race_leg_trends") - print(" - race_leg_consistency") + print(" Tables available:") + print(f" - {SCHEMA_PREFIX}.race_positions_over_time") + print(f" - {SCHEMA_PREFIX}.race_leg_vmg") + print(f" - {SCHEMA_PREFIX}.race_leg_trends") + print(f" - {SCHEMA_PREFIX}.race_leg_consistency") except: print("⚠️ Run 03_race_progress notebook first for detailed position analysis") diff --git a/data_drifter/notebooks/create_control_table.py b/data_drifter/notebooks/create_control_table.py new file mode 100644 index 0000000..b6947af --- /dev/null +++ b/data_drifter/notebooks/create_control_table.py @@ -0,0 +1,39 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Create Race Control Table +# MAGIC +# MAGIC Simple control table that allows the app to send speed commands to the telemetry generator. +# MAGIC The generator polls this table periodically and adjusts its playback speed. +# MAGIC +# MAGIC **Parameters:** +# MAGIC - `telemetry_table`: Full telemetry table name (used to derive schema prefix) + +# COMMAND ---------- + +telemetry_table = dbutils.widgets.get("telemetry_table") +schema_prefix = ".".join(telemetry_table.split(".")[:2]) +control_table = f"{schema_prefix}.race_control" + +print(f"Creating control table: {control_table}") + +# COMMAND ---------- + +spark.sql(f""" +CREATE TABLE IF NOT EXISTS {control_table} ( + speed_multiplier DOUBLE COMMENT 'Playback speed multiplier (0.5 = half speed, 2.0 = double speed)', + updated_at TIMESTAMP COMMENT 'When the speed was last changed', + updated_by STRING COMMENT 'Who changed the speed (app or manual)' +) +USING DELTA +COMMENT 'Race control table - allows app to control telemetry generator speed' +""") + +# Insert default row if table is empty +count = spark.sql(f"SELECT COUNT(*) as cnt FROM {control_table}").collect()[0].cnt +if count == 0: + spark.sql(f""" + INSERT INTO {control_table} VALUES (1.0, current_timestamp(), 'system') + """) + print("Inserted default speed_multiplier = 1.0") + +print(f"Control table created: {control_table}") diff --git a/data_drifter/notebooks/grant_permissions.py b/data_drifter/notebooks/grant_permissions.py index dec65e5..58ae343 100644 --- a/data_drifter/notebooks/grant_permissions.py +++ b/data_drifter/notebooks/grant_permissions.py @@ -3,13 +3,14 @@ # MAGIC # Grant Permissions to App Service Principal # MAGIC # MAGIC This notebook grants Unity Catalog permissions for the Streamlit app to access tables. -# MAGIC The app service principal needs SELECT on telemetry table and SELECT/MODIFY on weather table. +# MAGIC The app needs SELECT/MODIFY at the schema level so it can read all tables and truncate them +# MAGIC when "Start New Race" is clicked (telemetry, weather, and analysis tables). # MAGIC # MAGIC **Parameters:** # MAGIC - `catalog_name`: Unity Catalog name # MAGIC - `schema_name`: Schema name within catalog -# MAGIC - `telemetry_table`: Full table name for telemetry data -# MAGIC - `weather_table`: Full table name for weather station data +# MAGIC - `telemetry_table`: Full table name for telemetry data (unused, kept for compatibility) +# MAGIC - `weather_table`: Full table name for weather station data (unused, kept for compatibility) # MAGIC - `app_service_principal_id`: Service principal ID of the deployed app # COMMAND ---------- @@ -24,8 +25,6 @@ print(f"Granting permissions to service principal: {app_service_principal_id}") print(f"Catalog: {catalog_name}") print(f"Schema: {schema_name}") -print(f"Telemetry table: {telemetry_table}") -print(f"Weather table: {weather_table}") # COMMAND ---------- @@ -45,19 +44,13 @@ # COMMAND ---------- -# Grant SELECT on telemetry table (read-only for app) +# Grant SELECT and MODIFY on the entire schema +# This covers telemetry, weather, and all analysis tables created by the notebooks +# MODIFY is needed for "Start New Race" which truncates all tables spark.sql(f""" -GRANT SELECT ON TABLE {telemetry_table} TO `{app_service_principal_id}` +GRANT SELECT, MODIFY ON SCHEMA {catalog_name}.{schema_name} TO `{app_service_principal_id}` """) -print(f"βœ… Granted SELECT on {telemetry_table}") - -# COMMAND ---------- - -# Grant SELECT and MODIFY on weather station table (app may write weather data) -spark.sql(f""" -GRANT SELECT, MODIFY ON TABLE {weather_table} TO `{app_service_principal_id}` -""") -print(f"βœ… Granted SELECT, MODIFY on {weather_table}") +print(f"βœ… Granted SELECT, MODIFY on schema {catalog_name}.{schema_name}") # COMMAND ---------- diff --git a/data_drifter/notebooks/race_utils.py b/data_drifter/notebooks/race_utils.py index 8d80bf5..1a48e2c 100644 --- a/data_drifter/notebooks/race_utils.py +++ b/data_drifter/notebooks/race_utils.py @@ -442,3 +442,28 @@ def get_finished_boats(df): ).orderBy("rank") return finished + + +def get_schema_prefix(table_name): + """Extract catalog.schema prefix from a fully-qualified table name (catalog.schema.table).""" + return ".".join(table_name.split(".")[:2]) + + +def categorize_wind(wind_speed_knots, config=None): + """Categorize wind speed into Light/Moderate/Heavy. + + Thresholds come from config.toml [analysis] section if available, + otherwise defaults to 8/15 knots. + """ + if config and "analysis" in config: + light = config["analysis"].get("wind_light_threshold_knots", 8) + moderate = config["analysis"].get("wind_moderate_threshold_knots", 15) + else: + light = 8 + moderate = 15 + + if wind_speed_knots < light: + return "Light" + elif wind_speed_knots < moderate: + return "Moderate" + return "Heavy"