Skip to content

Commit b541f9a

Browse files
baudbot-agentBaudbot
andauthored
feat(bridge): generic event envelope dispatch (#138)
Co-authored-by: Baudbot <hornet@agentmail.to>
1 parent 699fc7c commit b541f9a

2 files changed

Lines changed: 536 additions & 21 deletions

File tree

slack-bridge/broker-bridge.mjs

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -655,21 +655,19 @@ function isPoisonMessageError(err) {
655655
return message.includes("invalid broker envelope signature") || message.includes("failed to decrypt broker envelope");
656656
}
657657

658-
async function processPulledMessage(message) {
659-
if (!verifyBrokerEnvelope(message)) {
660-
throw new Error("invalid broker envelope signature");
661-
}
662-
663-
let slackEventEnvelopePayload;
664-
try {
665-
slackEventEnvelopePayload = decryptEnvelope(message);
666-
markHealth("inbound_decrypt", true);
667-
} catch (err) {
668-
markHealth("inbound_decrypt", false, err);
669-
throw err;
670-
}
658+
function isGenericEnvelope(payload) {
659+
return (
660+
payload != null &&
661+
typeof payload === "object" &&
662+
typeof payload.source === "string" &&
663+
typeof payload.type === "string" &&
664+
"payload" in payload &&
665+
typeof payload.broker_timestamp === "number"
666+
);
667+
}
671668

672-
logInfo(`📦 decrypted envelope — type: ${slackEventEnvelopePayload?.type || "unknown"}`);
669+
async function handleSlackPayload(slackEventEnvelopePayload) {
670+
logInfo(`📦 slack payload — type: ${slackEventEnvelopePayload?.type || "unknown"}`);
673671

674672
if (slackEventEnvelopePayload?.type !== "event_callback") {
675673
logInfo(` ↳ ignoring non-event_callback type: ${slackEventEnvelopePayload?.type}`);
@@ -712,6 +710,53 @@ async function processPulledMessage(message) {
712710
return true;
713711
}
714712

713+
async function handleDashboardEvent(type, payload) {
714+
logInfo(`📊 dashboard event: ${type}`, JSON.stringify(payload).slice(0, 200));
715+
// TODO: implement dashboard event handling (env updates, config changes)
716+
return true;
717+
}
718+
719+
async function handleSystemEvent(type, payload) {
720+
logInfo(`⚙️ system event: ${type}`, JSON.stringify(payload).slice(0, 200));
721+
// TODO: implement system event handling
722+
return true;
723+
}
724+
725+
async function processPulledMessage(message) {
726+
if (!verifyBrokerEnvelope(message)) {
727+
throw new Error("invalid broker envelope signature");
728+
}
729+
730+
let payload;
731+
try {
732+
payload = decryptEnvelope(message);
733+
markHealth("inbound_decrypt", true);
734+
} catch (err) {
735+
markHealth("inbound_decrypt", false, err);
736+
throw err;
737+
}
738+
739+
// Generic envelope dispatch
740+
if (isGenericEnvelope(payload)) {
741+
logInfo(`📦 generic envelope — source: ${payload.source}, type: ${payload.type}`);
742+
switch (payload.source) {
743+
case "slack":
744+
return handleSlackPayload(payload.payload);
745+
case "dashboard":
746+
return handleDashboardEvent(payload.type, payload.payload);
747+
case "system":
748+
return handleSystemEvent(payload.type, payload.payload);
749+
default:
750+
logWarn(`⚠️ unknown event source: ${payload.source} — acking to avoid blocking queue`);
751+
return true;
752+
}
753+
}
754+
755+
// Legacy: raw Slack event_callback (backwards compat during rollout)
756+
logInfo(`📦 legacy envelope — type: ${payload?.type || "unknown"}`);
757+
return handleSlackPayload(payload);
758+
}
759+
715760
function getLogLinesForResponse(url) {
716761
const nParam = url.searchParams.get("n");
717762
const filterParam = url.searchParams.get("filter");

0 commit comments

Comments
 (0)