From 71fcace5e4acda0bd0fbe057984383e07c22e7c4 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Wed, 27 May 2026 01:09:09 +0300 Subject: [PATCH] Use one Postgres connection for operator bootstrap --- functions/api/[[path]].ts | 167 ++++++++++++++++++++++++++++++++------ 1 file changed, 141 insertions(+), 26 deletions(-) diff --git a/functions/api/[[path]].ts b/functions/api/[[path]].ts index 83e711b..1d8c4ff 100644 --- a/functions/api/[[path]].ts +++ b/functions/api/[[path]].ts @@ -44,6 +44,19 @@ class PgDatabase { prepare(query: string): PgPreparedStatement { return new PgPreparedStatement(this.connectionString, query); } + + async withClient(handler: (client: Client) => Promise): Promise { + const client = new Client({ + connectionString: this.connectionString, + application_name: "agent-comms-core", + }); + await client.connect(); + try { + return await handler(client); + } finally { + await client.end(); + } + } } class PgPreparedStatement { @@ -91,6 +104,10 @@ function toPostgresPlaceholders(query: string): string { return query.replace(/\?/g, () => `$${++index}`); } +async function pgAll(client: Client, query: string, values: unknown[] = []): Promise<{ results: T[] }> { + return { results: (await client.query(toPostgresPlaceholders(query), values)).rows as T[] }; +} + const json = (payload: unknown, status = 200) => Response.json(payload, { status, @@ -750,23 +767,126 @@ async function listAgents(env: Env) { return json({ agents: results.map((row) => normalizeAgent(row as Row)) }); } +function operatorBootstrapPayload(input: { + forums: Row[]; + threads: Row[]; + replies: Row[]; + suggestions: Row[]; + agents: Row[]; + directConversations: Row[]; + directMessages: Row[]; + gates: Row[]; + gateEvidenceItems: Row[]; + liveSessions: Row[]; + liveReceipts: Row[]; + previewStorage?: boolean; +}) { + return { + forums: input.forums.map((row) => normalizeForum(row)), + threads: input.threads.map((row) => normalizeThread(row, input.previewStorage ? "preview" : "operator")), + replies: input.replies.map((row) => normalizeReply(row)), + suggestions: input.suggestions.map((row) => normalizeSuggestion(row)), + agents: input.agents.map((row) => normalizeAgent(row)), + conversations: input.directConversations.map((row) => normalizeConversation(row)), + messages: input.directMessages.map((row) => normalizeDirectMessage(row)), + gates: input.gates.map((row) => + normalizeGate(row, input.gateEvidenceItems.filter((item) => item.gate_id === row.id)), + ), + sessions: input.liveSessions.map((session) => + normalizeLiveSession( + session, + input.liveReceipts.filter((receipt) => receipt.session_id === session.id), + ), + ), + ...(input.previewStorage ? { previewStorage: true } : {}), + }; +} + async function operatorBootstrap(env: Env) { const db = requireDb(env); if (!db.ok) { - return json({ - forums: memory.forums.map(normalizeForum), - threads: memory.threads.map((row) => normalizeThread(row as Row, "preview")), + return json(operatorBootstrapPayload({ + forums: memory.forums as Row[], + threads: memory.threads as Row[], replies: [], - suggestions: memory.suggestions.map(normalizeSuggestion), + suggestions: memory.suggestions as Row[], agents: [], - conversations: [], - messages: memory.directMessages.map((row) => normalizeDirectMessage(row as Row)), - sessions: [], + directConversations: [], + directMessages: memory.directMessages as Row[], gates: [], + gateEvidenceItems: [], + liveSessions: [], + liveReceipts: [], previewStorage: true, - }); + })); } const database = db.db; + if (database instanceof PgDatabase) { + return json(await database.withClient(async (client) => { + const forums = await pgAll(client, "SELECT * FROM forums ORDER BY name"); + const threads = await pgAll(client, "SELECT * FROM threads ORDER BY created_at DESC"); + const replies = await pgAll(client, "SELECT * FROM thread_replies ORDER BY created_at ASC"); + const suggestions = await pgAll(client, "SELECT * FROM suggestion_cards ORDER BY created_at DESC"); + const agents = await pgAll( + client, + `SELECT a.*, p.agent_id, p.project, p.role, p.summary, p.tools_json, + p.interested_projects_json, p.capabilities_json, p.operating_notes, + p.updated_at + FROM agent_identities a + LEFT JOIN agent_profiles p ON p.agent_id = a.id + ORDER BY a.handle`, + ); + const directConversations = await pgAll( + client, + `SELECT id, agent_a_id, agent_b_id + FROM direct_conversations + ORDER BY id`, + ); + const directMessages = await pgAll( + client, + `SELECT id, conversation_id, sender_agent_id, 'agent' AS sender_kind, body, created_at + FROM direct_messages + UNION ALL + SELECT id, conversation_id, sender_human_id AS sender_agent_id, 'human' AS sender_kind, body, created_at + FROM direct_operator_messages + ORDER BY created_at ASC`, + ); + const gates = await pgAll(client, "SELECT * FROM cross_project_gates ORDER BY updated_at DESC"); + const liveSessions = await pgAll(client, "SELECT * FROM live_conversation_sessions ORDER BY created_at DESC"); + const gateIds = gates.results.map((gate) => String(gate.id)); + const liveSessionIds = liveSessions.results.map((session) => String(session.id)); + const gateEvidenceItems = gateIds.length + ? await pgAll( + client, + `SELECT * FROM gate_evidence_items WHERE gate_id IN (${gateIds.map(() => "?").join(",")}) ORDER BY updated_at DESC`, + gateIds, + ) + : { results: [] as Row[] }; + const liveReceipts = liveSessionIds.length + ? await pgAll( + client, + `SELECT * FROM live_conversation_receipts + WHERE session_id IN (${liveSessionIds.map(() => "?").join(",")}) + ORDER BY updated_at DESC`, + liveSessionIds, + ) + : { results: [] as Row[] }; + + return operatorBootstrapPayload({ + forums: forums.results, + threads: threads.results, + replies: replies.results, + suggestions: suggestions.results, + agents: agents.results, + directConversations: directConversations.results, + directMessages: directMessages.results, + gates: gates.results, + gateEvidenceItems: gateEvidenceItems.results, + liveSessions: liveSessions.results, + liveReceipts: liveReceipts.results, + }); + })); + } const [ forums, threads, @@ -833,24 +953,19 @@ async function operatorBootstrap(env: Env) { : Promise.resolve({ results: [] as Row[] }), ]); - return json({ - forums: forums.results.map((row) => normalizeForum(row)), - threads: threads.results.map((row) => normalizeThread(row, "operator")), - replies: replies.results.map((row) => normalizeReply(row)), - suggestions: suggestions.results.map((row) => normalizeSuggestion(row)), - agents: agents.results.map((row) => normalizeAgent(row)), - conversations: directConversations.results.map((row) => normalizeConversation(row)), - messages: directMessages.results.map((row) => normalizeDirectMessage(row)), - gates: gates.results.map((row) => - normalizeGate(row, gateEvidenceItems.results.filter((item) => item.gate_id === row.id)), - ), - sessions: liveSessions.results.map((session) => - normalizeLiveSession( - session, - liveReceipts.results.filter((receipt) => receipt.session_id === session.id), - ), - ), - }); + return json(operatorBootstrapPayload({ + forums: forums.results, + threads: threads.results, + replies: replies.results, + suggestions: suggestions.results, + agents: agents.results, + directConversations: directConversations.results, + directMessages: directMessages.results, + gates: gates.results, + gateEvidenceItems: gateEvidenceItems.results, + liveSessions: liveSessions.results, + liveReceipts: liveReceipts.results, + })); } async function listThreads(env: Env, forumId?: string | null) {