Skip to content

Commit 89da219

Browse files
authored
Fix CloudWatchLogStorage with sparse logs (#2501)
When there are time gaps in logs, CloudWatch API may return less events than requested or even no events at all. We should poll logs until either enough events are accumulated or we reach the end, which is indicated by the same token: https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html > Partially full or empty pages don't necessarily mean that pagination > is finished. As long as the nextBackwardToken or nextForwardToken > returned is NOT equal to the nextToken that you passed into the API > call, there might be more log events available. Happens with both startFromHead=false and startFromHead=true, but way more often when startFromHead=false. boto/boto3#3718 (comment) > The reason behind this behavior is that get_log_events returns > a paginated response, with pages being split by time. The responses > you're getting with no events are periods of time where no events > happened, and you need to keep paginating using the nextBackwardToken Fixes: #2500
1 parent 9cfd011 commit 89da219

File tree

2 files changed

+62
-33
lines changed

2 files changed

+62
-33
lines changed

src/dstack/_internal/server/services/logs/aws.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,28 +86,22 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi
8686
raise
8787
logger.debug("Stream %s not found, returning dummy response", stream)
8888
cw_events = []
89-
cw_events_iter: Iterator[_CloudWatchLogEvent]
90-
if request.descending:
91-
# Regardless of the startFromHead value log events are arranged in chronological order,
92-
# from earliest to latest.
93-
cw_events_iter = reversed(cw_events)
94-
else:
95-
cw_events_iter = iter(cw_events)
9689
logs = [
9790
LogEvent(
9891
timestamp=unix_time_ms_to_datetime(cw_event["timestamp"]),
9992
log_source=LogEventSource.STDOUT,
10093
message=cw_event["message"],
10194
)
102-
for cw_event in cw_events_iter
95+
for cw_event in cw_events
10396
]
10497
return JobSubmissionLogs(logs=logs)
10598

10699
def _get_log_events(self, stream: str, request: PollLogsRequest) -> List[_CloudWatchLogEvent]:
100+
limit = request.limit
107101
parameters = {
108102
"logGroupName": self._group,
109103
"logStreamName": stream,
110-
"limit": request.limit,
104+
"limit": limit,
111105
}
112106
start_from_head = not request.descending
113107
parameters["startFromHead"] = start_from_head
@@ -119,25 +113,32 @@ def _get_log_events(self, stream: str, request: PollLogsRequest) -> List[_CloudW
119113
# No need to substract one millisecond in this case, though, seems that endTime is
120114
# exclusive, that is, time interval boundaries are [startTime, entTime)
121115
parameters["endTime"] = datetime_to_unix_time_ms(request.end_time)
122-
response = self._client.get_log_events(**parameters)
123-
events: List[_CloudWatchLogEvent] = response["events"]
124-
if start_from_head or events:
125-
return events
126-
# Workaround for https://github.com/boto/boto3/issues/3718
127-
# Required only when startFromHead = false (the default value).
128-
next_token: str = response["nextBackwardToken"]
116+
# "Partially full or empty pages don't necessarily mean that pagination is finished.
117+
# As long as the nextBackwardToken or nextForwardToken returned is NOT equal to the
118+
# nextToken that you passed into the API call, there might be more log events available."
119+
events: List[_CloudWatchLogEvent] = []
120+
next_token: Optional[str] = None
121+
next_token_key = "nextForwardToken" if start_from_head else "nextBackwardToken"
129122
# Limit max tries to avoid a possible infinite loop if the API is misbehaving
130123
tries_left = 10
131124
while tries_left:
132-
parameters["nextToken"] = next_token
125+
if next_token is not None:
126+
parameters["nextToken"] = next_token
133127
response = self._client.get_log_events(**parameters)
134-
events = response["events"]
135-
if events or response["nextBackwardToken"] == next_token:
128+
if start_from_head:
129+
events.extend(response["events"])
130+
else:
131+
# Regardless of the startFromHead value log events are arranged in
132+
# chronological order, from earliest to latest.
133+
events.extend(reversed(response["events"]))
134+
if len(events) >= limit:
135+
return events[:limit]
136+
if response[next_token_key] == next_token:
136137
return events
137-
next_token = response["nextBackwardToken"]
138+
next_token = response[next_token_key]
138139
tries_left -= 1
139-
logger.warning("too many empty responses from stream %s, returning dummy response", stream)
140-
return []
140+
logger.warning("too many requests to stream %s, returning partial response", stream)
141+
return events
141142

142143
def write_logs(
143144
self,

src/tests/_internal/server/services/test_logs.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def mock_client(self, monkeypatch: pytest.MonkeyPatch) -> Mock:
7272
mock.get_log_events.return_value = {
7373
"events": [],
7474
"nextBackwardToken": "bwd",
75-
"nextFormartToken": "fwd",
75+
"nextForwardToken": "fwd",
7676
}
7777
return mock
7878

@@ -183,6 +183,7 @@ async def test_poll_logs_non_empty_response(
183183
{"timestamp": 1696586513234, "message": "SGVsbG8="},
184184
{"timestamp": 1696586513235, "message": "V29ybGQ="},
185185
]
186+
poll_logs_request.limit = 2
186187
job_submission_logs = log_storage.poll_logs(project, poll_logs_request)
187188

188189
assert job_submission_logs.logs == [
@@ -199,20 +200,21 @@ async def test_poll_logs_non_empty_response(
199200
]
200201

201202
@pytest.mark.asyncio
203+
@pytest.mark.parametrize("descending", [False, True])
202204
async def test_poll_logs_empty_response(
203205
self,
204206
project: ProjectModel,
205207
log_storage: CloudWatchLogStorage,
206208
mock_client: Mock,
207209
poll_logs_request: PollLogsRequest,
210+
descending: bool,
208211
):
209-
# Check that we don't use the workaround when descending=False -> startFromHead=True
210-
# https://github.com/dstackai/dstack/issues/1647
211212
mock_client.get_log_events.return_value["events"] = []
213+
poll_logs_request.descending = descending
212214
job_submission_logs = log_storage.poll_logs(project, poll_logs_request)
213215

214216
assert job_submission_logs.logs == []
215-
mock_client.get_log_events.assert_called_once()
217+
assert mock_client.get_log_events.call_count == 2
216218

217219
@pytest.mark.asyncio
218220
async def test_poll_logs_descending_non_empty_response_on_first_call(
@@ -227,6 +229,7 @@ async def test_poll_logs_descending_non_empty_response_on_first_call(
227229
{"timestamp": 1696586513235, "message": "V29ybGQ="},
228230
]
229231
poll_logs_request.descending = True
232+
poll_logs_request.limit = 2
230233
job_submission_logs = log_storage.poll_logs(project, poll_logs_request)
231234

232235
assert job_submission_logs.logs == [
@@ -243,16 +246,18 @@ async def test_poll_logs_descending_non_empty_response_on_first_call(
243246
]
244247

245248
@pytest.mark.asyncio
246-
async def test_poll_logs_descending_two_first_calls_return_empty_response(
249+
async def test_poll_logs_descending_some_responses_are_empty(
247250
self,
248251
project: ProjectModel,
249252
log_storage: CloudWatchLogStorage,
250253
mock_client: Mock,
251254
poll_logs_request: PollLogsRequest,
252255
):
253256
# The first two calls return empty event lists, though the token is not the same, meaning
254-
# there are more events.
255-
# https://github.com/dstackai/dstack/issues/1647
257+
# there are more events, see: https://github.com/dstackai/dstack/issues/1647
258+
# As the third call returns less events than requested (2 < 3), we continue to poll until
259+
# accumulate enough events (2 + 2) and return exactly the requested number of events (3),
260+
# see: https://github.com/dstackai/dstack/issues/2500
256261
mock_client.get_log_events.side_effect = [
257262
{
258263
"events": [],
@@ -272,8 +277,22 @@ async def test_poll_logs_descending_two_first_calls_return_empty_response(
272277
"nextBackwardToken": "bwd3",
273278
"nextForwardToken": "fwd",
274279
},
280+
{
281+
"events": [],
282+
"nextBackwardToken": "bwd4",
283+
"nextForwardToken": "fwd",
284+
},
285+
{
286+
"events": [
287+
{"timestamp": 1696586513232, "message": "aW5pdCAx"},
288+
{"timestamp": 1696586513233, "message": "aW5pdCAy"},
289+
],
290+
"nextBackwardToken": "bwd5",
291+
"nextForwardToken": "fwd",
292+
},
275293
]
276294
poll_logs_request.descending = True
295+
poll_logs_request.limit = 3
277296
job_submission_logs = log_storage.poll_logs(project, poll_logs_request)
278297

279298
assert job_submission_logs.logs == [
@@ -287,8 +306,13 @@ async def test_poll_logs_descending_two_first_calls_return_empty_response(
287306
log_source=LogEventSource.STDOUT,
288307
message="SGVsbG8=",
289308
),
309+
LogEvent(
310+
timestamp=datetime(2023, 10, 6, 10, 1, 53, 233000, tzinfo=timezone.utc),
311+
log_source=LogEventSource.STDOUT,
312+
message="aW5pdCAy",
313+
),
290314
]
291-
assert mock_client.get_log_events.call_count == 3
315+
assert mock_client.get_log_events.call_count == 5
292316

293317
@pytest.mark.asyncio
294318
async def test_poll_logs_descending_empty_response_with_same_token(
@@ -352,7 +376,7 @@ def _response_producer(*args, **kwargs):
352376
job_submission_logs = log_storage.poll_logs(project, poll_logs_request)
353377

354378
assert job_submission_logs.logs == []
355-
assert mock_client.get_log_events.call_count == 11 # initial call + 10 tries
379+
assert mock_client.get_log_events.call_count == 10
356380

357381
@pytest.mark.asyncio
358382
async def test_poll_logs_request_params_asc_no_diag_no_dates(
@@ -366,11 +390,13 @@ async def test_poll_logs_request_params_asc_no_diag_no_dates(
366390
poll_logs_request.limit = 5
367391
poll_logs_request.diagnose = False
368392
log_storage.poll_logs(project, poll_logs_request)
369-
mock_client.get_log_events.assert_called_once_with(
393+
assert mock_client.get_log_events.call_count == 2
394+
mock_client.get_log_events.assert_called_with(
370395
logGroupName="test-group",
371396
logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job",
372397
limit=5,
373398
startFromHead=True,
399+
nextToken="fwd",
374400
)
375401

376402
@pytest.mark.asyncio
@@ -394,13 +420,15 @@ async def test_poll_logs_request_params_desc_diag_with_dates(
394420
poll_logs_request.limit = 10
395421
poll_logs_request.diagnose = True
396422
log_storage.poll_logs(project, poll_logs_request)
397-
mock_client.get_log_events.assert_called_once_with(
423+
assert mock_client.get_log_events.call_count == 2
424+
mock_client.get_log_events.assert_called_with(
398425
logGroupName="test-group",
399426
logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/runner",
400427
limit=10,
401428
startFromHead=False,
402429
startTime=1696586513235,
403430
endTime=1696672913234,
431+
nextToken="bwd",
404432
)
405433

406434
@pytest.mark.asyncio

0 commit comments

Comments
 (0)