@@ -17,6 +17,7 @@ import {
1717 type ProviderRuntimeEvent ,
1818 type ProviderRequestKind ,
1919 type ThreadTokenUsageSnapshot ,
20+ type CodexUsageSnapshot ,
2021 type ProviderUserInputAnswers ,
2122 RuntimeItemId ,
2223 RuntimeRequestId ,
@@ -54,6 +55,7 @@ import {
5455 type CodexSessionRuntimeShape ,
5556} from "./CodexSessionRuntime.ts" ;
5657import { type EventNdjsonLogger , makeEventNdjsonLogger } from "./EventNdjsonLogger.ts" ;
58+ import { normalizeCodexUsageSnapshot } from "../codexUsage.ts" ;
5759
5860const PROVIDER = ProviderDriverKind . make ( "codex" ) ;
5961
@@ -1350,6 +1352,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
13501352 options ?. nativeEventLogger === undefined ? nativeEventLogger : undefined ;
13511353 const runtimeEventQueue = yield * Queue . unbounded < ProviderRuntimeEvent > ( ) ;
13521354 const sessions = new Map < ThreadId , CodexAdapterSessionContext > ( ) ;
1355+ let cachedCodexUsage : CodexUsageSnapshot | null = null ;
13531356
13541357 const startSession : CodexAdapterShape [ "startSession" ] = ( input ) =>
13551358 Effect . scoped (
@@ -1409,6 +1412,19 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
14091412 const eventFiber = yield * Stream . runForEach ( runtime . events , ( event ) =>
14101413 Effect . gen ( function * ( ) {
14111414 yield * writeNativeEvent ( event ) ;
1415+ if ( event . method === "account/rateLimits/updated" ) {
1416+ const payload = readPayload (
1417+ EffectCodexSchema . V2AccountRateLimitsUpdatedNotification ,
1418+ event . payload ,
1419+ ) ;
1420+ if ( payload ) {
1421+ cachedCodexUsage = normalizeCodexUsageSnapshot ( {
1422+ providerInstanceId : boundInstanceId ,
1423+ payload,
1424+ source : "notification" ,
1425+ } ) ;
1426+ }
1427+ }
14121428 const runtimeEvents = mapToRuntimeEvents ( event , event . threadId ) ;
14131429 if ( runtimeEvents . length === 0 ) {
14141430 yield * Effect . logDebug ( "ignoring unhandled Codex provider event" , {
@@ -1644,6 +1660,75 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16441660 const hasSession : CodexAdapterShape [ "hasSession" ] = ( threadId ) =>
16451661 Effect . succeed ( Boolean ( sessions . get ( threadId ) && ! sessions . get ( threadId ) ?. stopped ) ) ;
16461662
1663+ const readCodexUsageWithoutSession = Effect . fn ( "readCodexUsageWithoutSession" ) ( function * ( ) {
1664+ const usageThreadId = ThreadId . make ( "codex-usage" ) ;
1665+ const createRuntime = options ?. makeRuntime ?? makeCodexSessionRuntime ;
1666+ return yield * Effect . acquireUseRelease (
1667+ Scope . make ( "sequential" ) ,
1668+ ( usageScope ) =>
1669+ Effect . gen ( function * ( ) {
1670+ const runtime = yield * createRuntime ( {
1671+ threadId : usageThreadId ,
1672+ providerInstanceId : boundInstanceId ,
1673+ cwd : process . cwd ( ) ,
1674+ binaryPath : codexConfig . binaryPath ,
1675+ ...( options ?. environment ? { environment : options . environment } : { } ) ,
1676+ ...( codexConfig . homePath ? { homePath : codexConfig . homePath } : { } ) ,
1677+ runtimeMode : "full-access" ,
1678+ } ) . pipe (
1679+ Effect . provideService ( Scope . Scope , usageScope ) ,
1680+ Effect . provideService ( ChildProcessSpawner . ChildProcessSpawner , childProcessSpawner ) ,
1681+ Effect . mapError (
1682+ ( cause ) =>
1683+ new ProviderAdapterProcessError ( {
1684+ provider : PROVIDER ,
1685+ threadId : usageThreadId ,
1686+ detail : cause . message ,
1687+ cause,
1688+ } ) ,
1689+ ) ,
1690+ ) ;
1691+ const payload = yield * runtime . readAccountRateLimits . pipe (
1692+ Effect . mapError ( ( cause ) =>
1693+ mapCodexRuntimeError ( usageThreadId , "account/rateLimits/read" , cause ) ,
1694+ ) ,
1695+ Effect . ensuring ( runtime . close ) ,
1696+ ) ;
1697+ return normalizeCodexUsageSnapshot ( {
1698+ providerInstanceId : boundInstanceId ,
1699+ payload,
1700+ source : "read" ,
1701+ } ) ;
1702+ } ) ,
1703+ ( usageScope ) => Scope . close ( usageScope , Exit . void ) ,
1704+ ) ;
1705+ } ) ;
1706+
1707+ const readCodexUsage : CodexAdapterShape [ "readCodexUsage" ] = Effect . fn ( "readCodexUsage" ) (
1708+ function * ( ) {
1709+ const session = Array . from ( sessions . values ( ) ) . findLast ( ( candidate ) => ! candidate . stopped ) ;
1710+ if ( ! session ) {
1711+ const snapshot = yield * readCodexUsageWithoutSession ( ) ;
1712+ cachedCodexUsage = snapshot ?? cachedCodexUsage ;
1713+ return (
1714+ snapshot ?? ( cachedCodexUsage ? { ...cachedCodexUsage , source : "cache" as const } : null )
1715+ ) ;
1716+ }
1717+ const payload = yield * session . runtime . readAccountRateLimits . pipe (
1718+ Effect . mapError ( ( cause ) =>
1719+ mapCodexRuntimeError ( session . threadId , "account/rateLimits/read" , cause ) ,
1720+ ) ,
1721+ ) ;
1722+ const snapshot = normalizeCodexUsageSnapshot ( {
1723+ providerInstanceId : boundInstanceId ,
1724+ payload,
1725+ source : "read" ,
1726+ } ) ;
1727+ cachedCodexUsage = snapshot ;
1728+ return snapshot ;
1729+ } ,
1730+ ) ;
1731+
16471732 const stopAll : CodexAdapterShape [ "stopAll" ] = ( ) =>
16481733 Effect . forEach ( Array . from ( sessions . values ( ) ) , stopSessionInternal , {
16491734 concurrency : 1 ,
@@ -1673,6 +1758,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16731758 stopSession,
16741759 listSessions,
16751760 hasSession,
1761+ readCodexUsage,
16761762 stopAll,
16771763 get streamEvents ( ) {
16781764 return Stream . fromQueue ( runtimeEventQueue ) ;
0 commit comments