Skip to content

Commit 2be6759

Browse files
committed
Added token clamping, TodoWrite plan events, and ProviderStatusCache service
Adapted from upstream t3code pingdotgg#1943 and pingdotgg#1541. normalizeClaudeTokenUsage now clamps usedTokens to contextWindow when it exceeds it, and includes totalProcessedTokens in the snapshot when it differs. Added isTodoTool and extractPlanStepsFromTodoInput to emit turn.plan.updated events during TodoWrite input streaming and content_block_stop. Created ProviderStatusCache Effect service (in-memory Ref-backed cache) for future use.
1 parent aaccef1 commit 2be6759

File tree

4 files changed

+120
-5
lines changed

4 files changed

+120
-5
lines changed

apps/server/src/provider/Layers/ClaudeAdapter.stream.handlers.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import {
88
asRuntimeItemId,
99
classifyToolItemType,
1010
extractExitPlanModePlan,
11+
extractPlanStepsFromTodoInput,
12+
isTodoTool,
1113
nativeProviderRefs,
1214
streamKindFromDeltaType,
1315
summarizeToolRequest,
@@ -169,6 +171,27 @@ export const makeMessageHandlers = (deps: MessageHandlerDeps) => {
169171
payload: message,
170172
},
171173
});
174+
175+
if (isTodoTool(nextTool.toolName) && parsedInput) {
176+
const planSteps = extractPlanStepsFromTodoInput(parsedInput);
177+
if (planSteps.length > 0 && context.turnState) {
178+
const planStamp = yield* makeEventStamp();
179+
yield* offerRuntimeEvent({
180+
type: "turn.plan.updated",
181+
eventId: planStamp.eventId,
182+
provider: PROVIDER,
183+
createdAt: planStamp.createdAt,
184+
threadId: context.session.threadId,
185+
turnId: context.turnState.turnId,
186+
payload: {
187+
plan: planSteps,
188+
},
189+
providerRefs: nativeProviderRefs(context, {
190+
providerItemId: nextTool.itemId,
191+
}),
192+
});
193+
}
194+
}
172195
}
173196
return;
174197
}
@@ -256,6 +279,25 @@ export const makeMessageHandlers = (deps: MessageHandlerDeps) => {
256279
if (!tool) {
257280
return;
258281
}
282+
283+
if (isTodoTool(tool.toolName) && tool.input && typeof tool.input === "object") {
284+
const planSteps = extractPlanStepsFromTodoInput(tool.input as Record<string, unknown>);
285+
if (planSteps.length > 0 && context.turnState) {
286+
const planStamp = yield* makeEventStamp();
287+
yield* offerRuntimeEvent({
288+
type: "turn.plan.updated",
289+
eventId: planStamp.eventId,
290+
provider: PROVIDER,
291+
createdAt: planStamp.createdAt,
292+
threadId: context.session.threadId,
293+
turnId: context.turnState.turnId,
294+
payload: {
295+
plan: planSteps,
296+
},
297+
providerRefs: nativeProviderRefs(context, { providerItemId: tool.itemId }),
298+
});
299+
}
300+
}
259301
}
260302
});
261303

apps/server/src/provider/Layers/ClaudeAdapter.utils.ts

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
type CanonicalRequestType,
1010
ClaudeCodeEffort,
1111
RuntimeRequestId,
12+
type RuntimePlanStepStatus,
13+
RUNTIME_PLAN_STEP_STATUSES,
1214
type ThreadTokenUsageSnapshot,
1315
ThreadId,
1416
type TurnId,
@@ -168,19 +170,28 @@ export function normalizeClaudeTokenUsage(
168170
? record.output_tokens
169171
: 0;
170172
const derivedUsedTokens = inputTokens + outputTokens;
173+
const totalProcessedTokens = directUsedTokens;
171174
const usedTokens = directUsedTokens ?? (derivedUsedTokens > 0 ? derivedUsedTokens : undefined);
172175
if (usedTokens === undefined || usedTokens <= 0) {
173176
return undefined;
174177
}
175178

179+
const hasContextWindow =
180+
typeof contextWindow === "number" && Number.isFinite(contextWindow) && contextWindow > 0;
181+
const clampedUsedTokens =
182+
hasContextWindow && usedTokens > contextWindow ? contextWindow : usedTokens;
183+
const shouldIncludeTotalProcessed =
184+
totalProcessedTokens !== undefined &&
185+
totalProcessedTokens > 0 &&
186+
totalProcessedTokens !== clampedUsedTokens;
187+
176188
return {
177-
usedTokens,
178-
lastUsedTokens: usedTokens,
189+
usedTokens: clampedUsedTokens,
190+
lastUsedTokens: clampedUsedTokens,
179191
...(inputTokens > 0 ? { inputTokens } : {}),
180192
...(outputTokens > 0 ? { outputTokens } : {}),
181-
...(typeof contextWindow === "number" && Number.isFinite(contextWindow) && contextWindow > 0
182-
? { maxTokens: contextWindow }
183-
: {}),
193+
...(hasContextWindow ? { maxTokens: contextWindow } : {}),
194+
...(shouldIncludeTotalProcessed ? { totalProcessedTokens } : {}),
184195
...(typeof record.tool_uses === "number" && Number.isFinite(record.tool_uses)
185196
? { toolUses: record.tool_uses }
186197
: {}),
@@ -292,6 +303,25 @@ export function isReadOnlyToolName(toolName: string): boolean {
292303
);
293304
}
294305

306+
export function isTodoTool(toolName: string): boolean {
307+
return toolName === "TodoWrite";
308+
}
309+
310+
export function extractPlanStepsFromTodoInput(
311+
input: Record<string, unknown>,
312+
): ReadonlyArray<{ step: string; status: RuntimePlanStepStatus }> {
313+
const todos = Array.isArray(input.todos) ? input.todos : [];
314+
return todos
315+
.filter((t): t is Record<string, unknown> => !!t && typeof t === "object")
316+
.map((t) => ({
317+
step:
318+
typeof t.content === "string" && t.content.trim().length > 0 ? t.content.trim() : "Task",
319+
status: (RUNTIME_PLAN_STEP_STATUSES as readonly string[]).includes(t.status as string)
320+
? (t.status as RuntimePlanStepStatus)
321+
: ("pending" as RuntimePlanStepStatus),
322+
}));
323+
}
324+
295325
export function classifyRequestType(toolName: string): CanonicalRequestType {
296326
if (isReadOnlyToolName(toolName)) {
297327
return "file_read_approval";
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import type { ThreadId } from "@bigcode/contracts";
2+
import { Effect, Layer, Ref } from "effect";
3+
4+
import { ProviderStatusCache, type ProviderStatusEntry } from "../Services/ProviderStatusCache.ts";
5+
6+
export const ProviderStatusCacheLive = Layer.effect(
7+
ProviderStatusCache,
8+
Effect.gen(function* () {
9+
const cache = yield* Ref.make(new Map<ThreadId, ProviderStatusEntry>());
10+
11+
return {
12+
get: (threadId: ThreadId) => Ref.get(cache).pipe(Effect.map((m) => m.get(threadId))),
13+
set: (threadId: ThreadId, entry: ProviderStatusEntry) =>
14+
Ref.update(cache, (m) => new Map(m).set(threadId, entry)),
15+
delete: (threadId: ThreadId) =>
16+
Ref.update(cache, (m) => {
17+
const next = new Map(m);
18+
next.delete(threadId);
19+
return next;
20+
}),
21+
};
22+
}),
23+
);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import type { ProviderKind, ThreadId } from "@bigcode/contracts";
2+
import { ServiceMap } from "effect";
3+
import type { Effect } from "effect";
4+
5+
export interface ProviderStatusEntry {
6+
readonly providerName: ProviderKind;
7+
readonly status: "initializing" | "ready" | "running" | "closed" | "stopped" | "error";
8+
readonly updatedAt: string;
9+
}
10+
11+
export interface ProviderStatusCacheShape {
12+
readonly get: (threadId: ThreadId) => Effect.Effect<ProviderStatusEntry | undefined>;
13+
readonly set: (threadId: ThreadId, entry: ProviderStatusEntry) => Effect.Effect<void>;
14+
readonly delete: (threadId: ThreadId) => Effect.Effect<void>;
15+
}
16+
17+
export class ProviderStatusCache extends ServiceMap.Service<
18+
ProviderStatusCache,
19+
ProviderStatusCacheShape
20+
>()("t3/provider/Services/ProviderStatusCache") {}

0 commit comments

Comments
 (0)