Skip to content

Commit 76ece64

Browse files
committed
feat: better and more reliable ai agent pipeline
1 parent 279cab3 commit 76ece64

68 files changed

Lines changed: 3381 additions & 1364 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/api/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
"@hono/zod-openapi": "^1.2.0",
3030
"@hono/zod-validator": "^0.7.6",
3131
"@openrouter/ai-sdk-provider": "^2.0.0",
32-
"@polar-sh/better-auth": "^1.6.4",
33-
"@polar-sh/hono": "^0.5.3",
3432
"@tinybirdco/sdk": "^0.0.33",
3533
"@trpc/server": "^11.8.1",
3634
"@types/node": "^24.10.9",

apps/api/src/ai-agent/AI-README.md

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -658,31 +658,36 @@ When adding/updating a tool:
658658
### Response Timing
659659

660660
Queue delay is disabled (0ms) so the AI responds as fast as possible.
661-
For visitor-trigger bursts, worker-side debounce (`AI_AGENT_VISITOR_DEBOUNCE_MS`, default 800ms) is applied before selecting an effective trigger.
661+
No visitor burst coalescing or debounce is applied.
662662
Natural typing delays between multi-part messages are still applied to keep the experience human.
663663

664664
### Queueing Model
665665

666666
- Each conversation has a Redis sorted set queue ordered by `createdAt` (with `messageId` tiebreaker).
667-
- A BullMQ drain job processes messages sequentially and advances a DB cursor for recovery.
668-
- Visitor bursts are coalesced at queue head: contiguous visitor triggers are handled as one effective trigger (latest message in the burst).
667+
- Wake jobs are conversation-scoped (`ai-agent-{conversationId}`), with single-active semantics:
668+
- `waiting`/`delayed`/`completed`/`failed` wake jobs are replaced
669+
- `active` wake jobs are never replaced
670+
- A BullMQ drain job processes queued messages sequentially and advances a DB cursor for recovery.
669671
- BullMQ wake jobs remain signals only; Redis queue + DB cursor are authoritative state.
672+
- Conversations with queued items are tracked in Redis (`ai-agent:active-conversations`), and producer/worker recovery markers are tracked via `ai-agent:wake-needed:{conversationId}`.
673+
- A worker-side wake sweeper periodically repairs missing wakes for non-empty queues.
670674

671675
### Trigger-Level Reliability Rules
672676

673677
1. **FIFO Trigger Processing**: Conversation triggers are processed in queue order using the Redis ZSET cursor model.
674-
2. **Burst Coalescing**: Contiguous visitor messages at queue head are coalesced and processed once using the latest coalesced trigger.
675-
3. **Continuation Gate**: If a queued visitor trigger already has a newer public AI reply, the pipeline runs `skip vs supplement` before generation.
676-
4. **Bias to Supplement on Uncertainty**: If continuation classification is uncertain (timeout/model error), fallback favors `supplement` (never silent miss).
677-
5. **No Full-Turn Retry After Visible Reply**: If a trigger already sent any public message, that trigger is marked `retryable=false` and dropped on subsequent pipeline error.
678-
6. **Retry Only Pre-Reply Failures**: If a trigger fails before any public send, it stays queued and is retried (with per-message failure threshold).
679-
7. **Typing Always Ends**: Typing is stopped before each visible send and force-stopped in final pipeline cleanup.
678+
2. **Strict Per-Conversation Serial Execution**: Redis lock (`ai-agent:lock:{conversationId}`) ensures only one worker processes a conversation at a time.
679+
3. **No Burst Coalescing**: Every queued message is processed in order; no contiguous visitor batching.
680+
4. **Reliable Producer Path**: Producer enqueues message (`ZADD NX`) then ensures wake with bounded retries; on exhaustion it marks `wake-needed` recovery.
681+
5. **Lock Miss/Loss Recovery**: Worker attempts continuation wake with jitter when lock cannot be acquired or is lost during processing.
682+
6. **End-of-Job Invariant**: If queue remains non-empty, worker must ensure a runnable wake exists or mark recovery.
683+
7. **Sweeper Reconciliation**: Periodic sweeper scans active + wake-needed conversations and recreates missing wakes.
684+
8. **Typing Always Ends**: Typing is stopped before each visible send and force-stopped in final pipeline cleanup.
680685

681686
### Failure Handling
682687

683-
1. **`retryable=true` and below threshold**: Keep trigger message at queue head, schedule continuation drain.
684-
2. **`retryable=false`**: Advance cursor to effective trigger and remove processed/coalesced queue items immediately.
685-
3. **Threshold reached**: Drop trigger/coalesced batch, advance cursor, continue draining.
688+
1. **`retryable=true` and below threshold**: Keep trigger message at queue head for retry.
689+
2. **`retryable=false`**: Advance cursor and remove the failed message immediately.
690+
3. **Threshold reached**: Drop failed message, advance cursor, continue draining.
686691
4. **Stalled jobs**: BullMQ stalled-job recovery still applies at worker level.
687692
5. **Error events**: `aiAgentProcessingCompleted` with `status: "error"` is still emitted for dashboard observability.
688693

apps/api/src/ai-agent/actions/send-message.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ export async function sendMessage(
105105
userId: null,
106106
visitorId: null,
107107
createdAt,
108-
triggerNotificationWorkflow: false,
109108
});
110109

111110
try {

apps/api/src/ai-agent/capabilities-studio.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ function createDocument(
6868
}
6969

7070
describe("buildCapabilitiesStudioResponse", () => {
71+
it("keeps wait.md in dropped skill template names", () => {
72+
expect(AI_AGENT_DROPPED_SKILL_TEMPLATE_NAMES).toContain("wait.md");
73+
});
74+
7175
it("maps behavior settings to runtime tool enabled state", () => {
7276
const response = buildCapabilitiesStudioResponse({
7377
aiAgent: createAgent({
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { describe, expect, it } from "bun:test";
2+
import { getConversationState } from "./state";
3+
4+
describe("getConversationState", () => {
5+
it("starts assignee and participant queries in parallel", async () => {
6+
let selectCallCount = 0;
7+
let resolveAssignees!: (value: Array<{ userId: string }>) => void;
8+
let resolveParticipants!: (value: Array<{ userId: string }>) => void;
9+
10+
const assigneesPromise = new Promise<Array<{ userId: string }>>(
11+
(resolve) => {
12+
resolveAssignees = (value) => resolve(value);
13+
}
14+
);
15+
const participantsPromise = new Promise<Array<{ userId: string }>>(
16+
(resolve) => {
17+
resolveParticipants = (value) => resolve(value);
18+
}
19+
);
20+
21+
const db = {
22+
select: () => {
23+
selectCallCount++;
24+
const pending =
25+
selectCallCount === 1 ? assigneesPromise : participantsPromise;
26+
return {
27+
from: () => ({
28+
where: () => pending,
29+
}),
30+
};
31+
},
32+
};
33+
34+
const statePromise = getConversationState(
35+
db as never,
36+
{ conversationId: "conv-1", organizationId: "org-1" },
37+
{
38+
escalatedAt: "2025-01-01T00:00:00.000Z",
39+
escalationHandledAt: null,
40+
escalationReason: "needs specialist",
41+
} as never
42+
);
43+
44+
await Promise.resolve();
45+
expect(selectCallCount).toBe(2);
46+
47+
resolveAssignees([{ userId: "user-1" }]);
48+
resolveParticipants([{ userId: "user-2" }]);
49+
50+
const result = await statePromise;
51+
expect(result).toEqual({
52+
hasHumanAssignee: true,
53+
assigneeIds: ["user-1"],
54+
participantIds: ["user-2"],
55+
isEscalated: true,
56+
escalationReason: "needs specialist",
57+
});
58+
});
59+
60+
it("keeps escalation false when escalation is handled", async () => {
61+
const db = {
62+
select: () => ({
63+
from: () => ({
64+
where: async () => [],
65+
}),
66+
}),
67+
};
68+
69+
const result = await getConversationState(
70+
db as never,
71+
{ conversationId: "conv-1", organizationId: "org-1" },
72+
{
73+
escalatedAt: "2025-01-01T00:00:00.000Z",
74+
escalationHandledAt: "2025-01-01T00:01:00.000Z",
75+
escalationReason: null,
76+
} as never
77+
);
78+
79+
expect(result.isEscalated).toBe(false);
80+
expect(result.escalationReason).toBeNull();
81+
});
82+
});

apps/api/src/ai-agent/context/state.ts

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,28 @@ export async function getConversationState(
3737
params: GetStateParams,
3838
conversation: ConversationSelect
3939
): Promise<ConversationState> {
40-
// Get active assignees
41-
const assignees = await db
42-
.select({ userId: conversationAssignee.userId })
43-
.from(conversationAssignee)
44-
.where(
45-
and(
46-
eq(conversationAssignee.conversationId, params.conversationId),
47-
eq(conversationAssignee.organizationId, params.organizationId),
48-
isNull(conversationAssignee.unassignedAt)
49-
)
50-
);
51-
52-
// Get active participants
53-
const participants = await db
54-
.select({ userId: conversationParticipant.userId })
55-
.from(conversationParticipant)
56-
.where(
57-
and(
58-
eq(conversationParticipant.conversationId, params.conversationId),
59-
eq(conversationParticipant.organizationId, params.organizationId),
60-
isNull(conversationParticipant.leftAt)
61-
)
62-
);
40+
const [assignees, participants] = await Promise.all([
41+
db
42+
.select({ userId: conversationAssignee.userId })
43+
.from(conversationAssignee)
44+
.where(
45+
and(
46+
eq(conversationAssignee.conversationId, params.conversationId),
47+
eq(conversationAssignee.organizationId, params.organizationId),
48+
isNull(conversationAssignee.unassignedAt)
49+
)
50+
),
51+
db
52+
.select({ userId: conversationParticipant.userId })
53+
.from(conversationParticipant)
54+
.where(
55+
and(
56+
eq(conversationParticipant.conversationId, params.conversationId),
57+
eq(conversationParticipant.organizationId, params.organizationId),
58+
isNull(conversationParticipant.leftAt)
59+
)
60+
),
61+
]);
6362

6463
const assigneeIds = assignees.map((a) => a.userId);
6564
const participantIds = participants.map((p) => p.userId);

apps/api/src/ai-agent/output/parser.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ export function validateDecisionForExecution(decision: AiDecision): {
4949
case "resolve":
5050
case "mark_spam":
5151
case "skip":
52-
case "wait":
5352
// No special validation needed - messages are sent via tools
5453
break;
5554

apps/api/src/ai-agent/output/schemas.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ export const aiDecisionSchema = z.object({
4141
"resolve", // Mark conversation as resolved
4242
"mark_spam", // Mark as spam
4343
"skip", // No action needed
44-
"wait", // Defer briefly, then re-evaluate from decision stage
4544
])
4645
.describe("The action to take after sending messages"),
4746

0 commit comments

Comments
 (0)