From 33dad88366a1a5424adeb931bd01e3e787ac565e Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 10 Mar 2026 03:20:53 +0200 Subject: [PATCH 1/5] Fix sailboat analysis job and remove hardcoded values Analysis notebooks saved intermediate results as Spark temp views, which don't persist across DAB job tasks running in separate Spark contexts. Replace all 18 temp views with persisted Delta tables using write.mode("overwrite").saveAsTable() with fully-qualified names. Also fixes: - Missing config_file_path variable in databricks.yml - NameError on df_with_weather in 04_race_summary - Hardcoded warehouse ID replaced with ${var.sql_warehouse_id} - deploy.sh --profile now uses $DATABRICKS_CONFIG_PROFILE env var - Reduce real_time_duration_seconds from 300 to 120 Co-Authored-By: Claude Opus 4.6 --- data_drifter/config.toml | 2 +- data_drifter/databricks.yml | 12 +++- data_drifter/deploy.sh | 32 +++++---- data_drifter/notebooks/01_boat_performance.py | 15 ++-- data_drifter/notebooks/02_wind_conditions.py | 59 ++++++++-------- data_drifter/notebooks/03_race_progress.py | 25 ++++--- data_drifter/notebooks/04_race_summary.py | 69 +++++++++---------- 7 files changed, 115 insertions(+), 99 deletions(-) diff --git a/data_drifter/config.toml b/data_drifter/config.toml index 2b31502..8c64c5a 100644 --- a/data_drifter/config.toml +++ b/data_drifter/config.toml @@ -114,7 +114,7 @@ race_duration_seconds = 172800 # 600 = watch 4-day race in 10 minutes (576x speed) # 1800 = watch 4-day race in 30 minutes (192x speed) # 3600 = watch 4-day race in 1 hour (96x speed) -real_time_duration_seconds = 300 +real_time_duration_seconds = 120 # How often to emit telemetry in RACE TIME (seconds) # This is the interval between telemetry records in race time diff --git a/data_drifter/databricks.yml b/data_drifter/databricks.yml index cbd17d6..c021554 100644 --- a/data_drifter/databricks.yml +++ b/data_drifter/databricks.yml @@ -30,6 +30,14 @@ variables: description: "Schema name within catalog" default: "" + config_file_path: + description: "Path to config.toml in workspace" + default: "${workspace.file_path}/app/config.toml" + + sql_warehouse_id: + description: "SQL Warehouse ID for app resource permissions" + default: "" + # Service Principal (populated after app creation for permissions) app_service_principal_id: description: "Service principal ID of the deployed app (for permissions)" @@ -71,7 +79,7 @@ resources: resources: - name: sql-warehouse sql_warehouse: - id: dd43ee29fedd958d # Hardcoded from config.toml + id: ${var.sql_warehouse_id} permission: CAN_USE # App permissions @@ -191,7 +199,7 @@ targets: mode: development default: true workspace: - host: https://your-dev-workspace.cloud.databricks.com + host: https://fe-vm-vdm-classic-cbnm51.cloud.databricks.com # Production target (if needed in future) # prod: diff --git a/data_drifter/deploy.sh b/data_drifter/deploy.sh index 8dbed56..1abbadc 100644 --- a/data_drifter/deploy.sh +++ b/data_drifter/deploy.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e -export DATABRICKS_CONFIG_PROFILE="${DATABRICKS_CONFIG_PROFILE:-default}" +export DATABRICKS_CONFIG_PROFILE="${DATABRICKS_CONFIG_PROFILE:-DEFAULT}" TARGET="${DATABRICKS_TARGET:-dev}" APP_NAME="data-drifter-regatta-v3" @@ -34,16 +34,18 @@ echo "πŸ“¦ Deploying bundle..." databricks bundle deploy \ --target "$TARGET" \ + --profile="$DATABRICKS_CONFIG_PROFILE" \ --var="telemetry_table=$TELEMETRY_TABLE" \ --var="weather_table=$WEATHER_TABLE" \ --var="catalog_name=$CATALOG_NAME" \ - --var="schema_name=$SCHEMA_NAME" + --var="schema_name=$SCHEMA_NAME" \ + --var="sql_warehouse_id=$WAREHOUSE_ID" echo "βœ… Bundle deployed" # Create tables echo "πŸ—οΈ Creating tables..." -if databricks bundle run create_tables --target "$TARGET"; then +if databricks bundle run create_tables --target "$TARGET" --profile="$DATABRICKS_CONFIG_PROFILE"; then echo "βœ… Tables created" else echo "❌ Table creation failed" @@ -53,15 +55,15 @@ fi # Deploy app echo "πŸ“± Deploying app..." -CURRENT_USER=$(databricks current-user me --output json 2>/dev/null | jq -r '.userName') +CURRENT_USER=$(databricks current-user me --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.userName') BUNDLE_NAME="data_drifter_regatta" APP_SOURCE_PATH="/Workspace/Users/$CURRENT_USER/.bundle/$BUNDLE_NAME/$TARGET/files/app" -if databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -e '.name' > /dev/null; then +if databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -e '.name' > /dev/null; then # Wait for any active deployment to complete WAIT_COUNT=0 while [ $WAIT_COUNT -lt 12 ]; do - DEPLOYMENT_STATE=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.active_deployment.deployment_state // "NONE"') + DEPLOYMENT_STATE=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.active_deployment.deployment_state // "NONE"') if [ "$DEPLOYMENT_STATE" = "NONE" ] || [ "$DEPLOYMENT_STATE" = "null" ]; then break fi @@ -70,20 +72,20 @@ if databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -e '.name' > / WAIT_COUNT=$((WAIT_COUNT + 1)) done - COMPUTE_STATUS=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.compute_status.state') + COMPUTE_STATUS=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.compute_status.state') if [ "$COMPUTE_STATUS" = "ACTIVE" ] || [ "$COMPUTE_STATUS" = "STARTING" ]; then echo "Updating running app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" else echo "Deploying and starting app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" - databricks apps start "$APP_NAME" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" + databricks apps start "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" fi else echo "Creating new app..." - databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" - databricks apps start "$APP_NAME" + databricks apps deploy "$APP_NAME" --source-code-path "$APP_SOURCE_PATH" --profile="$DATABRICKS_CONFIG_PROFILE" + databricks apps start "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" fi echo "βœ… App deployed" @@ -92,8 +94,8 @@ echo "βœ… App deployed" echo "πŸ” Granting permissions..." sleep 2 -if APP_SP_ID=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.service_principal_client_id'); then - if databricks bundle run grant_permissions --notebook-params="app_service_principal_id=$APP_SP_ID"; then +if APP_SP_ID=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.service_principal_client_id'); then + if databricks bundle run grant_permissions --notebook-params="app_service_principal_id=$APP_SP_ID" --profile="$DATABRICKS_CONFIG_PROFILE"; then echo "βœ… Permissions granted" else echo "⚠️ Permission grant failed" @@ -107,7 +109,7 @@ fi echo "" echo "βœ… Deployment complete!" echo "" -if APP_URL=$(databricks apps get "$APP_NAME" --output json 2>/dev/null | jq -r '.url'); then +if APP_URL=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.url'); then echo "App URL: $APP_URL" fi echo "" diff --git a/data_drifter/notebooks/01_boat_performance.py b/data_drifter/notebooks/01_boat_performance.py index ea1fc7a..458d2c7 100644 --- a/data_drifter/notebooks/01_boat_performance.py +++ b/data_drifter/notebooks/01_boat_performance.py @@ -151,13 +151,16 @@ # COMMAND ---------- -# Save boat performance summary -boat_performance.createOrReplaceTempView("boat_performance_summary") -finished_boats.createOrReplaceTempView("finished_boats_summary") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) -print("Boat performance results saved to temp views:") -print(" - boat_performance_summary") -print(" - finished_boats_summary") +# Save boat performance summary as tables +boat_performance.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_performance_summary") +finished_boats.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.finished_boats_summary") + +print("Boat performance results saved to tables:") +print(f" - {SCHEMA_PREFIX}.boat_performance_summary") +print(f" - {SCHEMA_PREFIX}.finished_boats_summary") # COMMAND ---------- diff --git a/data_drifter/notebooks/02_wind_conditions.py b/data_drifter/notebooks/02_wind_conditions.py index 4ccc197..61c48cf 100644 --- a/data_drifter/notebooks/02_wind_conditions.py +++ b/data_drifter/notebooks/02_wind_conditions.py @@ -146,13 +146,16 @@ # COMMAND ---------- -# Save wind condition analysis results -performance_by_wind.createOrReplaceTempView("performance_by_wind") -top_by_wind.createOrReplaceTempView("top_performers_by_wind") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) -print("Wind condition analysis results saved to temp views:") -print(" - performance_by_wind") -print(" - top_performers_by_wind") +# Save wind condition analysis results as tables +performance_by_wind.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_wind") +top_by_wind.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_wind") + +print("Wind condition analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.performance_by_wind") +print(f" - {SCHEMA_PREFIX}.top_performers_by_wind") # COMMAND ---------- @@ -365,18 +368,18 @@ # COMMAND ---------- -# Save point of sail analysis results -performance_by_sail.createOrReplaceTempView("performance_by_point_of_sail") -top_by_sail.createOrReplaceTempView("top_performers_by_sail") -boat_specialization.createOrReplaceTempView("boat_specializations") -combined_analysis.createOrReplaceTempView("performance_wind_and_sail") -distance_by_sail.createOrReplaceTempView("distance_by_point_of_sail") -distance_by_sail_boat.createOrReplaceTempView("distance_by_point_of_sail_boat") +# Save point of sail analysis results as tables +performance_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_point_of_sail") +top_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_sail") +boat_specialization.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_specializations") +combined_analysis.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_wind_and_sail") +distance_by_sail.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.distance_by_point_of_sail") +distance_by_sail_boat.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.distance_by_point_of_sail_boat") -print("Point of sail analysis results saved to temp views:") -print(" - performance_by_point_of_sail") -print(" - top_performers_by_sail") -print(" - boat_specializations") +print("Point of sail analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.performance_by_point_of_sail") +print(f" - {SCHEMA_PREFIX}.top_performers_by_sail") +print(f" - {SCHEMA_PREFIX}.boat_specializations") print(" - performance_wind_and_sail") print(" - distance_by_point_of_sail") print(" - distance_by_point_of_sail_boat") @@ -655,7 +658,7 @@ display(vmg_changes) # Save for use in other notebooks - vmg_changes.createOrReplaceTempView("performance_change_by_weather_event") + vmg_changes.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_change_by_weather_event") # COMMAND ---------- @@ -665,16 +668,16 @@ # COMMAND ---------- if weather_df is not None: - # Save analysis results - weather_df.createOrReplaceTempView("weather_events") - performance_by_weather_event.createOrReplaceTempView("performance_by_weather_event") - top_by_event.createOrReplaceTempView("top_performers_by_weather_event") - adaptability.createOrReplaceTempView("boat_adaptability_to_weather") - - print("Weather event analysis results saved to temp views:") - print(" - weather_events") - print(" - performance_by_weather_event") - print(" - top_performers_by_weather_event") + # Save analysis results as tables + weather_df.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.weather_events") + performance_by_weather_event.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_weather_event") + top_by_event.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.top_performers_by_weather_event") + adaptability.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_adaptability_to_weather") + + print("Weather event analysis results saved to tables:") + print(f" - {SCHEMA_PREFIX}.weather_events") + print(f" - {SCHEMA_PREFIX}.performance_by_weather_event") + print(f" - {SCHEMA_PREFIX}.top_performers_by_weather_event") print(" - boat_adaptability_to_weather") print(" - performance_change_by_weather_event") diff --git a/data_drifter/notebooks/03_race_progress.py b/data_drifter/notebooks/03_race_progress.py index a5798cf..5a5a1ae 100644 --- a/data_drifter/notebooks/03_race_progress.py +++ b/data_drifter/notebooks/03_race_progress.py @@ -215,17 +215,20 @@ # COMMAND ---------- -# Save race progress analysis results -positions.createOrReplaceTempView("race_positions_over_time") -vmg_by_leg.createOrReplaceTempView("race_leg_vmg") -leg_trend.createOrReplaceTempView("race_leg_trends") -consistency.createOrReplaceTempView("race_leg_consistency") - -print("Race progress analysis results saved to temp views:") -print(" - race_positions_over_time (includes race_position and remaining_distance_nm)") -print(" - race_leg_vmg") -print(" - race_leg_trends") -print(" - race_leg_consistency") +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) + +# Save race progress analysis results as tables +positions.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_positions_over_time") +vmg_by_leg.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_vmg") +leg_trend.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_trends") +consistency.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_leg_consistency") + +print("Race progress analysis results saved to tables:") +print(f" - {SCHEMA_PREFIX}.race_positions_over_time") +print(f" - {SCHEMA_PREFIX}.race_leg_vmg") +print(f" - {SCHEMA_PREFIX}.race_leg_trends") +print(f" - {SCHEMA_PREFIX}.race_leg_consistency") # COMMAND ---------- diff --git a/data_drifter/notebooks/04_race_summary.py b/data_drifter/notebooks/04_race_summary.py index e3debae..0ca1a27 100644 --- a/data_drifter/notebooks/04_race_summary.py +++ b/data_drifter/notebooks/04_race_summary.py @@ -27,6 +27,9 @@ # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() +# Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) +SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) + # Load telemetry data using utility function df = load_race_data(TABLE_NAME) @@ -69,16 +72,16 @@ # MAGIC %md # MAGIC ## 1. Overall Boat Rankings # MAGIC -# MAGIC Load results from boat_performance_summary temp view created by 01_boat_performance notebook +# MAGIC Load results from boat_performance_summary table created by 01_boat_performance notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - boat_performance = spark.table("finished_boats_summary") - print("βœ“ Loaded boat rankings from finished_boats_summary temp view") + boat_performance = spark.table(f"{SCHEMA_PREFIX}.finished_boats_summary") + print(f"βœ“ Loaded boat rankings from {SCHEMA_PREFIX}.finished_boats_summary table") except: - print("⚠️ Temp view not found, calculating boat rankings...") + print("⚠️ Table not found, calculating boat rankings...") boat_performance = get_finished_boats(df) print("=" * 80) @@ -96,21 +99,21 @@ # MAGIC %md # MAGIC ## 2. Performance by Wind Condition # MAGIC -# MAGIC Load results from performance_by_wind temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_wind table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - vmg_by_wind = spark.table("performance_by_wind") - print("βœ“ Loaded wind condition performance from performance_by_wind temp view") + vmg_by_wind = spark.table(f"{SCHEMA_PREFIX}.performance_by_wind") + print(f"βœ“ Loaded wind condition performance from {SCHEMA_PREFIX}.performance_by_wind table") wind_winners = vmg_by_wind.withColumn( "rank", F.row_number().over(Window.partitionBy("wind_category").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("⚠️ Temp view not found, calculating wind condition performance...") + print("⚠️ Table not found, calculating wind condition performance...") df_conditions = df.withColumn("wind_category", F.when(F.col("wind_speed_knots") < 8, "Light") .when(F.col("wind_speed_knots") < 15, "Moderate") @@ -138,21 +141,21 @@ # MAGIC %md # MAGIC ## 3. Performance by Point of Sail # MAGIC -# MAGIC Load results from performance_by_point_of_sail temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_point_of_sail table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view, fallback to calculation if not available +# Try to load from table, fallback to calculation if not available try: - vmg_by_sail = spark.table("performance_by_point_of_sail") - print("βœ“ Loaded point of sail performance from performance_by_point_of_sail temp view") + vmg_by_sail = spark.table(f"{SCHEMA_PREFIX}.performance_by_point_of_sail") + print(f"βœ“ Loaded point of sail performance from {SCHEMA_PREFIX}.performance_by_point_of_sail table") sail_winners = vmg_by_sail.withColumn( "rank", F.row_number().over(Window.partitionBy("point_of_sail").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("⚠️ Temp view not found, calculating point of sail performance...") + print("⚠️ Table not found, calculating point of sail performance...") from race_utils import add_point_of_sail_column df_with_sail = add_point_of_sail_column(df) @@ -178,11 +181,11 @@ # MAGIC %md # MAGIC ## 3.5. Weather Event Analysis # MAGIC -# MAGIC Load results from performance_by_weather_event temp view created by 02_wind_conditions notebook +# MAGIC Load results from performance_by_weather_event table created by 02_wind_conditions notebook # COMMAND ---------- -# Try to load from temp view first +# Try to load from table first from race_utils import load_weather_station_data, summarize_weather_events weather_df = load_weather_station_data(config) @@ -207,17 +210,17 @@ pct = (count / weather_summary['total_events']) * 100 print(f" {event_type.replace('_', ' ').title():20s}: {count:3d} events ({pct:5.1f}%)") - # Try to load from temp view, fallback to calculation if not available + # Try to load from table, fallback to calculation if not available try: - vmg_by_weather = spark.table("performance_by_weather_event") - print("\nβœ“ Loaded weather event performance from performance_by_weather_event temp view") + vmg_by_weather = spark.table(f"{SCHEMA_PREFIX}.performance_by_weather_event") + print(f"\nβœ“ Loaded weather event performance from {SCHEMA_PREFIX}.performance_by_weather_event") weather_winners = vmg_by_weather.withColumn( "rank", F.row_number().over(Window.partitionBy("weather_event_type").orderBy(F.desc("avg_vmg"))) ).filter(F.col("rank") == 1) except: - print("\n⚠️ Temp view not found, calculating weather event performance...") + print("\n⚠️ Table not found, calculating weather event performance...") from race_utils import join_telemetry_with_weather df_with_weather = join_telemetry_with_weather(df, weather_df) @@ -239,15 +242,9 @@ print(f"\n{event_name}: {row['boat_name']}") print(f" Average VMG: {row['avg_vmg']:.2f} knots") - # Calculate fleet average VMG by weather event - fleet_vmg_by_weather = df_with_weather.groupBy("weather_event_type").agg( - F.avg("vmg_knots").alias("fleet_avg_vmg"), - F.count("*").alias("observations") - ).filter(F.col("weather_event_type").isNotNull()).orderBy(F.desc("fleet_avg_vmg")) - - # Calculate fleet average VMG by weather event (only if not using temp view) + # Calculate fleet average VMG by weather event from the persisted table try: - fleet_vmg_by_weather = spark.table("performance_by_weather_event").groupBy("weather_event_type").agg( + fleet_vmg_by_weather = spark.table(f"{SCHEMA_PREFIX}.performance_by_weather_event").groupBy("weather_event_type").agg( F.avg("avg_vmg").alias("fleet_avg_vmg"), F.sum("observations").alias("observations") ).filter(F.col("weather_event_type").isNotNull()).orderBy(F.desc("fleet_avg_vmg")) @@ -307,18 +304,18 @@ # MAGIC %md # MAGIC ## 4. Position Changes Throughout Race # MAGIC -# MAGIC **Note**: Position change analysis requires full race data. Loading from race_positions_over_time temp view if available. +# MAGIC **Note**: Position change analysis requires full race data. Loading from race_positions_over_time table if available. # COMMAND ---------- -# Try to load from temp view created by 03_race_progress notebook +# Try to load from table created by 03_race_progress notebook try: print("βœ“ Position analysis available - see 03_race_progress notebook for detailed position tracking") - print(" Temp views available:") - print(" - race_positions_over_time") - print(" - race_leg_vmg") - print(" - race_leg_trends") - print(" - race_leg_consistency") + print(" Tables available:") + print(f" - {SCHEMA_PREFIX}.race_positions_over_time") + print(f" - {SCHEMA_PREFIX}.race_leg_vmg") + print(f" - {SCHEMA_PREFIX}.race_leg_trends") + print(f" - {SCHEMA_PREFIX}.race_leg_consistency") except: print("⚠️ Run 03_race_progress notebook first for detailed position analysis") From 36a127768e3ee8d069149221565b0c68b8bcc77b Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 10 Mar 2026 03:35:44 +0200 Subject: [PATCH 2/5] Remove hardcoded workspace host from dev target Let the --profile flag determine the workspace instead of hardcoding it in databricks.yml, so deploy.sh works across different environments. Co-Authored-By: Claude Opus 4.6 --- data_drifter/databricks.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/data_drifter/databricks.yml b/data_drifter/databricks.yml index c021554..248358f 100644 --- a/data_drifter/databricks.yml +++ b/data_drifter/databricks.yml @@ -198,8 +198,6 @@ targets: dev: mode: development default: true - workspace: - host: https://fe-vm-vdm-classic-cbnm51.cloud.databricks.com # Production target (if needed in future) # prod: From ffed36cc12dd6c0abb41dead971d80b9761bfee0 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 10 Mar 2026 03:50:10 +0200 Subject: [PATCH 3/5] Update README and fix typos in main.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Document service principal setup in Prerequisites - Add --profile flag to all CLI examples - Document DATABRICKS_CONFIG_PROFILE and DATABRICKS_TARGET env vars - Note that analysis results are persisted as Delta tables - Fix architecture diagram fleet size (configurable, default=6) - Add "no module named toml" troubleshooting entry - Fix app name in manual examples (data-drifter-regatta-v3) - Fix typos in main.py banner ("possbile" β†’ "possible", "Zeorbus" β†’ "Zerobus") Co-Authored-By: Claude Opus 4.6 --- data_drifter/README.md | 52 ++++++++++++++++++++++++++++++------------ data_drifter/main.py | 2 +- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/data_drifter/README.md b/data_drifter/README.md index b1c756d..cb8706e 100644 --- a/data_drifter/README.md +++ b/data_drifter/README.md @@ -16,6 +16,11 @@ Simulates a fleet of sailboats racing and visualizes their positions in real-tim - Databricks workspace with Apps enabled - [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/install.html) installed - Unity Catalog access with permission to create tables +- A Databricks **service principal** with OAuth credentials (client ID + secret) for Zerobus Ingest authentication. To create one: + 1. Go to **Workspace Settings β†’ Identity and access β†’ Service principals β†’ Add** + 2. Create a new service principal + 3. Under the service principal's settings, generate an **OAuth secret** + 4. Save the **client ID** and **client secret** β€” you'll need them in Step 5 ### Step 1: Clone and Install Dependencies @@ -34,8 +39,8 @@ pip install -r requirements.txt ### Step 2: Configure Databricks CLI ```bash -# Configure authentication (if not already done) -databricks configure --token +# Configure a named profile (recommended) +databricks configure --token --profile MY_PROFILE # You'll be prompted for: # - Databricks Host: https://your-workspace.cloud.databricks.com @@ -66,6 +71,10 @@ sql_warehouse_id = "your-warehouse-id" ### Step 4: Deploy Everything with One Command ```bash +# Using a named profile (recommended) +DATABRICKS_CONFIG_PROFILE=MY_PROFILE ./deploy.sh + +# Or with the DEFAULT profile ./deploy.sh ``` @@ -88,17 +97,18 @@ sql_warehouse_id = "your-warehouse-id" URL: https://your-app-url.cloud.databricks.com πŸ“Š Analysis Job: - Run with: databricks bundle run sailboat_analysis + Run with: databricks bundle run sailboat_analysis --profile MY_PROFILE πŸš€ Next Steps: 1. Start telemetry generator: python3 main.py --client-id --client-secret + (use the service principal credentials you created in the Prerequisites) 2. Open app in browser (URL above) ``` ### Step 5: Start the Race! ```bash -# Generate sailboat telemetry (pass OAuth credentials as CLI arguments) +# Start the telemetry generator using your service principal credentials python3 main.py \ --client-id your-service-principal-client-id \ --client-secret your-service-principal-secret @@ -129,7 +139,7 @@ Open the app URL from deployment output in your browser. You'll see: After the race, run the analysis notebooks: ```bash -databricks bundle run sailboat_analysis +databricks bundle run sailboat_analysis --profile MY_PROFILE ``` This runs 4 notebooks in sequence: @@ -138,6 +148,8 @@ This runs 4 notebooks in sequence: 3. **Race Progress** - Position changes throughout race, mark rounding analysis 4. **Race Summary** - Executive summary with key insights +Analysis results are persisted as Delta tables in your Unity Catalog schema (e.g. `your_catalog.your_schema.finished_boats_summary`), so they can be queried independently after the job completes. + --- ## πŸ—οΈ Architecture @@ -145,9 +157,9 @@ This runs 4 notebooks in sequence: ### Data Flow ``` -Sailboat Fleet (20 boats) Weather Station (IoT device) - ↓ (gRPC/SDK) ↓ (REST API) - └──────────→ Zerobus Ingest β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +Sailboat Fleet (configurable, default=6) Weather Station (IoT device) + ↓ (gRPC/SDK) ↓ (REST API) + └──────────→ Zerobus Ingest β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↓ Delta Lake Tables (Unity Catalog) @@ -174,6 +186,13 @@ Sailboat Fleet (20 boats) Weather Station (IoT device) ## πŸ”§ Advanced Usage +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `DATABRICKS_CONFIG_PROFILE` | `DEFAULT` | Databricks CLI profile to use | +| `DATABRICKS_TARGET` | `dev` | Deployment target (dev/prod) | + ### Customize Race Configuration Edit `config.toml` to change: @@ -189,7 +208,7 @@ See inline comments in `config.toml` for all options. Tables are created automatically by `deploy.sh`, but if needed: ```bash -databricks bundle run create_tables +databricks bundle run create_tables --profile MY_PROFILE ``` ### Manual Permissions Grant (Optional) @@ -198,10 +217,10 @@ Permissions are granted automatically, but if needed: ```bash # Get app service principal ID -APP_SP_ID=$(databricks apps get data-drifter-regatta --output json | jq -r '.service_principal_client_id') +APP_SP_ID=$(databricks apps get data-drifter-regatta-v3 --profile MY_PROFILE --output json | jq -r '.service_principal_client_id') # Grant permissions -databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_ID +databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_ID --profile MY_PROFILE ``` ### View Deployed Resources @@ -211,10 +230,10 @@ databricks bundle run grant_permissions --var app_service_principal_id=$APP_SP_I databricks bundle resources --target dev # View app details -databricks apps get data-drifter-regatta +databricks apps get data-drifter-regatta-v3 --profile MY_PROFILE # View jobs -databricks jobs list +databricks jobs list --profile MY_PROFILE ``` --- @@ -232,19 +251,22 @@ databricks jobs list **Problem:** `deploy.sh` fails with "table_name not found" - **Solution:** Check `[zerobus]` section in `config.toml` - ensure `table_name` and `weather_station_table_name` are set +**Problem:** `deploy.sh` fails with "no module named toml" +- **Solution:** Activate the virtual environment first: `source venv/bin/activate && ./deploy.sh` + **Problem:** Permission errors during deployment - **Solution:** Ensure you have CREATE TABLE permissions in Unity Catalog **Problem:** App service principal ID not found - **Solution:** Wait a few moments for app to initialize, then re-run: ```bash - databricks bundle run grant_permissions --var app_service_principal_id= + databricks bundle run grant_permissions --var app_service_principal_id= --profile MY_PROFILE ``` ### Telemetry Generator Issues **Problem:** Missing required argument: --client-id or --client-secret -- **Solution:** Pass OAuth credentials as command-line arguments: +- **Solution:** These are OAuth credentials for a Databricks service principal (see Prerequisites): ```bash python3 main.py --client-id --client-secret ``` diff --git a/data_drifter/main.py b/data_drifter/main.py index b068397..9a22578 100644 --- a/data_drifter/main.py +++ b/data_drifter/main.py @@ -281,7 +281,7 @@ async def main(): Real-Time Sailing Competition β›΅ Presented by Lakeflow Connect β›΅ - Made possbile with Zeorbus Ingest + Made possible with Zerobus Ingest ════════════════════════════════════════════════════════════ """) From fd926b37365dfbfe3e0beeba5686551373872db3 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 10 Mar 2026 04:50:55 +0200 Subject: [PATCH 4/5] Improve Data Drifter: app refactor, live controls, DRY notebooks, hardened deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major improvements to the Data Drifter Regatta demo: **App (Streamlit)** - Split monolithic app.py (1384 lines) into 3 modules: app.py, db_connection.py, components.py - Add partial refresh via @st.fragment β€” only dashboard re-renders, not the whole page - Center map on fleet centroid with auto-zoom based on boat spread - Add configurable refresh interval slider (5-60s, default 30s) - Add "Start New Race" button β€” dynamically discovers and truncates all schema tables - Add race speed controls (0.5x, 1x, 2x, 4x) via shared control table - Show effective speed under buttons with base Γ— multiplier breakdown - Display both telemetry and weather table names in Data Source sidebar - Grant schema-level SELECT/MODIFY for app service principal **Telemetry Generator (main.py)** - Add speed control: polls race_control table every 5s, adjusts emission interval - Replace exit(1) with retry logic (3 retries, exponential backoff) - Fix typos in ASCII banner **Notebooks (Analysis)** - Extract get_schema_prefix() and categorize_wind() to race_utils.py (DRY) - Replace hardcoded wind thresholds with configurable values from config.toml [analysis] - Replace hardcoded consistency thresholds with config-driven values - All 4 notebooks updated to use shared utilities **Infrastructure** - Add race_control table to databricks.yml create_tables job - Add [analysis] section to config.toml for configurable thresholds - Harden deploy.sh: prerequisite validation, increased polling timeout - Grant schema-level permissions instead of per-table Co-Authored-By: Claude Opus 4.6 --- data_drifter/app/app.py | 1399 ++++------------- data_drifter/app/components.py | 433 +++++ data_drifter/app/db_connection.py | 522 ++++++ data_drifter/config.toml | 12 +- data_drifter/databricks.yml | 9 + data_drifter/deploy.sh | 8 +- data_drifter/main.py | 112 +- data_drifter/notebooks/01_boat_performance.py | 4 +- data_drifter/notebooks/02_wind_conditions.py | 16 +- data_drifter/notebooks/03_race_progress.py | 15 +- data_drifter/notebooks/04_race_summary.py | 12 +- .../notebooks/create_control_table.py | 39 + data_drifter/notebooks/grant_permissions.py | 25 +- data_drifter/notebooks/race_utils.py | 25 + 14 files changed, 1498 insertions(+), 1133 deletions(-) create mode 100644 data_drifter/app/components.py create mode 100644 data_drifter/app/db_connection.py create mode 100644 data_drifter/notebooks/create_control_table.py diff --git a/data_drifter/app/app.py b/data_drifter/app/app.py index 1dd022a..ce21167 100644 --- a/data_drifter/app/app.py +++ b/data_drifter/app/app.py @@ -1,5 +1,5 @@ """ -🌊 DATA DRIFTER REGATTA 🌊 +DATA DRIFTER REGATTA Real-Time Sailboat Race Visualization Powered by Databricks Zerobus Ingest & Lakeflow Connect @@ -11,14 +11,7 @@ import time import sys from datetime import datetime -import json import logging -import threading -from databricks.sdk.core import Config, oauth_service_principal -import folium -from folium import plugins -from streamlit_folium import st_folium -import uuid # Handle TOML for different Python versions if sys.version_info >= (3, 11): @@ -33,17 +26,10 @@ ) logger = logging.getLogger(__name__) -# Import databricks-sql-connector with error handling -try: - from databricks import sql - from databricks.sdk import WorkspaceClient -except ImportError as e: - st.error(f"❌ Failed to import databricks modules: {str(e)}") - st.info("Please ensure databricks-sql-connector and databricks-sdk are installed in requirements.txt") - st.stop() +from streamlit_folium import st_folium -# Import navigation utilities -from navigation_utils import calculate_distance +import db_connection +import components # Load configuration from config.toml file @st.cache_resource @@ -60,27 +46,27 @@ def load_config(): try: with open(config_path, "rb") as f: config_data = tomllib.load(f) - logger.info(f"βœ“ Configuration loaded successfully from: {config_path}") + logger.info(f"Configuration loaded successfully from: {config_path}") except FileNotFoundError: error_msg = f"Config file not found: {config_path}" - st.error(f"❌ {error_msg}") + st.error(f"{error_msg}") logger.error(error_msg) # Show current working directory for debugging import os cwd = os.getcwd() - st.info(f"ℹ️ Current working directory: {cwd}") + st.info(f"Current working directory: {cwd}") logger.error(f"Current working directory: {cwd}") # List files in current directory try: files = os.listdir(".") - st.info(f"ℹ️ Files in current directory: {', '.join(files[:10])}") + st.info(f"Files in current directory: {', '.join(files[:10])}") logger.error(f"Files in current directory: {files}") except: pass - st.warning("πŸ’‘ Ensure deploy.sh successfully copied config.toml to app/ directory") + st.warning("Ensure deploy.sh successfully copied config.toml to app/ directory") return None # Extract and structure configuration @@ -102,11 +88,11 @@ def load_config(): return config except KeyError as e: - st.error(f"❌ Missing required configuration key: {e}") + st.error(f"Missing required configuration key: {e}") logger.error(f"Config key error: {e}", exc_info=True) return None except Exception as e: - st.error(f"❌ Failed to load configuration: {str(e)}") + st.error(f"Failed to load configuration: {str(e)}") logger.error(f"Config load error: {str(e)}", exc_info=True) return None @@ -180,885 +166,9 @@ def load_config(): '#FD79A8', '#FDCB6E', '#E17055', '#00B894', '#00CEC9' ] -def get_connection(): - """Create Databricks SQL connection using workspace authentication""" - logger.info("Starting SQL warehouse connection attempt") - debug_mode = os.getenv("DEBUG", "false").lower() == "true" - start_time = time.time() - - try: - if debug_mode: - st.write("πŸ” Debug - Step 1: Checking environment variables") - st.write("Available DATABRICKS/DEFAULT env vars:") - for key in sorted(os.environ.keys()): - if 'DATABRICKS' in key or 'DEFAULT' in key: - value = os.environ[key] - if 'TOKEN' in key or 'SECRET' in key or 'PASSWORD' in key: - value = '***' if value else 'None' - else: - value = value[:80] if value and len(value) > 80 else value - st.write(f" {key} = {value}") - - # Get warehouse ID from config - warehouse_id = config["warehouse_id"] - - # Extract server hostname from workspace URL (remove https:// prefix) - workspace_url = config.get("workspace_url", os.getenv("DATABRICKS_HOST", "")) - if workspace_url.startswith("https://"): - server_hostname = workspace_url.replace("https://", "") - elif workspace_url.startswith("http://"): - server_hostname = workspace_url.replace("http://", "") - else: - server_hostname = workspace_url - - if debug_mode: - st.write(f"πŸ” Debug - Step 2: Connection parameters") - st.write(f" Server hostname: {server_hostname}") - st.write(f" Warehouse ID: {warehouse_id}") - st.write(f" HTTP Path: /sql/1.0/warehouses/{warehouse_id}") - - # Method 1: Try using OAuth M2M with client credentials (recommended for Databricks Apps) - client_id = os.getenv("DATABRICKS_CLIENT_ID") - client_secret = os.getenv("DATABRICKS_CLIENT_SECRET") - - if client_id and client_secret: - logger.info("Attempting OAuth M2M authentication (Method 1)") - if debug_mode: - st.write("πŸ” Debug - Step 3: Attempting connection with OAuth M2M (client credentials)...") - st.write(f" βœ… DATABRICKS_CLIENT_ID found: {client_id[:8]}...") - st.write(" βœ… DATABRICKS_CLIENT_SECRET found") - - try: - credential_provider = lambda: oauth_service_principal(Config( - host = f"https://{server_hostname}", - client_id = client_id, - client_secret = client_secret)) - - connection =sql.connect( - server_hostname = server_hostname, - http_path = f"/sql/1.0/warehouses/{warehouse_id}", - credentials_provider = credential_provider) - - if debug_mode: - st.write(" Connection object created, testing query...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 'test' as status, current_database() as db") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… OAuth M2M authentication successful! Result: {result}") - - logger.info(f"OAuth M2M authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as oauth_error: - logger.warning(f"OAuth M2M auth failed: {str(oauth_error)}") - if debug_mode: - st.error(f"❌ OAuth M2M auth failed: {str(oauth_error)}") - st.write(" Falling back to alternative auth methods...") - else: - if debug_mode: - st.write("πŸ” Debug - Step 3: Client credentials not found in environment") - if not client_id: - st.write(" ❌ DATABRICKS_CLIENT_ID not set") - if not client_secret: - st.write(" ❌ DATABRICKS_CLIENT_SECRET not set") - st.write(" Attempting connection with SDK default auth...") - - # Method 2: Try SDK automatic authentication - # In Databricks Apps, use the SDK's automatic authentication - # The app runs as a service principal and credentials are handled automatically - logger.info("Attempting SDK OAuth authentication (Method 2)") - try: - if debug_mode: - st.write(" Initializing WorkspaceClient for auth...") - - # Initialize WorkspaceClient - it will automatically use the service principal - from databricks.sdk import WorkspaceClient - from databricks.sdk.core import ApiClient - - # Create config that will auto-detect credentials - cfg = Config() - - if debug_mode: - st.write(f" Config host: {cfg.host if cfg.host else 'auto-detect'}") - st.write(" Creating API client...") - - # Create API client for making authenticated requests - api_client = ApiClient(cfg) - - if debug_mode: - st.write(" Getting auth headers...") - - # Get authentication headers - def get_token(): - headers = api_client.do("GET", "/api/2.0/preview/scim/v2/Me", - headers={}, data={}, raw=True).headers - # Extract token from Authorization header - auth_header = headers.get('Authorization', '') - if auth_header.startswith('Bearer '): - return auth_header[7:] - return None - - if debug_mode: - st.write(" Connecting to SQL Warehouse...") - - # Connect using the SDK's auth mechanism - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - auth_type="databricks-oauth", # Use OAuth authentication - _socket_timeout=30 - ) - - if debug_mode: - st.write(" Connection object created, testing query...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 'test' as status, current_database() as db") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Connection successful! Result: {result}") - - logger.info(f"SDK OAuth authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as sdk_error: - logger.warning(f"SDK OAuth auth failed: {str(sdk_error)}") - if debug_mode: - st.error(f"❌ SDK OAuth auth method failed: {str(sdk_error)}") - st.write(f" Error type: {type(sdk_error).__name__}") - st.exception(sdk_error) - - # Method 3: Try WorkspaceClient with better error handling - logger.info("Attempting WorkspaceClient authentication (Method 3)") - if debug_mode: - st.write("πŸ” Method 3: Trying WorkspaceClient authentication...") - - try: - # Initialize WorkspaceClient with explicit host - w = WorkspaceClient(host=f"https://{server_hostname}") - - if debug_mode: - st.write(f" WorkspaceClient created") - st.write(f" Host: {w.config.host}") - - # Try to get auth method info - try: - auth_details = str(w.config) - st.write(f" Config: {auth_details[:200]}") - except: - pass - - # Try to authenticate and get token - if debug_mode: - st.write(" Attempting authentication...") - - try: - # Call authenticate to get credentials - credentials = w.config.authenticate() - if debug_mode: - st.write(f" Authentication successful, got credentials") - - # Use the credentials with SQL connector - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - credentials_provider=lambda: credentials, - _socket_timeout=30 - ) - - if debug_mode: - st.write(" SQL Connection created, testing...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 1 as test") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Method 3 successful! Test query returned: {result}") - - logger.info(f"WorkspaceClient authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as auth_error: - if debug_mode: - st.error(f" Authentication failed: {str(auth_error)}") - st.write(f" Error type: {type(auth_error).__name__}") - raise - - except Exception as wc_error: - logger.warning(f"WorkspaceClient auth failed: {str(wc_error)}") - if debug_mode: - st.error(f"❌ Method 3 failed: {str(wc_error)}") - st.write(f" Error type: {type(wc_error).__name__}") - st.exception(wc_error) - - # Method 4: Try OAuth U2M flow (for apps with attached resources) - logger.info("Attempting OAuth U2M flow (Method 4)") - if debug_mode: - st.write("πŸ” Method 4: Trying OAuth U2M flow...") - - try: - # For Databricks Apps with SQL Warehouse resources, use OAuth U2M - connection = sql.connect( - server_hostname=server_hostname, - http_path=f"/sql/1.0/warehouses/{warehouse_id}", - auth_type="databricks-oauth", - _socket_timeout=30 - ) - - if debug_mode: - st.write(" OAuth connection created, testing...") - - # Test the connection - cursor = connection.cursor() - cursor.execute("SELECT 1 as test") - result = cursor.fetchone() - cursor.close() - - if debug_mode: - st.success(f"βœ… Method 4 successful! Test query returned: {result}") - - logger.info(f"OAuth U2M authentication successful in {time.time() - start_time:.2f}s") - return connection - - except Exception as oauth_error: - logger.warning(f"OAuth U2M auth failed: {str(oauth_error)}") - if debug_mode: - st.error(f"❌ Method 4 failed: {str(oauth_error)}") - st.write(f" Error type: {type(oauth_error).__name__}") - st.exception(oauth_error) - - # All methods failed - logger.error(f"All connection methods failed after {time.time() - start_time:.2f}s") - if debug_mode: - st.error("❌ All connection methods failed!") - st.write("Troubleshooting suggestions:") - st.write("1. Check that the SQL Warehouse resource is properly attached") - st.write("2. Verify the warehouse is running and accessible") - st.write("3. Ensure the app has CAN_USE permission on the warehouse") - st.write("4. Check that environment variables are being set correctly") - - raise Exception("Unable to connect to Databricks SQL Warehouse. All authentication methods failed.") - - except Exception as e: - st.error(f"❌ Failed to connect to Databricks SQL: {str(e)}") - st.info("πŸ’‘ Troubleshooting tips:") - st.info("- Ensure the app has a SQL Warehouse resource with CAN_USE permission") - st.info("- Check that the warehouse is running and accessible") - st.info(f"- Warehouse ID: {warehouse_id}") - if debug_mode: - st.write("πŸ” Full error details:") - st.exception(e) - return None - -def execute_query_with_timeout(cursor, query, timeout_seconds=60): - """Execute query with a timeout using threading""" - result = [None] - error = [None] - - def run_query(): - try: - cursor.execute(query) - result[0] = True - except Exception as e: - error[0] = e - - thread = threading.Thread(target=run_query) - thread.daemon = True - thread.start() - thread.join(timeout=timeout_seconds) - - if thread.is_alive(): - raise TimeoutError(f"Query execution exceeded {timeout_seconds} seconds timeout") - if error[0]: - raise error[0] - - return result[0] - -def query_telemetry(limit=10000): - """Query latest telemetry data from table""" - logger.info(f"Starting telemetry query with limit={limit}") - debug_mode = os.getenv("DEBUG", "false").lower() == "true" - query_start = time.time() - - try: - # Step 1: Get connection - logger.info("Step 1: Establishing connection") - if debug_mode: - st.write("πŸ” Step 1: Getting connection...") - - conn = get_connection() - if not conn: - logger.error("Failed to establish connection") - st.warning("⚠️ Could not establish database connection") - return None - - logger.info(f"Step 1 complete in {time.time() - query_start:.2f}s") - - # Step 2: Count rows in table to verify we can query it - logger.info(f"Step 2: Counting rows in {TABLE_NAME}") - step2_start = time.time() - - try: - count_query = f"SELECT COUNT(*) as row_count FROM {TABLE_NAME}" - cursor = conn.cursor() - cursor.execute(count_query) - count_result = cursor.fetchone() - row_count = count_result[0] if count_result else 0 - cursor.close() - - logger.info(f"Step 2 complete: {row_count:,} rows found in {time.time() - step2_start:.2f}s") - - if row_count == 0: - logger.warning("Table is empty, no data available") - st.warning("⚠️ Table is empty. No telemetry data available yet.") - st.info("πŸ’‘ Run `python main.py` to start generating telemetry data.") - return None - - except Exception as count_error: - logger.error(f"Failed to count rows: {str(count_error)}") - st.error(f"❌ Failed to count rows in table: {str(count_error)}") - st.info("πŸ’‘ Check that:") - st.info(" - The table exists") - st.info(" - The service principal has SELECT permission on the table") - st.info(f" - Table name is correct: {TABLE_NAME}") - if debug_mode: - st.exception(count_error) - return None - - if debug_mode: - st.write(f"πŸ” Step 3: Querying table: {TABLE_NAME}") - - query = f""" - SELECT - boat_id, - boat_name, - boat_type, - timestamp, - latitude, - longitude, - speed_over_ground_knots, - heading_degrees, - wind_speed_knots, - wind_direction_degrees, - distance_traveled_nm, - distance_to_destination_nm, - vmg_knots, - current_mark_index, - marks_rounded, - total_marks, - has_started, - has_finished, - race_status - FROM {TABLE_NAME} - ORDER BY timestamp DESC - LIMIT {limit} - """ - - # Step 3: Execute query with timeout - logger.info(f"Step 3: Querying {TABLE_NAME} for {limit} rows") - step3_start = time.time() - - if debug_mode: - st.write("πŸ” Step 4: Executing query with 60 second timeout...") - - cursor = conn.cursor() - - try: - # Execute query with timeout - execute_query_with_timeout(cursor, query, timeout_seconds=60) - execution_time = time.time() - step3_start - - logger.info(f"Step 3 query execution complete in {execution_time:.2f}s") - - if debug_mode: - st.write(f" βœ… Query executed in {execution_time:.2f} seconds") - - except TimeoutError as te: - logger.error(f"Query timeout: {str(te)}") - st.error(f"❌ Query execution timeout: {str(te)}") - st.warning("πŸ’‘ Troubleshooting suggestions:") - st.info("β€’ The SQL warehouse may be slow or overloaded") - st.info("β€’ Try reducing the data limit or adding filters") - st.info("β€’ Check SQL warehouse status in Databricks UI") - st.info(f"β€’ Table: {TABLE_NAME}") - cursor.close() - conn.close() - return None - - # Step 4-5: Fetch results - logger.info("Step 4-5: Fetching results as DataFrame") - fetch_start = time.time() - - if debug_mode: - st.write("πŸ” Step 5: Fetching results...") - - df = cursor.fetchall_arrow().to_pandas() - cursor.close() - - fetch_time = time.time() - fetch_start - logger.info(f"Fetch complete: {len(df)} rows in {fetch_time:.2f}s") - - total_time = time.time() - query_start - logger.info(f"Query completed successfully in {total_time:.2f}s total") - - if debug_mode: - st.write(f"πŸ” Step 6: Retrieved {len(df)} rows") - - return df - - except TimeoutError as te: - logger.error(f"Query timeout after {time.time() - query_start:.2f}s: {str(te)}") - # Error already displayed above - return None - except Exception as e: - logger.error(f"Query failed after {time.time() - query_start:.2f}s: {str(e)}", exc_info=True) - st.error(f"❌ Failed to query data: {str(e)}") - st.info("πŸ’‘ Check:") - st.info("β€’ Table exists and has data") - st.info("β€’ Service principal has SELECT permission") - st.info(f"β€’ Table name: {TABLE_NAME}") - if debug_mode: - st.exception(e) - return None - -def query_weather_station(): - """Query latest weather station data""" - logger.info("Querying weather station data") - - # Check if weather station table is configured - weather_table = config.get("weather_table_name") - if not weather_table: - logger.warning("Weather station table not configured") - return None - - try: - conn = get_connection() - if not conn: - logger.error("Failed to establish connection for weather station query") - return None - - # Query latest weather station reading - query = f""" - SELECT - station_id, - station_name, - station_location, - timestamp, - wind_speed_knots, - wind_direction_degrees, - event_type, - in_transition, - time_in_state_seconds, - next_change_in_seconds - FROM {weather_table} - ORDER BY timestamp DESC - LIMIT 1 - """ - - cursor = conn.cursor() - cursor.execute(query) - result = cursor.fetchall_arrow().to_pandas() - cursor.close() - - if result.empty: - logger.warning("No weather station data found") - return None - - logger.info(f"Weather station data retrieved: {len(result)} record") - return result.iloc[0] # Return the latest record as a Series - - except Exception as e: - logger.error(f"Failed to query weather station: {str(e)}", exc_info=True) - return None - -def get_race_course_config(): - """Get race course configuration (marks, start/finish) from config""" - return { - "start_lat": config["race_course_start_lat"], - "start_lon": config["race_course_start_lon"], - "marks": config["race_course_marks"] - } - -def calculate_total_remaining_distance(lat, lon, current_mark_index, marks): - """ - Calculate total remaining distance along the race course in nautical miles. - - Args: - lat: Current latitude - lon: Current longitude - current_mark_index: Index of the next mark to round (0-based) - marks: List of all marks [[lat1, lon1], [lat2, lon2], ...] where last mark is finish - - Returns: - Total distance remaining along the course in nautical miles - """ - if not marks or len(marks) == 0: - return 0.0 - - total_distance = 0.0 - - # Marks to round are all except the last one (which is the finish line) - marks_to_round = marks[:-1] if len(marks) > 1 else [] - finish_lat, finish_lon = marks[-1][0], marks[-1][1] - - # If all marks are rounded, just return distance to finish - if current_mark_index >= len(marks_to_round): - return calculate_distance(lat, lon, finish_lat, finish_lon) - - # Add distance to next mark - next_mark = marks_to_round[current_mark_index] - total_distance += calculate_distance(lat, lon, next_mark[0], next_mark[1]) - - # Add distances between subsequent marks - for i in range(current_mark_index, len(marks_to_round) - 1): - mark1 = marks_to_round[i] - mark2 = marks_to_round[i + 1] - total_distance += calculate_distance(mark1[0], mark1[1], mark2[0], mark2[1]) - - # Add distance from last mark to finish - if len(marks_to_round) > 0: - last_mark = marks_to_round[-1] - total_distance += calculate_distance(last_mark[0], last_mark[1], finish_lat, finish_lon) - - return total_distance - -def create_race_map(df): - """Create interactive race map with boat tracks using Folium""" - if df is None or len(df) == 0: - st.warning("No telemetry data available") - return None - - # Convert timestamp from epoch microseconds to datetime - df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') - - # Get race course configuration - course_config = get_race_course_config() - - # Get race course marks - mark_lats = [m[0] for m in course_config['marks']] - mark_lons = [m[1] for m in course_config['marks']] - - # Calculate center of the map - center_lat = sum(mark_lats) / len(mark_lats) - center_lon = sum(mark_lons) / len(mark_lons) - - # Create Folium map - m = folium.Map( - location=[center_lat, center_lon], - zoom_start=9, - tiles='OpenStreetMap', - prefer_canvas=True - ) - - # Add race course line connecting marks - course_coords = [[lat, lon] for lat, lon in zip(mark_lats, mark_lons)] - folium.PolyLine( - course_coords, - color='gray', - weight=2, - opacity=0.7, - dash_array='10, 5', - popup='Race Course' - ).add_to(m) - - # Add markers for each race course mark - for idx, (lat, lon) in enumerate(zip(mark_lats, mark_lons)): - folium.Marker( - location=[lat, lon], - popup=f'Mark {idx + 1}', - tooltip=f'Mark {idx + 1}', - icon=folium.Icon(color='orange', icon='flag', prefix='fa') - ).add_to(m) - - # Add start line marker - folium.Marker( - location=[course_config['start_lat'], course_config['start_lon']], - popup='START', - tooltip='Start Line', - icon=folium.Icon(color='green', icon='play', prefix='fa') - ).add_to(m) - - # Add finish line marker (last mark) - finish_lat, finish_lon = course_config['marks'][-1] - folium.Marker( - location=[finish_lat, finish_lon], - popup='FINISH', - tooltip='Finish Line', - icon=folium.Icon(color='red', icon='stop', prefix='fa') - ).add_to(m) - - # Group by boat and plot tracks - boats = df.groupby('boat_id') - - for idx, (boat_id, boat_df) in enumerate(boats): - boat_df = boat_df.sort_values('timestamp') - - boat_name = boat_df['boat_name'].iloc[0] - boat_type = boat_df['boat_type'].iloc[0] - race_status = boat_df['race_status'].iloc[0] - - color = BOAT_COLORS[idx % len(BOAT_COLORS)] - - # Create boat track coordinates - track_coords = [[row['latitude'], row['longitude']] for _, row in boat_df.iterrows()] - - # Add boat track as polyline - folium.PolyLine( - track_coords, - color=color, - weight=3, - opacity=0.8, - popup=boat_name, - tooltip=boat_name - ).add_to(m) - - # Add current position marker (most recent point - first in DESC order) - latest = boat_df.iloc[-1] # Last point after sorting by timestamp ASC - - # Create popup content with boat information - popup_html = f""" -
- {boat_name}
- Status: {race_status}
- Speed: {latest['speed_over_ground_knots']:.1f} knots
- Heading: {latest['heading_degrees']:.0f}Β°
- Distance: {latest['distance_traveled_nm']:.1f} nm
- VMG: {latest['vmg_knots']:.1f} knots
- Marks: {latest['marks_rounded']}/{latest['total_marks']}
- Time: {pd.to_datetime(latest['timestamp'], unit='us').strftime('%Y-%m-%d %H:%M:%S')} -
- """ - - # Get heading for boat marker - heading = latest['heading_degrees'] - - # Use BoatMarker for racing boats, regular markers for finished/DNF - if race_status == 'finished': - folium.Marker( - location=[latest['latitude'], latest['longitude']], - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0], - icon=folium.Icon(color='lightgreen', icon='star', prefix='fa') - ).add_to(m) - elif race_status == 'dnf': - folium.Marker( - location=[latest['latitude'], latest['longitude']], - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0], - icon=folium.Icon(color='red', icon='times', prefix='fa') - ).add_to(m) - else: - # Use BoatMarker for racing boats with rounded SVG shape - plugins.BoatMarker( - location=[latest['latitude'], latest['longitude']], - heading=heading, - wind_heading=latest.get('wind_direction_degrees', heading + 45), - wind_speed=latest.get('wind_speed_knots', 10), - color=color, - popup=folium.Popup(popup_html, max_width=250), - tooltip=boat_name.split("'s")[0] - ).add_to(m) - - # Add wind indicator in top left corner - # Get average wind conditions from latest positions - latest_positions_for_wind = df.sort_values('timestamp').groupby('boat_id').last() - avg_wind_speed = latest_positions_for_wind['wind_speed_knots'].mean() - avg_wind_direction = latest_positions_for_wind['wind_direction_degrees'].mean() - - # Create wind direction name - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - index = int((degrees + 11.25) / 22.5) % 16 - return directions[index] - - wind_dir_name = get_wind_direction_name(avg_wind_direction) - - # Create custom HTML for wind indicator - wind_html = f""" -
-
-
- 🌬️ WIND -
-
- ↓ -
-
- {avg_wind_speed:.1f} kts -
-
- {wind_dir_name} ({avg_wind_direction:.0f}Β°) -
-
-
- """ - - # Add the wind indicator to the map - m.get_root().html.add_child(folium.Element(wind_html)) - - return m - -def display_leaderboard(df): - """Display leaderboard with all boats""" - if df is None or len(df) == 0: - st.info("No race data available") - return - - # Get latest position for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - - # Get race course marks - course_config = get_race_course_config() - marks = course_config['marks'] - - # Calculate total remaining distance along the race course for each boat - latest_positions['total_remaining_distance'] = latest_positions.apply( - lambda row: calculate_total_remaining_distance( - row['latitude'], row['longitude'], row['current_mark_index'], marks - ), - axis=1 - ) - - # Sort by total remaining distance (less distance = better rank) - # DNF boats go to the end - latest_positions['sort_key'] = latest_positions.apply( - lambda row: ( - float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] - ), - axis=1 - ) - latest_positions = latest_positions.sort_values('sort_key') - - # Display leaderboard header - st.markdown("##### Race Leaderboard") - - # Create container for scrollable leaderboard - leaderboard_container = st.container(height=600) - - with leaderboard_container: - # Display each boat with rank, name, and distance - for rank, (idx, row) in enumerate(latest_positions.iterrows(), start=1): - boat_id = row['boat_id'] - boat_name = row['boat_name'] - distance = row['distance_traveled_nm'] - - # Create columns for rank and boat info - col1, col2 = st.columns([0.5, 4.5]) - - with col1: - # Rank with medal emoji for top 3 - if rank == 1: - st.markdown(f"### πŸ₯‡") - elif rank == 2: - st.markdown(f"### πŸ₯ˆ") - elif rank == 3: - st.markdown(f"### πŸ₯‰") - else: - st.markdown(f"### **{rank}**") - - with col2: - # Boat name and distance - st.markdown(f"**{boat_name}**") - st.caption(f"{distance:.1f} nm") - - # Add divider between boats - if rank < len(latest_positions): - st.divider() - -def display_boat_stats(df): - """Display boat statistics table""" - if df is None or len(df) == 0: - return - - # Get latest position for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - - # Get race course marks - course_config = get_race_course_config() - marks = course_config['marks'] - - # Calculate total remaining distance along the race course for each boat - latest_positions['total_remaining_distance'] = latest_positions.apply( - lambda row: calculate_total_remaining_distance( - row['latitude'], row['longitude'], row['current_mark_index'], marks - ), - axis=1 - ) - - # Sort by total remaining distance (ascending) - boat with least distance is in 1st place - # DNF boats go to the end - latest_positions['sort_key'] = latest_positions.apply( - lambda row: ( - float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] - ), - axis=1 - ) - latest_positions = latest_positions.sort_values('sort_key') - - # Create display dataframe with the new total_remaining_distance column - display_df = latest_positions[[ - 'boat_name', 'boat_type', 'race_status', 'speed_over_ground_knots', - 'distance_traveled_nm', 'total_remaining_distance', 'vmg_knots', - 'marks_rounded', 'total_marks' - ]].copy() - - display_df.columns = [ - 'Boat', 'Type', 'Status', 'Speed (kt)', 'Distance (nm)', - 'To Dest (nm)', 'VMG (kt)', 'Marks', 'Total Marks' - ] - - # Add position column - display_df.insert(0, 'Pos', range(1, len(display_df) + 1)) - - # Format marks column - display_df['Marks'] = display_df.apply(lambda row: f"{row['Marks']}/{row['Total Marks']}", axis=1) - display_df = display_df.drop('Total Marks', axis=1) - - # Round numeric columns - display_df['Speed (kt)'] = display_df['Speed (kt)'].round(1) - display_df['Distance (nm)'] = display_df['Distance (nm)'].round(1) - display_df['To Dest (nm)'] = display_df['To Dest (nm)'].round(1) - display_df['VMG (kt)'] = display_df['VMG (kt)'].round(1) - - st.dataframe( - display_df, - use_container_width=True, - hide_index=True, - column_config={ - "Status": st.column_config.TextColumn( - "Status", - help="Current race status", - ) - } - ) +# Initialize modules +db_connection.init(config, TABLE_NAME) +components.init(config, BOAT_COLORS) # Main app def main(): @@ -1068,15 +178,141 @@ def main(): # Sidebar with st.sidebar: - st.header("βš™οΈ Settings") + st.header("βš™οΈ Race Controls") + + # Auto-refresh toggle + auto_refresh = st.toggle("Auto-refresh", value=True) - # Manual refresh only - auto_refresh = False - st.info("πŸ”„ Manual refresh mode - click 'Refresh Now' to update data") + # Refresh interval slider (only shown when auto-refresh is on) + if auto_refresh: + refresh_interval = st.slider("Refresh interval (seconds)", min_value=5, max_value=60, value=REFRESH_INTERVAL, step=5) + else: + refresh_interval = REFRESH_INTERVAL # Manual refresh button - if st.button("πŸ”„ Refresh Now"): - st.rerun() + if not auto_refresh: + if st.button("πŸ”„ Refresh Now"): + st.rerun() + + # Start new race + if st.button("πŸš€ Start New Race", type="primary"): + with st.spinner("Clearing all tables..."): + try: + conn = db_connection.get_connection() + if conn: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + truncated = 0 + errors = [] + # Get all tables in the schema and truncate them (except race_control) + cursor = conn.cursor() + cursor.execute(f"SHOW TABLES IN {schema_prefix}") + tables = cursor.fetchall() + cursor.close() + for row in tables: + tbl = row[1] if len(row) > 1 else row[0] + if tbl == "race_control": + continue + full_name = f"{schema_prefix}.{tbl}" + try: + c = conn.cursor() + c.execute(f"TRUNCATE TABLE {full_name}") + c.close() + truncated += 1 + except Exception as e: + errors.append(f"{tbl}: {e}") + # Reset speed control to 1.0x + try: + c = conn.cursor() + c.execute(f""" + MERGE INTO {schema_prefix}.race_control AS target + USING (SELECT 1.0 AS speed_multiplier) AS source + ON 1=1 + WHEN MATCHED THEN UPDATE SET + speed_multiplier = source.speed_multiplier, + updated_at = current_timestamp(), + updated_by = 'app' + WHEN NOT MATCHED THEN INSERT + (speed_multiplier, updated_at, updated_by) + VALUES (source.speed_multiplier, current_timestamp(), 'app') + """) + c.close() + except Exception as e: + errors.append(f"race_control reset: {e}") + conn.close() + if errors: + st.warning(f"Truncated {truncated} tables, {len(errors)} errors: {'; '.join(errors)}") + else: + st.success(f"Cleared {truncated} tables! Run `python3 main.py` to start a new race.") + time.sleep(2) + st.rerun() + else: + st.error("Could not connect to database") + except Exception as e: + st.error(f"Failed to clear tables: {e}") + + st.divider() + + # Speed controls + st.subheader("⚑ Race Speed") + speed_msg = None + row1_col1, row1_col2 = st.columns(2) + row2_col1, row2_col2 = st.columns(2) + speed_layout = [(row1_col1, "0.5x", 0.5), (row1_col2, "1x", 1.0), + (row2_col1, "2x", 2.0), (row2_col2, "4x", 4.0)] + + for col, label, multiplier in speed_layout: + with col: + if st.button(label, use_container_width=True): + try: + conn = db_connection.get_connection() + if conn: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + cursor = conn.cursor() + cursor.execute(f""" + MERGE INTO {schema_prefix}.race_control AS target + USING (SELECT {multiplier} AS speed_multiplier) AS source + ON 1=1 + WHEN MATCHED THEN UPDATE SET + speed_multiplier = source.speed_multiplier, + updated_at = current_timestamp(), + updated_by = 'app' + WHEN NOT MATCHED THEN INSERT + (speed_multiplier, updated_at, updated_by) + VALUES (source.speed_multiplier, current_timestamp(), 'app') + """) + cursor.close() + conn.close() + speed_msg = ("success", f"Speed set to {label}") + except Exception as e: + speed_msg = ("error", f"Failed to set speed: {e}") + + # Show speed message below all buttons (full width) + if speed_msg: + if speed_msg[0] == "success": + st.success(speed_msg[1]) + else: + st.error(speed_msg[1]) + + # Current effective speed (under buttons) + speed_multiplier = 1.0 + try: + conn_speed = db_connection.get_connection() + if conn_speed: + schema_prefix = ".".join(TABLE_NAME.split(".")[:2]) + c = conn_speed.cursor() + c.execute(f"SELECT speed_multiplier FROM {schema_prefix}.race_control LIMIT 1") + row = c.fetchone() + if row: + speed_multiplier = float(row[0]) + c.close() + conn_speed.close() + except Exception: + pass + effective_speed = TIME_ACCELERATION * speed_multiplier + if speed_multiplier != 1.0: + st.caption(f"Current: {effective_speed:.0f}x ({TIME_ACCELERATION:.0f}x base Γ— {speed_multiplier:.1f}x)") + else: + st.caption(f"Current: {TIME_ACCELERATION:.0f}x") st.divider() @@ -1096,9 +332,6 @@ def main(): real_minutes = REAL_TIME_DURATION_SECONDS // 60 st.markdown(f"**Playback:** {real_minutes} min (real time)") - # Time acceleration - st.markdown(f"**Speed:** {TIME_ACCELERATION:.0f}x acceleration") - st.markdown(f"**Course Marks:** {len(config['race_course_marks'])}") st.markdown(f"**Fleet Size:** {config['num_boats']} boats") @@ -1106,7 +339,10 @@ def main(): # Data source st.header("πŸ“Š Data Source") - st.markdown(f"**Table:** `{TABLE_NAME}`") + st.markdown(f"**Telemetry:** `{TABLE_NAME}`") + weather_table = config.get("weather_table_name", "") + if weather_table: + st.markdown(f"**Weather:** `{weather_table}`") # Last update time st.markdown(f"**Last Update:** {datetime.now().strftime('%H:%M:%S')}") @@ -1124,218 +360,219 @@ def main(): ]) st.markdown(f"**DB Credentials:** {'βœ“' if has_credentials else 'βœ—'}") - # Query data - with st.spinner("Loading race data..."): - df = query_telemetry() + # Use st.fragment for partial refresh - only the race data section re-renders + @st.fragment(run_every=refresh_interval if auto_refresh else None) + def race_dashboard(): + # Query data + with st.spinner("Loading race data..."): + df = db_connection.query_telemetry() - if df is not None and len(df) > 0: - # Convert timestamp for display - df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') + if df is not None and len(df) > 0: + # Convert timestamp for display + df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') - # Get latest data for each boat - latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + # Get latest data for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() - # Get latest timestamp - latest_time = df['datetime'].max() + # Get latest timestamp + latest_time = df['datetime'].max() - # Calculate race progress - elapsed_race_seconds = (latest_time.timestamp() - RACE_START_TIME.timestamp()) - race_progress_percent = min(100, (elapsed_race_seconds / RACE_DURATION_SECONDS) * 100) if RACE_DURATION_SECONDS > 0 else 0 + # Calculate race progress + elapsed_race_seconds = (latest_time.timestamp() - RACE_START_TIME.timestamp()) + race_progress_percent = min(100, (elapsed_race_seconds / RACE_DURATION_SECONDS) * 100) if RACE_DURATION_SECONDS > 0 else 0 - # Combined Race Progress & Fleet Status in one line - st.subheader("🏁 Race Progress & Fleet Status β›΅") + # Combined Race Progress & Fleet Status in one line + st.subheader("🏁 Race Progress & Fleet Status β›΅") - # Calculate fleet stats - total_boats = df['boat_id'].nunique() - racing = len(latest_positions[latest_positions['race_status'] == 'racing']) - finished = len(latest_positions[latest_positions['race_status'] == 'finished']) - dnf = len(latest_positions[latest_positions['race_status'] == 'dnf']) - not_started = len(latest_positions[latest_positions['race_status'] == 'not_started']) + # Calculate fleet stats + total_boats = df['boat_id'].nunique() + racing = len(latest_positions[latest_positions['race_status'] == 'racing']) + finished = len(latest_positions[latest_positions['race_status'] == 'finished']) + dnf = len(latest_positions[latest_positions['race_status'] == 'dnf']) + not_started = len(latest_positions[latest_positions['race_status'] == 'not_started']) - # Display race time - elapsed_days = int(elapsed_race_seconds // 86400) - elapsed_hours = int((elapsed_race_seconds % 86400) // 3600) - elapsed_minutes = int((elapsed_race_seconds % 3600) // 60) - time_str = f"{elapsed_days}d {elapsed_hours}h {elapsed_minutes}m" if elapsed_days > 0 else f"{elapsed_hours}h {elapsed_minutes}m" + # Display race time + elapsed_days = int(elapsed_race_seconds // 86400) + elapsed_hours = int((elapsed_race_seconds % 86400) // 3600) + elapsed_minutes = int((elapsed_race_seconds % 3600) // 60) + time_str = f"{elapsed_days}d {elapsed_hours}h {elapsed_minutes}m" if elapsed_days > 0 else f"{elapsed_hours}h {elapsed_minutes}m" - # Single row with progress bar and all fleet metrics - col1, col2, col3, col4, col5, col6, col7 = st.columns([3, 1, 1, 1, 1, 1, 1]) + # Single row with progress bar and all fleet metrics + col1, col2, col3, col4, col5, col6, col7 = st.columns([3, 1, 1, 1, 1, 1, 1]) - with col1: - st.progress(race_progress_percent / 100.0) - st.caption(f"{time_str} elapsed | {latest_time.strftime('%Y-%m-%d %H:%M UTC')}") + with col1: + st.progress(race_progress_percent / 100.0) + st.caption(f"{time_str} elapsed | {latest_time.strftime('%Y-%m-%d %H:%M UTC')}") - with col2: - st.metric("Progress", f"{race_progress_percent:.1f}%") + with col2: + st.metric("Progress", f"{race_progress_percent:.1f}%") - with col3: - st.metric("Total", total_boats) + with col3: + st.metric("Total", total_boats) - with col4: - st.metric("πŸƒ Racing", racing) + with col4: + st.metric("πŸƒ Racing", racing) - with col5: - st.metric("βœ… Done", finished) + with col5: + st.metric("βœ… Done", finished) - with col6: - st.metric("❌ DNF", dnf) + with col6: + st.metric("❌ DNF", dnf) - with col7: - st.metric("⏸️ Waiting", not_started) + with col7: + st.metric("⏸️ Waiting", not_started) - st.divider() + st.divider() - # Weather Station Display - st.subheader("🌀️ Captain's Weather Watch") - - # Query weather station data - weather_data = query_weather_station() - - if weather_data is not None: - # Weather station data available - wind_col1, wind_col2, wind_col3, wind_col4, wind_col5 = st.columns(5) - - # Wind direction name helper - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - idx = int((degrees % 360) / 22.5 + 0.5) % 16 - return directions[idx] - - with wind_col1: - st.metric("Wind Speed", f"{weather_data['wind_speed_knots']:.1f} kt") - - with wind_col2: - wind_dir = weather_data['wind_direction_degrees'] - st.metric("Wind Direction", f"{wind_dir:.0f}Β° ({get_wind_direction_name(wind_dir)})") - - with wind_col3: - # Format event type for display - event_type = weather_data['event_type'].replace('_', ' ').title() - # Add emoji based on event type - event_emoji = { - 'Stable': '😌', - 'Gradual Shift': 'πŸŒ€', - 'Frontal Passage': 'β›ˆοΈ', - 'Gust': 'πŸ’¨' - }.get(event_type, '🌬️') - st.metric("Conditions", f"{event_emoji} {event_type}") - - with wind_col4: - if weather_data['in_transition']: - st.metric("Status", "⚠️ Changing") - else: - time_in_state = weather_data['time_in_state_seconds'] - if time_in_state < 60: - st.metric("Status", f"βœ… Stable ({time_in_state:.0f}s)") - elif time_in_state < 3600: - st.metric("Status", f"βœ… Stable ({time_in_state/60:.0f}m)") + # Weather Station Display + st.subheader("🌀️ Captain's Weather Watch") + + # Query weather station data + weather_data = db_connection.query_weather_station() + + if weather_data is not None: + # Weather station data available + wind_col1, wind_col2, wind_col3, wind_col4, wind_col5 = st.columns(5) + + # Wind direction name helper + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + idx = int((degrees % 360) / 22.5 + 0.5) % 16 + return directions[idx] + + with wind_col1: + st.metric("Wind Speed", f"{weather_data['wind_speed_knots']:.1f} kt") + + with wind_col2: + wind_dir = weather_data['wind_direction_degrees'] + st.metric("Wind Direction", f"{wind_dir:.0f}Β° ({get_wind_direction_name(wind_dir)})") + + with wind_col3: + # Format event type for display + event_type = weather_data['event_type'].replace('_', ' ').title() + # Add emoji based on event type + event_emoji = { + 'Stable': '😌', + 'Gradual Shift': 'πŸŒ€', + 'Frontal Passage': 'β›ˆοΈ', + 'Gust': 'πŸ’¨' + }.get(event_type, '🌬️') + st.metric("Conditions", f"{event_emoji} {event_type}") + + with wind_col4: + if weather_data['in_transition']: + st.metric("Status", "⚠️ Changing") else: - st.metric("Status", f"βœ… Stable ({time_in_state/3600:.1f}h)") - - with wind_col5: - if weather_data['in_transition']: - st.metric("Next Change", "In progress") - else: - next_change = weather_data['next_change_in_seconds'] - if next_change < 60: - st.metric("Next Change", f"~{next_change:.0f}s") - elif next_change < 3600: - st.metric("Next Change", f"~{next_change/60:.0f}m") + time_in_state = weather_data['time_in_state_seconds'] + if time_in_state < 60: + st.metric("Status", f"βœ… Stable ({time_in_state:.0f}s)") + elif time_in_state < 3600: + st.metric("Status", f"βœ… Stable ({time_in_state/60:.0f}m)") + else: + st.metric("Status", f"βœ… Stable ({time_in_state/3600:.1f}h)") + + with wind_col5: + if weather_data['in_transition']: + st.metric("Next Change", "In progress") else: - st.metric("Next Change", f"~{next_change/3600:.1f}h") + next_change = weather_data['next_change_in_seconds'] + if next_change < 60: + st.metric("Next Change", f"~{next_change:.0f}s") + elif next_change < 3600: + st.metric("Next Change", f"~{next_change/60:.0f}m") + else: + st.metric("Next Change", f"~{next_change/3600:.1f}h") - # Show weather station info - st.caption(f"πŸ“‘ {weather_data['station_name']} β€’ {weather_data['station_location']}") + # Show weather station info + st.caption(f"πŸ“‘ {weather_data['station_name']} β€’ {weather_data['station_location']}") - else: - # Fallback to telemetry-based wind display - st.info("πŸ“‘ Weather station data not available - showing approximate conditions from boat telemetry") + else: + # Fallback to telemetry-based wind display + st.info("πŸ“‘ Weather station data not available - showing approximate conditions from boat telemetry") - wind_col1, wind_col2, wind_col3, wind_col4 = st.columns(4) + wind_col1, wind_col2, wind_col3, wind_col4 = st.columns(4) - # Get average wind conditions from latest telemetry - avg_wind_speed = latest_positions['wind_speed_knots'].mean() - avg_wind_direction = latest_positions['wind_direction_degrees'].mean() - min_wind = latest_positions['wind_speed_knots'].min() - max_wind = latest_positions['wind_speed_knots'].max() + # Get average wind conditions from latest telemetry + avg_wind_speed = latest_positions['wind_speed_knots'].mean() + avg_wind_direction = latest_positions['wind_direction_degrees'].mean() + min_wind = latest_positions['wind_speed_knots'].min() + max_wind = latest_positions['wind_speed_knots'].max() - # Wind direction name - def get_wind_direction_name(degrees): - directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - idx = int((degrees % 360) / 22.5 + 0.5) % 16 - return directions[idx] + # Wind direction name + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + idx = int((degrees % 360) / 22.5 + 0.5) % 16 + return directions[idx] - with wind_col1: - st.metric("Avg Wind Speed", f"{avg_wind_speed:.1f} kt") + with wind_col1: + st.metric("Avg Wind Speed", f"{avg_wind_speed:.1f} kt") - with wind_col2: - st.metric("Wind Direction", f"{avg_wind_direction:.0f}Β° ({get_wind_direction_name(avg_wind_direction)})") + with wind_col2: + st.metric("Wind Direction", f"{avg_wind_direction:.0f}Β° ({get_wind_direction_name(avg_wind_direction)})") - with wind_col3: - st.metric("Min Wind", f"{min_wind:.1f} kt") + with wind_col3: + st.metric("Min Wind", f"{min_wind:.1f} kt") - with wind_col4: - st.metric("Max Wind", f"{max_wind:.1f} kt") + with wind_col4: + st.metric("Max Wind", f"{max_wind:.1f} kt") - # Performance Statistics - st.subheader("πŸ“ˆ Performance Statistics") - perf_col1, perf_col2, perf_col3, perf_col4 = st.columns(4) + # Performance Statistics + st.subheader("πŸ“ˆ Performance Statistics") + perf_col1, perf_col2, perf_col3, perf_col4 = st.columns(4) - with perf_col1: - avg_speed = latest_positions['speed_over_ground_knots'].mean() - st.metric("Avg Speed", f"{avg_speed:.1f} kt") + with perf_col1: + avg_speed = latest_positions['speed_over_ground_knots'].mean() + st.metric("Avg Speed", f"{avg_speed:.1f} kt") - with perf_col2: - avg_vmg = latest_positions['vmg_knots'].mean() - st.metric("Avg VMG", f"{avg_vmg:.1f} kt") + with perf_col2: + avg_vmg = latest_positions['vmg_knots'].mean() + st.metric("Avg VMG", f"{avg_vmg:.1f} kt") - with perf_col3: - total_distance = latest_positions['distance_traveled_nm'].max() - st.metric("Max Distance", f"{total_distance:.1f} nm") + with perf_col3: + total_distance = latest_positions['distance_traveled_nm'].max() + st.metric("Max Distance", f"{total_distance:.1f} nm") - with perf_col4: - avg_marks = latest_positions['marks_rounded'].mean() - total_marks = latest_positions['total_marks'].iloc[0] - st.metric("Avg Progress", f"{avg_marks:.1f}/{total_marks} marks") + with perf_col4: + avg_marks = latest_positions['marks_rounded'].mean() + total_marks = latest_positions['total_marks'].iloc[0] + st.metric("Avg Progress", f"{avg_marks:.1f}/{total_marks} marks") - st.divider() + st.divider() - # Two-column layout: Race Map (left) and Leaderboard (right) - map_col, leaderboard_col = st.columns([2, 1]) + # Two-column layout: Race Map (left) and Leaderboard (right) + map_col, leaderboard_col = st.columns([2, 1]) - with map_col: - st.subheader("πŸ—ΊοΈ Race Map") - folium_map = create_race_map(df) - if folium_map: - # Disable map interaction returns to prevent reruns on zoom/pan - st_folium(folium_map, width=None, height=700, returned_objects=[]) + with map_col: + st.subheader("πŸ—ΊοΈ Race Map") + folium_map = components.create_race_map(df) + if folium_map: + # Disable map interaction returns to prevent reruns on zoom/pan + st_folium(folium_map, width=None, height=700, returned_objects=[]) - with leaderboard_col: - st.subheader("πŸ† Leaderboard") - display_leaderboard(df) + with leaderboard_col: + st.subheader("πŸ† Leaderboard") + components.display_leaderboard(df) - st.divider() + st.divider() - # Full boat statistics table - st.subheader("πŸ“‹ Boat Positions & Statistics") - display_boat_stats(df) + # Full boat statistics table + st.subheader("πŸ“‹ Boat Positions & Statistics") + components.display_boat_stats(df) - # Race timeline info - st.caption(f"Race time: {latest_time.strftime('%Y-%m-%d %H:%M:%S UTC')} | Total telemetry records: {len(df):,}") + # Race timeline info + st.caption(f"Race time: {latest_time.strftime('%Y-%m-%d %H:%M:%S UTC')} | Total telemetry records: {len(df):,}") - else: - st.warning("⚠️ No race data available. Make sure the telemetry generator is running and sending data to the table.") - st.info("Run `python main.py` to start the race simulation.") + else: + st.warning("⚠️ No race data available. Make sure the telemetry generator is running and sending data to the table.") + st.info("Run `python main.py` to start the race simulation.") - # Auto-refresh - always refresh regardless of data availability - if auto_refresh: - time.sleep(REFRESH_INTERVAL) - st.rerun() + # Render the auto-refreshing fragment + race_dashboard() except Exception as e: - st.error(f"❌ Application Error: {str(e)}") + st.error(f"Application Error: {str(e)}") st.exception(e) - st.info("πŸ”„ Click 'Refresh Now' in the sidebar to retry...") + st.info("Click 'Refresh Now' in the sidebar to retry...") if __name__ == "__main__": try: diff --git a/data_drifter/app/components.py b/data_drifter/app/components.py new file mode 100644 index 0000000..1ae83dc --- /dev/null +++ b/data_drifter/app/components.py @@ -0,0 +1,433 @@ +""" +UI rendering components for the Data Drifter Regatta app. + +Usage: + import components + components.init(config, boat_colors) + folium_map = components.create_race_map(df) + components.display_leaderboard(df) + components.display_boat_stats(df) +""" + +import streamlit as st +import pandas as pd +import folium +from folium import plugins +from navigation_utils import calculate_distance + +# Module-level state set by init() +_config = None +_boat_colors = None + + +def init(config, boat_colors): + """Initialize the components module with config and boat colors.""" + global _config, _boat_colors + _config = config + _boat_colors = boat_colors + + +def get_race_course_config(): + """Get race course configuration (marks, start/finish) from config""" + return { + "start_lat": _config["race_course_start_lat"], + "start_lon": _config["race_course_start_lon"], + "marks": _config["race_course_marks"] + } + + +def calculate_total_remaining_distance(lat, lon, current_mark_index, marks): + """ + Calculate total remaining distance along the race course in nautical miles. + + Args: + lat: Current latitude + lon: Current longitude + current_mark_index: Index of the next mark to round (0-based) + marks: List of all marks [[lat1, lon1], [lat2, lon2], ...] where last mark is finish + + Returns: + Total distance remaining along the course in nautical miles + """ + if not marks or len(marks) == 0: + return 0.0 + + total_distance = 0.0 + + # Marks to round are all except the last one (which is the finish line) + marks_to_round = marks[:-1] if len(marks) > 1 else [] + finish_lat, finish_lon = marks[-1][0], marks[-1][1] + + # If all marks are rounded, just return distance to finish + if current_mark_index >= len(marks_to_round): + return calculate_distance(lat, lon, finish_lat, finish_lon) + + # Add distance to next mark + next_mark = marks_to_round[current_mark_index] + total_distance += calculate_distance(lat, lon, next_mark[0], next_mark[1]) + + # Add distances between subsequent marks + for i in range(current_mark_index, len(marks_to_round) - 1): + mark1 = marks_to_round[i] + mark2 = marks_to_round[i + 1] + total_distance += calculate_distance(mark1[0], mark1[1], mark2[0], mark2[1]) + + # Add distance from last mark to finish + if len(marks_to_round) > 0: + last_mark = marks_to_round[-1] + total_distance += calculate_distance(last_mark[0], last_mark[1], finish_lat, finish_lon) + + return total_distance + + +def create_race_map(df): + """Create interactive race map with boat tracks using Folium""" + if df is None or len(df) == 0: + st.warning("No telemetry data available") + return None + + # Convert timestamp from epoch microseconds to datetime + df['datetime'] = pd.to_datetime(df['timestamp'], unit='us') + + # Get race course configuration + course_config = get_race_course_config() + + # Get race course marks + mark_lats = [m[0] for m in course_config['marks']] + mark_lons = [m[1] for m in course_config['marks']] + + # Center map on current boat positions (centroid of fleet) + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + racing_boats = latest_positions[latest_positions['race_status'].isin(['racing', 'not_started'])] + if len(racing_boats) > 0: + center_lat = racing_boats['latitude'].mean() + center_lon = racing_boats['longitude'].mean() + else: + # All finished/DNF β€” center on all boats + center_lat = latest_positions['latitude'].mean() + center_lon = latest_positions['longitude'].mean() + + # Auto-fit zoom to show all current boat positions + lat_spread = latest_positions['latitude'].max() - latest_positions['latitude'].min() + lon_spread = latest_positions['longitude'].max() - latest_positions['longitude'].min() + spread = max(lat_spread, lon_spread) + if spread < 0.05: + zoom = 13 + elif spread < 0.2: + zoom = 11 + elif spread < 0.5: + zoom = 10 + elif spread < 1.5: + zoom = 9 + elif spread < 3.0: + zoom = 8 + else: + zoom = 7 + + # Create Folium map + m = folium.Map( + location=[center_lat, center_lon], + zoom_start=zoom, + tiles='OpenStreetMap', + prefer_canvas=True + ) + + # Add race course line connecting marks + course_coords = [[lat, lon] for lat, lon in zip(mark_lats, mark_lons)] + folium.PolyLine( + course_coords, + color='gray', + weight=2, + opacity=0.7, + dash_array='10, 5', + popup='Race Course' + ).add_to(m) + + # Add markers for each race course mark + for idx, (lat, lon) in enumerate(zip(mark_lats, mark_lons)): + folium.Marker( + location=[lat, lon], + popup=f'Mark {idx + 1}', + tooltip=f'Mark {idx + 1}', + icon=folium.Icon(color='orange', icon='flag', prefix='fa') + ).add_to(m) + + # Add start line marker + folium.Marker( + location=[course_config['start_lat'], course_config['start_lon']], + popup='START', + tooltip='Start Line', + icon=folium.Icon(color='green', icon='play', prefix='fa') + ).add_to(m) + + # Add finish line marker (last mark) + finish_lat, finish_lon = course_config['marks'][-1] + folium.Marker( + location=[finish_lat, finish_lon], + popup='FINISH', + tooltip='Finish Line', + icon=folium.Icon(color='red', icon='stop', prefix='fa') + ).add_to(m) + + # Group by boat and plot tracks + boats = df.groupby('boat_id') + + for idx, (boat_id, boat_df) in enumerate(boats): + boat_df = boat_df.sort_values('timestamp') + + boat_name = boat_df['boat_name'].iloc[0] + boat_type = boat_df['boat_type'].iloc[0] + race_status = boat_df['race_status'].iloc[0] + + color = _boat_colors[idx % len(_boat_colors)] + + # Create boat track coordinates + track_coords = [[row['latitude'], row['longitude']] for _, row in boat_df.iterrows()] + + # Add boat track as polyline + folium.PolyLine( + track_coords, + color=color, + weight=3, + opacity=0.8, + popup=boat_name, + tooltip=boat_name + ).add_to(m) + + # Add current position marker (most recent point - first in DESC order) + latest = boat_df.iloc[-1] # Last point after sorting by timestamp ASC + + # Create popup content with boat information + popup_html = f""" +
+ {boat_name}
+ Status: {race_status}
+ Speed: {latest['speed_over_ground_knots']:.1f} knots
+ Heading: {latest['heading_degrees']:.0f}
+ Distance: {latest['distance_traveled_nm']:.1f} nm
+ VMG: {latest['vmg_knots']:.1f} knots
+ Marks: {latest['marks_rounded']}/{latest['total_marks']}
+ Time: {pd.to_datetime(latest['timestamp'], unit='us').strftime('%Y-%m-%d %H:%M:%S')} +
+ """ + + # Get heading for boat marker + heading = latest['heading_degrees'] + + # Use BoatMarker for racing boats, regular markers for finished/DNF + if race_status == 'finished': + folium.Marker( + location=[latest['latitude'], latest['longitude']], + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0], + icon=folium.Icon(color='lightgreen', icon='star', prefix='fa') + ).add_to(m) + elif race_status == 'dnf': + folium.Marker( + location=[latest['latitude'], latest['longitude']], + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0], + icon=folium.Icon(color='red', icon='times', prefix='fa') + ).add_to(m) + else: + # Use BoatMarker for racing boats with rounded SVG shape + plugins.BoatMarker( + location=[latest['latitude'], latest['longitude']], + heading=heading, + wind_heading=latest.get('wind_direction_degrees', heading + 45), + wind_speed=latest.get('wind_speed_knots', 10), + color=color, + popup=folium.Popup(popup_html, max_width=250), + tooltip=boat_name.split("'s")[0] + ).add_to(m) + + # Add wind indicator in top left corner + latest_positions_for_wind = df.sort_values('timestamp').groupby('boat_id').last() + avg_wind_speed = latest_positions_for_wind['wind_speed_knots'].mean() + avg_wind_direction = latest_positions_for_wind['wind_direction_degrees'].mean() + + # Create wind direction name + def get_wind_direction_name(degrees): + directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] + index = int((degrees + 11.25) / 22.5) % 16 + return directions[index] + + wind_dir_name = get_wind_direction_name(avg_wind_direction) + + # Create custom HTML for wind indicator + wind_html = f""" +
+
+
+ WIND +
+
+ ↓ +
+
+ {avg_wind_speed:.1f} kts +
+
+ {wind_dir_name} ({avg_wind_direction:.0f}) +
+
+
+ """ + + # Add the wind indicator to the map + m.get_root().html.add_child(folium.Element(wind_html)) + + return m + + +def display_leaderboard(df): + """Display leaderboard with all boats""" + if df is None or len(df) == 0: + st.info("No race data available") + return + + # Get latest position for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + + # Get race course marks + course_config = get_race_course_config() + marks = course_config['marks'] + + # Calculate total remaining distance along the race course for each boat + latest_positions['total_remaining_distance'] = latest_positions.apply( + lambda row: calculate_total_remaining_distance( + row['latitude'], row['longitude'], row['current_mark_index'], marks + ), + axis=1 + ) + + # Sort by total remaining distance (less distance = better rank) + # DNF boats go to the end + latest_positions['sort_key'] = latest_positions.apply( + lambda row: ( + float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] + ), + axis=1 + ) + latest_positions = latest_positions.sort_values('sort_key') + + # Display leaderboard header + st.markdown("##### Race Leaderboard") + + # Create container for scrollable leaderboard + leaderboard_container = st.container(height=600) + + with leaderboard_container: + # Display each boat with rank, name, and distance + for rank, (idx, row) in enumerate(latest_positions.iterrows(), start=1): + boat_id = row['boat_id'] + boat_name = row['boat_name'] + distance = row['distance_traveled_nm'] + + # Create columns for rank and boat info + col1, col2 = st.columns([0.5, 4.5]) + + with col1: + # Rank with medal emoji for top 3 + if rank == 1: + st.markdown(f"### πŸ₯‡") + elif rank == 2: + st.markdown(f"### πŸ₯ˆ") + elif rank == 3: + st.markdown(f"### πŸ₯‰") + else: + st.markdown(f"### **{rank}**") + + with col2: + # Boat name and distance + st.markdown(f"**{boat_name}**") + st.caption(f"{distance:.1f} nm") + + # Add divider between boats + if rank < len(latest_positions): + st.divider() + + +def display_boat_stats(df): + """Display boat statistics table""" + if df is None or len(df) == 0: + return + + # Get latest position for each boat + latest_positions = df.sort_values('timestamp').groupby('boat_id').last().reset_index() + + # Get race course marks + course_config = get_race_course_config() + marks = course_config['marks'] + + # Calculate total remaining distance along the race course for each boat + latest_positions['total_remaining_distance'] = latest_positions.apply( + lambda row: calculate_total_remaining_distance( + row['latitude'], row['longitude'], row['current_mark_index'], marks + ), + axis=1 + ) + + # Sort by total remaining distance (ascending) - boat with least distance is in 1st place + # DNF boats go to the end + latest_positions['sort_key'] = latest_positions.apply( + lambda row: ( + float('inf') if row['race_status'] == 'dnf' else row['total_remaining_distance'] + ), + axis=1 + ) + latest_positions = latest_positions.sort_values('sort_key') + + # Create display dataframe with the new total_remaining_distance column + display_df = latest_positions[[ + 'boat_name', 'boat_type', 'race_status', 'speed_over_ground_knots', + 'distance_traveled_nm', 'total_remaining_distance', 'vmg_knots', + 'marks_rounded', 'total_marks' + ]].copy() + + display_df.columns = [ + 'Boat', 'Type', 'Status', 'Speed (kt)', 'Distance (nm)', + 'To Dest (nm)', 'VMG (kt)', 'Marks', 'Total Marks' + ] + + # Add position column + display_df.insert(0, 'Pos', range(1, len(display_df) + 1)) + + # Format marks column + display_df['Marks'] = display_df.apply(lambda row: f"{row['Marks']}/{row['Total Marks']}", axis=1) + display_df = display_df.drop('Total Marks', axis=1) + + # Round numeric columns + display_df['Speed (kt)'] = display_df['Speed (kt)'].round(1) + display_df['Distance (nm)'] = display_df['Distance (nm)'].round(1) + display_df['To Dest (nm)'] = display_df['To Dest (nm)'].round(1) + display_df['VMG (kt)'] = display_df['VMG (kt)'].round(1) + + st.dataframe( + display_df, + use_container_width=True, + hide_index=True, + column_config={ + "Status": st.column_config.TextColumn( + "Status", + help="Current race status", + ) + } + ) diff --git a/data_drifter/app/db_connection.py b/data_drifter/app/db_connection.py new file mode 100644 index 0000000..ab02690 --- /dev/null +++ b/data_drifter/app/db_connection.py @@ -0,0 +1,522 @@ +""" +Database connection and query logic for the Data Drifter Regatta app. + +Usage: + import db_connection + db_connection.init(config, table_name) + df = db_connection.query_telemetry() +""" + +import os +import time +import logging +import threading + +import streamlit as st +from databricks.sdk.core import Config, oauth_service_principal + +# Import databricks-sql-connector with error handling +try: + from databricks import sql + from databricks.sdk import WorkspaceClient +except ImportError as e: + st.error(f"Failed to import databricks modules: {str(e)}") + st.info("Please ensure databricks-sql-connector and databricks-sdk are installed in requirements.txt") + st.stop() + +logger = logging.getLogger(__name__) + +# Module-level state set by init() +_config = None +_table_name = None + + +def init(config, table_name): + """Initialize the db_connection module with config and table name.""" + global _config, _table_name + _config = config + _table_name = table_name + + +def get_connection(): + """Create Databricks SQL connection using workspace authentication""" + logger.info("Starting SQL warehouse connection attempt") + debug_mode = os.getenv("DEBUG", "false").lower() == "true" + start_time = time.time() + + try: + if debug_mode: + st.write("Debug - Step 1: Checking environment variables") + st.write("Available DATABRICKS/DEFAULT env vars:") + for key in sorted(os.environ.keys()): + if 'DATABRICKS' in key or 'DEFAULT' in key: + value = os.environ[key] + if 'TOKEN' in key or 'SECRET' in key or 'PASSWORD' in key: + value = '***' if value else 'None' + else: + value = value[:80] if value and len(value) > 80 else value + st.write(f" {key} = {value}") + + # Get warehouse ID from config + warehouse_id = _config["warehouse_id"] + + # Extract server hostname from workspace URL (remove https:// prefix) + workspace_url = _config.get("workspace_url", os.getenv("DATABRICKS_HOST", "")) + if workspace_url.startswith("https://"): + server_hostname = workspace_url.replace("https://", "") + elif workspace_url.startswith("http://"): + server_hostname = workspace_url.replace("http://", "") + else: + server_hostname = workspace_url + + if debug_mode: + st.write(f"Debug - Step 2: Connection parameters") + st.write(f" Server hostname: {server_hostname}") + st.write(f" Warehouse ID: {warehouse_id}") + st.write(f" HTTP Path: /sql/1.0/warehouses/{warehouse_id}") + + # Method 1: Try using OAuth M2M with client credentials (recommended for Databricks Apps) + client_id = os.getenv("DATABRICKS_CLIENT_ID") + client_secret = os.getenv("DATABRICKS_CLIENT_SECRET") + + if client_id and client_secret: + logger.info("Attempting OAuth M2M authentication (Method 1)") + if debug_mode: + st.write("Debug - Step 3: Attempting connection with OAuth M2M (client credentials)...") + st.write(f" DATABRICKS_CLIENT_ID found: {client_id[:8]}...") + st.write(" DATABRICKS_CLIENT_SECRET found") + + try: + credential_provider = lambda: oauth_service_principal(Config( + host = f"https://{server_hostname}", + client_id = client_id, + client_secret = client_secret)) + + connection = sql.connect( + server_hostname = server_hostname, + http_path = f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider = credential_provider) + + if debug_mode: + st.write(" Connection object created, testing query...") + + # Test the connection + cursor = connection.cursor() + cursor.execute("SELECT 'test' as status, current_database() as db") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"OAuth M2M authentication successful! Result: {result}") + + logger.info(f"OAuth M2M authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as oauth_error: + logger.warning(f"OAuth M2M auth failed: {str(oauth_error)}") + if debug_mode: + st.error(f"OAuth M2M auth failed: {str(oauth_error)}") + st.write(" Falling back to alternative auth methods...") + else: + if debug_mode: + st.write("Debug - Step 3: Client credentials not found in environment") + if not client_id: + st.write(" DATABRICKS_CLIENT_ID not set") + if not client_secret: + st.write(" DATABRICKS_CLIENT_SECRET not set") + st.write(" Attempting connection with SDK default auth...") + + # Method 2: Try SDK automatic authentication + logger.info("Attempting SDK OAuth authentication (Method 2)") + try: + if debug_mode: + st.write(" Initializing WorkspaceClient for auth...") + + from databricks.sdk import WorkspaceClient + from databricks.sdk.core import ApiClient + + cfg = Config() + + if debug_mode: + st.write(f" Config host: {cfg.host if cfg.host else 'auto-detect'}") + st.write(" Creating API client...") + + api_client = ApiClient(cfg) + + if debug_mode: + st.write(" Getting auth headers...") + + def get_token(): + headers = api_client.do("GET", "/api/2.0/preview/scim/v2/Me", + headers={}, data={}, raw=True).headers + auth_header = headers.get('Authorization', '') + if auth_header.startswith('Bearer '): + return auth_header[7:] + return None + + if debug_mode: + st.write(" Connecting to SQL Warehouse...") + + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + auth_type="databricks-oauth", + _socket_timeout=30 + ) + + if debug_mode: + st.write(" Connection object created, testing query...") + + cursor = connection.cursor() + cursor.execute("SELECT 'test' as status, current_database() as db") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Connection successful! Result: {result}") + + logger.info(f"SDK OAuth authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as sdk_error: + logger.warning(f"SDK OAuth auth failed: {str(sdk_error)}") + if debug_mode: + st.error(f"SDK OAuth auth method failed: {str(sdk_error)}") + st.write(f" Error type: {type(sdk_error).__name__}") + st.exception(sdk_error) + + # Method 3: Try WorkspaceClient with better error handling + logger.info("Attempting WorkspaceClient authentication (Method 3)") + if debug_mode: + st.write("Method 3: Trying WorkspaceClient authentication...") + + try: + w = WorkspaceClient(host=f"https://{server_hostname}") + + if debug_mode: + st.write(f" WorkspaceClient created") + st.write(f" Host: {w.config.host}") + try: + auth_details = str(w.config) + st.write(f" Config: {auth_details[:200]}") + except: + pass + + if debug_mode: + st.write(" Attempting authentication...") + + try: + credentials = w.config.authenticate() + if debug_mode: + st.write(f" Authentication successful, got credentials") + + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider=lambda: credentials, + _socket_timeout=30 + ) + + if debug_mode: + st.write(" SQL Connection created, testing...") + + cursor = connection.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Method 3 successful! Test query returned: {result}") + + logger.info(f"WorkspaceClient authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as auth_error: + if debug_mode: + st.error(f" Authentication failed: {str(auth_error)}") + st.write(f" Error type: {type(auth_error).__name__}") + raise + + except Exception as wc_error: + logger.warning(f"WorkspaceClient auth failed: {str(wc_error)}") + if debug_mode: + st.error(f"Method 3 failed: {str(wc_error)}") + st.write(f" Error type: {type(wc_error).__name__}") + st.exception(wc_error) + + # Method 4: Try OAuth U2M flow (for apps with attached resources) + logger.info("Attempting OAuth U2M flow (Method 4)") + if debug_mode: + st.write("Method 4: Trying OAuth U2M flow...") + + try: + connection = sql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + auth_type="databricks-oauth", + _socket_timeout=30 + ) + + if debug_mode: + st.write(" OAuth connection created, testing...") + + cursor = connection.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + cursor.close() + + if debug_mode: + st.success(f"Method 4 successful! Test query returned: {result}") + + logger.info(f"OAuth U2M authentication successful in {time.time() - start_time:.2f}s") + return connection + + except Exception as oauth_error: + logger.warning(f"OAuth U2M auth failed: {str(oauth_error)}") + if debug_mode: + st.error(f"Method 4 failed: {str(oauth_error)}") + st.write(f" Error type: {type(oauth_error).__name__}") + st.exception(oauth_error) + + # All methods failed + logger.error(f"All connection methods failed after {time.time() - start_time:.2f}s") + if debug_mode: + st.error("All connection methods failed!") + st.write("Troubleshooting suggestions:") + st.write("1. Check that the SQL Warehouse resource is properly attached") + st.write("2. Verify the warehouse is running and accessible") + st.write("3. Ensure the app has CAN_USE permission on the warehouse") + st.write("4. Check that environment variables are being set correctly") + + raise Exception("Unable to connect to Databricks SQL Warehouse. All authentication methods failed.") + + except Exception as e: + st.error(f"Failed to connect to Databricks SQL: {str(e)}") + st.info("Troubleshooting tips:") + st.info("- Ensure the app has a SQL Warehouse resource with CAN_USE permission") + st.info("- Check that the warehouse is running and accessible") + st.info(f"- Warehouse ID: {warehouse_id}") + if debug_mode: + st.write("Full error details:") + st.exception(e) + return None + + +def execute_query_with_timeout(cursor, query, timeout_seconds=60): + """Execute query with a timeout using threading""" + result = [None] + error = [None] + + def run_query(): + try: + cursor.execute(query) + result[0] = True + except Exception as e: + error[0] = e + + thread = threading.Thread(target=run_query) + thread.daemon = True + thread.start() + thread.join(timeout=timeout_seconds) + + if thread.is_alive(): + raise TimeoutError(f"Query execution exceeded {timeout_seconds} seconds timeout") + if error[0]: + raise error[0] + + return result[0] + + +def query_telemetry(limit=10000): + """Query latest telemetry data from table""" + logger.info(f"Starting telemetry query with limit={limit}") + debug_mode = os.getenv("DEBUG", "false").lower() == "true" + query_start = time.time() + + try: + # Step 1: Get connection + logger.info("Step 1: Establishing connection") + if debug_mode: + st.write("Step 1: Getting connection...") + + conn = get_connection() + if not conn: + logger.error("Failed to establish connection") + st.warning("Could not establish database connection") + return None + + logger.info(f"Step 1 complete in {time.time() - query_start:.2f}s") + + # Step 2: Count rows in table to verify we can query it + logger.info(f"Step 2: Counting rows in {_table_name}") + step2_start = time.time() + + try: + count_query = f"SELECT COUNT(*) as row_count FROM {_table_name}" + cursor = conn.cursor() + cursor.execute(count_query) + count_result = cursor.fetchone() + row_count = count_result[0] if count_result else 0 + cursor.close() + + logger.info(f"Step 2 complete: {row_count:,} rows found in {time.time() - step2_start:.2f}s") + + if row_count == 0: + logger.warning("Table is empty, no data available") + st.warning("Table is empty. No telemetry data available yet.") + st.info("Run `python main.py` to start generating telemetry data.") + return None + + except Exception as count_error: + logger.error(f"Failed to count rows: {str(count_error)}") + st.error(f"Failed to count rows in table: {str(count_error)}") + st.info("Check that:") + st.info(" - The table exists") + st.info(" - The service principal has SELECT permission on the table") + st.info(f" - Table name is correct: {_table_name}") + if debug_mode: + st.exception(count_error) + return None + + if debug_mode: + st.write(f"Step 3: Querying table: {_table_name}") + + query = f""" + SELECT + boat_id, + boat_name, + boat_type, + timestamp, + latitude, + longitude, + speed_over_ground_knots, + heading_degrees, + wind_speed_knots, + wind_direction_degrees, + distance_traveled_nm, + distance_to_destination_nm, + vmg_knots, + current_mark_index, + marks_rounded, + total_marks, + has_started, + has_finished, + race_status + FROM {_table_name} + ORDER BY timestamp DESC + LIMIT {limit} + """ + + # Step 3: Execute query with timeout + logger.info(f"Step 3: Querying {_table_name} for {limit} rows") + step3_start = time.time() + + if debug_mode: + st.write("Step 4: Executing query with 60 second timeout...") + + cursor = conn.cursor() + + try: + execute_query_with_timeout(cursor, query, timeout_seconds=60) + execution_time = time.time() - step3_start + + logger.info(f"Step 3 query execution complete in {execution_time:.2f}s") + + if debug_mode: + st.write(f" Query executed in {execution_time:.2f} seconds") + + except TimeoutError as te: + logger.error(f"Query timeout: {str(te)}") + st.error(f"Query execution timeout: {str(te)}") + st.warning("Troubleshooting suggestions:") + st.info("- The SQL warehouse may be slow or overloaded") + st.info("- Try reducing the data limit or adding filters") + st.info("- Check SQL warehouse status in Databricks UI") + st.info(f"- Table: {_table_name}") + cursor.close() + conn.close() + return None + + # Step 4-5: Fetch results + logger.info("Step 4-5: Fetching results as DataFrame") + fetch_start = time.time() + + if debug_mode: + st.write("Step 5: Fetching results...") + + df = cursor.fetchall_arrow().to_pandas() + cursor.close() + + fetch_time = time.time() - fetch_start + logger.info(f"Fetch complete: {len(df)} rows in {fetch_time:.2f}s") + + total_time = time.time() - query_start + logger.info(f"Query completed successfully in {total_time:.2f}s total") + + if debug_mode: + st.write(f"Step 6: Retrieved {len(df)} rows") + + return df + + except TimeoutError as te: + logger.error(f"Query timeout after {time.time() - query_start:.2f}s: {str(te)}") + return None + except Exception as e: + logger.error(f"Query failed after {time.time() - query_start:.2f}s: {str(e)}", exc_info=True) + st.error(f"Failed to query data: {str(e)}") + st.info("Check:") + st.info(f"- Table exists and has data") + st.info("- Service principal has SELECT permission") + st.info(f"- Table name: {_table_name}") + if debug_mode: + st.exception(e) + return None + + +def query_weather_station(): + """Query latest weather station data""" + logger.info("Querying weather station data") + + weather_table = _config.get("weather_table_name") + if not weather_table: + logger.warning("Weather station table not configured") + return None + + try: + conn = get_connection() + if not conn: + logger.error("Failed to establish connection for weather station query") + return None + + query = f""" + SELECT + station_id, + station_name, + station_location, + timestamp, + wind_speed_knots, + wind_direction_degrees, + event_type, + in_transition, + time_in_state_seconds, + next_change_in_seconds + FROM {weather_table} + ORDER BY timestamp DESC + LIMIT 1 + """ + + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchall_arrow().to_pandas() + cursor.close() + + if result.empty: + logger.warning("No weather station data found") + return None + + logger.info(f"Weather station data retrieved: {len(result)} record") + return result.iloc[0] + + except Exception as e: + logger.error(f"Failed to query weather station: {str(e)}", exc_info=True) + return None diff --git a/data_drifter/config.toml b/data_drifter/config.toml index 8c64c5a..efb6bbe 100644 --- a/data_drifter/config.toml +++ b/data_drifter/config.toml @@ -130,4 +130,14 @@ stats_interval_seconds = 5.0 [warehouse] # SQL Warehouse ID for app resource permissions # The app will be granted CAN_USE permission on this warehouse -sql_warehouse_id = "" +sql_warehouse_id = "" + +[analysis] +# Wind speed category thresholds (knots) +wind_light_threshold_knots = 8 +wind_moderate_threshold_knots = 15 + +# Consistency rating thresholds (coefficient of variation) +consistency_very_consistent = 0.1 +consistency_consistent = 0.2 +consistency_variable = 0.3 diff --git a/data_drifter/databricks.yml b/data_drifter/databricks.yml index 248358f..365b82e 100644 --- a/data_drifter/databricks.yml +++ b/data_drifter/databricks.yml @@ -117,6 +117,15 @@ resources: depends_on: - task_key: create_telemetry_table + - task_key: create_control_table + notebook_task: + notebook_path: ./notebooks/create_control_table.py + source: WORKSPACE + base_parameters: + telemetry_table: ${var.telemetry_table} + depends_on: + - task_key: create_weather_table + # Job permissions permissions: - level: CAN_MANAGE diff --git a/data_drifter/deploy.sh b/data_drifter/deploy.sh index 1abbadc..bd512a6 100644 --- a/data_drifter/deploy.sh +++ b/data_drifter/deploy.sh @@ -1,6 +1,12 @@ #!/bin/bash set -e +# Validate prerequisites +command -v databricks >/dev/null 2>&1 || { echo "❌ databricks CLI not found. Install: https://docs.databricks.com/en/dev-tools/cli/install.html"; exit 1; } +command -v jq >/dev/null 2>&1 || { echo "❌ jq not found. Install: brew install jq"; exit 1; } +command -v python3 >/dev/null 2>&1 || { echo "❌ python3 not found"; exit 1; } +python3 -c "import toml" 2>/dev/null || python3 -c "import tomllib" 2>/dev/null || python3 -c "import tomli" 2>/dev/null || { echo "❌ No TOML parser available. Run: pip install toml (or activate your venv first)"; exit 1; } + export DATABRICKS_CONFIG_PROFILE="${DATABRICKS_CONFIG_PROFILE:-DEFAULT}" TARGET="${DATABRICKS_TARGET:-dev}" APP_NAME="data-drifter-regatta-v3" @@ -62,7 +68,7 @@ APP_SOURCE_PATH="/Workspace/Users/$CURRENT_USER/.bundle/$BUNDLE_NAME/$TARGET/fil if databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -e '.name' > /dev/null; then # Wait for any active deployment to complete WAIT_COUNT=0 - while [ $WAIT_COUNT -lt 12 ]; do + while [ $WAIT_COUNT -lt 24 ]; do DEPLOYMENT_STATE=$(databricks apps get "$APP_NAME" --profile="$DATABRICKS_CONFIG_PROFILE" --output json 2>/dev/null | jq -r '.active_deployment.deployment_state // "NONE"') if [ "$DEPLOYMENT_STATE" = "NONE" ] || [ "$DEPLOYMENT_STATE" = "null" ]; then break diff --git a/data_drifter/main.py b/data_drifter/main.py index 9a22578..701a132 100644 --- a/data_drifter/main.py +++ b/data_drifter/main.py @@ -27,6 +27,8 @@ from weather_station import WeatherStation from zerobus.sdk.aio import ZerobusSdk from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties +from databricks import sql as dbsql +from databricks.sdk.core import Config, oauth_service_principal # Configure logging logging.basicConfig( @@ -45,10 +47,10 @@ def load_config(config_path: str = "config.toml") -> dict: return config except FileNotFoundError: logger.error(f"βœ— Configuration file not found: {config_path}") - sys.exit(1) + raise except Exception as e: logger.error(f"βœ— Failed to load configuration: {e}") - sys.exit(1) + raise class RaceSimulator: @@ -86,6 +88,13 @@ def __init__(self, fleet: SailboatFleet, stream: Any, config: Dict[str, Any], we self.time_acceleration = 1.0 self.emission_interval_real_time = self.emission_interval_race_time + # Speed control from control table + self.speed_multiplier = 1.0 + self.last_control_check = 0 + self.control_check_interval = 5.0 # Check every 5 seconds of real time + self.control_table = None + self.sql_connection = None + # Statistics tracking self.records_sent = 0 self.records_failed = 0 @@ -93,6 +102,60 @@ def __init__(self, fleet: SailboatFleet, stream: Any, config: Dict[str, Any], we self.last_stats_time = None self.elapsed_race_time = 0.0 + def _init_control_table(self, config, client_id, client_secret): + """Initialize SQL connection for reading the speed control table""" + try: + schema_prefix = ".".join(config["zerobus"]["table_name"].split(".")[:2]) + self.control_table = f"{schema_prefix}.race_control" + + workspace_url = config["zerobus"]["workspace_url"] + server_hostname = workspace_url.replace("https://", "").replace("http://", "") + warehouse_id = config["warehouse"]["sql_warehouse_id"] + + def credential_provider(): + config_obj = Config( + host=workspace_url, + client_id=client_id, + client_secret=client_secret, + ) + return oauth_service_principal(config_obj) + + self.sql_connection = dbsql.connect( + server_hostname=server_hostname, + http_path=f"/sql/1.0/warehouses/{warehouse_id}", + credentials_provider=credential_provider, + ) + logger.info(f"βœ“ Connected to control table: {self.control_table}") + except Exception as e: + logger.warning(f"⚠️ Could not connect to control table: {e}") + logger.warning(" Speed control from app will not be available") + self.control_table = None + + def _check_speed_control(self): + """Read speed multiplier from control table""" + if not self.control_table or not self.sql_connection: + return + + current_time = time.time() + if current_time - self.last_control_check < self.control_check_interval: + return + + self.last_control_check = current_time + try: + cursor = self.sql_connection.cursor() + cursor.execute(f"SELECT speed_multiplier FROM {self.control_table} LIMIT 1") + row = cursor.fetchone() + cursor.close() + if row and row[0] != self.speed_multiplier: + old = self.speed_multiplier + self.speed_multiplier = float(row[0]) + if self.speed_multiplier <= 0: + self.speed_multiplier = 1.0 + logger.info(f"⚑ Speed changed: {old:.1f}x β†’ {self.speed_multiplier:.1f}x") + except Exception as e: + # Silently ignore - control table might not exist yet + pass + async def run(self): """Run the race simulation""" logger.info(f"\nStarting race simulation...\n") @@ -137,8 +200,12 @@ async def run(self): self._print_stats() self.last_stats_time = current_real_time - # Sleep for real time interval before next emission - await asyncio.sleep(self.emission_interval_real_time) + # Check for speed control updates from the app + self._check_speed_control() + + # Apply speed multiplier to sleep interval + adjusted_interval = self.emission_interval_real_time / self.speed_multiplier + await asyncio.sleep(adjusted_interval) except KeyboardInterrupt: logger.info("\n\nInterrupted by user") @@ -147,16 +214,25 @@ async def run(self): self._print_final_summary() async def _send_telemetry(self, fleet_telemetry: list): - """Send telemetry data to Zerobus""" + """Send telemetry data to Zerobus with retry logic""" + max_retries = 3 for telemetry in fleet_telemetry: - try: - await self.stream.ingest_record(telemetry) - self.records_sent += 1 - except Exception as e: - self.records_failed += 1 - logger.error(f"βœ— Failed to send record from {telemetry['boat_name']}: {e}") - logger.error(e) - exit(1) + last_exception = None + for attempt in range(max_retries + 1): + try: + await self.stream.ingest_record(telemetry) + self.records_sent += 1 + break + except Exception as e: + last_exception = e + if attempt < max_retries: + backoff = 2 ** attempt # 1s, 2s, 4s + logger.warning(f"Failed to send record from {telemetry['boat_name']} (attempt {attempt + 1}/{max_retries + 1}), retrying in {backoff}s: {e}") + await asyncio.sleep(backoff) + else: + self.records_failed += 1 + logger.error(f"βœ— Failed to send record from {telemetry['boat_name']} after {max_retries + 1} attempts: {e}") + raise last_exception def _all_boats_finished(self) -> bool: """Check if all boats have finished the race""" @@ -197,7 +273,8 @@ def _print_stats(self): logger.info(f"Records sent: {self.records_sent}") logger.info(f"Records failed: {self.records_failed}") logger.info(f"Success rate: {(self.records_sent/(self.records_sent+self.records_failed)*100) if (self.records_sent+self.records_failed) > 0 else 0:.1f}%") - logger.info(f"Throughput: {rate:.2f} records/sec (real time)") + speed_str = f" | Speed: {self.speed_multiplier:.1f}x" if self.speed_multiplier != 1.0 else "" + logger.info(f"Throughput: {rate:.2f} records/sec (real time){speed_str}") logger.info("-" * 60 + "\n") def _print_final_summary(self): @@ -463,6 +540,7 @@ async def main(): # Create and run the race simulator simulator = RaceSimulator(fleet, stream, config, weather_station) + simulator._init_control_table(config, CLIENT_ID, CLIENT_SECRET) await simulator.run() # Close stream @@ -473,6 +551,12 @@ async def main(): except Exception as e: logger.error(f"βœ— Error closing stream: {e}") + if simulator.sql_connection: + try: + simulator.sql_connection.close() + except: + pass + return 0 diff --git a/data_drifter/notebooks/01_boat_performance.py b/data_drifter/notebooks/01_boat_performance.py index 458d2c7..f63f1a2 100644 --- a/data_drifter/notebooks/01_boat_performance.py +++ b/data_drifter/notebooks/01_boat_performance.py @@ -21,7 +21,7 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data +from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data, get_schema_prefix # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -152,7 +152,7 @@ # COMMAND ---------- # Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) -SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) # Save boat performance summary as tables boat_performance.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.boat_performance_summary") diff --git a/data_drifter/notebooks/02_wind_conditions.py b/data_drifter/notebooks/02_wind_conditions.py index 61c48cf..ebadb66 100644 --- a/data_drifter/notebooks/02_wind_conditions.py +++ b/data_drifter/notebooks/02_wind_conditions.py @@ -21,7 +21,11 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, load_race_data +from race_utils import load_race_course_config, load_race_data, get_schema_prefix, categorize_wind +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +categorize_wind_udf = udf(lambda speed: categorize_wind(speed), StringType()) # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -38,9 +42,7 @@ # Categorize wind conditions df_with_wind = df.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ) # Check distribution @@ -147,7 +149,7 @@ # COMMAND ---------- # Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) -SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) # Save wind condition analysis results as tables performance_by_wind.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.performance_by_wind") @@ -322,9 +324,7 @@ # Analyze by both wind condition and point of sail combined_analysis = df_with_sail.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ).groupBy("boat_id", "boat_name", "wind_category", "point_of_sail").agg( F.avg("vmg_knots").alias("avg_vmg"), F.avg("speed_over_ground_knots").alias("avg_speed"), diff --git a/data_drifter/notebooks/03_race_progress.py b/data_drifter/notebooks/03_race_progress.py index 5a5a1ae..aa6143c 100644 --- a/data_drifter/notebooks/03_race_progress.py +++ b/data_drifter/notebooks/03_race_progress.py @@ -21,7 +21,7 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data +from race_utils import load_race_course_config, add_remaining_distance_column, load_race_data, get_schema_prefix # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() @@ -162,6 +162,11 @@ # COMMAND ---------- +# Read consistency thresholds from config +_very_consistent = config.get("analysis", {}).get("consistency_very_consistent", 0.1) +_consistent = config.get("analysis", {}).get("consistency_consistent", 0.2) +_variable = config.get("analysis", {}).get("consistency_variable", 0.3) + # Calculate VMG consistency across legs consistency = vmg_by_leg.groupBy("boat_id", "boat_name").agg( F.avg("avg_vmg").alias("mean_vmg"), @@ -172,9 +177,9 @@ F.col("stddev_vmg") / F.abs(F.col("mean_vmg")) ).withColumn( "consistency_rating", - F.when(F.col("coefficient_of_variation") < 0.1, "Very Consistent") - .when(F.col("coefficient_of_variation") < 0.2, "Consistent") - .when(F.col("coefficient_of_variation") < 0.3, "Moderate") + F.when(F.col("coefficient_of_variation") < _very_consistent, "Very Consistent") + .when(F.col("coefficient_of_variation") < _consistent, "Consistent") + .when(F.col("coefficient_of_variation") < _variable, "Moderate") .otherwise("Variable") ).orderBy("coefficient_of_variation") @@ -216,7 +221,7 @@ # COMMAND ---------- # Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) -SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) # Save race progress analysis results as tables positions.write.mode("overwrite").saveAsTable(f"{SCHEMA_PREFIX}.race_positions_over_time") diff --git a/data_drifter/notebooks/04_race_summary.py b/data_drifter/notebooks/04_race_summary.py index 0ca1a27..71a5ab3 100644 --- a/data_drifter/notebooks/04_race_summary.py +++ b/data_drifter/notebooks/04_race_summary.py @@ -22,13 +22,17 @@ import pyspark.sql.functions as F from pyspark.sql.window import Window -from race_utils import load_race_course_config, load_race_data, get_finished_boats +from race_utils import load_race_course_config, load_race_data, get_finished_boats, get_schema_prefix, categorize_wind +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +categorize_wind_udf = udf(lambda speed: categorize_wind(speed), StringType()) # Load race course configuration from config.toml TABLE_NAME, marks, config = load_race_course_config() # Derive schema from TABLE_NAME (catalog.schema.table -> catalog.schema) -SCHEMA_PREFIX = ".".join(TABLE_NAME.split(".")[:2]) +SCHEMA_PREFIX = get_schema_prefix(TABLE_NAME) # Load telemetry data using utility function df = load_race_data(TABLE_NAME) @@ -115,9 +119,7 @@ except: print("⚠️ Table not found, calculating wind condition performance...") df_conditions = df.withColumn("wind_category", - F.when(F.col("wind_speed_knots") < 8, "Light") - .when(F.col("wind_speed_knots") < 15, "Moderate") - .otherwise("Heavy") + categorize_wind_udf(F.col("wind_speed_knots")) ) vmg_by_wind = df_conditions.groupBy("boat_id", "boat_name", "wind_category").agg( diff --git a/data_drifter/notebooks/create_control_table.py b/data_drifter/notebooks/create_control_table.py new file mode 100644 index 0000000..b6947af --- /dev/null +++ b/data_drifter/notebooks/create_control_table.py @@ -0,0 +1,39 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Create Race Control Table +# MAGIC +# MAGIC Simple control table that allows the app to send speed commands to the telemetry generator. +# MAGIC The generator polls this table periodically and adjusts its playback speed. +# MAGIC +# MAGIC **Parameters:** +# MAGIC - `telemetry_table`: Full telemetry table name (used to derive schema prefix) + +# COMMAND ---------- + +telemetry_table = dbutils.widgets.get("telemetry_table") +schema_prefix = ".".join(telemetry_table.split(".")[:2]) +control_table = f"{schema_prefix}.race_control" + +print(f"Creating control table: {control_table}") + +# COMMAND ---------- + +spark.sql(f""" +CREATE TABLE IF NOT EXISTS {control_table} ( + speed_multiplier DOUBLE COMMENT 'Playback speed multiplier (0.5 = half speed, 2.0 = double speed)', + updated_at TIMESTAMP COMMENT 'When the speed was last changed', + updated_by STRING COMMENT 'Who changed the speed (app or manual)' +) +USING DELTA +COMMENT 'Race control table - allows app to control telemetry generator speed' +""") + +# Insert default row if table is empty +count = spark.sql(f"SELECT COUNT(*) as cnt FROM {control_table}").collect()[0].cnt +if count == 0: + spark.sql(f""" + INSERT INTO {control_table} VALUES (1.0, current_timestamp(), 'system') + """) + print("Inserted default speed_multiplier = 1.0") + +print(f"Control table created: {control_table}") diff --git a/data_drifter/notebooks/grant_permissions.py b/data_drifter/notebooks/grant_permissions.py index dec65e5..58ae343 100644 --- a/data_drifter/notebooks/grant_permissions.py +++ b/data_drifter/notebooks/grant_permissions.py @@ -3,13 +3,14 @@ # MAGIC # Grant Permissions to App Service Principal # MAGIC # MAGIC This notebook grants Unity Catalog permissions for the Streamlit app to access tables. -# MAGIC The app service principal needs SELECT on telemetry table and SELECT/MODIFY on weather table. +# MAGIC The app needs SELECT/MODIFY at the schema level so it can read all tables and truncate them +# MAGIC when "Start New Race" is clicked (telemetry, weather, and analysis tables). # MAGIC # MAGIC **Parameters:** # MAGIC - `catalog_name`: Unity Catalog name # MAGIC - `schema_name`: Schema name within catalog -# MAGIC - `telemetry_table`: Full table name for telemetry data -# MAGIC - `weather_table`: Full table name for weather station data +# MAGIC - `telemetry_table`: Full table name for telemetry data (unused, kept for compatibility) +# MAGIC - `weather_table`: Full table name for weather station data (unused, kept for compatibility) # MAGIC - `app_service_principal_id`: Service principal ID of the deployed app # COMMAND ---------- @@ -24,8 +25,6 @@ print(f"Granting permissions to service principal: {app_service_principal_id}") print(f"Catalog: {catalog_name}") print(f"Schema: {schema_name}") -print(f"Telemetry table: {telemetry_table}") -print(f"Weather table: {weather_table}") # COMMAND ---------- @@ -45,19 +44,13 @@ # COMMAND ---------- -# Grant SELECT on telemetry table (read-only for app) +# Grant SELECT and MODIFY on the entire schema +# This covers telemetry, weather, and all analysis tables created by the notebooks +# MODIFY is needed for "Start New Race" which truncates all tables spark.sql(f""" -GRANT SELECT ON TABLE {telemetry_table} TO `{app_service_principal_id}` +GRANT SELECT, MODIFY ON SCHEMA {catalog_name}.{schema_name} TO `{app_service_principal_id}` """) -print(f"βœ… Granted SELECT on {telemetry_table}") - -# COMMAND ---------- - -# Grant SELECT and MODIFY on weather station table (app may write weather data) -spark.sql(f""" -GRANT SELECT, MODIFY ON TABLE {weather_table} TO `{app_service_principal_id}` -""") -print(f"βœ… Granted SELECT, MODIFY on {weather_table}") +print(f"βœ… Granted SELECT, MODIFY on schema {catalog_name}.{schema_name}") # COMMAND ---------- diff --git a/data_drifter/notebooks/race_utils.py b/data_drifter/notebooks/race_utils.py index 8d80bf5..1a48e2c 100644 --- a/data_drifter/notebooks/race_utils.py +++ b/data_drifter/notebooks/race_utils.py @@ -442,3 +442,28 @@ def get_finished_boats(df): ).orderBy("rank") return finished + + +def get_schema_prefix(table_name): + """Extract catalog.schema prefix from a fully-qualified table name (catalog.schema.table).""" + return ".".join(table_name.split(".")[:2]) + + +def categorize_wind(wind_speed_knots, config=None): + """Categorize wind speed into Light/Moderate/Heavy. + + Thresholds come from config.toml [analysis] section if available, + otherwise defaults to 8/15 knots. + """ + if config and "analysis" in config: + light = config["analysis"].get("wind_light_threshold_knots", 8) + moderate = config["analysis"].get("wind_moderate_threshold_knots", 15) + else: + light = 8 + moderate = 15 + + if wind_speed_knots < light: + return "Light" + elif wind_speed_knots < moderate: + return "Moderate" + return "Heavy" From 67c6fc7cf12368d14ef7e8d62952ea74ec9f2aea Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 10 Mar 2026 04:56:45 +0200 Subject: [PATCH 5/5] Simplify _send_telemetry: remove retry/backoff logic Co-Authored-By: Claude Opus 4.6 --- data_drifter/main.py | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/data_drifter/main.py b/data_drifter/main.py index 701a132..3f117af 100644 --- a/data_drifter/main.py +++ b/data_drifter/main.py @@ -214,25 +214,14 @@ async def run(self): self._print_final_summary() async def _send_telemetry(self, fleet_telemetry: list): - """Send telemetry data to Zerobus with retry logic""" - max_retries = 3 + """Send telemetry data to Zerobus""" for telemetry in fleet_telemetry: - last_exception = None - for attempt in range(max_retries + 1): - try: - await self.stream.ingest_record(telemetry) - self.records_sent += 1 - break - except Exception as e: - last_exception = e - if attempt < max_retries: - backoff = 2 ** attempt # 1s, 2s, 4s - logger.warning(f"Failed to send record from {telemetry['boat_name']} (attempt {attempt + 1}/{max_retries + 1}), retrying in {backoff}s: {e}") - await asyncio.sleep(backoff) - else: - self.records_failed += 1 - logger.error(f"βœ— Failed to send record from {telemetry['boat_name']} after {max_retries + 1} attempts: {e}") - raise last_exception + try: + await self.stream.ingest_record(telemetry) + self.records_sent += 1 + except Exception as e: + self.records_failed += 1 + logger.error(f"βœ— Failed to send record from {telemetry['boat_name']}: {e}") def _all_boats_finished(self) -> bool: """Check if all boats have finished the race"""