Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions containers/api-proxy/proxy-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,30 @@ function isValidRequestId(id) {
return typeof id === 'string' && id.length <= 128 && /^[\w\-\.]+$/.test(id);
}

function handleRequestError(err, {
res,
requestId,
provider,
req,
targetHost,
startTime,
statusCode,
clientMessage,
extraMetrics,
}) {
const duration = Date.now() - startTime;
metrics.gaugeDec('active_requests', { provider });
metrics.increment('requests_errors_total', { provider });
if (extraMetrics) extraMetrics(duration);
logRequest('error', 'request_error', {
request_id: requestId, provider, method: req.method,
path: sanitizeForLog(req.url), duration_ms: duration,
error: sanitizeForLog(err.message), upstream_host: targetHost,
Comment on lines +134 to +141
});
if (!res.headersSent) res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: clientMessage, message: err.message }));
}

const checkRateLimit = createRateLimitChecker({
limiter,
metrics,
Expand Down Expand Up @@ -201,16 +225,16 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath =
req.on('error', (err) => {
if (errored) return;
errored = true;
const duration = Date.now() - startTime;
metrics.gaugeDec('active_requests', { provider });
metrics.increment('requests_errors_total', { provider });
logRequest('error', 'request_error', {
request_id: requestId, provider, method: req.method,
path: sanitizeForLog(req.url), duration_ms: duration,
error: sanitizeForLog(err.message), upstream_host: targetHost,
handleRequestError(err, {
res,
requestId,
provider,
req,
targetHost,
startTime,
statusCode: 400,
clientMessage: 'Client error',
});
if (!res.headersSent) res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Client error', message: err.message }));
});

req.on('data', chunk => {
Expand Down Expand Up @@ -356,16 +380,16 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath =
proxyRes.on('data', (chunk) => { responseBytes += chunk.length; });

proxyRes.on('error', (err) => {
const duration = Date.now() - startTime;
metrics.gaugeDec('active_requests', { provider });
metrics.increment('requests_errors_total', { provider });
logRequest('error', 'request_error', {
request_id: requestId, provider, method: req.method,
path: sanitizeForLog(req.url), duration_ms: duration,
error: sanitizeForLog(err.message), upstream_host: targetHost,
handleRequestError(err, {
res,
requestId,
provider,
req,
targetHost,
startTime,
statusCode: 502,
clientMessage: 'Response stream error',
});
Comment on lines 387 to 400
if (!res.headersSent) res.writeHead(502, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Response stream error', message: err.message }));
});

const billingInfo = extractBillingHeaders(proxyRes.headers);
Expand Down Expand Up @@ -419,18 +443,20 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath =
});

proxyReq.on('error', (err) => {
const duration = Date.now() - startTime;
metrics.gaugeDec('active_requests', { provider });
metrics.increment('requests_errors_total', { provider });
metrics.increment('requests_total', { provider, method: req.method, status_class: '5xx' });
metrics.observe('request_duration_ms', duration, { provider });
logRequest('error', 'request_error', {
request_id: requestId, provider, method: req.method,
path: sanitizeForLog(req.url), duration_ms: duration,
error: sanitizeForLog(err.message), upstream_host: targetHost,
handleRequestError(err, {
res,
requestId,
provider,
req,
targetHost,
startTime,
statusCode: 502,
clientMessage: 'Proxy error',
extraMetrics: (duration) => {
metrics.increment('requests_total', { provider, method: req.method, status_class: '5xx' });
metrics.observe('request_duration_ms', duration, { provider });
},
});
if (!res.headersSent) res.writeHead(502, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Proxy error', message: err.message }));
});

if (body.length > 0) proxyReq.write(body);
Expand Down
89 changes: 89 additions & 0 deletions containers/api-proxy/server.proxy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,95 @@ describe('proxyRequest X-Initiator injection', () => {
});
});

describe('proxyRequest error handling', () => {
function makeReq(headers = {}) {
const req = new EventEmitter();
req.url = '/v1/chat/completions';
req.method = 'POST';
req.headers = { 'content-type': 'application/json', ...headers };
return req;
}

function makeRes() {
return {
headersSent: false,
setHeader: jest.fn(),
writeHead: jest.fn(),
end: jest.fn(),
Comment on lines +485 to +492
};
}

afterEach(() => {
jest.restoreAllMocks();
});

it('returns 400 when the client request stream errors', () => {
const req = makeReq();
const res = makeRes();

proxyRequest(req, res, 'api.openai.com', { Authorization: 'Bearer token' }, 'openai');
req.emit('error', new Error('client stream failed'));

expect(res.writeHead).toHaveBeenCalledWith(400, { 'Content-Type': 'application/json' });
expect(JSON.parse(res.end.mock.calls[0][0])).toEqual({
error: 'Client error',
message: 'client stream failed',
});
});

it('returns 502 when the upstream response stream errors', () => {
let responseHandler;
const upstreamRequest = new EventEmitter();
upstreamRequest.end = jest.fn();
upstreamRequest.write = jest.fn();
upstreamRequest.destroy = jest.fn();

jest.spyOn(https, 'request').mockImplementation((_options, cb) => {
responseHandler = cb;
return upstreamRequest;
});

const req = makeReq();
const res = makeRes();
proxyRequest(req, res, 'api.openai.com', { Authorization: 'Bearer token' }, 'openai');
req.emit('end');

const proxyRes = new EventEmitter();
proxyRes.statusCode = 200;
proxyRes.headers = {};
proxyRes.pipe = jest.fn();
responseHandler(proxyRes);
proxyRes.emit('error', new Error('upstream stream failed'));

expect(res.writeHead).toHaveBeenCalledWith(502, { 'Content-Type': 'application/json' });
expect(JSON.parse(res.end.mock.calls[0][0])).toEqual({
error: 'Response stream error',
message: 'upstream stream failed',
});
});

it('returns 502 when the upstream proxy request errors', () => {
const upstreamRequest = new EventEmitter();
upstreamRequest.end = jest.fn();
upstreamRequest.write = jest.fn();
upstreamRequest.destroy = jest.fn();

jest.spyOn(https, 'request').mockImplementation(() => upstreamRequest);

const req = makeReq();
const res = makeRes();
proxyRequest(req, res, 'api.openai.com', { Authorization: 'Bearer token' }, 'openai');
req.emit('end');
upstreamRequest.emit('error', new Error('upstream connect failed'));

expect(res.writeHead).toHaveBeenCalledWith(502, { 'Content-Type': 'application/json' });
expect(JSON.parse(res.end.mock.calls[0][0])).toEqual({
error: 'Proxy error',
message: 'upstream connect failed',
});
});
});

describe('proxyRequest effective token guard', () => {
function makeReq(headers = {}) {
const req = new EventEmitter();
Expand Down
Loading