diff --git a/.gavel.yaml b/.gavel.yaml new file mode 100644 index 000000000..e9b9db8c7 --- /dev/null +++ b/.gavel.yaml @@ -0,0 +1,3 @@ +pre: + - name: build + run: make build diff --git a/.github/workflows/gavel.yml b/.github/workflows/gavel.yml new file mode 100644 index 000000000..8b62dc36e --- /dev/null +++ b/.github/workflows/gavel.yml @@ -0,0 +1,32 @@ +name: Gavel +on: + pull_request: +permissions: + contents: read + checks: write + issues: write + pull-requests: write +jobs: + gavel: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 + + - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6 + with: + go-version: 1.26.x + cache: false + + - uses: actions/cache@1bd1e32a3bdc45362d1e726936510720a7c30a57 # v4 + with: + path: | + ~/go/pkg/mod + ~/.cache/go-build + .bin + key: cache-go-1.26.x-${{ hashFiles('**/go.sum') }}-${{ hashFiles('.bin/*') }} + restore-keys: | + cache-go-1.26.x- + + - uses: flanksource/gavel@main + with: + fail-on-error: "true" diff --git a/Makefile b/Makefile index e5f809fa6..ab20e349c 100644 --- a/Makefile +++ b/Makefile @@ -145,7 +145,7 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and $(CONTROLLER_GEN) object paths="./api/..." paths="./logs/..." .PHONY: build -build: static +build: install-deps static go build -o ./.bin/$(NAME) -ldflags "-X \"main.version=$(VERSION_TAG) built at $(DATE)\"" main.go .PHONY: dev @@ -175,9 +175,11 @@ test-e2e: bin mkdir -p .bin +DEPS = $(shell which deps 2>/dev/null || echo $(LOCALBIN)/deps) + .PHONY: install-deps -install-deps: $(LOCALBIN) ## Install the deps CLI if not present - which deps 2>/dev/null || test -x $(LOCALBIN)/deps || curl -sSL https://github.com/flanksource/deps/releases/latest/download/deps-$(OS)-$(ARCH).tar.gz | tar -xz -C $(LOCALBIN) +install-deps: $(LOCALBIN) + @test -x $(DEPS) || curl -sSL https://github.com/flanksource/deps/releases/latest/download/deps-$(OS)-$(ARCH).tar.gz | tar -xz -C $(LOCALBIN) .PHONY: deps deps: install-deps ginkgo controller-gen golangci-lint kustomize $(TAILWIND_JS) ## Install all tool dependencies @@ -187,16 +189,16 @@ ginkgo: go install github.com/onsi/ginkgo/v2/ginkgo .PHONY: controller-gen -controller-gen: install-deps $(LOCALBIN) - $(LOCALBIN)/deps install controller-gen@$(CONTROLLER_TOOLS_VERSION) --bin-dir $(LOCALBIN) +controller-gen: install-deps + $(DEPS) install controller-gen@$(CONTROLLER_TOOLS_VERSION) --bin-dir $(LOCALBIN) .PHONY: golangci-lint -golangci-lint: install-deps $(LOCALBIN) - $(LOCALBIN)/deps install golangci/golangci-lint@v$(GOLANGCI_LINT_VERSION) --bin-dir $(LOCALBIN) +golangci-lint: install-deps + $(DEPS) install golangci/golangci-lint@v$(GOLANGCI_LINT_VERSION) --bin-dir $(LOCALBIN) .PHONY: kustomize -kustomize: install-deps $(LOCALBIN) - $(LOCALBIN)/deps install kubernetes-sigs/kustomize@$(KUSTOMIZE_VERSION) --bin-dir $(LOCALBIN) +kustomize: install-deps + $(DEPS) install kubernetes-sigs/kustomize@$(KUSTOMIZE_VERSION) --bin-dir $(LOCALBIN) .PHONY: docs\:mcp docs\:mcp: ## Generate MCP tools reference documentation diff --git a/api/v1/connection_types.go b/api/v1/connection_types.go index 1920bb315..d83d34913 100644 --- a/api/v1/connection_types.go +++ b/api/v1/connection_types.go @@ -63,6 +63,25 @@ type ConnectionPushover struct { User string `json:"user"` } +type ConnectionTeams struct { + // Incoming webhook URL from Microsoft Teams + WebhookURL types.EnvVar `json:"webhookURL"` +} + +type ConnectionMattermost struct { + // Incoming webhook URL + WebhookURL types.EnvVar `json:"webhookURL"` + + // Channel override (optional) + Channel string `json:"channel,omitempty"` + + // Username override (optional) + Username string `json:"username,omitempty"` + + // Icon URL override (optional) + IconURL string `json:"iconURL,omitempty"` +} + type ConnectionPostgres struct { // URL is the connection url. URL types.EnvVar `json:"url,omitempty"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 7dd081748..9c71dc5b9 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -998,6 +998,22 @@ func (in *ConnectionMSSQL) DeepCopy() *ConnectionMSSQL { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionMattermost) DeepCopyInto(out *ConnectionMattermost) { + *out = *in + in.WebhookURL.DeepCopyInto(&out.WebhookURL) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionMattermost. +func (in *ConnectionMattermost) DeepCopy() *ConnectionMattermost { + if in == nil { + return nil + } + out := new(ConnectionMattermost) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConnectionMongo) DeepCopyInto(out *ConnectionMongo) { *out = *in @@ -1518,6 +1534,22 @@ func (in *ConnectionStatus) DeepCopy() *ConnectionStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionTeams) DeepCopyInto(out *ConnectionTeams) { + *out = *in + in.WebhookURL.DeepCopyInto(&out.WebhookURL) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionTeams. +func (in *ConnectionTeams) DeepCopy() *ConnectionTeams { + if in == nil { + return nil + } + out := new(ConnectionTeams) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConnectionTelegram) DeepCopyInto(out *ConnectionTelegram) { *out = *in diff --git a/events/event_queue.go b/events/event_queue.go index 30135cf67..a961add0d 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -59,7 +59,57 @@ func ConsumeAll(ctx context.Context) { for _, consumer := range consumers { consumer.ConsumeUntilEmpty(ctx) } +} + +// InitConsumers registers event handlers and creates consumers without starting +// background listeners. Use ConsumeAll to process events explicitly. +// This is useful in tests to avoid races between background consumers and test data setup. +func InitConsumers(ctx context.Context) { + log := ctx.Logger.Named("events") + for _, fn := range registers { + fn(ctx) + } + + SyncHandlers.Each(func(event string, handlers []postq.SyncEventHandlerFunc) { + consumer := postq.SyncEventConsumer{ + WatchEvents: []string{event}, + Consumers: handlers, + ConsumerOption: &postq.ConsumerOption{ + ErrorHandler: defaultLoggerErrorHandler, + }, + } + if ec, err := consumer.EventConsumer(); err != nil { + log.Fatalf("failed to create event consumer: %s", err) + } else { + consumers = append(consumers, ec) + } + }) + AsyncHandlers.Each(func(event string, handlers []asyncHandlerData) { + for _, handler := range handlers { + h := handler.fn + batchSize := ctx.Properties().Int(event+".batchSize", handler.batchSize) + consumer := postq.AsyncEventConsumer{ + WatchEvents: []string{event}, + BatchSize: batchSize, + Consumer: func(_ctx context.Context, e models.Events) models.Events { + return h(ctx, e) + }, + ConsumerOption: &postq.ConsumerOption{ + NumConsumers: handler.numConsumers, + ErrorHandler: func(ctx context.Context, err error) bool { + log.Errorf("error consuming event(%s): %v", event, err) + return false + }, + }, + } + if ec, err := consumer.EventConsumer(); err != nil { + log.Fatalf("failed to create event consumer: %s", err) + } else { + consumers = append(consumers, ec) + } + } + }) } func StartConsumers(ctx context.Context) { diff --git a/go.mod b/go.mod index ac18af039..7cd9ca747 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/MicahParks/keyfunc v1.9.0 github.com/casbin/casbin/v2 v2.135.0 github.com/casbin/gorm-adapter/v3 v3.41.0 - github.com/containrrr/shoutrrr v0.8.0 github.com/fergusstrange/embedded-postgres v1.34.0 // indirect github.com/flanksource/commons v1.50.2 github.com/flanksource/duty v1.0.1271 @@ -37,7 +36,6 @@ require ( github.com/MicahParks/keyfunc/v2 v2.1.0 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b github.com/WinterYukky/gorm-extra-clause-plugin v0.4.0 - github.com/adityathebe/go-strip-markdown/v2 v2.0.1 github.com/aws/aws-sdk-go-v2 v1.41.5 github.com/aws/aws-sdk-go-v2/config v1.32.14 github.com/aws/aws-sdk-go-v2/credentials v1.19.14 diff --git a/go.sum b/go.sum index 53b286013..bfafc2bad 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,6 @@ github.com/TomOnTime/utfutil v1.0.0 h1:/0Ivgo2OjXJxo8i7zgvs7ewSFZMLwCRGm3P5Umowb github.com/TomOnTime/utfutil v1.0.0/go.mod h1:l9lZmOniizVSuIliSkEf87qivMRlSNzbdBFKjuLRg1c= github.com/WinterYukky/gorm-extra-clause-plugin v0.4.0 h1:e4gYsN9tNzoBMYKYBaGwwZpSljJhW231+1cBlYwv8YQ= github.com/WinterYukky/gorm-extra-clause-plugin v0.4.0/go.mod h1:jNWq8AymgsVev9Kq6mke0b3o3yzY6bTSwjMDfTvZPPM= -github.com/adityathebe/go-strip-markdown/v2 v2.0.1 h1:/Dxr9Rnn6h8VIwh2rqpYTUyoN4Hx4SXeEOjrz+JUO6I= -github.com/adityathebe/go-strip-markdown/v2 v2.0.1/go.mod h1:Ze3XxKLEV5u8VWBaiAALVKOIA7uLZghVIUvQrICHFV0= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= @@ -340,8 +338,6 @@ github.com/cncf/xds/go v0.0.0-20260121142036-a486691bba94 h1:kkHPnzHm5Ln7WA0XYjr github.com/cncf/xds/go v0.0.0-20260121142036-a486691bba94/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJUSec= -github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= @@ -786,8 +782,6 @@ github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= -github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= diff --git a/notification/notification_email_e2e_test.go b/notification/notification_email_e2e_test.go index c9583d23c..601f4bd6f 100644 --- a/notification/notification_email_e2e_test.go +++ b/notification/notification_email_e2e_test.go @@ -3,9 +3,12 @@ package notification_test import ( "encoding/json" "fmt" + "io" "net/url" + "strings" "time" + gomessage "github.com/emersion/go-message" "github.com/flanksource/commons/collections" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" @@ -26,7 +29,45 @@ func lastSMTPMessage() string { if len(messages) == 0 { return "" } - return string(messages[len(messages)-1].Data) + return decodeMIME(string(messages[len(messages)-1].Data)) +} + +func decodeMIME(raw string) string { + entity, err := gomessage.Read(strings.NewReader(raw)) + if err != nil { + return raw + } + + // Start with raw headers (contains Subject, From, To, etc.) + headerEnd := strings.Index(raw, "\r\n\r\n") + if headerEnd < 0 { + headerEnd = strings.Index(raw, "\n\n") + } + + var decoded strings.Builder + if headerEnd > 0 { + decoded.WriteString(raw[:headerEnd]) + decoded.WriteString("\n\n") + } + + if mr := entity.MultipartReader(); mr != nil { + for { + part, err := mr.NextPart() + if err != nil { + break + } + body, _ := io.ReadAll(part.Body) + decoded.Write(body) + } + return decoded.String() + } + + body, err := io.ReadAll(entity.Body) + if err != nil { + return raw + } + decoded.Write(body) + return decoded.String() } var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() { @@ -86,9 +127,8 @@ var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() } Expect(DefaultContext.DB().Create(&event).Error).To(BeNil()) - events.ConsumeAll(DefaultContext) - Eventually(func() int { + events.ConsumeAll(DefaultContext) return len(getSMTPMessages()) }, "10s", "200ms").Should(Equal(1)) @@ -100,13 +140,13 @@ var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() ginkgo.Describe("check passed team email", ginkgo.Ordered, func() { var ( - n models.Notification - team dbModels.Team - creator models.Person - agent models.Agent - canary models.Canary - check models.Check - checkRun models.CheckStatus + n models.Notification + team dbModels.Team + creator models.Person + agent models.Agent + canary models.Canary + check models.Check + lastRuntime string ) @@ -190,27 +230,25 @@ var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() }) ginkgo.It("sends custom team email", func() { - lastRuntime = time.Now().UTC().Format(time.DateTime) - checkRun = models.CheckStatus{ + lastRuntime = time.Now().UTC().Format(time.RFC3339) + + Expect(DefaultContext.DB().Create(&models.CheckStatus{ CheckID: check.ID, Status: true, Time: lastRuntime, Message: "All good", - } - Expect(DefaultContext.DB().Create(&checkRun).Error).To(BeNil()) + }).Error).To(BeNil()) - event := models.Event{ + Expect(DefaultContext.DB().Create(&models.Event{ Name: api.EventCheckPassed, EventID: check.ID, Properties: types.JSONStringMap{ "last_runtime": lastRuntime, }, - } - Expect(DefaultContext.DB().Create(&event).Error).To(BeNil()) - - events.ConsumeAll(DefaultContext) + }).Error).To(BeNil()) Eventually(func() int { + events.ConsumeAll(DefaultContext) return len(getSMTPMessages()) }, "10s", "200ms").Should(Equal(1)) @@ -222,12 +260,12 @@ var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() ginkgo.Describe("check failed person email", ginkgo.Ordered, func() { var ( - n models.Notification - person models.Person - agent models.Agent - canary models.Canary - check models.Check - checkRun models.CheckStatus + n models.Notification + person models.Person + agent models.Agent + canary models.Canary + check models.Check + lastRuntime string ) @@ -286,27 +324,25 @@ var _ = ginkgo.Describe("Notification email end-to-end", ginkgo.Ordered, func() }) ginkgo.It("sends default person email", func() { - lastRuntime = time.Now().UTC().Format(time.DateTime) - checkRun = models.CheckStatus{ + lastRuntime = time.Now().UTC().Format(time.RFC3339) + + Expect(DefaultContext.DB().Create(&models.CheckStatus{ CheckID: check.ID, Status: false, Time: lastRuntime, Error: "Timeout", - } - Expect(DefaultContext.DB().Create(&checkRun).Error).To(BeNil()) + }).Error).To(BeNil()) - event := models.Event{ + Expect(DefaultContext.DB().Create(&models.Event{ Name: api.EventCheckFailed, EventID: check.ID, Properties: types.JSONStringMap{ "last_runtime": lastRuntime, }, - } - Expect(DefaultContext.DB().Create(&event).Error).To(BeNil()) - - events.ConsumeAll(DefaultContext) + }).Error).To(BeNil()) Eventually(func() int { + events.ConsumeAll(DefaultContext) return len(getSMTPMessages()) }, "10s", "200ms").Should(Equal(1)) diff --git a/notification/send.go b/notification/send.go index ddfdf98cb..64466eeba 100644 --- a/notification/send.go +++ b/notification/send.go @@ -23,6 +23,7 @@ import ( "github.com/flanksource/incident-commander/events" "github.com/flanksource/incident-commander/logs" "github.com/flanksource/incident-commander/mail" + "github.com/flanksource/incident-commander/notification/senders" "github.com/flanksource/incident-commander/teams" ) @@ -373,45 +374,90 @@ func SendRawNotification(ctx *Context, connectionName, shoutrrrURL string, celEn } ctx.WithRecipient(RecipientTypeConnection, &connection.ID) - shoutrrrURL = connection.URL - // connection.Properties provides the base settings (host, credentials, default recipients, etc.). - // data.Properties (which carry spec.to.properties from the CRD) are merged on top so that - // anything the user explicitly specified overrides the connection's defaults. - data.Properties = collections.MergeMap(connection.Properties, data.Properties) } - if connection != nil && connection.Type == models.ConnectionTypeSlack { - celEnv["channel"] = "slack" - templater := ctx.NewStructTemplater(celEnv, "", TemplateFuncs) - if err := templater.Walk(&data); err != nil { - return "", fmt.Errorf("error templating notification: %w", err) - } + // Template the notification data (title, message) using the CRD/playbook environment. + // Connection properties are NOT merged — each sender reads what it needs from the connection. + celEnv["channel"] = resolveServiceName(connection, shoutrrrURL) + templater := ctx.NewStructTemplater(celEnv, "template", TemplateFuncs) + if err := templater.Walk(&data); err != nil { + return "", fmt.Errorf("error templating notification: %w", err) + } + ctx.WithMessage(data.Message) - ctx.WithMessage(data.Message) - resourceID := "" - if ctx.log != nil && ctx.log.ResourceID != uuid.Nil { - resourceID = ctx.log.ResourceID.String() - } - traceLog("NotificationID=%s Resource=[%s] Sent via slack ...", ctx.notificationID, resourceID) + // Slack: existing native path + if connection != nil && connection.Type == models.ConnectionTypeSlack { if err := SlackSend(ctx, connection.Password, connection.Username, data); err != nil { return "", err } - + traceLogSend(ctx, "slack") return "slack", nil } + // Native senders for known connection types + if connection != nil { + if sender, err := senders.ForConnection(connection); err == nil { + senderData := senders.Data{ + Title: data.Title, + Message: data.Message, + Properties: data.Properties, + Attachments: data.Attachments, + } + if err := sender.Send(ctx.Context, connection, senderData); err != nil { + return "", fmt.Errorf("failed to send via %s: %w", connection.Type, err) + } + traceLogSend(ctx, connection.Type) + return connection.Type, nil + } + } + + // Fallback: SMTP and unknown connection types + data.Properties = collections.MergeMap(connectionProperties(connection), data.Properties) service, err := shoutrrrSendRaw(ctx, celEnv, shoutrrrURL, data) if err != nil { - return "", fmt.Errorf("failed to send message with Shoutrrr: %w", err) + return "", fmt.Errorf("failed to send notification: %w", err) } + traceLogSend(ctx, service) + return service, nil +} + +func traceLogSend(ctx *Context, service string) { resourceID := "" if ctx.log != nil && ctx.log.ResourceID != uuid.Nil { resourceID = ctx.log.ResourceID.String() } - traceLog("NotificationID=%s Resource=[%s] Sent via Shoutrrr ...", ctx.notificationID, resourceID) + traceLog("NotificationID=%s Resource=[%s] Sent via %s", ctx.notificationID, resourceID, service) +} - return service, nil +func resolveServiceName(conn *models.Connection, shoutrrrURL string) string { + if conn != nil { + switch conn.Type { + case models.ConnectionTypeSlack: + return "slack" + case models.ConnectionTypeEmail: + return "smtp" + default: + return conn.Type + } + } + if strings.HasPrefix(shoutrrrURL, "smtp://") { + return "smtp" + } + if strings.HasPrefix(shoutrrrURL, "slack://") { + return "slack" + } + if idx := strings.Index(shoutrrrURL, "://"); idx > 0 { + return shoutrrrURL[:idx] + } + return "" +} + +func connectionProperties(conn *models.Connection) map[string]string { + if conn == nil { + return nil + } + return conn.Properties } func SendNotification(ctx *Context, connectionName, shoutrrrURL string, payload NotificationMessagePayload, properties map[string]string, celEnv *celVariables) (string, error) { @@ -424,19 +470,15 @@ func SendNotification(ctx *Context, connectionName, shoutrrrURL string, payload } ctx.WithRecipient(RecipientTypeConnection, &connection.ID) - shoutrrrURL = connection.URL - // connection.Properties provides the base settings (host, credentials, default recipients, etc.). - // properties (which carry spec.to.properties from the CRD) are merged on top so that - // anything the user explicitly specified overrides the connection's defaults. - properties = collections.MergeMap(connection.Properties, properties) } - // Template render properties if celEnv is available + // Template render user-specified properties (not connection properties) if celEnv != nil { properties = renderTemplateProperties(ctx, properties, celEnv) } + // Slack: existing native path if connection != nil && connection.Type == models.ConnectionTypeSlack { slackMsg, err := FormatNotificationMessage(payload, "slack") if err != nil { @@ -450,29 +492,44 @@ func SendNotification(ctx *Context, connectionName, shoutrrrURL string, payload } ctx.WithMessage(data.Message) - - resourceID := "" - if ctx.log != nil && ctx.log.ResourceID != uuid.Nil { - resourceID = ctx.log.ResourceID.String() - } - traceLog("NotificationID=%s Resource=[%s] Sent via slack ...", ctx.notificationID, resourceID) if err := SlackSend(ctx, connection.Password, connection.Username, data); err != nil { return "", err } - + traceLogSend(ctx, "slack") return "slack", nil } + // Native senders for known connection types + if connection != nil { + if sender, senderErr := senders.ForConnection(connection); senderErr == nil { + format := "markdown" + if connection.Type == models.ConnectionTypeEmail { + format = "email" + } + message, err := FormatNotificationMessage(payload, format) + if err != nil { + return "", fmt.Errorf("failed to format message: %w", err) + } + senderData := senders.Data{ + Title: payload.Title, + Message: message, + Properties: properties, + } + if err := sender.Send(ctx.Context, connection, senderData); err != nil { + return "", fmt.Errorf("failed to send via %s: %w", connection.Type, err) + } + traceLogSend(ctx, connection.Type) + return connection.Type, nil + } + } + + // Fallback: SMTP and unknown types + properties = collections.MergeMap(connectionProperties(connection), properties) service, err := shoutrrrSend(ctx, shoutrrrURL, payload, properties) if err != nil { - return "", fmt.Errorf("failed to send message with Shoutrrr: %w", err) + return "", fmt.Errorf("failed to send notification: %w", err) } - resourceID := "" - if ctx.log != nil && ctx.log.ResourceID != uuid.Nil { - resourceID = ctx.log.ResourceID.String() - } - traceLog("NotificationID=%s Resource=[%s] Sent via Shoutrrr ...", ctx.notificationID, resourceID) - + traceLogSend(ctx, service) return service, nil } diff --git a/notification/senders/discord.go b/notification/senders/discord.go new file mode 100644 index 000000000..bd01ddc32 --- /dev/null +++ b/notification/senders/discord.go @@ -0,0 +1,65 @@ +package senders + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/flanksource/duty/models" +) + +type Discord struct{} + +func (d *Discord) Send(ctx context.Context, conn *models.Connection, data Data) error { + webhookURL := conn.URL + if webhookURL == "" { + if conn.Username == "" || conn.Password == "" { + return fmt.Errorf("discord connection requires a webhook URL or webhookID (username) and token (password)") + } + webhookURL = fmt.Sprintf("https://discord.com/api/webhooks/%s/%s", conn.Username, conn.Password) + } + + payload := discordWebhookPayload{ + Embeds: []discordEmbed{{ + Title: data.Title, + Description: data.Message, + }}, + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("discord API returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +type discordWebhookPayload struct { + Content string `json:"content,omitempty"` + Embeds []discordEmbed `json:"embeds,omitempty"` +} + +type discordEmbed struct { + Title string `json:"title,omitempty"` + Description string `json:"description,omitempty"` + Color int `json:"color,omitempty"` +} diff --git a/notification/senders/mattermost.go b/notification/senders/mattermost.go new file mode 100644 index 000000000..3c39b92ac --- /dev/null +++ b/notification/senders/mattermost.go @@ -0,0 +1,72 @@ +package senders + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "context" + "github.com/flanksource/duty/models" +) + +type Mattermost struct{} + +func (m *Mattermost) Send(ctx context.Context, conn *models.Connection, data Data) error { + webhookURL := conn.URL + if webhookURL == "" { + if props := conn.Properties; props != nil { + webhookURL = props["webhookURL"] + } + } + if webhookURL == "" { + return fmt.Errorf("mattermost connection requires a webhook URL") + } + + payload := mattermostPayload{ + Text: data.Message, + Username: conn.Username, + } + if data.Title != "" { + payload.Text = fmt.Sprintf("### %s\n\n%s", data.Title, data.Message) + } + if props := conn.Properties; props != nil { + if ch := props["channel"]; ch != "" { + payload.Channel = ch + } + if icon := props["iconURL"]; icon != "" { + payload.IconURL = icon + } + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("mattermost webhook returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +type mattermostPayload struct { + Channel string `json:"channel,omitempty"` + Username string `json:"username,omitempty"` + IconURL string `json:"icon_url,omitempty"` + Text string `json:"text"` +} diff --git a/notification/senders/ntfy.go b/notification/senders/ntfy.go new file mode 100644 index 000000000..2ffe3e9ea --- /dev/null +++ b/notification/senders/ntfy.go @@ -0,0 +1,56 @@ +package senders + +import ( + "fmt" + "io" + "net/http" + "strings" + + "context" + "github.com/flanksource/duty/models" +) + +type Ntfy struct{} + +func (n *Ntfy) Send(ctx context.Context, conn *models.Connection, data Data) error { + host := conn.URL + if host == "" { + host = "https://ntfy.sh" + } + host = strings.TrimRight(host, "/") + + topic := conn.Username + if topic == "" { + if props := conn.Properties; props != nil { + topic = props["topic"] + } + } + if topic == "" { + return fmt.Errorf("ntfy connection requires a topic") + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, host+"/"+topic, strings.NewReader(data.Message)) + if err != nil { + return err + } + if data.Title != "" { + req.Header.Set("Title", data.Title) + } + if conn.Username != "" && conn.Password != "" { + req.SetBasicAuth(conn.Username, conn.Password) + } else if conn.Password != "" { + req.Header.Set("Authorization", "Bearer "+conn.Password) + } + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("ntfy returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} diff --git a/notification/senders/pushbullet.go b/notification/senders/pushbullet.go new file mode 100644 index 000000000..cc6ba2f37 --- /dev/null +++ b/notification/senders/pushbullet.go @@ -0,0 +1,51 @@ +package senders + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "context" + "github.com/flanksource/duty/models" +) + +type Pushbullet struct{} + +func (p *Pushbullet) Send(ctx context.Context, conn *models.Connection, data Data) error { + token := conn.Password + if token == "" { + return fmt.Errorf("pushbullet connection requires a token (password)") + } + + payload := map[string]string{ + "type": "note", + "title": data.Title, + "body": data.Message, + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.pushbullet.com/v2/pushes", bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Access-Token", token) + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("pushbullet API returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} diff --git a/notification/senders/pushover.go b/notification/senders/pushover.go new file mode 100644 index 000000000..edf3923ba --- /dev/null +++ b/notification/senders/pushover.go @@ -0,0 +1,48 @@ +package senders + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/flanksource/duty/models" +) + +type Pushover struct{} + +func (p *Pushover) Send(ctx context.Context, conn *models.Connection, data Data) error { + token := conn.Password + user := conn.Username + if token == "" || user == "" { + return fmt.Errorf("pushover connection requires token (password) and user (username)") + } + + form := url.Values{ + "token": {token}, + "user": {user}, + "title": {data.Title}, + "message": {data.Message}, + "html": {"1"}, + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.pushover.net/1/messages.json", strings.NewReader(form.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("pushover API returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} diff --git a/notification/senders/sender.go b/notification/senders/sender.go new file mode 100644 index 000000000..ed5e9b802 --- /dev/null +++ b/notification/senders/sender.go @@ -0,0 +1,46 @@ +package senders + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/flanksource/duty/models" + + "github.com/flanksource/incident-commander/mail" +) + +type Sender interface { + Send(ctx context.Context, conn *models.Connection, data Data) error +} + +type Data struct { + Title string + Message string + Properties map[string]string + Attachments []mail.Attachment +} + +var httpClient = &http.Client{Timeout: 30 * time.Second} + +func ForConnection(conn *models.Connection) (Sender, error) { + switch conn.Type { + case models.ConnectionTypeTelegram: + return &Telegram{}, nil + case models.ConnectionTypeDiscord: + return &Discord{}, nil + case models.ConnectionTypeTeams: + return &Teams{}, nil + case models.ConnectionTypeMattermost: + return &Mattermost{}, nil + case models.ConnectionTypeNtfy: + return &Ntfy{}, nil + case models.ConnectionTypePushbullet: + return &Pushbullet{}, nil + case models.ConnectionTypePushover: + return &Pushover{}, nil + default: + return nil, fmt.Errorf("unsupported notification service: %s", conn.Type) + } +} diff --git a/notification/senders/senders_test.go b/notification/senders/senders_test.go new file mode 100644 index 000000000..dc1c57297 --- /dev/null +++ b/notification/senders/senders_test.go @@ -0,0 +1,241 @@ +package senders + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + + "github.com/flanksource/duty/models" + ginkgo "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = ginkgo.Describe("ForConnection", func() { + for _, tt := range []struct { + connType string + }{ + {models.ConnectionTypeTelegram}, + {models.ConnectionTypeDiscord}, + {models.ConnectionTypeTeams}, + {models.ConnectionTypeMattermost}, + {models.ConnectionTypeNtfy}, + {models.ConnectionTypePushbullet}, + {models.ConnectionTypePushover}, + } { + ginkgo.It("returns sender for "+tt.connType, func() { + sender, err := ForConnection(&models.Connection{Type: tt.connType}) + Expect(err).ToNot(HaveOccurred()) + Expect(sender).ToNot(BeNil()) + }) + } + + ginkgo.It("returns error for unsupported type", func() { + _, err := ForConnection(&models.Connection{Type: "unknown"}) + Expect(err).To(HaveOccurred()) + }) +}) + +var _ = ginkgo.Describe("Teams sender", func() { + ginkgo.It("sends MessageCard to webhook", func() { + var received map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Expect(r.Method).To(Equal(http.MethodPost)) + Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) + body, _ := io.ReadAll(r.Body) + Expect(json.Unmarshal(body, &received)).To(Succeed()) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeTeams, URL: server.URL} + err := (&Teams{}).Send(context.TODO(), conn, Data{Title: "Alert", Message: "Server is down"}) + Expect(err).ToNot(HaveOccurred()) + Expect(received["@type"]).To(Equal("MessageCard")) + sections := received["sections"].([]any) + Expect(sections).To(HaveLen(1)) + section := sections[0].(map[string]any) + Expect(section["activityTitle"]).To(Equal("Alert")) + Expect(section["text"]).To(Equal("Server is down")) + }) + + ginkgo.It("returns error on non-200 response", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("bad request")) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeTeams, URL: server.URL} + err := (&Teams{}).Send(context.TODO(), conn, Data{Title: "Test", Message: "msg"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("400")) + }) +}) + +var _ = ginkgo.Describe("Mattermost sender", func() { + ginkgo.It("sends payload to webhook", func() { + var received mattermostPayload + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Expect(r.Method).To(Equal(http.MethodPost)) + body, _ := io.ReadAll(r.Body) + Expect(json.Unmarshal(body, &received)).To(Succeed()) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeMattermost, URL: server.URL, Username: "bot"} + err := (&Mattermost{}).Send(context.TODO(), conn, Data{Title: "Deploy", Message: "v1.2.3 deployed"}) + Expect(err).ToNot(HaveOccurred()) + Expect(received.Username).To(Equal("bot")) + Expect(received.Text).To(ContainSubstring("Deploy")) + Expect(received.Text).To(ContainSubstring("v1.2.3 deployed")) + }) + + ginkgo.It("returns error on non-200 response", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeMattermost, URL: server.URL} + err := (&Mattermost{}).Send(context.TODO(), conn, Data{Message: "test"}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("500")) + }) +}) + +var _ = ginkgo.Describe("Discord sender", func() { + ginkgo.It("sends embed to webhook", func() { + var received discordWebhookPayload + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Expect(r.Method).To(Equal(http.MethodPost)) + Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) + body, _ := io.ReadAll(r.Body) + Expect(json.Unmarshal(body, &received)).To(Succeed()) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeDiscord, URL: server.URL} + err := (&Discord{}).Send(context.TODO(), conn, Data{Title: "Alert", Message: "CPU high"}) + Expect(err).ToNot(HaveOccurred()) + Expect(received.Embeds).To(HaveLen(1)) + Expect(received.Embeds[0].Title).To(Equal("Alert")) + Expect(received.Embeds[0].Description).To(Equal("CPU high")) + }) +}) + +var _ = ginkgo.Describe("Telegram sender", func() { + ginkgo.It("escapes MarkdownV2 special characters in message body", func() { + var received map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + Expect(json.Unmarshal(body, &received)).To(Succeed()) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + origClient := httpClient + httpClient = &http.Client{Transport: &rewriteTransport{server.URL, http.DefaultTransport}} + defer func() { httpClient = origClient }() + + conn := &models.Connection{Type: models.ConnectionTypeTelegram, Password: "fake-token", Username: "12345"} + err := (&Telegram{}).Send(context.TODO(), conn, Data{ + Title: "Alert!", + Message: "server-01.prod is down!", + }) + Expect(err).ToNot(HaveOccurred()) + text := received["text"].(string) + Expect(text).To(ContainSubstring(`\!`)) + Expect(text).To(ContainSubstring(`\.`)) + Expect(text).To(ContainSubstring(`\-`)) + Expect(received["parse_mode"]).To(Equal("MarkdownV2")) + }) +}) + +var _ = ginkgo.Describe("Ntfy sender", func() { + ginkgo.It("sends message with title header", func() { + var receivedTitle string + var receivedBody string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedTitle = r.Header.Get("Title") + body, _ := io.ReadAll(r.Body) + receivedBody = string(body) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + conn := &models.Connection{Type: models.ConnectionTypeNtfy, URL: server.URL, Username: "test-topic"} + err := (&Ntfy{}).Send(context.TODO(), conn, Data{Title: "Alert", Message: "disk full"}) + Expect(err).ToNot(HaveOccurred()) + Expect(receivedTitle).To(Equal("Alert")) + Expect(receivedBody).To(Equal("disk full")) + }) +}) + +var _ = ginkgo.Describe("Pushbullet sender", func() { + ginkgo.It("sends note to API", func() { + var received map[string]string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Expect(r.Method).To(Equal(http.MethodPost)) + Expect(r.Header.Get("Access-Token")).To(Equal("test-token")) + body, _ := io.ReadAll(r.Body) + Expect(json.Unmarshal(body, &received)).To(Succeed()) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + origClient := httpClient + httpClient = &http.Client{Transport: &rewriteTransport{server.URL, http.DefaultTransport}} + defer func() { httpClient = origClient }() + + conn := &models.Connection{Type: models.ConnectionTypePushbullet, Password: "test-token"} + err := (&Pushbullet{}).Send(context.TODO(), conn, Data{Title: "Alert", Message: "CPU high"}) + Expect(err).ToNot(HaveOccurred()) + Expect(received["type"]).To(Equal("note")) + Expect(received["title"]).To(Equal("Alert")) + Expect(received["body"]).To(Equal("CPU high")) + }) +}) + +var _ = ginkgo.Describe("Pushover sender", func() { + ginkgo.It("sends message to API", func() { + var receivedToken, receivedUser, receivedTitle, receivedMessage string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + Expect(r.Method).To(Equal(http.MethodPost)) + r.ParseForm() + receivedToken = r.FormValue("token") + receivedUser = r.FormValue("user") + receivedTitle = r.FormValue("title") + receivedMessage = r.FormValue("message") + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + origClient := httpClient + httpClient = &http.Client{Transport: &rewriteTransport{server.URL, http.DefaultTransport}} + defer func() { httpClient = origClient }() + + conn := &models.Connection{Type: models.ConnectionTypePushover, Password: "app-token", Username: "user-key"} + err := (&Pushover{}).Send(context.TODO(), conn, Data{Title: "Alert", Message: "Server down"}) + Expect(err).ToNot(HaveOccurred()) + Expect(receivedToken).To(Equal("app-token")) + Expect(receivedUser).To(Equal("user-key")) + Expect(receivedTitle).To(Equal("Alert")) + Expect(receivedMessage).To(Equal("Server down")) + }) +}) + +// rewriteTransport redirects all requests to a local test server. +type rewriteTransport struct { + targetURL string + base http.RoundTripper +} + +func (t *rewriteTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.URL.Scheme = "http" + req.URL.Host = t.targetURL[len("http://"):] + return t.base.RoundTrip(req) +} diff --git a/notification/senders/suite_test.go b/notification/senders/suite_test.go new file mode 100644 index 000000000..1fd3d80d8 --- /dev/null +++ b/notification/senders/suite_test.go @@ -0,0 +1,13 @@ +package senders + +import ( + "testing" + + ginkgo "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSenders(t *testing.T) { + RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Senders") +} diff --git a/notification/senders/teams.go b/notification/senders/teams.go new file mode 100644 index 000000000..b0ca2d864 --- /dev/null +++ b/notification/senders/teams.go @@ -0,0 +1,75 @@ +package senders + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "context" + "github.com/flanksource/duty/models" +) + +type Teams struct{} + +func (t *Teams) Send(ctx context.Context, conn *models.Connection, data Data) error { + webhookURL := conn.URL + if webhookURL == "" { + if props := conn.Properties; props != nil { + webhookURL = props["webhookURL"] + } + } + if webhookURL == "" { + return fmt.Errorf("teams connection requires a webhook URL") + } + + card := teamsMessageCard{ + Type: "MessageCard", + Context: "http://schema.org/extensions", + ThemeColor: "0076D7", + Summary: data.Title, + Sections: []teamsSection{{ + ActivityTitle: data.Title, + Text: data.Message, + Markdown: true, + }}, + } + + body, err := json.Marshal(card) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("teams webhook returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +type teamsMessageCard struct { + Type string `json:"@type"` + Context string `json:"@context"` + ThemeColor string `json:"themeColor,omitempty"` + Summary string `json:"summary"` + Sections []teamsSection `json:"sections"` +} + +type teamsSection struct { + ActivityTitle string `json:"activityTitle,omitempty"` + Text string `json:"text"` + Markdown bool `json:"markdown"` +} diff --git a/notification/senders/telegram.go b/notification/senders/telegram.go new file mode 100644 index 000000000..cab1158fc --- /dev/null +++ b/notification/senders/telegram.go @@ -0,0 +1,79 @@ +package senders + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/flanksource/duty/models" +) + +type Telegram struct{} + +func (t *Telegram) Send(ctx context.Context, conn *models.Connection, data Data) error { + token := conn.Password + chats := conn.Username + if token == "" || chats == "" { + return fmt.Errorf("telegram connection requires token (password) and chats (username)") + } + + for _, chatID := range strings.Split(chats, ",") { + chatID = strings.TrimSpace(chatID) + if chatID == "" { + continue + } + if err := telegramSendMessage(ctx, token, chatID, data); err != nil { + return fmt.Errorf("telegram chat %s: %w", chatID, err) + } + } + return nil +} + +func telegramSendMessage(ctx context.Context, token, chatID string, data Data) error { + escapedMessage := escapeMarkdownV2(data.Message) + text := escapedMessage + if data.Title != "" { + text = fmt.Sprintf("*%s*\n\n%s", escapeMarkdownV2(data.Title), escapedMessage) + } + + payload := map[string]any{ + "chat_id": chatID, + "text": text, + "parse_mode": "MarkdownV2", + } + + body, err := json.Marshal(payload) + if err != nil { + return err + } + + apiURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", token) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("telegram API returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func escapeMarkdownV2(s string) string { + for _, c := range []string{"_", "*", "[", "]", "(", ")", "~", "`", ">", "#", "+", "-", "=", "|", "{", "}", ".", "!"} { + s = strings.ReplaceAll(s, c, "\\"+c) + } + return s +} diff --git a/notification/shoutrrr.go b/notification/shoutrrr.go index 954dafe9b..9d450e85e 100644 --- a/notification/shoutrrr.go +++ b/notification/shoutrrr.go @@ -6,10 +6,7 @@ import ( "strconv" "strings" - stripmd "github.com/adityathebe/go-strip-markdown/v2" - "github.com/containrrr/shoutrrr" - "github.com/containrrr/shoutrrr/pkg/router" - "github.com/containrrr/shoutrrr/pkg/types" + commonshttp "github.com/flanksource/commons/http" "github.com/flanksource/duty/context" "github.com/flanksource/incident-commander/api" @@ -18,8 +15,7 @@ import ( mcUtils "github.com/flanksource/incident-commander/utils" ) -// setSystemSMTPCredential modifies the shoutrrrURL to use the system's SMTP credentials. -func setSystemSMTPCredential(ctx context.Context, shoutrrrURL string) (string, error) { +func setSystemSMTPCredential(ctx context.Context, smtpURL string) (string, error) { smtp, err := mail.GetDefaultSMTP(ctx) if err != nil { return "", fmt.Errorf("failed to get default SMTP config: %w", err) @@ -31,11 +27,11 @@ func setSystemSMTPCredential(ctx context.Context, shoutrrrURL string) (string, e smtp.Host, smtp.Port, ) - shoutrrrURL = strings.ReplaceAll(shoutrrrURL, api.SystemSMTP, prefix) + smtpURL = strings.ReplaceAll(smtpURL, api.SystemSMTP, prefix) - parsedURL, err := url.Parse(shoutrrrURL) + parsedURL, err := url.Parse(smtpURL) if err != nil { - return "", fmt.Errorf("failed to parse shoutrrr URL: %w", err) + return "", fmt.Errorf("failed to parse SMTP URL: %w", err) } query := parsedURL.Query() @@ -46,240 +42,168 @@ func setSystemSMTPCredential(ctx context.Context, shoutrrrURL string) (string, e return parsedURL.String(), nil } -func PrepareShoutrrrRaw(ctx *Context, celEnv map[string]any, shoutrrrURL string, data *NotificationTemplate) (string, string, *router.ServiceRouter, error) { - if celEnv == nil { - celEnv = make(map[string]any) - } - - if data.Properties == nil { - data.Properties = make(map[string]string) +func firstNonEmpty(props map[string]string, q url.Values, keys ...string) string { + for _, k := range keys { + for p, v := range props { + if strings.EqualFold(k, p) { + return v + } + } + if v := q.Get(k); v != "" { + return v + } } + return "" +} - if strings.HasPrefix(shoutrrrURL, api.SystemSMTP) { +func sendSMTP(ctx *Context, smtpURL string, data NotificationTemplate) error { + if strings.HasPrefix(smtpURL, api.SystemSMTP) { var err error - shoutrrrURL, err = setSystemSMTPCredential(ctx.Context, shoutrrrURL) + smtpURL, err = setSystemSMTPCredential(ctx.Context, smtpURL) if err != nil { - return "", "", nil, err + return err } } - sender, err := shoutrrr.CreateSender(shoutrrrURL) - if err != nil { - return "", "", nil, fmt.Errorf("failed to create a shoutrrr sender client: %w", err) - } + data.Message = mcUtils.MarkdownToHTML(data.Message) + props := GetPropsForService("smtp", data.Properties) + injectTitleIntoProperties("smtp", data.Title, props) - service, _, err := sender.ExtractServiceName(shoutrrrURL) + parsedURL, err := url.Parse(smtpURL) if err != nil { - return "", "", nil, fmt.Errorf("failed to extract service name: %w", err) + return fmt.Errorf("failed to parse SMTP URL: %w", err) } - celEnv["channel"] = service - templater := ctx.NewStructTemplater(celEnv, "", TemplateFuncs) - if err := templater.Walk(data); err != nil { - return "", "", nil, fmt.Errorf("error templating notification: %w", err) - } - - switch service { - case "smtp": - data.Message = mcUtils.MarkdownToHTML(data.Message) - data.Properties["UseHTML"] = "true" // enforce HTML for smtp - - case "telegram": - data.Properties["ParseMode"] = "MarkdownV2" + query := parsedURL.Query() + to := firstNonEmpty(props, query, "to", "ToAddresses", "ToAddress") + from := firstNonEmpty(props, query, "from", "FromAddress") + fromName := firstNonEmpty(props, query, "fromname", "FromName") + password, _ := parsedURL.User.Password() + port, _ := strconv.Atoi(parsedURL.Port()) + headerString := props["headers"] - default: - data.Message = stripmd.StripOptions(data.Message, stripmd.Options{KeepURL: true}) + var conn v1.ConnectionSMTP + if err := conn.FromURL(smtpURL); err != nil { + return ctx.Oops().Wrapf(err, "error parsing SMTP URL") } - - return service, shoutrrrURL, sender, nil -} - -// firstNonEmpty looks up keys (case-insensitive) first in params then in query values. -func firstNonEmpty(params *types.Params, q url.Values, keys ...string) string { - for _, k := range keys { - for p := range *params { - if strings.EqualFold(k, p) { - return (*params)[p] - } - } - if v := q.Get(k); v != "" { - return v - } + if from != "" { + conn.FromAddress = from + } + if fromName != "" { + conn.FromName = fromName } - return "" -} -// dispatchNotification dispatches a prepared notification via SMTP (using the mail package) -// or via the shoutrrr router for all other services. -func dispatchNotification(ctx *Context, service, shoutrrrURL string, sender *router.ServiceRouter, data NotificationTemplate) error { - data.Properties = GetPropsForService(service, data.Properties) - injectTitleIntoProperties(service, data.Title, data.Properties) + m := mail.New(strings.Split(to, ","), data.Title, data.Message, `text/html; charset="UTF-8"`). + SetFrom(conn.FromName, conn.FromAddress). + SetCredentials(parsedURL.Hostname(), port, parsedURL.User.Username(), password) - params := &types.Params{} - if data.Properties != nil { - params = (*types.Params)(&data.Properties) + for _, a := range data.Attachments { + m.AddAttachment(a) } - // NOTE: Until shoutrrr fixes the "UseHTML" props, we'll use the mailer package - if service == "smtp" { - parsedURL, err := url.Parse(shoutrrrURL) + if headerString != "" { + headers, err := mcUtils.StringToStringMap(headerString) if err != nil { - return fmt.Errorf("failed to parse shoutrrr URL: %w", err) - } - - query := parsedURL.Query() - var ( - to = firstNonEmpty(params, query, "to", "ToAddresses", "ToAddress") - from = firstNonEmpty(params, query, "from", "FromAddress") - fromName = firstNonEmpty(params, query, "fromname", "FromName") - password, _ = parsedURL.User.Password() - port, _ = strconv.Atoi(parsedURL.Port()) - headerString = (*params)["headers"] - ) - - var conn v1.ConnectionSMTP - if err := conn.FromURL(shoutrrrURL); err != nil { - return ctx.Oops().Wrapf(err, "error parsing SMTP URL") - } - if from != "" { - conn.FromAddress = from + return ctx.Oops().Wrapf(err, "error converting headerString[%s] to map", headerString) } - if fromName != "" { - conn.FromName = fromName + for k, v := range headers { + m.SetHeader(k, v) } - - m := mail.New(strings.Split(to, ","), data.Title, data.Message, `text/html; charset="UTF-8"`). - SetFrom(conn.FromName, conn.FromAddress). - SetCredentials(parsedURL.Hostname(), port, parsedURL.User.Username(), password) - - for _, a := range data.Attachments { - m.AddAttachment(a) - } - - if headerString != "" { - headers, err := mcUtils.StringToStringMap(headerString) - if err != nil { - return ctx.Oops().Wrapf(err, "error converting headerString[%s] to map", headerString) - } - for k, v := range headers { - m.SetHeader(k, v) - } - } - if err := m.Send(conn); err != nil { - return ctx.Oops(). - With("to", to, "from", from, "host", parsedURL.Hostname()). - Wrap(err) - } - return nil } + return m.Send(conn) +} - sendErrors := sender.Send(data.Message, params) - for _, err := range sendErrors { - if err != nil { - return ctx.Oops().Hint(data.Message).Wrapf(err, "error publishing notification (service=%s)", service) - } +func sendGenericWebhook(ctx *Context, rawURL string, data NotificationTemplate) error { + targetURL := strings.TrimPrefix(rawURL, "generic+") + payload := map[string]string{ + "title": data.Title, + "message": data.Message, + } + for k, v := range data.Properties { + payload[k] = v } - return nil -} - -// shoutrrrSendRaw sends a notification and returns the service it sent the notification to -func shoutrrrSendRaw(ctx *Context, celEnv map[string]any, shoutrrrURL string, data NotificationTemplate) (string, error) { - service, shoutrrrURL, sender, err := PrepareShoutrrrRaw(ctx, celEnv, shoutrrrURL, &data) + resp, err := commonshttp.NewClient().R(ctx.Context).Post(targetURL, payload) if err != nil { - return "", err + return fmt.Errorf("generic webhook failed: %w", err) } - - ctx.WithMessage(data.Message) - - return service, dispatchNotification(ctx, service, shoutrrrURL, sender, data) + if !resp.IsOK() { + body, _ := resp.AsString() + return fmt.Errorf("generic webhook returned %d: %s", resp.StatusCode, body) + } + return nil } -func PrepareShoutrrr(ctx *Context, shoutrrrURL string, payload NotificationMessagePayload, properties map[string]string) (string, string, *router.ServiceRouter, NotificationTemplate, error) { - if properties == nil { - properties = make(map[string]string) +func serviceFromURL(rawURL string) string { + if strings.HasPrefix(rawURL, "smtp://") || strings.HasPrefix(rawURL, api.SystemSMTP) { + return "smtp" } - - if strings.HasPrefix(shoutrrrURL, api.SystemSMTP) { - var err error - shoutrrrURL, err = setSystemSMTPCredential(ctx.Context, shoutrrrURL) - if err != nil { - return "", "", nil, NotificationTemplate{}, err - } + if strings.HasPrefix(rawURL, "generic+") { + return "generic" } - - sender, err := shoutrrr.CreateSender(shoutrrrURL) - if err != nil { - return "", "", nil, NotificationTemplate{}, fmt.Errorf("failed to create a shoutrrr sender client: %w", err) + if idx := strings.Index(rawURL, "://"); idx > 0 { + return rawURL[:idx] } + return "" +} - service, _, err := sender.ExtractServiceName(shoutrrrURL) - if err != nil { - return "", "", nil, NotificationTemplate{}, fmt.Errorf("failed to extract service name: %w", err) +func shoutrrrSendRaw(ctx *Context, celEnv map[string]any, notificationURL string, data NotificationTemplate) (string, error) { + service := serviceFromURL(notificationURL) + + celEnv["channel"] = service + templater := ctx.NewStructTemplater(celEnv, "", TemplateFuncs) + if err := templater.Walk(&data); err != nil { + return "", fmt.Errorf("error templating notification: %w", err) } + ctx.WithMessage(data.Message) - var message string switch service { case "smtp": - message, err = FormatNotificationMessage(payload, "email") - if err != nil { - return "", "", nil, NotificationTemplate{}, fmt.Errorf("failed to format html message: %w", err) - } - properties["UseHTML"] = "true" - case "telegram": - message, err = FormatNotificationMessage(payload, "markdown") - if err != nil { - return "", "", nil, NotificationTemplate{}, fmt.Errorf("failed to format markdown message: %w", err) - } - properties["ParseMode"] = "MarkdownV2" + return "smtp", sendSMTP(ctx, notificationURL, data) + case "generic": + return "generic", sendGenericWebhook(ctx, notificationURL, data) default: - message, err = FormatNotificationMessage(payload, "markdown") - if err != nil { - return "", "", nil, NotificationTemplate{}, fmt.Errorf("failed to format markdown message: %w", err) - } - message = stripmd.StripOptions(message, stripmd.Options{KeepURL: true}) + return "", fmt.Errorf("unsupported notification URL scheme: %s", service) } +} - data := NotificationTemplate{ - Title: payload.Title, - Message: message, - Properties: properties, - } +func shoutrrrSend(ctx *Context, notificationURL string, payload NotificationMessagePayload, properties map[string]string) (string, error) { + service := serviceFromURL(notificationURL) - return service, shoutrrrURL, sender, data, nil -} + format := "markdown" + if service == "smtp" { + format = "email" + } -// shoutrrrSend sends a notification and returns the service it sent the notification to -func shoutrrrSend(ctx *Context, shoutrrrURL string, payload NotificationMessagePayload, properties map[string]string) (string, error) { - service, shoutrrrURL, sender, data, err := PrepareShoutrrr(ctx, shoutrrrURL, payload, properties) + message, err := FormatNotificationMessage(payload, format) if err != nil { - return "", err + return "", fmt.Errorf("failed to format message: %w", err) } + data := NotificationTemplate{ + Title: payload.Title, + Message: message, + Properties: properties, + } ctx.WithMessage(data.Message) - return service, dispatchNotification(ctx, service, shoutrrrURL, sender, data) + switch service { + case "smtp": + return "smtp", sendSMTP(ctx, notificationURL, data) + case "generic": + return "generic", sendGenericWebhook(ctx, notificationURL, data) + default: + return "", fmt.Errorf("unsupported notification URL scheme: %s", service) + } } -// injectTitleIntoProperties adds the given title to the shoutrrr properties if it's not already set. func injectTitleIntoProperties(service, title string, properties map[string]string) map[string]string { - if title == "" { + if title == "" || properties == nil { return properties } - switch strings.ToLower(service) { - case "smtp": - if properties["subject"] == "" { - properties["subject"] = title - } - - case "googlechat", "rocketchat": - // Do nothing - - default: - if properties["title"] == "" { - properties["title"] = title - } + if service == "smtp" && properties["subject"] == "" { + properties["subject"] = title } return properties diff --git a/notification/shoutrrr_test.go b/notification/shoutrrr_test.go index 6518f9e10..239c13e71 100644 --- a/notification/shoutrrr_test.go +++ b/notification/shoutrrr_test.go @@ -5,9 +5,7 @@ import ( "reflect" "github.com/flanksource/incident-commander/notification" - "github.com/google/uuid" - "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + ginkgo "github.com/onsi/ginkgo/v2" ) var _ = ginkgo.Describe("Notification properties", func() { @@ -60,18 +58,3 @@ var _ = ginkgo.Describe("Notification properties", func() { }) } }) - -var _ = ginkgo.Describe("Shoutrrr", func() { - ginkgo.It("should render smtp html", func() { - ctx := notification.NewContext(DefaultContext, uuid.Nil) - payload := notification.NotificationMessagePayload{ - Title: "Test Notification", - Description: "My Test Config", - } - url := "smtp://username:password@host:25/?from=test@flanksource.com&to=receiver@flanksource.com" - - _, _, _, data, err := notification.PrepareShoutrrr(ctx, url, payload, nil) - Expect(err).To(BeNil()) - Expect(data.Message).To(ContainSubstring("My Test Config")) - }) -}) diff --git a/notification/suite_test.go b/notification/suite_test.go index 0ce21e650..11b8fb612 100644 --- a/notification/suite_test.go +++ b/notification/suite_test.go @@ -46,7 +46,7 @@ var _ = ginkgo.BeforeSuite(func() { _ = context.UpdateProperty(DefaultContext, api.PropertyIncidentsDisabled, "true") _ = context.UpdateProperty(DefaultContext, "notification.send.trace", "true") - events.StartConsumers(DefaultContext) + events.InitConsumers(DefaultContext) setupWebhookServer() setupSMTPServer() setupSystemSMTPConnection() @@ -71,7 +71,7 @@ var _ = ginkgo.AfterSuite(func() { var ( webhookServer *http.Server webhookEndpoint string // the autogenerated endpoint for our webhook - webhookPostdata map[string]string // JSON message sent by shoutrrr to our webhook + webhookPostdata map[string]string // JSON payload received by the test webhook server ) var (