Skip to content

Commit 4864863

Browse files
[UI] Query logs using descending #2892
- [x] Allowed `descending` in `PollLogsRequest` - [x] Supported `descending` in `FileLogStorage` (implemented an algorithm that reads lines from the end of t he file) - [x] Updated `CloudWatchLogStorage` to support `MAX_RETRIES` (to skip empty pages)
1 parent de1073f commit 4864863

5 files changed

Lines changed: 850 additions & 125 deletions

File tree

frontend/src/pages/Runs/Details/Logs/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { IProps } from './types';
1212

1313
import styles from './styles.module.scss';
1414

15-
const LIMIT_LOG_ROWS = 1000;
15+
const LIMIT_LOG_ROWS = 100;
1616
const LOADING_SCROLL_GAP = 300;
1717

1818
export const Logs: React.FC<IProps> = ({ className, projectName, runName, jobSubmissionId }) => {
Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime
22
from typing import Optional
33

4-
from pydantic import UUID4, Field, validator
4+
from pydantic import UUID4, Field
55

66
from dstack._internal.core.models.common import CoreModel
77

@@ -15,11 +15,3 @@ class PollLogsRequest(CoreModel):
1515
next_token: Optional[str] = None
1616
limit: int = Field(100, ge=0, le=1000)
1717
diagnose: bool = False
18-
19-
@validator("descending")
20-
@classmethod
21-
def validate_descending(cls, v):
22-
# Descending is not supported until we migrate from base64-encoded logs to plain text logs.
23-
if v is True:
24-
raise ValueError("descending: true is not supported")
25-
return v

src/dstack/_internal/server/services/logs/aws.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class CloudWatchLogStorage(LogStorage):
5555
PAST_EVENT_MAX_DELTA = int((timedelta(days=14)).total_seconds()) * 1000 - CLOCK_DRIFT
5656
# "None of the log events in the batch can be more than 2 hours in the future."
5757
FUTURE_EVENT_MAX_DELTA = int((timedelta(hours=2)).total_seconds()) * 1000 - CLOCK_DRIFT
58+
# Maximum number of retries when polling for log events to skip empty pages.
59+
MAX_RETRIES = 10
5860

5961
def __init__(self, *, group: str, region: Optional[str] = None) -> None:
6062
with self._wrap_boto_errors():
@@ -80,7 +82,7 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi
8082
next_token: Optional[str] = None
8183
with self._wrap_boto_errors():
8284
try:
83-
cw_events, next_token = self._get_log_events(stream, request)
85+
cw_events, next_token = self._get_log_events_with_retry(stream, request)
8486
except botocore.exceptions.ClientError as e:
8587
if not self._is_resource_not_found_exception(e):
8688
raise
@@ -101,7 +103,45 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi
101103
)
102104
for cw_event in cw_events
103105
]
104-
return JobSubmissionLogs(logs=logs, next_token=next_token if len(logs) > 0 else None)
106+
return JobSubmissionLogs(logs=logs, next_token=next_token)
107+
108+
def _get_log_events_with_retry(
109+
self, stream: str, request: PollLogsRequest
110+
) -> Tuple[List[_CloudWatchLogEvent], Optional[str]]:
111+
current_request = request
112+
previous_next_token = request.next_token
113+
114+
for attempt in range(self.MAX_RETRIES):
115+
cw_events, next_token = self._get_log_events(stream, current_request)
116+
117+
if cw_events:
118+
return cw_events, next_token
119+
120+
if not next_token:
121+
return [], None
122+
123+
if next_token == previous_next_token:
124+
return [], None
125+
126+
previous_next_token = next_token
127+
current_request = PollLogsRequest(
128+
run_name=request.run_name,
129+
job_submission_id=request.job_submission_id,
130+
start_time=request.start_time,
131+
end_time=request.end_time,
132+
descending=request.descending,
133+
next_token=next_token,
134+
limit=request.limit,
135+
diagnose=request.diagnose,
136+
)
137+
138+
if not request.descending:
139+
logger.debug(
140+
"Stream %s: exhausted %d retries without finding logs, returning empty response",
141+
stream,
142+
self.MAX_RETRIES,
143+
)
144+
return [], next_token if request.descending else None
105145

106146
def _get_log_events(
107147
self, stream: str, request: PollLogsRequest
@@ -115,7 +155,7 @@ def _get_log_events(
115155
}
116156

117157
if request.start_time:
118-
parameters["startTime"] = datetime_to_unix_time_ms(request.start_time) + 1
158+
parameters["startTime"] = datetime_to_unix_time_ms(request.start_time)
119159

120160
if request.end_time:
121161
parameters["endTime"] = datetime_to_unix_time_ms(request.end_time)

src/dstack/_internal/server/services/logs/filelog.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import os
12
from pathlib import Path
2-
from typing import List, Union
3+
from typing import Generator, List, Optional, Tuple, Union
34
from uuid import UUID
45

56
from dstack._internal.core.errors import ServerClientError
@@ -37,6 +38,14 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi
3738
producer=log_producer,
3839
)
3940

41+
if request.descending:
42+
return self._poll_logs_descending(log_file_path, request)
43+
else:
44+
return self._poll_logs_ascending(log_file_path, request)
45+
46+
def _poll_logs_ascending(
47+
self, log_file_path: Path, request: PollLogsRequest
48+
) -> JobSubmissionLogs:
4049
start_line = 0
4150
if request.next_token:
4251
try:
@@ -94,6 +103,99 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi
94103

95104
return JobSubmissionLogs(logs=logs, next_token=next_token)
96105

106+
def _poll_logs_descending(
107+
self, log_file_path: Path, request: PollLogsRequest
108+
) -> JobSubmissionLogs:
109+
start_offset = None
110+
if request.next_token:
111+
try:
112+
start_offset = int(request.next_token)
113+
if start_offset < 0:
114+
raise ValueError("Offset must be non-negative")
115+
except (ValueError, TypeError):
116+
raise ServerClientError(
117+
f"Invalid next_token for descending read: {request.next_token}. "
118+
f"Must be a non-negative integer offset."
119+
)
120+
121+
candidate_logs = []
122+
123+
try:
124+
line_generator = self._read_lines_reversed(log_file_path, start_offset)
125+
126+
for line_bytes, line_start_offset in line_generator:
127+
try:
128+
line_str = line_bytes.decode("utf-8")
129+
log_event = LogEvent.__response__.parse_raw(line_str)
130+
except Exception:
131+
continue # Skip malformed lines
132+
133+
if request.end_time is not None and log_event.timestamp > request.end_time:
134+
continue
135+
if request.start_time and log_event.timestamp <= request.start_time:
136+
break
137+
138+
candidate_logs.append((log_event, line_start_offset))
139+
140+
if len(candidate_logs) > request.limit:
141+
break
142+
except FileNotFoundError:
143+
return JobSubmissionLogs(logs=[], next_token=None)
144+
145+
logs = [log for log, offset in candidate_logs[: request.limit]]
146+
next_token = None
147+
if len(candidate_logs) > request.limit:
148+
# We fetched one more than the limit, so there are more pages.
149+
# The next token should point to the start of the last log we are returning.
150+
_last_log_event, last_log_offset = candidate_logs[request.limit - 1]
151+
next_token = str(last_log_offset)
152+
153+
return JobSubmissionLogs(logs=logs, next_token=next_token)
154+
155+
@staticmethod
156+
def _read_lines_reversed(
157+
filepath: Path, start_offset: Optional[int] = None, chunk_size: int = 8192
158+
) -> Generator[Tuple[bytes, int], None, None]:
159+
"""
160+
A generator that yields lines from a file in reverse order, along with the byte
161+
offset of the start of each line. This is memory-efficient for large files.
162+
"""
163+
with open(filepath, "rb") as f:
164+
if start_offset is None:
165+
f.seek(0, os.SEEK_END)
166+
cursor = f.tell()
167+
else:
168+
f.seek(0, os.SEEK_END)
169+
file_size = f.tell()
170+
cursor = file_size
171+
172+
buffer = b""
173+
174+
while cursor > 0:
175+
seek_pos = max(0, cursor - chunk_size)
176+
amount_to_read = cursor - seek_pos
177+
f.seek(seek_pos)
178+
chunk = f.read(amount_to_read)
179+
cursor = seek_pos
180+
181+
buffer = chunk + buffer
182+
183+
while b"\n" in buffer:
184+
newline_pos = buffer.rfind(b"\n")
185+
line = buffer[newline_pos + 1 :]
186+
line_start_offset = cursor + newline_pos + 1
187+
188+
# Skip lines that start at or after the start_offset
189+
if start_offset is None or line_start_offset < start_offset:
190+
yield line, line_start_offset
191+
192+
buffer = buffer[:newline_pos]
193+
194+
# The remaining buffer is the first line of the file.
195+
# Only yield it if we're not using start_offset or if it starts before start_offset
196+
if buffer and (start_offset is None or 0 < start_offset):
197+
yield buffer, 0
198+
97199
def write_logs(
98200
self,
99201
project: ProjectModel,

0 commit comments

Comments
 (0)