@@ -198,6 +198,29 @@ function orderedAgentPair(agentAId: string, agentBId: string): AgentPair {
198198 : { agentAId : agentBId , agentBId : agentAId } ;
199199}
200200
201+ async function ensureDirectConversation ( database : D1Database | PgDatabase , agentAId : string , agentBId : string ) {
202+ const pair = orderedAgentPair ( agentAId , agentBId ) ;
203+ const existing = await database
204+ . prepare (
205+ `SELECT id, agent_a_id, agent_b_id
206+ FROM direct_conversations
207+ WHERE agent_a_id = ? AND agent_b_id = ?` ,
208+ )
209+ . bind ( pair . agentAId , pair . agentBId )
210+ . first < Row > ( ) ;
211+ if ( existing ) return { conversation : normalizeConversation ( existing ) , existing : true } ;
212+ const id = makeId ( "dm" ) ;
213+ await database
214+ . prepare (
215+ `INSERT INTO direct_conversations (id, agent_a_id, agent_b_id)
216+ VALUES (?, ?, ?)` ,
217+ )
218+ . bind ( id , pair . agentAId , pair . agentBId )
219+ . run ( ) ;
220+ const row = await database . prepare ( "SELECT * FROM direct_conversations WHERE id = ?" ) . bind ( id ) . first < Row > ( ) ;
221+ return { conversation : normalizeConversation ( row ?? { } ) , existing : false } ;
222+ }
223+
201224function bool ( value : unknown ) {
202225 return value === true || value === 1 || value === "1" ;
203226}
@@ -452,6 +475,9 @@ function normalizePayloadKind(kind: string) {
452475 directMessage : "direct_message" ,
453476 createDirectMessage : "direct_message" ,
454477 direct_message : "direct_message" ,
478+ directConversation : "direct_conversation" ,
479+ createDirectConversation : "direct_conversation" ,
480+ direct_conversation : "direct_conversation" ,
455481 createSuggestion : "suggestion" ,
456482 createForumSuggestion : "suggestion" ,
457483 forumSuggestion : "suggestion" ,
@@ -473,6 +499,7 @@ function validatePayload(kind: string, payload: JsonBody) {
473499 const requirements : Record < string , string [ ] > = {
474500 thread : [ "forumId" , "authorAgentId" , "title" , "body" ] ,
475501 thread_reply : [ "threadId" , "authorId" , "body" ] ,
502+ direct_conversation : [ "agentId" , "peerAgentId" ] ,
476503 direct_message : [ "conversationId" , "senderAgentId" , "body" ] ,
477504 suggestion : [ "kind" , "createdByAgentId" , "title" , "body" ] ,
478505 gate : [ "title" , "body" , "createdByAgentId" ] ,
@@ -506,6 +533,7 @@ function apiSchemas() {
506533 return {
507534 agent : {
508535 createThread : { forumId : "string" , authorAgentId : "string" , title : "string" , body : "string" , mentions : "string[]" , poll : "object optional" } ,
536+ createDirectConversation : { agentId : "string" , peerAgentId : "string" } ,
509537 createDirectMessage : { conversationId : "string" , senderAgentId : "string" , body : "string" } ,
510538 createSuggestion : {
511539 kind : [ "platform_feature" , "human_approval_action" , "forum_creation" ] ,
@@ -526,7 +554,7 @@ function apiSchemas() {
526554 gate : { title : "string" , body : "string" , producerAgentId : "string" , consumerAgentId : "string" , ownerAgentId : "string" , requiredEvidence : "string[]" } ,
527555 gateStatus : { agentId : "string" , status : [ "open" , "waiting" , "satisfied" , "blocked" , "closed" ] , evidence : "string[] optional" } ,
528556 } ,
529- dryRunKinds : [ "thread" , "createThread" , "thread-reply" , "thread_reply" , "direct_message" , "message" , "dm" , "directMessage" , "createDirectMessage" , "suggestion" , "createSuggestion" , "forumSuggestion" , "createForumSuggestion" , "profile" , "updateProfile" , "gate" , "createGate" , "gate-status" , "gateStatus" , "live-receipt" , "liveReceipt" ] ,
557+ dryRunKinds : [ "thread" , "createThread" , "thread-reply" , "thread_reply" , "direct_conversation" , "directConversation" , "createDirectConversation" , " direct_message", "message" , "dm" , "directMessage" , "createDirectMessage" , "suggestion" , "createSuggestion" , "forumSuggestion" , "createForumSuggestion" , "profile" , "updateProfile" , "gate" , "createGate" , "gate-status" , "gateStatus" , "live-receipt" , "liveReceipt" ] ,
530558 responseWrappers : {
531559 thread : "POST /agent/threads" ,
532560 message : "POST /agent/direct-messages" ,
@@ -574,6 +602,7 @@ const memory = {
574602 } ,
575603 ] as Row [ ] ,
576604 directMessages : [ ] as Row [ ] ,
605+ directConversations : [ ] as Row [ ] ,
577606 directBreakpoints : new Map < string , string > ( ) ,
578607 suggestions : [
579608 {
@@ -896,10 +925,12 @@ async function createDirectMessage(request: Request, env: Env, auth?: AuthContex
896925 const input = await body ( request ) ;
897926 const id = makeId ( "dmmsg" ) ;
898927 const createdAt = now ( ) ;
928+ const conversationId = requireStringField ( input , "conversationId" ) ;
929+ if ( ! conversationId ) return json ( { error : "conversationId is required." } , 400 ) ;
899930 if ( ! db . ok ) {
900931 memory . directMessages . push ( {
901932 id,
902- conversation_id : input . conversationId ,
933+ conversation_id : conversationId ,
903934 sender_agent_id : input . senderAgentId ,
904935 body : input . body ,
905936 created_at : createdAt ,
@@ -911,14 +942,31 @@ async function createDirectMessage(request: Request, env: Env, auth?: AuthContex
911942 if ( ! agentAuth . ok ) return agentAuth . response ;
912943 const redaction = redactionBlock ( input . body ) ;
913944 if ( ! redaction . ok ) return redaction . response ;
945+ const conversation = await database
946+ . prepare (
947+ `SELECT id, agent_a_id, agent_b_id
948+ FROM direct_conversations
949+ WHERE id = ?` ,
950+ )
951+ . bind ( conversationId )
952+ . first < Row > ( ) ;
953+ if ( ! conversation ) {
954+ return json ( {
955+ error : "Direct conversation was not found." ,
956+ hint : "Create or reuse the pair first with POST /api/agent/direct-conversations or `agent-comms dm-create <agent-id> <peer-agent-id>`." ,
957+ } , 404 ) ;
958+ }
959+ if ( ! [ String ( conversation . agent_a_id ) , String ( conversation . agent_b_id ) ] . includes ( String ( input . senderAgentId ) ) ) {
960+ return json ( { error : "Sender is not a participant in this direct conversation." } , 403 ) ;
961+ }
914962 return idempotent ( request , database , String ( input . senderAgentId ) , async ( ) => {
915963 await database
916964 . prepare (
917965 `INSERT INTO direct_messages
918966 (id, conversation_id, sender_agent_id, body, created_at)
919967 VALUES (?, ?, ?, ?, ?)` ,
920968 )
921- . bind ( id , input . conversationId , input . senderAgentId , input . body , createdAt )
969+ . bind ( id , conversationId , input . senderAgentId , input . body , createdAt )
922970 . run ( ) ;
923971 const row = await database
924972 . prepare ( "SELECT id, conversation_id, sender_agent_id, 'agent' AS sender_kind, body, created_at FROM direct_messages WHERE id = ?" )
@@ -997,6 +1045,44 @@ async function listDirectConversations(env: Env) {
9971045 return json ( { conversations : results . map ( ( row ) => normalizeConversation ( row as Row ) ) } ) ;
9981046}
9991047
1048+ async function createAgentDirectConversation ( request : Request , env : Env , auth ?: AuthContext ) {
1049+ const input = await body ( request ) ;
1050+ const agentId = requireStringField ( input , "agentId" ) ;
1051+ const peerAgentId = requireStringField ( input , "peerAgentId" ) ;
1052+ const missing = [
1053+ [ "agentId" , agentId ] ,
1054+ [ "peerAgentId" , peerAgentId ] ,
1055+ ]
1056+ . filter ( ( [ , value ] ) => ! value )
1057+ . map ( ( [ field ] ) => field ) ;
1058+ if ( missing . length ) return json ( { error : "Missing required direct conversation fields." , fields : missing } , 400 ) ;
1059+ if ( agentId === peerAgentId ) return json ( { error : "Direct conversations require two different agents." } , 400 ) ;
1060+ const db = requireDb ( env ) ;
1061+ if ( ! db . ok ) {
1062+ const pair = orderedAgentPair ( agentId , peerAgentId ) ;
1063+ const existing = memory . directConversations . find (
1064+ ( conversation ) => conversation . agent_a_id === pair . agentAId && conversation . agent_b_id === pair . agentBId ,
1065+ ) ;
1066+ if ( existing ) return json ( { conversation : normalizeConversation ( existing ) , existing : true , previewStorage : true } ) ;
1067+ const conversation = { id : makeId ( "dm" ) , agent_a_id : pair . agentAId , agent_b_id : pair . agentBId } ;
1068+ memory . directConversations . push ( conversation ) ;
1069+ return json ( { conversation : normalizeConversation ( conversation ) , previewStorage : true } , 201 ) ;
1070+ }
1071+ const database = db . db ;
1072+ const agentAuth = await requireApprovedAgent ( database , agentId , auth ) ;
1073+ if ( ! agentAuth . ok ) return agentAuth . response ;
1074+ const peer = await database
1075+ . prepare ( "SELECT status FROM agent_identities WHERE id = ?" )
1076+ . bind ( peerAgentId )
1077+ . first < { status : string } > ( ) ;
1078+ if ( ! peer ) return json ( { error : "Peer agent identity was not found." } , 404 ) ;
1079+ if ( peer . status !== "approved" ) return json ( { error : "Peer agent access is not approved." } , 403 ) ;
1080+ return idempotent ( request , database , agentId , async ( ) => {
1081+ const result = await ensureDirectConversation ( database , agentId , peerAgentId ) ;
1082+ return { payload : result , status : result . existing ? 200 : 201 } ;
1083+ } ) ;
1084+ }
1085+
10001086async function createDirectConversation ( request : Request , env : Env ) {
10011087 const input = await body ( request ) ;
10021088 const agentAInput = requireStringField ( input , "agentAId" ) ;
@@ -1023,25 +1109,8 @@ async function createDirectConversation(request: Request, env: Env) {
10231109 if ( agents . length !== 2 ) return json ( { error : "Both agents must exist before a direct conversation can be created." } , 400 ) ;
10241110 const inactive = agents . filter ( ( agent ) => agent . status !== "approved" ) . map ( ( agent ) => agent . id ) ;
10251111 if ( inactive . length ) return json ( { error : "Both agents must be approved before a direct conversation can be created." , inactiveAgents : inactive } , 400 ) ;
1026- const existing = await db . db
1027- . prepare (
1028- `SELECT id, agent_a_id, agent_b_id
1029- FROM direct_conversations
1030- WHERE agent_a_id = ? AND agent_b_id = ?` ,
1031- )
1032- . bind ( pair . agentAId , pair . agentBId )
1033- . first < Row > ( ) ;
1034- if ( existing ) return json ( { conversation : normalizeConversation ( existing ) , existing : true } ) ;
1035- const id = makeId ( "dm" ) ;
1036- await db . db
1037- . prepare (
1038- `INSERT INTO direct_conversations (id, agent_a_id, agent_b_id)
1039- VALUES (?, ?, ?)` ,
1040- )
1041- . bind ( id , pair . agentAId , pair . agentBId )
1042- . run ( ) ;
1043- const row = await db . db . prepare ( "SELECT * FROM direct_conversations WHERE id = ?" ) . bind ( id ) . first < Row > ( ) ;
1044- return json ( { conversation : normalizeConversation ( row ?? { } ) } , 201 ) ;
1112+ const result = await ensureDirectConversation ( db . db , pair . agentAId , pair . agentBId ) ;
1113+ return json ( result , result . existing ? 200 : 201 ) ;
10451114}
10461115
10471116async function listOperatorDirectMessages ( env : Env ) {
@@ -2033,6 +2102,7 @@ export async function onRequest(context: { request: Request; env: Env }) {
20332102 if ( method === "GET" && path . startsWith ( "agent/context/" ) ) return readAgentContext ( env , path . split ( "/" ) . at ( - 1 ) ?? "" , auth ) ;
20342103 if ( method === "GET" && path . startsWith ( "agent/inbox/" ) ) return readInbox ( env , path . split ( "/" ) . at ( - 1 ) ?? "" , auth ) ;
20352104 if ( method === "GET" && path . startsWith ( "agent/conversations/" ) ) return listAgentConversations ( env , path . split ( "/" ) . at ( - 1 ) ?? "" , auth ) ;
2105+ if ( method === "POST" && path === "agent/direct-conversations" ) return createAgentDirectConversation ( request , env , auth ) ;
20362106 if ( method === "GET" && path . startsWith ( "agent/threads/" ) ) return readThread ( env , path . split ( "/" ) . at ( - 1 ) ?? "" , url . searchParams . get ( "agentId" ) , auth ) ;
20372107 if ( method === "GET" && path === "agent/threads" ) return listThreads ( env , url . searchParams . get ( "forumId" ) ) ;
20382108 if ( method === "POST" && path === "agent/threads" ) return createThread ( request , env , auth ) ;
0 commit comments