@@ -25,15 +25,30 @@ import {
2525import {
2626 canonicalizeEnvelope ,
2727 canonicalizeOutbound ,
28+ canonicalizeOutboundV2 ,
2829 canonicalizeSendRequest ,
2930} from "./crypto.mjs" ;
3031
3132const SOCKET_DIR = path . join ( homedir ( ) , ".pi" , "session-control" ) ;
3233const AGENT_TIMEOUT_MS = 120_000 ;
33- const API_PORT = parseInt ( process . env . BRIDGE_API_PORT || "7890" , 10 ) ;
34- const POLL_INTERVAL_MS = parseInt ( process . env . SLACK_BROKER_POLL_INTERVAL_MS || "3000" , 10 ) ;
35- const MAX_MESSAGES = parseInt ( process . env . SLACK_BROKER_MAX_MESSAGES || "10" , 10 ) ;
36- const DEDUPE_TTL_MS = parseInt ( process . env . SLACK_BROKER_DEDUPE_TTL_MS || String ( 20 * 60 * 1000 ) , 10 ) ;
34+
35+ function clampInt ( value , min , max , fallback ) {
36+ const parsed = Number . parseInt ( String ( value ?? "" ) , 10 ) ;
37+ if ( ! Number . isFinite ( parsed ) ) return fallback ;
38+ return Math . min ( max , Math . max ( min , parsed ) ) ;
39+ }
40+
41+ const API_PORT = clampInt ( process . env . BRIDGE_API_PORT || "7890" , 1 , 65535 , 7890 ) ;
42+ const POLL_INTERVAL_MS = clampInt ( process . env . SLACK_BROKER_POLL_INTERVAL_MS || "3000" , 0 , 60_000 , 3000 ) ;
43+ const MAX_MESSAGES = clampInt ( process . env . SLACK_BROKER_MAX_MESSAGES || "10" , 1 , 100 , 10 ) ;
44+ const MAX_WAIT_SECONDS = 25 ;
45+ const BROKER_WAIT_SECONDS = clampInt ( process . env . SLACK_BROKER_WAIT_SECONDS || "20" , 0 , MAX_WAIT_SECONDS , 20 ) ;
46+ const DEDUPE_TTL_MS = clampInt (
47+ process . env . SLACK_BROKER_DEDUPE_TTL_MS || String ( 20 * 60 * 1000 ) ,
48+ 1_000 ,
49+ 7 * 24 * 60 * 60 * 1000 ,
50+ 20 * 60 * 1000 ,
51+ ) ;
3752const MAX_BACKOFF_MS = 30_000 ;
3853const BROKER_HEALTH_PATH = path . join ( homedir ( ) , ".pi" , "agent" , "broker-health.json" ) ;
3954
@@ -357,6 +372,19 @@ function signRequest(action, timestamp, payloadField) {
357372 return toBase64 ( sig ) ;
358373}
359374
375+ function signPullRequest ( timestamp , maxMessages , waitSeconds ) {
376+ if ( waitSeconds <= 0 ) {
377+ return signRequest ( "inbox.pull" , timestamp , String ( maxMessages ) ) ;
378+ }
379+
380+ const canonical = canonicalizeOutboundV2 ( workspaceId , "inbox.pull.v2" , timestamp , {
381+ max_messages : maxMessages ,
382+ wait_seconds : waitSeconds ,
383+ } ) ;
384+ const sig = sodium . crypto_sign_detached ( canonical , cryptoState . serverSignSecretKey ) ;
385+ return toBase64 ( sig ) ;
386+ }
387+
360388async function brokerFetch ( pathname , body ) {
361389 const url = `${ brokerBaseUrl } ${ pathname } ` ;
362390 const response = await fetch ( url , {
@@ -389,14 +417,17 @@ async function brokerFetch(pathname, body) {
389417
390418async function pullInbox ( ) {
391419 const timestamp = Math . floor ( Date . now ( ) / 1000 ) ;
392- const signature = signRequest ( "inbox.pull" , timestamp , String ( MAX_MESSAGES ) ) ;
420+ const signature = signPullRequest ( timestamp , MAX_MESSAGES , BROKER_WAIT_SECONDS ) ;
393421
394- const payload = await brokerFetch ( "/api/inbox/pull" , {
422+ const body = {
395423 workspace_id : workspaceId ,
396424 max_messages : MAX_MESSAGES ,
425+ ...( BROKER_WAIT_SECONDS > 0 ? { wait_seconds : BROKER_WAIT_SECONDS } : { } ) ,
397426 timestamp,
398427 signature,
399- } ) ;
428+ } ;
429+
430+ const payload = await brokerFetch ( "/api/inbox/pull" , body ) ;
400431
401432 return Array . isArray ( payload . messages ) ? payload . messages : [ ] ;
402433}
@@ -918,7 +949,9 @@ async function startPollLoop() {
918949 }
919950
920951 backoffMs = POLL_INTERVAL_MS ;
921- await sleep ( POLL_INTERVAL_MS ) ;
952+ if ( BROKER_WAIT_SECONDS <= 0 ) {
953+ await sleep ( POLL_INTERVAL_MS ) ;
954+ }
922955 } catch ( err ) {
923956 if ( ! pollSucceeded ) {
924957 markHealth ( "poll" , false , err ) ;
@@ -960,7 +993,10 @@ async function startPollLoop() {
960993 logInfo ( ` outbound mode: ${ outboundMode } ${ outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)" } ` ) ;
961994 logInfo ( ` broker: ${ brokerBaseUrl } ` ) ;
962995 logInfo ( ` workspace: ${ workspaceId } ` ) ;
963- logInfo ( ` poll interval: ${ POLL_INTERVAL_MS } ms, max messages: ${ MAX_MESSAGES } ` ) ;
996+ logInfo (
997+ ` poll mode: ${ BROKER_WAIT_SECONDS > 0 ? `long-poll (${ BROKER_WAIT_SECONDS } s)` : "short-poll" } , ` +
998+ `interval: ${ POLL_INTERVAL_MS } ms, max messages: ${ MAX_MESSAGES } ` ,
999+ ) ;
9641000 logInfo ( ` allowed users: ${ ALLOWED_USERS . length || "all" } ` ) ;
9651001 logInfo ( ` pi socket: ${ socketPath || "(not found — will retry on message)" } ` ) ;
9661002 await startPollLoop ( ) ;
0 commit comments