Skip to content

Commit 8075d94

Browse files
committed
Add Codex usage indicator
1 parent f7748a0 commit 8075d94

30 files changed

Lines changed: 1142 additions & 5 deletions

apps/desktop/src/clientPersistence.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ function makeSecretStorage(available: boolean): DesktopSecretStorage {
5050

5151
const clientSettings: ClientSettings = {
5252
autoOpenPlanSidebar: false,
53+
codexUsageIndicatorMode: "five-hour",
5354
confirmThreadArchive: true,
5455
confirmThreadDelete: false,
5556
diffIgnoreWhitespace: true,

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function createProviderServiceHarness(
114114
continuationKey: `${providerName}:instance:${instanceId}`,
115115
},
116116
}),
117+
getCodexUsage: () => Effect.succeed(null),
117118
rollbackConversation,
118119
get streamEvents() {
119120
return Stream.fromPubSub(runtimeEventPubSub);

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ describe("ProviderCommandReactor", () => {
304304
},
305305
});
306306
},
307+
getCodexUsage: () => Effect.succeed(null),
307308
rollbackConversation: () => unsupported(),
308309
get streamEvents() {
309310
return Stream.fromPubSub(runtimeEventPubSub);

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ function createProviderServiceHarness() {
113113
},
114114
});
115115
},
116+
getCodexUsage: () => Effect.succeed(null),
116117
rollbackConversation: () => unsupported(),
117118
get streamEvents() {
118119
return Stream.fromPubSub(runtimeEventPubSub);

apps/server/src/provider/Layers/CodexAdapter.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { it, vi } from "@effect/vitest";
2323

2424
import { Context, Effect, Exit, Fiber, Layer, Option, Queue, Schema, Scope, Stream } from "effect";
2525
import * as CodexErrors from "effect-codex-app-server/errors";
26+
import type * as EffectCodexSchema from "effect-codex-app-server/schema";
2627

2728
import { ServerConfig } from "../../config.ts";
2829
import { ServerSettingsService } from "../../serverSettings.ts";
@@ -92,6 +93,15 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape {
9293
}),
9394
);
9495

96+
public readonly readAccountRateLimitsImpl = vi.fn(
97+
(): Promise<EffectCodexSchema.V2GetAccountRateLimitsResponse> =>
98+
Promise.resolve({
99+
rateLimits: {
100+
primary: { usedPercent: 25, windowDurationMins: 300 },
101+
},
102+
}),
103+
);
104+
95105
public readonly respondToRequestImpl = vi.fn(
96106
(_requestId: ApprovalRequestId, _decision: ProviderApprovalDecision): Promise<void> =>
97107
Promise.resolve(undefined),
@@ -130,6 +140,8 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape {
130140
return Effect.promise(() => this.rollbackThreadImpl(numTurns));
131141
}
132142

143+
readAccountRateLimits = Effect.promise(() => this.readAccountRateLimitsImpl());
144+
133145
respondToRequest(requestId: ApprovalRequestId, decision: ProviderApprovalDecision) {
134146
return Effect.promise(() => this.respondToRequestImpl(requestId, decision));
135147
}
@@ -159,6 +171,7 @@ function makeRuntimeFactory() {
159171

160172
return {
161173
factory,
174+
runtimes,
162175
get lastRuntime(): FakeCodexRuntime | undefined {
163176
return runtimes.at(-1);
164177
},
@@ -348,6 +361,95 @@ sessionErrorLayer("CodexAdapterLive session errors", (it) => {
348361
}),
349362
);
350363

364+
it.effect("reads and normalizes account rate limits through the active runtime", () =>
365+
Effect.gen(function* () {
366+
const adapter = yield* CodexAdapter;
367+
yield* adapter.startSession({
368+
provider: ProviderDriverKind.make("codex"),
369+
threadId: asThreadId("usage-thread"),
370+
runtimeMode: "full-access",
371+
});
372+
const runtime = sessionRuntimeFactory.lastRuntime;
373+
assert.ok(runtime);
374+
runtime.readAccountRateLimitsImpl.mockResolvedValueOnce({
375+
rateLimits: {
376+
primary: { usedPercent: 30, windowDurationMins: 300 },
377+
secondary: { usedPercent: 80, windowDurationMins: 10_080 },
378+
},
379+
});
380+
381+
const snapshot = yield* adapter.readCodexUsage!();
382+
383+
assert.equal(runtime.readAccountRateLimitsImpl.mock.calls.length, 1);
384+
assert.deepStrictEqual(
385+
snapshot?.windows.map((window) => ({
386+
kind: window.kind,
387+
remainingPercent: window.remainingPercent,
388+
})),
389+
[
390+
{ kind: "five-hour", remainingPercent: 70 },
391+
{ kind: "weekly", remainingPercent: 20 },
392+
],
393+
);
394+
}),
395+
);
396+
397+
it.effect("keeps cached account rate limits when an active read has no displayable windows", () =>
398+
Effect.gen(function* () {
399+
const adapter = yield* CodexAdapter;
400+
yield* adapter.startSession({
401+
provider: ProviderDriverKind.make("codex"),
402+
threadId: asThreadId("usage-cache-thread"),
403+
runtimeMode: "full-access",
404+
});
405+
const runtime = sessionRuntimeFactory.lastRuntime;
406+
assert.ok(runtime);
407+
runtime.readAccountRateLimitsImpl.mockResolvedValueOnce({
408+
rateLimits: {
409+
primary: { usedPercent: 45, windowDurationMins: 300 },
410+
},
411+
});
412+
yield* adapter.readCodexUsage!();
413+
runtime.readAccountRateLimitsImpl.mockResolvedValueOnce({
414+
rateLimits: {},
415+
});
416+
417+
const snapshot = yield* adapter.readCodexUsage!();
418+
419+
assert.equal(runtime.readAccountRateLimitsImpl.mock.calls.length, 2);
420+
assert.equal(snapshot?.source, "cache");
421+
assert.deepStrictEqual(snapshot?.windows[0], {
422+
kind: "five-hour",
423+
usedPercent: 45,
424+
remainingPercent: 55,
425+
resetsAt: null,
426+
windowDurationMins: 300,
427+
});
428+
}),
429+
);
430+
431+
it.effect("reads account rate limits even before a Codex thread session exists", () =>
432+
Effect.gen(function* () {
433+
const adapter = yield* CodexAdapter;
434+
yield* adapter.stopAll();
435+
const snapshot = yield* adapter.readCodexUsage!();
436+
const runtime = sessionRuntimeFactory.lastRuntime;
437+
438+
assert.ok(runtime);
439+
assert.equal(runtime.options.threadId, asThreadId("codex-usage"));
440+
assert.equal(runtime.startImpl.mock.calls.length, 1);
441+
assert.equal(runtime.readAccountRateLimitsImpl.mock.calls.length, 1);
442+
assert.equal(runtime.closeImpl.mock.calls.length, 1);
443+
assert.deepStrictEqual(snapshot?.windows[0], {
444+
kind: "five-hour",
445+
usedPercent: 25,
446+
remainingPercent: 75,
447+
resetsAt: null,
448+
windowDurationMins: 300,
449+
});
450+
}),
451+
);
452+
351453
it.effect("maps codex model options for the adapter's bound custom instance id", () => {
352454
const customInstanceId = ProviderInstanceId.make("codex_personal");
353455
const customRuntimeFactory = makeRuntimeFactory();

apps/server/src/provider/Layers/CodexAdapter.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
type ProviderRuntimeEvent,
1818
type ProviderRequestKind,
1919
type ThreadTokenUsageSnapshot,
20+
type CodexUsageSnapshot,
2021
type ProviderUserInputAnswers,
2122
RuntimeItemId,
2223
RuntimeRequestId,
@@ -54,6 +55,7 @@ import {
5455
type CodexSessionRuntimeShape,
5556
} from "./CodexSessionRuntime.ts";
5657
import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts";
58+
import { normalizeCodexUsageSnapshot } from "../codexUsage.ts";
5759

5860
const PROVIDER = ProviderDriverKind.make("codex");
5961

@@ -1350,6 +1352,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
13501352
options?.nativeEventLogger === undefined ? nativeEventLogger : undefined;
13511353
const runtimeEventQueue = yield* Queue.unbounded<ProviderRuntimeEvent>();
13521354
const sessions = new Map<ThreadId, CodexAdapterSessionContext>();
1355+
let cachedCodexUsage: CodexUsageSnapshot | null = null;
13531356

13541357
const startSession: CodexAdapterShape["startSession"] = (input) =>
13551358
Effect.scoped(
@@ -1409,6 +1412,19 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
14091412
const eventFiber = yield* Stream.runForEach(runtime.events, (event) =>
14101413
Effect.gen(function* () {
14111414
yield* writeNativeEvent(event);
1415+
if (event.method === "account/rateLimits/updated") {
1416+
const payload = readPayload(
1417+
EffectCodexSchema.V2AccountRateLimitsUpdatedNotification,
1418+
event.payload,
1419+
);
1420+
if (payload) {
1421+
cachedCodexUsage = normalizeCodexUsageSnapshot({
1422+
providerInstanceId: boundInstanceId,
1423+
payload,
1424+
source: "notification",
1425+
});
1426+
}
1427+
}
14121428
const runtimeEvents = mapToRuntimeEvents(event, event.threadId);
14131429
if (runtimeEvents.length === 0) {
14141430
yield* Effect.logDebug("ignoring unhandled Codex provider event", {
@@ -1644,6 +1660,90 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16441660
const hasSession: CodexAdapterShape["hasSession"] = (threadId) =>
16451661
Effect.succeed(Boolean(sessions.get(threadId) && !sessions.get(threadId)?.stopped));
16461662

1663+
const readCodexUsageWithoutSession = Effect.fn("readCodexUsageWithoutSession")(function* () {
1664+
const usageThreadId = ThreadId.make("codex-usage");
1665+
const createRuntime = options?.makeRuntime ?? makeCodexSessionRuntime;
1666+
return yield* Effect.acquireUseRelease(
1667+
Scope.make("sequential"),
1668+
(usageScope) =>
1669+
Effect.gen(function* () {
1670+
const runtime = yield* createRuntime({
1671+
threadId: usageThreadId,
1672+
providerInstanceId: boundInstanceId,
1673+
cwd: process.cwd(),
1674+
binaryPath: codexConfig.binaryPath,
1675+
...(options?.environment ? { environment: options.environment } : {}),
1676+
...(codexConfig.homePath ? { homePath: codexConfig.homePath } : {}),
1677+
runtimeMode: "full-access",
1678+
}).pipe(
1679+
Effect.provideService(Scope.Scope, usageScope),
1680+
Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, childProcessSpawner),
1681+
Effect.mapError(
1682+
(cause) =>
1683+
new ProviderAdapterProcessError({
1684+
provider: PROVIDER,
1685+
threadId: usageThreadId,
1686+
detail: cause.message,
1687+
cause,
1688+
}),
1689+
),
1690+
);
1691+
const payload = yield* runtime.start().pipe(
1692+
Effect.mapError(
1693+
(cause) =>
1694+
new ProviderAdapterProcessError({
1695+
provider: PROVIDER,
1696+
threadId: usageThreadId,
1697+
detail: cause.message,
1698+
cause,
1699+
}),
1700+
),
1701+
Effect.andThen(
1702+
runtime.readAccountRateLimits.pipe(
1703+
Effect.mapError((cause) =>
1704+
mapCodexRuntimeError(usageThreadId, "account/rateLimits/read", cause),
1705+
),
1706+
),
1707+
),
1708+
Effect.ensuring(runtime.close),
1709+
);
1710+
return normalizeCodexUsageSnapshot({
1711+
providerInstanceId: boundInstanceId,
1712+
payload,
1713+
source: "read",
1714+
});
1715+
}),
1716+
(usageScope) => Scope.close(usageScope, Exit.void),
1717+
);
1718+
});
1719+
1720+
const readCodexUsage: CodexAdapterShape["readCodexUsage"] = Effect.fn("readCodexUsage")(
1721+
function* () {
1722+
const session = Array.from(sessions.values()).findLast((candidate) => !candidate.stopped);
1723+
if (!session) {
1724+
const snapshot = yield* readCodexUsageWithoutSession();
1725+
cachedCodexUsage = snapshot ?? cachedCodexUsage;
1726+
return (
1727+
snapshot ?? (cachedCodexUsage ? { ...cachedCodexUsage, source: "cache" as const } : null)
1728+
);
1729+
}
1730+
const payload = yield* session.runtime.readAccountRateLimits.pipe(
1731+
Effect.mapError((cause) =>
1732+
mapCodexRuntimeError(session.threadId, "account/rateLimits/read", cause),
1733+
),
1734+
);
1735+
const snapshot = normalizeCodexUsageSnapshot({
1736+
providerInstanceId: boundInstanceId,
1737+
payload,
1738+
source: "read",
1739+
});
1740+
cachedCodexUsage = snapshot ?? cachedCodexUsage;
1741+
return (
1742+
snapshot ?? (cachedCodexUsage ? { ...cachedCodexUsage, source: "cache" as const } : null)
1743+
);
1744+
},
1745+
);
1746+
16471747
const stopAll: CodexAdapterShape["stopAll"] = () =>
16481748
Effect.forEach(Array.from(sessions.values()), stopSessionInternal, {
16491749
concurrency: 1,
@@ -1673,6 +1773,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16731773
stopSession,
16741774
listSessions,
16751775
hasSession,
1776+
readCodexUsage,
16761777
stopAll,
16771778
get streamEvents() {
16781779
return Stream.fromQueue(runtimeEventQueue);

apps/server/src/provider/Layers/CodexSessionRuntime.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ export interface CodexSessionRuntimeShape {
120120
readonly rollbackThread: (
121121
numTurns: number,
122122
) => Effect.Effect<CodexThreadSnapshot, CodexSessionRuntimeError>;
123+
readonly readAccountRateLimits: Effect.Effect<
124+
EffectCodexSchema.V2GetAccountRateLimitsResponse,
125+
CodexSessionRuntimeError
126+
>;
123127
readonly respondToRequest: (
124128
requestId: ApprovalRequestId,
125129
decision: ProviderApprovalDecision,
@@ -1286,6 +1290,7 @@ export const makeCodexSessionRuntime = (
12861290
});
12871291
return parseThreadSnapshot(response);
12881292
}),
1293+
readAccountRateLimits: client.request("account/rateLimits/read", undefined),
12891294
respondToRequest: (requestId, decision) =>
12901295
Effect.gen(function* () {
12911296
const pending = (yield* Ref.get(pendingApprovalsRef)).get(requestId);

0 commit comments

Comments
 (0)