Skip to content

Commit a3268ee

Browse files
authored
fix: Turn any log streaming related errors to warnings (#876)
### Description Errors in log streaming should not propagate further. Just log them as warnings. Such errors should be fixed on API level: apify/apify-core#26653 ### Issues Closes: #873
1 parent 6117a95 commit a3268ee

3 files changed

Lines changed: 36 additions & 6 deletions

File tree

src/resource_clients/log.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { Readable } from 'node:stream';
44
import c from 'ansi-colors';
55

66
import type { Log } from '@apify/log';
7-
import { Logger, LogLevel } from '@apify/log';
7+
import log, { Logger, LogLevel } from '@apify/log';
88

99
import type { ApifyApiError } from '../apify_api_error';
1010
import type { ApiClientSubResourceOptions } from '../base/api_client';
@@ -197,11 +197,15 @@ export class StreamedLog {
197197
if (!logStream) {
198198
return;
199199
}
200-
const lastChunkRemainder = await this.logStreamChunks(logStream);
201-
// Process whatever is left when exiting. Maybe it is incomplete, maybe it is last log without EOL.
202-
const lastMessage = Buffer.from(lastChunkRemainder).toString().trim();
203-
if (lastMessage.length) {
204-
this.destinationLog.info(lastMessage);
200+
try {
201+
const lastChunkRemainder = await this.logStreamChunks(logStream);
202+
// Process whatever is left when exiting. Maybe it is incomplete, maybe it is last log without EOL.
203+
const lastMessage = Buffer.from(lastChunkRemainder).toString().trim();
204+
if (lastMessage.length) {
205+
this.destinationLog.info(lastMessage);
206+
}
207+
} catch (err) {
208+
log.warning(`Log redirection stopped due to error`, err as Error);
205209
}
206210
}
207211

test/mock_server/server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,18 @@ export function createDefaultApp(v2Router = express.Router()) {
106106
res.json({ data: { id: 'redirect-run-id', actId: 'redirect-actor-id', status: 'SUCCEEDED' } });
107107
});
108108

109+
v2Router.use('/actor-runs/econnreset-run-id/log', async (req: express.Request, res: express.Response) => {
110+
res.write(MOCKED_ACTOR_LOGS[0]);
111+
(res as any).flush();
112+
await new Promise<void>((resolve) => {
113+
setTimeout(resolve, 10);
114+
});
115+
req.socket.destroy();
116+
});
117+
v2Router.use('/actor-runs/econnreset-run-id', async (_, res) => {
118+
res.json({ data: { id: 'econnreset-run-id', actId: 'redirect-actor-id', status: 'SUCCEEDED' } });
119+
});
120+
109121
v2Router.use('/actor-runs', runs);
110122
v2Router.use('/actor-tasks', tasks);
111123
v2Router.use('/users', users);

test/runs.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,4 +447,18 @@ describe('Redirect run logs', () => {
447447
logSpy.mockRestore();
448448
});
449449
});
450+
451+
describe('run.getStreamedLog ECONNRESET', () => {
452+
test('logs warning instead of throwing on error', async () => {
453+
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
454+
const streamedLog = await client.run('econnreset-run-id').getStreamedLog({ fromStart: true });
455+
streamedLog?.start();
456+
await setTimeoutNode(500);
457+
await expect(streamedLog?.stop()).resolves.not.toThrow();
458+
expect(
459+
warnSpy.mock.calls.some(([msg]: [string]) => msg?.includes('Log redirection stopped due to error')),
460+
).toBe(true);
461+
warnSpy.mockRestore();
462+
});
463+
});
450464
});

0 commit comments

Comments
 (0)