11import json
22import logging
3+ import time
34from collections .abc import Generator
45from datetime import datetime
56from pathlib import Path
6- from typing import Annotated , Optional
7+ from typing import Annotated , Any , Optional
78
89import typer
910from httpx import HTTPError , HTTPStatusError , ReadTimeout
1011from pydantic import BaseModel , ValidationError
12+ from rich_toolkit import RichToolkit
1113
1214from fastapi_cloud_cli .utils .api import APIClient
13- from fastapi_cloud_cli .utils .apps import get_app_config
15+ from fastapi_cloud_cli .utils .apps import AppConfig , get_app_config
1416from fastapi_cloud_cli .utils .auth import is_logged_in
1517from fastapi_cloud_cli .utils .cli import get_rich_toolkit
1618
1719logger = logging .getLogger (__name__ )
1820
21+ # Maximum number of reconnection attempts before giving up
22+ MAX_RECONNECT_ATTEMPTS = 10
23+
24+ # Delay between reconnection attempts in seconds
25+ RECONNECT_DELAY_SECONDS = 1
26+
27+ # Colors matching the UI log level indicators
28+ LOG_LEVEL_COLORS = {
29+ "debug" : "blue" ,
30+ "info" : "cyan" ,
31+ "warning" : "yellow" ,
32+ "warn" : "yellow" ,
33+ "error" : "red" ,
34+ "critical" : "magenta" ,
35+ "fatal" : "magenta" ,
36+ }
37+
1938
2039class LogEntry (BaseModel ):
2140 timestamp : datetime
@@ -30,7 +49,7 @@ def _stream_logs(
3049 follow : bool ,
3150) -> Generator [str , None , None ]:
3251 """Stream logs from the API."""
33- params : dict [str , str | int | bool ] = {
52+ params : dict [str , Any ] = {
3453 "tail" : tail ,
3554 "since" : since ,
3655 "follow" : follow ,
@@ -48,18 +67,6 @@ def _stream_logs(
4867 yield from response .iter_lines ()
4968
5069
51- # Colors matching the UI log level indicators
52- LOG_LEVEL_COLORS = {
53- "debug" : "blue" ,
54- "info" : "cyan" ,
55- "warning" : "yellow" ,
56- "warn" : "yellow" ,
57- "error" : "red" ,
58- "critical" : "magenta" ,
59- "fatal" : "magenta" ,
60- }
61-
62-
6370def _format_log_line (log : LogEntry ) -> str :
6471 """Format a log entry for display with a colored indicator matching the UI."""
6572 timestamp_str = log .timestamp .strftime ("%Y-%m-%dT%H:%M:%S.%f" )[:- 3 ] + "Z"
@@ -69,6 +76,110 @@ def _format_log_line(log: LogEntry) -> str:
6976 return f"[dim]┃[/dim] [dim]{ timestamp_str } [/dim] { log .message } "
7077
7178
79+ def _process_log_stream (
80+ toolkit : RichToolkit ,
81+ app_config : AppConfig ,
82+ tail : int ,
83+ since : str ,
84+ follow : bool ,
85+ ) -> None :
86+ """Process the log stream with reconnection logic for follow mode."""
87+ log_count = 0
88+ last_timestamp : datetime | None = None
89+ current_since = since
90+ current_tail = tail
91+ reconnect_attempts = 0
92+
93+ while True :
94+ try :
95+ for line in _stream_logs (
96+ app_id = app_config .app_id ,
97+ tail = current_tail ,
98+ since = current_since ,
99+ follow = follow ,
100+ ):
101+ if not line : # pragma: no cover
102+ continue
103+
104+ try :
105+ data = json .loads (line )
106+ except json .JSONDecodeError :
107+ logger .debug ("Failed to parse log line: %s" , line )
108+ continue
109+
110+ # Skip heartbeat messages
111+ if data .get ("type" ) == "heartbeat" : # pragma: no cover
112+ continue
113+
114+ # Handle error messages from the server
115+ if data .get ("type" ) == "error" :
116+ toolkit .print (
117+ f"Error: { data .get ('message' , 'Unknown error' )} " ,
118+ )
119+ raise typer .Exit (1 )
120+
121+ # Parse and display log entry
122+ try :
123+ log_entry = LogEntry .model_validate (data )
124+ toolkit .print (_format_log_line (log_entry ))
125+ log_count += 1
126+ last_timestamp = log_entry .timestamp
127+ # Reset reconnect attempts on successful log receipt
128+ reconnect_attempts = 0
129+ except ValidationError as e : # pragma: no cover
130+ logger .debug ("Failed to parse log entry: %s - %s" , data , e )
131+ continue
132+
133+ # Stream ended normally (only happens with --no-follow)
134+ if not follow and log_count == 0 :
135+ toolkit .print ("No logs found for the specified time range." )
136+ break
137+
138+ except KeyboardInterrupt : # pragma: no cover
139+ toolkit .print_line ()
140+ break
141+ except (ReadTimeout , HTTPError ) as e :
142+ # In follow mode, try to reconnect on connection issues
143+ if follow and not isinstance (e , HTTPStatusError ):
144+ reconnect_attempts += 1
145+ if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS :
146+ toolkit .print (
147+ "Lost connection to log stream. Please try again later." ,
148+ )
149+ raise typer .Exit (1 ) from None
150+
151+ logger .debug (
152+ "Connection lost, reconnecting (attempt %d/%d)..." ,
153+ reconnect_attempts ,
154+ MAX_RECONNECT_ATTEMPTS ,
155+ )
156+
157+ # On reconnect, resume from last seen timestamp
158+ # The API uses strict > comparison, so logs with the same timestamp
159+ # as last_timestamp will be filtered out (no duplicates)
160+ if last_timestamp :
161+ current_since = last_timestamp .isoformat ()
162+ current_tail = 0 # Don't fetch historical logs again
163+
164+ time .sleep (RECONNECT_DELAY_SECONDS )
165+ continue
166+
167+ # Handle non-recoverable errors
168+ if isinstance (e , HTTPStatusError ) and e .response .status_code in (401 , 403 ):
169+ toolkit .print (
170+ "The specified token is not valid. Use [blue]`fastapi login`[/] to generate a new token." ,
171+ )
172+ elif isinstance (e , ReadTimeout ):
173+ toolkit .print (
174+ "The request timed out. Please try again later." ,
175+ )
176+ else :
177+ toolkit .print (
178+ "Failed to fetch logs. Please try again later." ,
179+ )
180+ raise typer .Exit (1 ) from None
181+
182+
72183def logs (
73184 path : Annotated [
74185 Optional [Path ],
@@ -129,65 +240,10 @@ def logs(
129240 )
130241 toolkit .print_line ()
131242
132- try :
133- log_count = 0
134- for line in _stream_logs (
135- app_id = app_config .app_id ,
136- tail = tail ,
137- since = since ,
138- follow = follow ,
139- ):
140- if not line : # pragma: no cover
141- continue
142-
143- try :
144- data = json .loads (line )
145- except json .JSONDecodeError :
146- logger .debug ("Failed to parse log line: %s" , line )
147- continue
148-
149- # Skip heartbeat messages
150- if data .get ("type" ) == "heartbeat" : # pragma: no cover
151- continue
152-
153- # Handle error messages from the server
154- if data .get ("type" ) == "error" :
155- toolkit .print (
156- f"Error: { data .get ('message' , 'Unknown error' )} " ,
157- )
158- raise typer .Exit (1 )
159-
160- # Parse and display log entry
161- try :
162- log_entry = LogEntry .model_validate (data )
163- toolkit .print (_format_log_line (log_entry ))
164- log_count += 1
165- except ValidationError as e : # pragma: no cover
166- logger .debug ("Failed to parse log entry: %s - %s" , data , e )
167- continue
168-
169- if not follow and log_count == 0 :
170- toolkit .print ("No logs found for the specified time range." )
171-
172- except KeyboardInterrupt : # pragma: no cover
173- toolkit .print_line ()
174- except ReadTimeout :
175- toolkit .print (
176- "The request timed out. Please try again later." ,
177- )
178- raise typer .Exit (1 ) from None
179- except HTTPStatusError as e :
180- if e .response .status_code in (401 , 403 ):
181- toolkit .print (
182- "The specified token is not valid. Use [blue]`fastapi login`[/] to generate a new token." ,
183- )
184- else :
185- toolkit .print (
186- "Failed to fetch logs. Please try again later." ,
187- )
188- raise typer .Exit (1 ) from None
189- except HTTPError :
190- toolkit .print (
191- "Failed to fetch logs. Please try again later." ,
192- )
193- raise typer .Exit (1 ) from None
243+ _process_log_stream (
244+ toolkit = toolkit ,
245+ app_config = app_config ,
246+ tail = tail ,
247+ since = since ,
248+ follow = follow ,
249+ )
0 commit comments