Skip to content

Commit 7184e82

Browse files
committed
feat: prioritize control-plane orchestration commands
Cherry-picked from upstream pingdotgg#1689
1 parent 2c1f460 commit 7184e82

4 files changed

Lines changed: 281 additions & 6 deletions

File tree

apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,157 @@ describe("OrchestrationEngine", () => {
370370
await system.dispose();
371371
});
372372

373+
it("prioritizes client commands ahead of queued internal stream commands", async () => {
374+
const system = await createOrchestrationSystem();
375+
const { engine } = system;
376+
const createdAt = now();
377+
const threadId = ThreadId.makeUnsafe("thread-priority-lane");
378+
379+
await system.run(
380+
engine.dispatch({
381+
type: "project.create",
382+
commandId: CommandId.makeUnsafe("cmd-project-priority-lane-create"),
383+
projectId: asProjectId("project-priority-lane"),
384+
title: "Priority Lane Project",
385+
workspaceRoot: "/tmp/project-priority-lane",
386+
defaultModelSelection: {
387+
provider: "codex",
388+
model: "gpt-5-codex",
389+
},
390+
createdAt,
391+
}),
392+
);
393+
await system.run(
394+
engine.dispatch({
395+
type: "thread.create",
396+
commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-create"),
397+
threadId,
398+
projectId: asProjectId("project-priority-lane"),
399+
title: "Priority Lane Thread",
400+
modelSelection: {
401+
provider: "codex",
402+
model: "gpt-5-codex",
403+
},
404+
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
405+
runtimeMode: "full-access",
406+
branch: null,
407+
worktreePath: null,
408+
createdAt,
409+
}),
410+
);
411+
412+
const lowPriorityCommandCount = 250;
413+
const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) =>
414+
system.run(
415+
engine.dispatch({
416+
type: "thread.message.assistant.delta",
417+
commandId: CommandId.makeUnsafe(`cmd-thread-priority-lane-delta-${index}`),
418+
threadId,
419+
messageId: asMessageId("assistant-priority-lane"),
420+
delta: `chunk-${index}`,
421+
turnId: asTurnId("turn-priority-lane"),
422+
createdAt,
423+
}),
424+
),
425+
);
426+
427+
const archiveResult = await system.run(
428+
engine.dispatch({
429+
type: "thread.archive",
430+
commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-archive"),
431+
threadId,
432+
}),
433+
);
434+
const lowPriorityResults = await Promise.all(lowPriorityDispatches);
435+
const lowPriorityCommandsAheadOfArchive = lowPriorityResults.filter(
436+
(result) => result.sequence < archiveResult.sequence,
437+
).length;
438+
439+
expect(lowPriorityCommandsAheadOfArchive).toBeLessThan(lowPriorityCommandCount / 3);
440+
expect(archiveResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity);
441+
await system.dispose();
442+
});
443+
444+
it("treats normalized thread.turn.start as a prioritized client command", async () => {
445+
const system = await createOrchestrationSystem();
446+
const { engine } = system;
447+
const createdAt = now();
448+
const threadId = ThreadId.makeUnsafe("thread-priority-turn-start");
449+
450+
await system.run(
451+
engine.dispatch({
452+
type: "project.create",
453+
commandId: CommandId.makeUnsafe("cmd-project-priority-turn-start-create"),
454+
projectId: asProjectId("project-priority-turn-start"),
455+
title: "Priority Turn Start Project",
456+
workspaceRoot: "/tmp/project-priority-turn-start",
457+
defaultModelSelection: {
458+
provider: "codex",
459+
model: "gpt-5-codex",
460+
},
461+
createdAt,
462+
}),
463+
);
464+
await system.run(
465+
engine.dispatch({
466+
type: "thread.create",
467+
commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start-create"),
468+
threadId,
469+
projectId: asProjectId("project-priority-turn-start"),
470+
title: "Priority Turn Start Thread",
471+
modelSelection: {
472+
provider: "codex",
473+
model: "gpt-5-codex",
474+
},
475+
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
476+
runtimeMode: "full-access",
477+
branch: null,
478+
worktreePath: null,
479+
createdAt,
480+
}),
481+
);
482+
483+
const lowPriorityCommandCount = 150;
484+
const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) =>
485+
system.run(
486+
engine.dispatch({
487+
type: "thread.message.assistant.delta",
488+
commandId: CommandId.makeUnsafe(`cmd-thread-priority-turn-start-delta-${index}`),
489+
threadId,
490+
messageId: asMessageId("assistant-priority-turn-start"),
491+
delta: `chunk-${index}`,
492+
turnId: asTurnId("turn-priority-turn-start-internal"),
493+
createdAt,
494+
}),
495+
),
496+
);
497+
498+
const turnStartResult = await system.run(
499+
engine.dispatch({
500+
type: "thread.turn.start",
501+
commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start"),
502+
threadId,
503+
message: {
504+
messageId: asMessageId("user-priority-turn-start"),
505+
role: "user",
506+
text: "hello",
507+
attachments: [],
508+
},
509+
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
510+
runtimeMode: "full-access",
511+
createdAt,
512+
}),
513+
);
514+
const lowPriorityResults = await Promise.all(lowPriorityDispatches);
515+
const lowPriorityCommandsAheadOfTurnStart = lowPriorityResults.filter(
516+
(result) => result.sequence < turnStartResult.sequence,
517+
).length;
518+
519+
expect(lowPriorityCommandsAheadOfTurnStart).toBeLessThan(lowPriorityCommandCount / 3);
520+
expect(turnStartResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity);
521+
await system.dispose();
522+
});
523+
373524
it("streams persisted domain events in order", async () => {
374525
const system = await createOrchestrationSystem();
375526
const { engine } = system;

apps/server/src/orchestration/Layers/OrchestrationEngine.ts

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type {
44
ProjectId,
55
ThreadId,
66
} from "@t3tools/contracts";
7-
import { OrchestrationCommand } from "@t3tools/contracts";
7+
import { DispatchableClientOrchestrationCommand, OrchestrationCommand } from "@t3tools/contracts";
88
import {
99
Cause,
1010
Deferred,
@@ -14,10 +14,11 @@ import {
1414
Layer,
1515
Metric,
1616
Option,
17+
Order,
1718
PubSub,
18-
Queue,
1919
Schema,
2020
Stream,
21+
TxPriorityQueue,
2122
} from "effect";
2223
import * as SqlClient from "effect/unstable/sql/SqlClient";
2324

@@ -50,6 +51,24 @@ interface CommandEnvelope {
5051
startedAtMs: number;
5152
}
5253

54+
type CommandPriority = 0 | 1;
55+
56+
interface PrioritizedCommandEnvelope {
57+
readonly priority: CommandPriority;
58+
readonly insertionSequence: number;
59+
readonly envelope: CommandEnvelope;
60+
}
61+
62+
const COMMAND_PRIORITY = {
63+
control: 0,
64+
stream: 1,
65+
} as const satisfies Record<string, CommandPriority>;
66+
67+
const prioritizedCommandEnvelopeOrder = Order.combine(
68+
Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.priority),
69+
Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.insertionSequence),
70+
);
71+
5372
function commandToAggregateRef(command: OrchestrationCommand): {
5473
readonly aggregateKind: "project" | "thread";
5574
readonly aggregateId: ProjectId | ThreadId;
@@ -70,6 +89,13 @@ function commandToAggregateRef(command: OrchestrationCommand): {
7089
}
7190
}
7291

92+
function commandPriority(command: OrchestrationCommand): CommandPriority {
93+
if (Schema.is(DispatchableClientOrchestrationCommand)(command)) {
94+
return COMMAND_PRIORITY.control;
95+
}
96+
return COMMAND_PRIORITY.stream;
97+
}
98+
7399
const makeOrchestrationEngine = Effect.gen(function* () {
74100
const sql = yield* SqlClient.SqlClient;
75101
const eventStore = yield* OrchestrationEventStore;
@@ -78,8 +104,11 @@ const makeOrchestrationEngine = Effect.gen(function* () {
78104
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
79105

80106
let readModel = createEmptyReadModel(new Date().toISOString());
107+
let nextCommandInsertionSequence = 0;
81108

82-
const commandQueue = yield* Queue.unbounded<CommandEnvelope>();
109+
const commandQueue = yield* TxPriorityQueue.empty<PrioritizedCommandEnvelope>(
110+
prioritizedCommandEnvelopeOrder,
111+
);
83112
const eventPubSub = yield* PubSub.unbounded<OrchestrationEvent>();
84113

85114
const processEnvelope = (envelope: CommandEnvelope): Effect.Effect<void> => {
@@ -272,7 +301,13 @@ const makeOrchestrationEngine = Effect.gen(function* () {
272301
yield* projectionPipeline.bootstrap;
273302
readModel = yield* projectionSnapshotQuery.getSnapshot();
274303

275-
const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope)));
304+
const worker = Effect.forever(
305+
TxPriorityQueue.take(commandQueue).pipe(
306+
Effect.tx,
307+
Effect.map((item) => item.envelope),
308+
Effect.flatMap(processEnvelope),
309+
),
310+
);
276311
yield* Effect.forkScoped(worker);
277312
yield* Effect.logDebug("orchestration engine started").pipe(
278313
Effect.annotateLogs({ sequence: readModel.snapshotSequence }),
@@ -287,7 +322,11 @@ const makeOrchestrationEngine = Effect.gen(function* () {
287322
const dispatch: OrchestrationEngineShape["dispatch"] = (command) =>
288323
Effect.gen(function* () {
289324
const result = yield* Deferred.make<{ sequence: number }, OrchestrationDispatchError>();
290-
yield* Queue.offer(commandQueue, { command, result, startedAtMs: Date.now() });
325+
yield* TxPriorityQueue.offer(commandQueue, {
326+
priority: commandPriority(command),
327+
insertionSequence: nextCommandInsertionSequence++,
328+
envelope: { command, result, startedAtMs: Date.now() },
329+
}).pipe(Effect.tx);
291330
return yield* Deferred.await(result);
292331
});
293332

packages/contracts/src/orchestration.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ const ThreadSessionStopCommand = Schema.Struct({
589589
createdAt: IsoDateTime,
590590
});
591591

592-
const DispatchableClientOrchestrationCommand = Schema.Union([
592+
export const DispatchableClientOrchestrationCommand = Schema.Union([
593593
ProjectCreateCommand,
594594
ProjectMetaUpdateCommand,
595595
ProjectDeleteCommand,

0 commit comments

Comments
 (0)