-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathdeployment_progress.py
More file actions
119 lines (99 loc) · 4.31 KB
/
deployment_progress.py
File metadata and controls
119 lines (99 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.status import Status
from rich.console import Group
from rich.live import Live
from rich.style import Style
class EndpointDeploymentProgress:
"""Rich console progress interface matching ModelTrainer design"""
def __init__(self, endpoint_name: str):
self.endpoint_name = endpoint_name
self.console = Console()
self.current_status = "Creating"
self.live = None
# Create progress bar with timer (like ModelTrainer)
self.progress = Progress(
SpinnerColumn("bouncingBar"),
TextColumn("{task.description}"),
TimeElapsedColumn(),
)
self.progress.add_task("Waiting for Endpoint...")
# Create status display
self.status = Status("Current status: Creating")
def __enter__(self):
panel = Panel(
Group(self.progress, self.status),
title="Wait Log Panel",
border_style=Style(color="blue")
)
# Use the same console with frequent refresh for animations and timer
self.live = Live(panel, console=self.console, refresh_per_second=4)
self.live.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.live:
self.live.stop()
def log(self, message: str):
"""Log a message above the progress bar"""
self.console.print(message)
def update_status(self, status: str):
"""Update the deployment status"""
self.current_status = status
if self.status:
self.status.update(f"Current status: [bold]{status}")
def _deploy_done_with_progress(sagemaker_client, endpoint_name, progress_tracker=None):
"""Enhanced deployment checker with rich progress support"""
in_progress_statuses = ["Creating", "Updating"]
desc = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
status = desc["EndpointStatus"]
if progress_tracker:
progress_tracker.update_status(status)
else:
# Fallback to original dots
print("-" if status in in_progress_statuses else "!", end="", flush=True)
return None if status in in_progress_statuses else desc
def _live_logging_deploy_done_with_progress(sagemaker_client, endpoint_name, paginator, paginator_config, poll, progress_tracker=None):
"""Live logging deployment checker that routes logs to Rich progress tracker"""
import time
from botocore.exceptions import ClientError
stop = False
endpoint_status = None
try:
desc = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = desc["EndpointStatus"]
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
return None
raise e
try:
# Update status and check if we should stop
if endpoint_status != "Creating":
stop = True
if endpoint_status == "InService" and progress_tracker:
progress_tracker.log(f"✅ Created endpoint with name {endpoint_name}")
elif endpoint_status != "InService":
time.sleep(poll)
# Return immediately when endpoint is no longer creating.
# Log fetching below is best-effort and must not block completion.
return desc
# Fetch and route CloudWatch logs to progress tracker (only while Creating)
pages = paginator.paginate(
logGroupName=f"/aws/sagemaker/Endpoints/{endpoint_name}",
logStreamNamePrefix="AllTraffic/",
PaginationConfig=paginator_config,
)
for page in pages:
if "nextToken" in page:
paginator_config["StartingToken"] = page["nextToken"]
for event in page["events"]:
if progress_tracker:
progress_tracker.log(event["message"])
# Update progress tracker status
if progress_tracker:
progress_tracker.update_status(endpoint_status)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
return None
raise e
return None