Skip to content

Commit 84c6517

Browse files
authored
Use one Postgres connection for operator bootstrap (#65)
1 parent 34e2192 commit 84c6517

1 file changed

Lines changed: 141 additions & 26 deletions

File tree

functions/api/[[path]].ts

Lines changed: 141 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ class PgDatabase {
4444
prepare(query: string): PgPreparedStatement {
4545
return new PgPreparedStatement(this.connectionString, query);
4646
}
47+
48+
async withClient<T>(handler: (client: Client) => Promise<T>): Promise<T> {
49+
const client = new Client({
50+
connectionString: this.connectionString,
51+
application_name: "agent-comms-core",
52+
});
53+
await client.connect();
54+
try {
55+
return await handler(client);
56+
} finally {
57+
await client.end();
58+
}
59+
}
4760
}
4861

4962
class PgPreparedStatement {
@@ -91,6 +104,10 @@ function toPostgresPlaceholders(query: string): string {
91104
return query.replace(/\?/g, () => `$${++index}`);
92105
}
93106

107+
async function pgAll<T = Row>(client: Client, query: string, values: unknown[] = []): Promise<{ results: T[] }> {
108+
return { results: (await client.query(toPostgresPlaceholders(query), values)).rows as T[] };
109+
}
110+
94111
const json = (payload: unknown, status = 200) =>
95112
Response.json(payload, {
96113
status,
@@ -750,23 +767,126 @@ async function listAgents(env: Env) {
750767
return json({ agents: results.map((row) => normalizeAgent(row as Row)) });
751768
}
752769

770+
function operatorBootstrapPayload(input: {
771+
forums: Row[];
772+
threads: Row[];
773+
replies: Row[];
774+
suggestions: Row[];
775+
agents: Row[];
776+
directConversations: Row[];
777+
directMessages: Row[];
778+
gates: Row[];
779+
gateEvidenceItems: Row[];
780+
liveSessions: Row[];
781+
liveReceipts: Row[];
782+
previewStorage?: boolean;
783+
}) {
784+
return {
785+
forums: input.forums.map((row) => normalizeForum(row)),
786+
threads: input.threads.map((row) => normalizeThread(row, input.previewStorage ? "preview" : "operator")),
787+
replies: input.replies.map((row) => normalizeReply(row)),
788+
suggestions: input.suggestions.map((row) => normalizeSuggestion(row)),
789+
agents: input.agents.map((row) => normalizeAgent(row)),
790+
conversations: input.directConversations.map((row) => normalizeConversation(row)),
791+
messages: input.directMessages.map((row) => normalizeDirectMessage(row)),
792+
gates: input.gates.map((row) =>
793+
normalizeGate(row, input.gateEvidenceItems.filter((item) => item.gate_id === row.id)),
794+
),
795+
sessions: input.liveSessions.map((session) =>
796+
normalizeLiveSession(
797+
session,
798+
input.liveReceipts.filter((receipt) => receipt.session_id === session.id),
799+
),
800+
),
801+
...(input.previewStorage ? { previewStorage: true } : {}),
802+
};
803+
}
804+
753805
async function operatorBootstrap(env: Env) {
754806
const db = requireDb(env);
755807
if (!db.ok) {
756-
return json({
757-
forums: memory.forums.map(normalizeForum),
758-
threads: memory.threads.map((row) => normalizeThread(row as Row, "preview")),
808+
return json(operatorBootstrapPayload({
809+
forums: memory.forums as Row[],
810+
threads: memory.threads as Row[],
759811
replies: [],
760-
suggestions: memory.suggestions.map(normalizeSuggestion),
812+
suggestions: memory.suggestions as Row[],
761813
agents: [],
762-
conversations: [],
763-
messages: memory.directMessages.map((row) => normalizeDirectMessage(row as Row)),
764-
sessions: [],
814+
directConversations: [],
815+
directMessages: memory.directMessages as Row[],
765816
gates: [],
817+
gateEvidenceItems: [],
818+
liveSessions: [],
819+
liveReceipts: [],
766820
previewStorage: true,
767-
});
821+
}));
768822
}
769823
const database = db.db;
824+
if (database instanceof PgDatabase) {
825+
return json(await database.withClient(async (client) => {
826+
const forums = await pgAll<Row>(client, "SELECT * FROM forums ORDER BY name");
827+
const threads = await pgAll<Row>(client, "SELECT * FROM threads ORDER BY created_at DESC");
828+
const replies = await pgAll<Row>(client, "SELECT * FROM thread_replies ORDER BY created_at ASC");
829+
const suggestions = await pgAll<Row>(client, "SELECT * FROM suggestion_cards ORDER BY created_at DESC");
830+
const agents = await pgAll<Row>(
831+
client,
832+
`SELECT a.*, p.agent_id, p.project, p.role, p.summary, p.tools_json,
833+
p.interested_projects_json, p.capabilities_json, p.operating_notes,
834+
p.updated_at
835+
FROM agent_identities a
836+
LEFT JOIN agent_profiles p ON p.agent_id = a.id
837+
ORDER BY a.handle`,
838+
);
839+
const directConversations = await pgAll<Row>(
840+
client,
841+
`SELECT id, agent_a_id, agent_b_id
842+
FROM direct_conversations
843+
ORDER BY id`,
844+
);
845+
const directMessages = await pgAll<Row>(
846+
client,
847+
`SELECT id, conversation_id, sender_agent_id, 'agent' AS sender_kind, body, created_at
848+
FROM direct_messages
849+
UNION ALL
850+
SELECT id, conversation_id, sender_human_id AS sender_agent_id, 'human' AS sender_kind, body, created_at
851+
FROM direct_operator_messages
852+
ORDER BY created_at ASC`,
853+
);
854+
const gates = await pgAll<Row>(client, "SELECT * FROM cross_project_gates ORDER BY updated_at DESC");
855+
const liveSessions = await pgAll<Row>(client, "SELECT * FROM live_conversation_sessions ORDER BY created_at DESC");
856+
const gateIds = gates.results.map((gate) => String(gate.id));
857+
const liveSessionIds = liveSessions.results.map((session) => String(session.id));
858+
const gateEvidenceItems = gateIds.length
859+
? await pgAll<Row>(
860+
client,
861+
`SELECT * FROM gate_evidence_items WHERE gate_id IN (${gateIds.map(() => "?").join(",")}) ORDER BY updated_at DESC`,
862+
gateIds,
863+
)
864+
: { results: [] as Row[] };
865+
const liveReceipts = liveSessionIds.length
866+
? await pgAll<Row>(
867+
client,
868+
`SELECT * FROM live_conversation_receipts
869+
WHERE session_id IN (${liveSessionIds.map(() => "?").join(",")})
870+
ORDER BY updated_at DESC`,
871+
liveSessionIds,
872+
)
873+
: { results: [] as Row[] };
874+
875+
return operatorBootstrapPayload({
876+
forums: forums.results,
877+
threads: threads.results,
878+
replies: replies.results,
879+
suggestions: suggestions.results,
880+
agents: agents.results,
881+
directConversations: directConversations.results,
882+
directMessages: directMessages.results,
883+
gates: gates.results,
884+
gateEvidenceItems: gateEvidenceItems.results,
885+
liveSessions: liveSessions.results,
886+
liveReceipts: liveReceipts.results,
887+
});
888+
}));
889+
}
770890
const [
771891
forums,
772892
threads,
@@ -833,24 +953,19 @@ async function operatorBootstrap(env: Env) {
833953
: Promise.resolve({ results: [] as Row[] }),
834954
]);
835955

836-
return json({
837-
forums: forums.results.map((row) => normalizeForum(row)),
838-
threads: threads.results.map((row) => normalizeThread(row, "operator")),
839-
replies: replies.results.map((row) => normalizeReply(row)),
840-
suggestions: suggestions.results.map((row) => normalizeSuggestion(row)),
841-
agents: agents.results.map((row) => normalizeAgent(row)),
842-
conversations: directConversations.results.map((row) => normalizeConversation(row)),
843-
messages: directMessages.results.map((row) => normalizeDirectMessage(row)),
844-
gates: gates.results.map((row) =>
845-
normalizeGate(row, gateEvidenceItems.results.filter((item) => item.gate_id === row.id)),
846-
),
847-
sessions: liveSessions.results.map((session) =>
848-
normalizeLiveSession(
849-
session,
850-
liveReceipts.results.filter((receipt) => receipt.session_id === session.id),
851-
),
852-
),
853-
});
956+
return json(operatorBootstrapPayload({
957+
forums: forums.results,
958+
threads: threads.results,
959+
replies: replies.results,
960+
suggestions: suggestions.results,
961+
agents: agents.results,
962+
directConversations: directConversations.results,
963+
directMessages: directMessages.results,
964+
gates: gates.results,
965+
gateEvidenceItems: gateEvidenceItems.results,
966+
liveSessions: liveSessions.results,
967+
liveReceipts: liveReceipts.results,
968+
}));
854969
}
855970

856971
async function listThreads(env: Env, forumId?: string | null) {

0 commit comments

Comments
 (0)