Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions databricks_job_executor/streamlit_app/components/job_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import streamlit as st
from typing import Dict, Any
from streamlit_app.utils.job_manager import get_job_manager
from streamlit_app.utils.formatters import format_duration, format_timestamp, get_status_emoji


class JobInterface:
Expand Down Expand Up @@ -96,11 +97,13 @@ def _execute_job(self, job_id: int, job_name: str):
run_id = self.job_manager.run_job(job_id)

if run_id:
# Store running job info
# Store running job info with start time in milliseconds for duration calculation
current_time_ms = int(time.time() * 1000)
st.session_state.running_jobs[run_id] = {
'job_id': job_id,
'job_name': job_name,
'start_time': time.strftime('%Y-%m-%d %H:%M:%S'),
'start_time_ms': current_time_ms,
'status': 'PENDING',
'run_id': run_id
}
Expand Down Expand Up @@ -138,9 +141,19 @@ def render_running_jobs(self):
st.metric("Start Time", job_info.get('start_time', 'N/A'))

with col3:
duration = job_info.get('execution_duration', 'N/A')
if duration and duration != 'N/A':
duration = f"{duration // 1000}s" # Convert ms to seconds
# Calculate duration based on status
if status in ['PENDING', 'RUNNING']:
# For running jobs, calculate elapsed time
start_time_ms = job_info.get('start_time_ms')
if start_time_ms:
elapsed_ms = int(time.time() * 1000) - start_time_ms
duration = format_duration(elapsed_ms)
else:
duration = 'N/A'
else:
# For completed jobs, use execution_duration from API
execution_duration = job_info.get('execution_duration')
duration = format_duration(execution_duration)
st.metric("Duration", duration)

with col4:
Expand Down Expand Up @@ -188,6 +201,11 @@ def _update_running_jobs_status(self):

# Update job info
st.session_state.running_jobs[run_id]['status'] = status

# Update start_time_ms from API if not already set (for restored runs)
if status_info.get('start_time') and not st.session_state.running_jobs[run_id].get('start_time_ms'):
st.session_state.running_jobs[run_id]['start_time_ms'] = status_info['start_time']

if status_info.get('execution_duration'):
st.session_state.running_jobs[run_id]['execution_duration'] = status_info['execution_duration']

Expand Down Expand Up @@ -231,6 +249,50 @@ def _show_job_logs(self, run_id: int):
else:
st.info("No logs available yet.")

def render_last_completed_job(self):
"""Render section for viewing logs from the last completed job."""
if not self.job_manager:
return

job_id = st.session_state.get('databricks_job_id')
if not job_id:
return

st.markdown("📜 **Last Completed Job**")

# Get last completed run
last_run = self.job_manager.get_last_completed_run(job_id)

if not last_run:
st.info("No completed job runs found.")
return

run_id = last_run['run_id']
status = last_run.get('status', 'UNKNOWN')

# Get status emoji using utility
status_emoji = get_status_emoji(status)

with st.expander(f"{status_emoji} Run {run_id}: {last_run['job_name']} - {status}", expanded=False):
col1, col2, col3 = st.columns(3)

with col1:
st.metric("Status", status)

with col2:
# Format timestamp using utility
completed_at = format_timestamp(last_run.get('start_time_ms'))
st.metric("Completed At", completed_at)

with col3:
# Format duration using utility
duration = format_duration(last_run.get('execution_duration'))
st.metric("Duration", duration)

# Button to view logs
if st.button("📋 View Logs", key=f"last_logs_{run_id}"):
self._show_job_logs(run_id)

def render(self):
"""Render the complete job interface."""
self.update_connection()
Expand All @@ -246,4 +308,9 @@ def render(self):
# Running jobs status
if st.session_state.running_jobs:
st.divider()
self.render_running_jobs()
self.render_running_jobs()

# Last completed job (for viewing logs after job finishes)
if st.session_state.get('databricks_job_id'):
st.divider()
self.render_last_completed_job()
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,46 @@ def initialize_job_state():

if 'running_jobs' not in st.session_state:
st.session_state.running_jobs = {}
# Restore active runs from Databricks on first load
_restore_active_runs()

if 'job_logs' not in st.session_state:
st.session_state.job_logs = {}


def _restore_active_runs():
"""Query Databricks for active runs of the configured job and restore them."""
from streamlit_app.utils.job_manager import get_job_manager
from streamlit_app.utils.formatters import format_timestamp

job_id = st.session_state.get('databricks_job_id')
if not job_id:
return

try:
job_manager = get_job_manager()
if not job_manager:
return

active_runs = job_manager.get_active_runs(job_id)

for run_info in active_runs:
run_id = run_info['run_id']
start_time_ms = run_info.get('start_time_ms')

st.session_state.running_jobs[run_id] = {
'job_id': run_info['job_id'],
'job_name': run_info['job_name'],
'start_time': format_timestamp(start_time_ms),
'start_time_ms': start_time_ms,
'status': run_info['status'],
'run_id': run_id
}
except Exception:
# Silently fail - don't block initialization
pass


def initialize_environment_state(db_env: dict):
"""Initialize environment-related state."""
st.session_state.databricks_env = db_env
Expand Down
73 changes: 73 additions & 0 deletions databricks_job_executor/streamlit_app/utils/formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Formatting utilities for display in the Databricks Job Executor UI.
"""
import time
from typing import Optional


def format_duration(duration_ms: Optional[int]) -> str:
"""
Format duration in milliseconds to human-readable string.

Args:
duration_ms: Duration in milliseconds

Returns:
Human-readable duration string (e.g., "2m 30s")
"""
if not duration_ms or duration_ms < 0:
return 'N/A'

total_seconds = duration_ms // 1000
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60

if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"


def format_timestamp(timestamp_ms: Optional[int]) -> str:
"""
Format timestamp in milliseconds to human-readable datetime string.

Args:
timestamp_ms: Timestamp in milliseconds since epoch

Returns:
Formatted datetime string (e.g., "2026-01-12 10:30:45")
"""
if not timestamp_ms:
return 'N/A'

return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp_ms / 1000))


def get_status_emoji(status: str) -> str:
"""
Get emoji representation for job status.

Args:
status: Job status string

Returns:
Emoji string representing the status
"""
status_upper = status.upper()

if status_upper == 'SUCCESS':
return '✅'
elif status_upper == 'FAILED':
return '❌'
elif status_upper in ['CANCELLED', 'CANCELED']:
return '⚠️'
elif status_upper == 'RUNNING':
return '🏃'
elif status_upper == 'PENDING':
return '⏳'
else:
return '🔘'
66 changes: 66 additions & 0 deletions databricks_job_executor/streamlit_app/utils/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import streamlit as st
from typing import List, Dict, Any, Optional, Tuple
from streamlit_app.utils.databricks_env import get_databricks_client, validate_connection
from streamlit_app.utils.job_run_parser import extract_run_metadata, parse_run_state
from streamlit_app.components.ui.handlers import (
handle_api_error, handle_job_execution_error, handle_job_cancellation_error,
show_logs_load_error
Expand Down Expand Up @@ -130,6 +131,70 @@ def run_job(self, job_id: int) -> Optional[int]:
handle_api_error("job execution", str(e))
return None

def get_active_runs(self, job_id: int) -> List[Dict[str, Any]]:
"""
Get active (running or pending) runs for a specific job.

Args:
job_id: The job ID to check for active runs

Returns:
List of active run information dictionaries
"""
if not self._ensure_client():
return []

try:
runs = list(self.client.jobs.list_runs(job_id=job_id, active_only=True))

active_runs = []
for run in runs:
if not hasattr(run, 'run_id') or not run.run_id:
continue

# Use parser utility for consistent data extraction
run_data = extract_run_metadata(run, job_id)
active_runs.append(run_data)

return active_runs

except Exception:
# Silently fail - don't disrupt UI
return []

def get_last_completed_run(self, job_id: int) -> Optional[Dict[str, Any]]:
"""
Get the last completed run for a specific job.

Args:
job_id: The job ID to check for completed runs

Returns:
Last completed run information or None if not found
"""
if not self._ensure_client():
return None

try:
runs = list(self.client.jobs.list_runs(job_id=job_id, limit=10))

# Find the most recent terminated run
for run in runs:
if not hasattr(run, 'run_id') or not run.run_id:
continue

# Use parser to extract state
state = parse_run_state(run)

# Only return terminated runs
if state['lifecycle_state'] == 'TERMINATED':
return extract_run_metadata(run, job_id)

return None

except Exception:
return None

def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]:
"""
Get the status of a job run.
Expand Down Expand Up @@ -161,6 +226,7 @@ def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]:
'result_state': result_state,
},
'execution_duration': run_info.execution_duration if hasattr(run_info, 'execution_duration') else None,
'start_time': run_info.start_time if hasattr(run_info, 'start_time') else None,
'tasks': run_info.tasks if hasattr(run_info, 'tasks') else [],
}

Expand Down
Loading