Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ import { isEmptyObj } from './internal/utils/values';

const WORKLOAD_IDENTITY_API_KEY_PLACEHOLDER = 'workload-identity-auth';

type FetchWithTimeoutResult = { response: Response; cleanup: () => void };

export type ApiKeySetter = () => Promise<string>;

export interface ClientOptions {
Expand Down Expand Up @@ -741,10 +743,11 @@ export class OpenAI {

const security = options.__security ?? { bearerAuth: true };
const controller = new AbortController();
const response = await this.fetchWithAuth(url, req, timeout, controller, security).catch(castToError);
const fetchResult = await this.fetchWithAuth(url, req, timeout, controller, security).catch(castToError);
const headersTime = Date.now();

if (response instanceof globalThis.Error) {
if (fetchResult instanceof globalThis.Error) {
const response = fetchResult;
const retryMessage = `retrying, ${retriesRemaining} attempts remaining`;
if (options.signal?.aborted) {
throw new Errors.APIUserAbortError();
Expand Down Expand Up @@ -795,6 +798,8 @@ export class OpenAI {
});
}

const { response, cleanup } = fetchResult;

const specialHeaders = [...response.headers.entries()]
.filter(([name]) => name === 'x-request-id')
.map(([name, value]) => ', ' + name + ': ' + JSON.stringify(value))
Expand All @@ -812,6 +817,7 @@ export class OpenAI {
!options.__metadata?.['workloadIdentityTokenRefreshed']
) {
await Shims.CancelReadableStream(response.body);
cleanup();
this._workloadIdentityAuth.invalidateToken();

return this.makeRequest(
Expand All @@ -833,6 +839,7 @@ export class OpenAI {

// We don't need the body of this response.
await Shims.CancelReadableStream(response.body);
cleanup();
loggerFor(this).info(`${responseInfo} - ${retryMessage}`);
loggerFor(this).debug(
`[${requestLogID}] response error (${retryMessage})`,
Expand All @@ -856,7 +863,10 @@ export class OpenAI {

loggerFor(this).info(`${responseInfo} - ${retryMessage}`);

const errText = await response.text().catch((err: any) => castToError(err).message);
const errText = await response
.text()
.catch((err: any) => castToError(err).message)
.finally(cleanup);
const errJSON = safeJSON(errText) as any;
const errMessage = errJSON ? undefined : errText;

Expand Down Expand Up @@ -888,7 +898,7 @@ export class OpenAI {
}),
);

return { response, options, controller, requestLogID, retryOfRequestLogID, startTime };
return { response, options, controller, requestLogID, retryOfRequestLogID, startTime, cleanup };
}

getAPIList<Item, PageClass extends Pagination.AbstractPage<Item> = Pagination.AbstractPage<Item>>(
Expand Down Expand Up @@ -924,7 +934,7 @@ export class OpenAI {
bearerAuth: true,
adminAPIKeyAuth: true,
},
): Promise<Response> {
): Promise<FetchWithTimeoutResult> {
if (this._workloadIdentityAuth && schemes.bearerAuth) {
const headers = init.headers as Headers;
const authHeader = headers.get('Authorization');
Expand All @@ -944,12 +954,19 @@ export class OpenAI {
init: RequestInit | undefined,
ms: number,
controller: AbortController,
): Promise<Response> {
): Promise<FetchWithTimeoutResult> {
const { signal, method, ...options } = init || {};
const abort = this._makeAbort(controller);
if (signal) signal.addEventListener('abort', abort, { once: true });

const timeout = setTimeout(abort, ms);
let cleanedUp = false;
const cleanup = () => {
if (cleanedUp) return;
cleanedUp = true;
clearTimeout(timeout);
if (signal) signal.removeEventListener('abort', abort);
};

const isReadableBody =
((globalThis as any).ReadableStream && options.body instanceof (globalThis as any).ReadableStream) ||
Expand All @@ -969,9 +986,13 @@ export class OpenAI {

try {
// use undefined this binding; fetch errors if bound to something else in browser/cloudflare
return await this.fetch.call(undefined, url, fetchOptions);
} finally {
const response = await this.fetch.call(undefined, url, fetchOptions);
clearTimeout(timeout);
if (!response.body) cleanup();
return { response, cleanup };
} catch (err) {
cleanup();
throw err;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/core/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
controller: AbortController,
client?: OpenAI,
synthesizeEventData?: boolean,
cleanup?: () => void,
): Stream<Item> {
let consumed = false;
const logger = client ? loggerFor(client) : console;
Expand Down Expand Up @@ -95,6 +96,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
} finally {
// If the user `break`s, abort the ongoing request.
if (!done) controller.abort();
cleanup?.();
}
}

Expand Down
103 changes: 80 additions & 23 deletions src/internal/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,60 @@ export type APIResponseProps = {
requestLogID: string;
retryOfRequestLogID: string | undefined;
startTime: number;
cleanup?: () => void;
};

export async function defaultParseResponse<T>(
client: OpenAI,
props: APIResponseProps,
): Promise<WithRequestID<T>> {
const { response, requestLogID, retryOfRequestLogID, startTime } = props;
const body = await (async () => {
if (props.options.stream) {
loggerFor(client).debug('response', response.status, response.url, response.headers, response.body);

// Note: there is an invariant here that isn't represented in the type system
// that if you set `stream: true` the response type must also be `Stream<T>`

if (props.options.__streamClass) {
return props.options.__streamClass.fromSSEResponse(
response,
props.controller,
client,
props.options.__synthesizeEventData,
) as any;
}
const { response, requestLogID, retryOfRequestLogID, startTime, cleanup } = props;
if (props.options.stream) {
loggerFor(client).debug('response', response.status, response.url, response.headers, response.body);

// Note: there is an invariant here that isn't represented in the type system
// that if you set `stream: true` the response type must also be `Stream<T>`

return Stream.fromSSEResponse(
if (props.options.__streamClass) {
return props.options.__streamClass.fromSSEResponse(
response,
props.controller,
client,
props.options.__synthesizeEventData,
cleanup,
) as any;
}

return Stream.fromSSEResponse(
response,
props.controller,
client,
props.options.__synthesizeEventData,
cleanup,
) as any;
}

if (props.options.__binaryResponse) {
const body = wrapResponseBodyWithCleanup(response, cleanup);
loggerFor(client).debug(
`[${requestLogID}] response parsed`,
formatRequestDetails({
retryOfRequestLogID,
url: response.url,
status: response.status,
body,
durationMs: Date.now() - startTime,
}),
);
return body as WithRequestID<T>;
}

const body = await (async () => {
// fetch refuses to read the body when the status code is 204.
if (response.status === 204) {
return null as T;
}

if (props.options.__binaryResponse) {
return response as unknown as T;
}

const contentType = response.headers.get('content-type');
const mediaType = contentType?.split(';')[0]?.trim();
const isJSON = mediaType?.includes('application/json') || mediaType?.endsWith('+json');
Expand All @@ -69,7 +83,7 @@ export async function defaultParseResponse<T>(

const text = await response.text();
return text as unknown as T;
})();
})().finally(() => cleanup?.());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve abort forwarding for binary responses

For __binaryResponse endpoints such as file downloads and audio speech, this finally runs immediately after returning the raw Response, before the caller has consumed response.body with blob(), arrayBuffer(), or a reader. Because fetchWithTimeout() only passes the SDK controller signal to fetch, removing the caller-signal listener here means aborting the user-provided signal during the subsequent binary body read no longer aborts the underlying response body; cleanup needs to be deferred until the returned Response is consumed/cancelled (or handled separately for raw responses).

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dd0e115. Binary/raw responses now return a Response whose body is wrapped with the same idempotent cleanup callback, so the caller abort listener is not removed when the raw Response is handed back. It stays active until the binary body is consumed, cancelled, or errors, preserving caller abort forwarding during text()/arrayBuffer()/reader consumption while still removing the listener afterward. Added regression coverage for successful binary body consumption and abort during binary body reading.

loggerFor(client).debug(
`[${requestLogID}] response parsed`,
formatRequestDetails({
Expand All @@ -80,14 +94,57 @@ export async function defaultParseResponse<T>(
durationMs: Date.now() - startTime,
}),
);
return body;
return body as WithRequestID<T>;
}

export type WithRequestID<T> =
T extends Array<any> | Response | AbstractPage<any> ? T
: T extends Record<string, any> ? T & { _request_id?: string | null }
: T;

function wrapResponseBodyWithCleanup(response: Response, cleanup: (() => void) | undefined): Response {
if (!cleanup) return response;
if (!response.body) {
cleanup();
return response;
}

const reader = response.body.getReader();
const body = new ReadableStream({
async pull(controller) {
try {
const { done, value } = await reader.read();
if (done) {
cleanup();
controller.close();
return;
}
controller.enqueue(value);
} catch (err) {
cleanup();
controller.error(err);
}
},
async cancel(reason) {
cleanup();
await reader.cancel(reason);
},
});
const wrapped = new Response(body, response);

try {
Object.defineProperties(wrapped, {
redirected: { value: response.redirected },
type: { value: response.type },
url: { value: response.url },
});
} catch {
// Some fetch implementations may expose non-configurable Response fields.
}

return wrapped;
}

export function addRequestID<T>(value: T, response: Response): WithRequestID<T> {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return value as WithRequestID<T>;
Expand Down
Loading