diff --git a/databricks_job_executor/streamlit_app/components/job_interface.py b/databricks_job_executor/streamlit_app/components/job_interface.py index 18c691c..bcfc460 100644 --- a/databricks_job_executor/streamlit_app/components/job_interface.py +++ b/databricks_job_executor/streamlit_app/components/job_interface.py @@ -5,6 +5,7 @@ import streamlit as st from typing import Dict, Any from streamlit_app.utils.job_manager import get_job_manager +from streamlit_app.utils.formatters import format_duration, format_timestamp, get_status_emoji class JobInterface: @@ -96,11 +97,13 @@ def _execute_job(self, job_id: int, job_name: str): run_id = self.job_manager.run_job(job_id) if run_id: - # Store running job info + # Store running job info with start time in milliseconds for duration calculation + current_time_ms = int(time.time() * 1000) st.session_state.running_jobs[run_id] = { 'job_id': job_id, 'job_name': job_name, 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), + 'start_time_ms': current_time_ms, 'status': 'PENDING', 'run_id': run_id } @@ -138,9 +141,19 @@ def render_running_jobs(self): st.metric("Start Time", job_info.get('start_time', 'N/A')) with col3: - duration = job_info.get('execution_duration', 'N/A') - if duration and duration != 'N/A': - duration = f"{duration // 1000}s" # Convert ms to seconds + # Calculate duration based on status + if status in ['PENDING', 'RUNNING']: + # For running jobs, calculate elapsed time + start_time_ms = job_info.get('start_time_ms') + if start_time_ms: + elapsed_ms = int(time.time() * 1000) - start_time_ms + duration = format_duration(elapsed_ms) + else: + duration = 'N/A' + else: + # For completed jobs, use execution_duration from API + execution_duration = job_info.get('execution_duration') + duration = format_duration(execution_duration) st.metric("Duration", duration) with col4: @@ -188,6 +201,11 @@ def _update_running_jobs_status(self): # Update job info st.session_state.running_jobs[run_id]['status'] = status + + # Update start_time_ms from API if not already set (for restored runs) + if status_info.get('start_time') and not st.session_state.running_jobs[run_id].get('start_time_ms'): + st.session_state.running_jobs[run_id]['start_time_ms'] = status_info['start_time'] + if status_info.get('execution_duration'): st.session_state.running_jobs[run_id]['execution_duration'] = status_info['execution_duration'] @@ -231,6 +249,50 @@ def _show_job_logs(self, run_id: int): else: st.info("No logs available yet.") + def render_last_completed_job(self): + """Render section for viewing logs from the last completed job.""" + if not self.job_manager: + return + + job_id = st.session_state.get('databricks_job_id') + if not job_id: + return + + st.markdown("📜 **Last Completed Job**") + + # Get last completed run + last_run = self.job_manager.get_last_completed_run(job_id) + + if not last_run: + st.info("No completed job runs found.") + return + + run_id = last_run['run_id'] + status = last_run.get('status', 'UNKNOWN') + + # Get status emoji using utility + status_emoji = get_status_emoji(status) + + with st.expander(f"{status_emoji} Run {run_id}: {last_run['job_name']} - {status}", expanded=False): + col1, col2, col3 = st.columns(3) + + with col1: + st.metric("Status", status) + + with col2: + # Format timestamp using utility + completed_at = format_timestamp(last_run.get('start_time_ms')) + st.metric("Completed At", completed_at) + + with col3: + # Format duration using utility + duration = format_duration(last_run.get('execution_duration')) + st.metric("Duration", duration) + + # Button to view logs + if st.button("📋 View Logs", key=f"last_logs_{run_id}"): + self._show_job_logs(run_id) + def render(self): """Render the complete job interface.""" self.update_connection() @@ -246,4 +308,9 @@ def render(self): # Running jobs status if st.session_state.running_jobs: st.divider() - self.render_running_jobs() \ No newline at end of file + self.render_running_jobs() + + # Last completed job (for viewing logs after job finishes) + if st.session_state.get('databricks_job_id'): + st.divider() + self.render_last_completed_job() \ No newline at end of file diff --git a/databricks_job_executor/streamlit_app/components/ui/initializers.py b/databricks_job_executor/streamlit_app/components/ui/initializers.py index a39983a..b229e02 100644 --- a/databricks_job_executor/streamlit_app/components/ui/initializers.py +++ b/databricks_job_executor/streamlit_app/components/ui/initializers.py @@ -56,11 +56,46 @@ def initialize_job_state(): if 'running_jobs' not in st.session_state: st.session_state.running_jobs = {} + # Restore active runs from Databricks on first load + _restore_active_runs() if 'job_logs' not in st.session_state: st.session_state.job_logs = {} +def _restore_active_runs(): + """Query Databricks for active runs of the configured job and restore them.""" + from streamlit_app.utils.job_manager import get_job_manager + from streamlit_app.utils.formatters import format_timestamp + + job_id = st.session_state.get('databricks_job_id') + if not job_id: + return + + try: + job_manager = get_job_manager() + if not job_manager: + return + + active_runs = job_manager.get_active_runs(job_id) + + for run_info in active_runs: + run_id = run_info['run_id'] + start_time_ms = run_info.get('start_time_ms') + + st.session_state.running_jobs[run_id] = { + 'job_id': run_info['job_id'], + 'job_name': run_info['job_name'], + 'start_time': format_timestamp(start_time_ms), + 'start_time_ms': start_time_ms, + 'status': run_info['status'], + 'run_id': run_id + } + except Exception: + # Silently fail - don't block initialization + pass + + def initialize_environment_state(db_env: dict): """Initialize environment-related state.""" st.session_state.databricks_env = db_env diff --git a/databricks_job_executor/streamlit_app/utils/formatters.py b/databricks_job_executor/streamlit_app/utils/formatters.py new file mode 100644 index 0000000..cfa69c6 --- /dev/null +++ b/databricks_job_executor/streamlit_app/utils/formatters.py @@ -0,0 +1,73 @@ +""" +Formatting utilities for display in the Databricks Job Executor UI. +""" +import time +from typing import Optional + + +def format_duration(duration_ms: Optional[int]) -> str: + """ + Format duration in milliseconds to human-readable string. + + Args: + duration_ms: Duration in milliseconds + + Returns: + Human-readable duration string (e.g., "2m 30s") + """ + if not duration_ms or duration_ms < 0: + return 'N/A' + + total_seconds = duration_ms // 1000 + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + + if hours > 0: + return f"{hours}h {minutes}m {seconds}s" + elif minutes > 0: + return f"{minutes}m {seconds}s" + else: + return f"{seconds}s" + + +def format_timestamp(timestamp_ms: Optional[int]) -> str: + """ + Format timestamp in milliseconds to human-readable datetime string. + + Args: + timestamp_ms: Timestamp in milliseconds since epoch + + Returns: + Formatted datetime string (e.g., "2026-01-12 10:30:45") + """ + if not timestamp_ms: + return 'N/A' + + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp_ms / 1000)) + + +def get_status_emoji(status: str) -> str: + """ + Get emoji representation for job status. + + Args: + status: Job status string + + Returns: + Emoji string representing the status + """ + status_upper = status.upper() + + if status_upper == 'SUCCESS': + return '✅' + elif status_upper == 'FAILED': + return '❌' + elif status_upper in ['CANCELLED', 'CANCELED']: + return '⚠️' + elif status_upper == 'RUNNING': + return '🏃' + elif status_upper == 'PENDING': + return '⏳' + else: + return '🔘' diff --git a/databricks_job_executor/streamlit_app/utils/job_manager.py b/databricks_job_executor/streamlit_app/utils/job_manager.py index 808777f..df1b35b 100644 --- a/databricks_job_executor/streamlit_app/utils/job_manager.py +++ b/databricks_job_executor/streamlit_app/utils/job_manager.py @@ -5,6 +5,7 @@ import streamlit as st from typing import List, Dict, Any, Optional, Tuple from streamlit_app.utils.databricks_env import get_databricks_client, validate_connection +from streamlit_app.utils.job_run_parser import extract_run_metadata, parse_run_state from streamlit_app.components.ui.handlers import ( handle_api_error, handle_job_execution_error, handle_job_cancellation_error, show_logs_load_error @@ -130,6 +131,70 @@ def run_job(self, job_id: int) -> Optional[int]: handle_api_error("job execution", str(e)) return None + def get_active_runs(self, job_id: int) -> List[Dict[str, Any]]: + """ + Get active (running or pending) runs for a specific job. + + Args: + job_id: The job ID to check for active runs + + Returns: + List of active run information dictionaries + """ + if not self._ensure_client(): + return [] + + try: + runs = list(self.client.jobs.list_runs(job_id=job_id, active_only=True)) + + active_runs = [] + for run in runs: + if not hasattr(run, 'run_id') or not run.run_id: + continue + + # Use parser utility for consistent data extraction + run_data = extract_run_metadata(run, job_id) + active_runs.append(run_data) + + return active_runs + + except Exception: + # Silently fail - don't disrupt UI + return [] + + def get_last_completed_run(self, job_id: int) -> Optional[Dict[str, Any]]: + """ + Get the last completed run for a specific job. + + Args: + job_id: The job ID to check for completed runs + + Returns: + Last completed run information or None if not found + """ + if not self._ensure_client(): + return None + + try: + runs = list(self.client.jobs.list_runs(job_id=job_id, limit=10)) + + # Find the most recent terminated run + for run in runs: + if not hasattr(run, 'run_id') or not run.run_id: + continue + + # Use parser to extract state + state = parse_run_state(run) + + # Only return terminated runs + if state['lifecycle_state'] == 'TERMINATED': + return extract_run_metadata(run, job_id) + + return None + + except Exception: + return None + def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]: """ Get the status of a job run. @@ -161,6 +226,7 @@ def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]: 'result_state': result_state, }, 'execution_duration': run_info.execution_duration if hasattr(run_info, 'execution_duration') else None, + 'start_time': run_info.start_time if hasattr(run_info, 'start_time') else None, 'tasks': run_info.tasks if hasattr(run_info, 'tasks') else [], } diff --git a/databricks_job_executor/streamlit_app/utils/job_run_parser.py b/databricks_job_executor/streamlit_app/utils/job_run_parser.py new file mode 100644 index 0000000..11815b9 --- /dev/null +++ b/databricks_job_executor/streamlit_app/utils/job_run_parser.py @@ -0,0 +1,141 @@ +""" +Job run data parsing utilities for Databricks Job Executor. + +Extracts and transforms run data from Databricks API responses. +""" +from typing import Dict, Any, Optional + + +def parse_run_state(run) -> Dict[str, Optional[str]]: + """ + Parse lifecycle and result state from a run object. + + Args: + run: Databricks run object + + Returns: + Dictionary with 'lifecycle_state' and 'result_state' + """ + lifecycle_state = None + result_state = None + + if hasattr(run, 'state') and run.state: + if hasattr(run.state, 'life_cycle_state') and run.state.life_cycle_state: + lifecycle_state = str(run.state.life_cycle_state.value) if hasattr( + run.state.life_cycle_state, 'value' + ) else str(run.state.life_cycle_state) + + if hasattr(run.state, 'result_state') and run.state.result_state: + result_state = str(run.state.result_state.value) if hasattr( + run.state.result_state, 'value' + ) else str(run.state.result_state) + + return { + 'lifecycle_state': lifecycle_state, + 'result_state': result_state + } + + +def calculate_duration(run) -> Optional[int]: + """ + Calculate execution duration from run object. + + Tries execution_duration field first, then calculates from timestamps. + + Args: + run: Databricks run object + + Returns: + Duration in milliseconds, or None if unavailable + """ + # Try direct execution_duration field + if hasattr(run, 'execution_duration') and run.execution_duration: + return run.execution_duration + + # Calculate from timestamps + start_time = run.start_time if hasattr(run, 'start_time') else None + end_time = run.end_time if hasattr(run, 'end_time') else None + + if start_time and end_time: + return end_time - start_time + + return None + + +def extract_run_metadata(run, job_id: int) -> Dict[str, Any]: + """ + Extract standardized metadata from a run object. + + Args: + run: Databricks run object + job_id: Job ID associated with the run + + Returns: + Dictionary with standardized run metadata + """ + run_id = run.run_id if hasattr(run, 'run_id') else None + + # Parse state + state = parse_run_state(run) + lifecycle_state = state['lifecycle_state'] + result_state = state['result_state'] + + # Determine display status + if lifecycle_state == 'TERMINATED': + status = result_state or 'COMPLETED' + elif lifecycle_state: + status = lifecycle_state + else: + status = 'UNKNOWN' + + # Get timestamps + start_time_ms = run.start_time if hasattr(run, 'start_time') else None + end_time_ms = run.end_time if hasattr(run, 'end_time') else None + + # Calculate duration + duration_ms = calculate_duration(run) + + # Get job name + job_name = f"Job {job_id}" + if hasattr(run, 'run_name') and run.run_name: + job_name = run.run_name + + return { + 'run_id': run_id, + 'job_id': job_id, + 'job_name': job_name, + 'status': status, + 'lifecycle_state': lifecycle_state, + 'result_state': result_state, + 'start_time_ms': start_time_ms, + 'end_time_ms': end_time_ms, + 'execution_duration': duration_ms, + } + + +def get_display_status(lifecycle_state: Optional[str], result_state: Optional[str]) -> str: + """ + Get user-friendly status from lifecycle and result states. + + Args: + lifecycle_state: Lifecycle state from Databricks + result_state: Result state from Databricks + + Returns: + Display-friendly status string + """ + if lifecycle_state == 'TERMINATED': + if result_state == 'SUCCESS': + return 'SUCCESS' + elif result_state == 'FAILED': + return 'FAILED' + elif result_state in ['CANCELLED', 'CANCELED']: + return 'CANCELLED' + else: + return 'TERMINATED' + elif lifecycle_state == 'RUNNING': + return 'RUNNING' + elif lifecycle_state == 'PENDING': + return 'PENDING' + else: + return lifecycle_state or 'UNKNOWN'