Skip to content

Commit f25f35b

Browse files
authored
Merge pull request #37 from thisisqubika/feature/job-app-fixes
Fixes on running job tracing
2 parents 1788fbe + d663a5c commit f25f35b

5 files changed

Lines changed: 387 additions & 5 deletions

File tree

databricks_job_executor/streamlit_app/components/job_interface.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import streamlit as st
66
from typing import Dict, Any
77
from streamlit_app.utils.job_manager import get_job_manager
8+
from streamlit_app.utils.formatters import format_duration, format_timestamp, get_status_emoji
89

910

1011
class JobInterface:
@@ -96,11 +97,13 @@ def _execute_job(self, job_id: int, job_name: str):
9697
run_id = self.job_manager.run_job(job_id)
9798

9899
if run_id:
99-
# Store running job info
100+
# Store running job info with start time in milliseconds for duration calculation
101+
current_time_ms = int(time.time() * 1000)
100102
st.session_state.running_jobs[run_id] = {
101103
'job_id': job_id,
102104
'job_name': job_name,
103105
'start_time': time.strftime('%Y-%m-%d %H:%M:%S'),
106+
'start_time_ms': current_time_ms,
104107
'status': 'PENDING',
105108
'run_id': run_id
106109
}
@@ -138,9 +141,19 @@ def render_running_jobs(self):
138141
st.metric("Start Time", job_info.get('start_time', 'N/A'))
139142

140143
with col3:
141-
duration = job_info.get('execution_duration', 'N/A')
142-
if duration and duration != 'N/A':
143-
duration = f"{duration // 1000}s" # Convert ms to seconds
144+
# Calculate duration based on status
145+
if status in ['PENDING', 'RUNNING']:
146+
# For running jobs, calculate elapsed time
147+
start_time_ms = job_info.get('start_time_ms')
148+
if start_time_ms:
149+
elapsed_ms = int(time.time() * 1000) - start_time_ms
150+
duration = format_duration(elapsed_ms)
151+
else:
152+
duration = 'N/A'
153+
else:
154+
# For completed jobs, use execution_duration from API
155+
execution_duration = job_info.get('execution_duration')
156+
duration = format_duration(execution_duration)
144157
st.metric("Duration", duration)
145158

146159
with col4:
@@ -188,6 +201,11 @@ def _update_running_jobs_status(self):
188201

189202
# Update job info
190203
st.session_state.running_jobs[run_id]['status'] = status
204+
205+
# Update start_time_ms from API if not already set (for restored runs)
206+
if status_info.get('start_time') and not st.session_state.running_jobs[run_id].get('start_time_ms'):
207+
st.session_state.running_jobs[run_id]['start_time_ms'] = status_info['start_time']
208+
191209
if status_info.get('execution_duration'):
192210
st.session_state.running_jobs[run_id]['execution_duration'] = status_info['execution_duration']
193211

@@ -231,6 +249,50 @@ def _show_job_logs(self, run_id: int):
231249
else:
232250
st.info("No logs available yet.")
233251

252+
def render_last_completed_job(self):
253+
"""Render section for viewing logs from the last completed job."""
254+
if not self.job_manager:
255+
return
256+
257+
job_id = st.session_state.get('databricks_job_id')
258+
if not job_id:
259+
return
260+
261+
st.markdown("📜 **Last Completed Job**")
262+
263+
# Get last completed run
264+
last_run = self.job_manager.get_last_completed_run(job_id)
265+
266+
if not last_run:
267+
st.info("No completed job runs found.")
268+
return
269+
270+
run_id = last_run['run_id']
271+
status = last_run.get('status', 'UNKNOWN')
272+
273+
# Get status emoji using utility
274+
status_emoji = get_status_emoji(status)
275+
276+
with st.expander(f"{status_emoji} Run {run_id}: {last_run['job_name']} - {status}", expanded=False):
277+
col1, col2, col3 = st.columns(3)
278+
279+
with col1:
280+
st.metric("Status", status)
281+
282+
with col2:
283+
# Format timestamp using utility
284+
completed_at = format_timestamp(last_run.get('start_time_ms'))
285+
st.metric("Completed At", completed_at)
286+
287+
with col3:
288+
# Format duration using utility
289+
duration = format_duration(last_run.get('execution_duration'))
290+
st.metric("Duration", duration)
291+
292+
# Button to view logs
293+
if st.button("📋 View Logs", key=f"last_logs_{run_id}"):
294+
self._show_job_logs(run_id)
295+
234296
def render(self):
235297
"""Render the complete job interface."""
236298
self.update_connection()
@@ -246,4 +308,9 @@ def render(self):
246308
# Running jobs status
247309
if st.session_state.running_jobs:
248310
st.divider()
249-
self.render_running_jobs()
311+
self.render_running_jobs()
312+
313+
# Last completed job (for viewing logs after job finishes)
314+
if st.session_state.get('databricks_job_id'):
315+
st.divider()
316+
self.render_last_completed_job()

databricks_job_executor/streamlit_app/components/ui/initializers.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,46 @@ def initialize_job_state():
5656

5757
if 'running_jobs' not in st.session_state:
5858
st.session_state.running_jobs = {}
59+
# Restore active runs from Databricks on first load
60+
_restore_active_runs()
5961

6062
if 'job_logs' not in st.session_state:
6163
st.session_state.job_logs = {}
6264

6365

66+
def _restore_active_runs():
67+
"""Query Databricks for active runs of the configured job and restore them."""
68+
from streamlit_app.utils.job_manager import get_job_manager
69+
from streamlit_app.utils.formatters import format_timestamp
70+
71+
job_id = st.session_state.get('databricks_job_id')
72+
if not job_id:
73+
return
74+
75+
try:
76+
job_manager = get_job_manager()
77+
if not job_manager:
78+
return
79+
80+
active_runs = job_manager.get_active_runs(job_id)
81+
82+
for run_info in active_runs:
83+
run_id = run_info['run_id']
84+
start_time_ms = run_info.get('start_time_ms')
85+
86+
st.session_state.running_jobs[run_id] = {
87+
'job_id': run_info['job_id'],
88+
'job_name': run_info['job_name'],
89+
'start_time': format_timestamp(start_time_ms),
90+
'start_time_ms': start_time_ms,
91+
'status': run_info['status'],
92+
'run_id': run_id
93+
}
94+
except Exception:
95+
# Silently fail - don't block initialization
96+
pass
97+
98+
6499
def initialize_environment_state(db_env: dict):
65100
"""Initialize environment-related state."""
66101
st.session_state.databricks_env = db_env
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""
2+
Formatting utilities for display in the Databricks Job Executor UI.
3+
"""
4+
import time
5+
from typing import Optional
6+
7+
8+
def format_duration(duration_ms: Optional[int]) -> str:
9+
"""
10+
Format duration in milliseconds to human-readable string.
11+
12+
Args:
13+
duration_ms: Duration in milliseconds
14+
15+
Returns:
16+
Human-readable duration string (e.g., "2m 30s")
17+
"""
18+
if not duration_ms or duration_ms < 0:
19+
return 'N/A'
20+
21+
total_seconds = duration_ms // 1000
22+
hours = total_seconds // 3600
23+
minutes = (total_seconds % 3600) // 60
24+
seconds = total_seconds % 60
25+
26+
if hours > 0:
27+
return f"{hours}h {minutes}m {seconds}s"
28+
elif minutes > 0:
29+
return f"{minutes}m {seconds}s"
30+
else:
31+
return f"{seconds}s"
32+
33+
34+
def format_timestamp(timestamp_ms: Optional[int]) -> str:
35+
"""
36+
Format timestamp in milliseconds to human-readable datetime string.
37+
38+
Args:
39+
timestamp_ms: Timestamp in milliseconds since epoch
40+
41+
Returns:
42+
Formatted datetime string (e.g., "2026-01-12 10:30:45")
43+
"""
44+
if not timestamp_ms:
45+
return 'N/A'
46+
47+
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp_ms / 1000))
48+
49+
50+
def get_status_emoji(status: str) -> str:
51+
"""
52+
Get emoji representation for job status.
53+
54+
Args:
55+
status: Job status string
56+
57+
Returns:
58+
Emoji string representing the status
59+
"""
60+
status_upper = status.upper()
61+
62+
if status_upper == 'SUCCESS':
63+
return '✅'
64+
elif status_upper == 'FAILED':
65+
return '❌'
66+
elif status_upper in ['CANCELLED', 'CANCELED']:
67+
return '⚠️'
68+
elif status_upper == 'RUNNING':
69+
return '🏃'
70+
elif status_upper == 'PENDING':
71+
return '⏳'
72+
else:
73+
return '🔘'

databricks_job_executor/streamlit_app/utils/job_manager.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import streamlit as st
66
from typing import List, Dict, Any, Optional, Tuple
77
from streamlit_app.utils.databricks_env import get_databricks_client, validate_connection
8+
from streamlit_app.utils.job_run_parser import extract_run_metadata, parse_run_state
89
from streamlit_app.components.ui.handlers import (
910
handle_api_error, handle_job_execution_error, handle_job_cancellation_error,
1011
show_logs_load_error
@@ -130,6 +131,70 @@ def run_job(self, job_id: int) -> Optional[int]:
130131
handle_api_error("job execution", str(e))
131132
return None
132133

134+
def get_active_runs(self, job_id: int) -> List[Dict[str, Any]]:
135+
"""
136+
Get active (running or pending) runs for a specific job.
137+
138+
Args:
139+
job_id: The job ID to check for active runs
140+
141+
Returns:
142+
List of active run information dictionaries
143+
"""
144+
if not self._ensure_client():
145+
return []
146+
147+
try:
148+
runs = list(self.client.jobs.list_runs(job_id=job_id, active_only=True))
149+
150+
active_runs = []
151+
for run in runs:
152+
if not hasattr(run, 'run_id') or not run.run_id:
153+
continue
154+
155+
# Use parser utility for consistent data extraction
156+
run_data = extract_run_metadata(run, job_id)
157+
active_runs.append(run_data)
158+
159+
return active_runs
160+
161+
except Exception:
162+
# Silently fail - don't disrupt UI
163+
return []
164+
165+
def get_last_completed_run(self, job_id: int) -> Optional[Dict[str, Any]]:
166+
"""
167+
Get the last completed run for a specific job.
168+
169+
Args:
170+
job_id: The job ID to check for completed runs
171+
172+
Returns:
173+
Last completed run information or None if not found
174+
"""
175+
if not self._ensure_client():
176+
return None
177+
178+
try:
179+
runs = list(self.client.jobs.list_runs(job_id=job_id, limit=10))
180+
181+
# Find the most recent terminated run
182+
for run in runs:
183+
if not hasattr(run, 'run_id') or not run.run_id:
184+
continue
185+
186+
# Use parser to extract state
187+
state = parse_run_state(run)
188+
189+
# Only return terminated runs
190+
if state['lifecycle_state'] == 'TERMINATED':
191+
return extract_run_metadata(run, job_id)
192+
193+
return None
194+
195+
except Exception:
196+
return None
197+
133198
def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]:
134199
"""
135200
Get the status of a job run.
@@ -161,6 +226,7 @@ def get_run_status(self, run_id: int) -> Optional[Dict[str, Any]]:
161226
'result_state': result_state,
162227
},
163228
'execution_duration': run_info.execution_duration if hasattr(run_info, 'execution_duration') else None,
229+
'start_time': run_info.start_time if hasattr(run_info, 'start_time') else None,
164230
'tasks': run_info.tasks if hasattr(run_info, 'tasks') else [],
165231
}
166232

0 commit comments

Comments
 (0)