Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 141 additions & 26 deletions functions/api/[[path]].ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class PgDatabase {
prepare(query: string): PgPreparedStatement {
return new PgPreparedStatement(this.connectionString, query);
}

async withClient<T>(handler: (client: Client) => Promise<T>): Promise<T> {
const client = new Client({
connectionString: this.connectionString,
application_name: "agent-comms-core",
});
await client.connect();
try {
return await handler(client);
} finally {
await client.end();
}
}
}

class PgPreparedStatement {
Expand Down Expand Up @@ -91,6 +104,10 @@ function toPostgresPlaceholders(query: string): string {
return query.replace(/\?/g, () => `$${++index}`);
}

async function pgAll<T = Row>(client: Client, query: string, values: unknown[] = []): Promise<{ results: T[] }> {
return { results: (await client.query(toPostgresPlaceholders(query), values)).rows as T[] };
}

const json = (payload: unknown, status = 200) =>
Response.json(payload, {
status,
Expand Down Expand Up @@ -750,23 +767,126 @@ async function listAgents(env: Env) {
return json({ agents: results.map((row) => normalizeAgent(row as Row)) });
}

function operatorBootstrapPayload(input: {
forums: Row[];
threads: Row[];
replies: Row[];
suggestions: Row[];
agents: Row[];
directConversations: Row[];
directMessages: Row[];
gates: Row[];
gateEvidenceItems: Row[];
liveSessions: Row[];
liveReceipts: Row[];
previewStorage?: boolean;
}) {
return {
forums: input.forums.map((row) => normalizeForum(row)),
threads: input.threads.map((row) => normalizeThread(row, input.previewStorage ? "preview" : "operator")),
replies: input.replies.map((row) => normalizeReply(row)),
suggestions: input.suggestions.map((row) => normalizeSuggestion(row)),
agents: input.agents.map((row) => normalizeAgent(row)),
conversations: input.directConversations.map((row) => normalizeConversation(row)),
messages: input.directMessages.map((row) => normalizeDirectMessage(row)),
gates: input.gates.map((row) =>
normalizeGate(row, input.gateEvidenceItems.filter((item) => item.gate_id === row.id)),
),
sessions: input.liveSessions.map((session) =>
normalizeLiveSession(
session,
input.liveReceipts.filter((receipt) => receipt.session_id === session.id),
),
),
...(input.previewStorage ? { previewStorage: true } : {}),
};
}

async function operatorBootstrap(env: Env) {
const db = requireDb(env);
if (!db.ok) {
return json({
forums: memory.forums.map(normalizeForum),
threads: memory.threads.map((row) => normalizeThread(row as Row, "preview")),
return json(operatorBootstrapPayload({
forums: memory.forums as Row[],
threads: memory.threads as Row[],
replies: [],
suggestions: memory.suggestions.map(normalizeSuggestion),
suggestions: memory.suggestions as Row[],
agents: [],
conversations: [],
messages: memory.directMessages.map((row) => normalizeDirectMessage(row as Row)),
sessions: [],
directConversations: [],
directMessages: memory.directMessages as Row[],
gates: [],
gateEvidenceItems: [],
liveSessions: [],
liveReceipts: [],
previewStorage: true,
});
}));
}
const database = db.db;
if (database instanceof PgDatabase) {
return json(await database.withClient(async (client) => {
const forums = await pgAll<Row>(client, "SELECT * FROM forums ORDER BY name");
const threads = await pgAll<Row>(client, "SELECT * FROM threads ORDER BY created_at DESC");
const replies = await pgAll<Row>(client, "SELECT * FROM thread_replies ORDER BY created_at ASC");
const suggestions = await pgAll<Row>(client, "SELECT * FROM suggestion_cards ORDER BY created_at DESC");
const agents = await pgAll<Row>(
client,
`SELECT a.*, p.agent_id, p.project, p.role, p.summary, p.tools_json,
p.interested_projects_json, p.capabilities_json, p.operating_notes,
p.updated_at
FROM agent_identities a
LEFT JOIN agent_profiles p ON p.agent_id = a.id
ORDER BY a.handle`,
);
const directConversations = await pgAll<Row>(
client,
`SELECT id, agent_a_id, agent_b_id
FROM direct_conversations
ORDER BY id`,
);
const directMessages = await pgAll<Row>(
client,
`SELECT id, conversation_id, sender_agent_id, 'agent' AS sender_kind, body, created_at
FROM direct_messages
UNION ALL
SELECT id, conversation_id, sender_human_id AS sender_agent_id, 'human' AS sender_kind, body, created_at
FROM direct_operator_messages
ORDER BY created_at ASC`,
);
const gates = await pgAll<Row>(client, "SELECT * FROM cross_project_gates ORDER BY updated_at DESC");
const liveSessions = await pgAll<Row>(client, "SELECT * FROM live_conversation_sessions ORDER BY created_at DESC");
const gateIds = gates.results.map((gate) => String(gate.id));
const liveSessionIds = liveSessions.results.map((session) => String(session.id));
const gateEvidenceItems = gateIds.length
? await pgAll<Row>(
client,
`SELECT * FROM gate_evidence_items WHERE gate_id IN (${gateIds.map(() => "?").join(",")}) ORDER BY updated_at DESC`,
gateIds,
)
: { results: [] as Row[] };
const liveReceipts = liveSessionIds.length
? await pgAll<Row>(
client,
`SELECT * FROM live_conversation_receipts
WHERE session_id IN (${liveSessionIds.map(() => "?").join(",")})
ORDER BY updated_at DESC`,
liveSessionIds,
)
: { results: [] as Row[] };

return operatorBootstrapPayload({
forums: forums.results,
threads: threads.results,
replies: replies.results,
suggestions: suggestions.results,
agents: agents.results,
directConversations: directConversations.results,
directMessages: directMessages.results,
gates: gates.results,
gateEvidenceItems: gateEvidenceItems.results,
liveSessions: liveSessions.results,
liveReceipts: liveReceipts.results,
});
}));
}
const [
forums,
threads,
Expand Down Expand Up @@ -833,24 +953,19 @@ async function operatorBootstrap(env: Env) {
: Promise.resolve({ results: [] as Row[] }),
]);

return json({
forums: forums.results.map((row) => normalizeForum(row)),
threads: threads.results.map((row) => normalizeThread(row, "operator")),
replies: replies.results.map((row) => normalizeReply(row)),
suggestions: suggestions.results.map((row) => normalizeSuggestion(row)),
agents: agents.results.map((row) => normalizeAgent(row)),
conversations: directConversations.results.map((row) => normalizeConversation(row)),
messages: directMessages.results.map((row) => normalizeDirectMessage(row)),
gates: gates.results.map((row) =>
normalizeGate(row, gateEvidenceItems.results.filter((item) => item.gate_id === row.id)),
),
sessions: liveSessions.results.map((session) =>
normalizeLiveSession(
session,
liveReceipts.results.filter((receipt) => receipt.session_id === session.id),
),
),
});
return json(operatorBootstrapPayload({
forums: forums.results,
threads: threads.results,
replies: replies.results,
suggestions: suggestions.results,
agents: agents.results,
directConversations: directConversations.results,
directMessages: directMessages.results,
gates: gates.results,
gateEvidenceItems: gateEvidenceItems.results,
liveSessions: liveSessions.results,
liveReceipts: liveReceipts.results,
}));
}

async function listThreads(env: Env, forumId?: string | null) {
Expand Down
Loading