Skip to content

Commit f108189

Browse files
committed
feat: implement threaded message strategy for Slack notifier
Signed-off-by: Santiago Fernández Núñez <santiago.nunez@nubank.com.br>
1 parent 51ea7d7 commit f108189

5 files changed

Lines changed: 1332 additions & 70 deletions

File tree

notify/slack/internal/apiurl/apiurl.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ func NewResolver(apiURL *amcommoncfg.SecretURL, apiURLFile string) *Resolver {
3939
return &Resolver{apiURL: apiURL, apiURLFile: apiURLFile}
4040
}
4141

42+
// URLForMethod returns the Slack Web API URL for the given method name
43+
// (e.g. "chat.update", "reactions.add"). An empty method returns the base
44+
// api_url as-is, which is suitable for the default chat.postMessage call.
4245
func (r *Resolver) URLForMethod(method string) (string, error) {
4346
if method == "" {
4447
if r.apiURL != nil {
@@ -74,6 +77,7 @@ func (r *Resolver) URLForMethod(method string) (string, error) {
7477
return webAPIMethodURL(baseURL, method)
7578
}
7679

80+
// getURLFromFile reads and parses the URL from the api_url_file path on disk.
7781
func (r *Resolver) getURLFromFile() (*amcommoncfg.URL, error) {
7882
content, err := os.ReadFile(r.apiURLFile)
7983
if err != nil {

notify/slack/slack.go

Lines changed: 124 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,14 @@ import (
3636
// https://api.slack.com/reference/messaging/attachments#legacy_fields - 1024, no units given, assuming runes or characters.
3737
const maxTitleLenRunes = 1024
3838

39-
// New returns a new Slack notification handler.
39+
// nflog store keys for persisting Slack-specific state across notifications.
40+
const (
41+
storeKeyThreadTs = "threadTs"
42+
storeKeyChannelId = "channelId"
43+
storeKeyTransitions = "transitions"
44+
)
45+
46+
// New builds a Slack Notifier with tracing enabled on the configured HTTP client.
4047
func New(c *config.SlackConfig, t *template.Template, l *slog.Logger, httpOpts ...commoncfg.HTTPClientOption) (*Notifier, error) {
4148
client, err := notify.NewClientWithTracing(*c.HTTPConfig, "slack", httpOpts...)
4249
if err != nil {
@@ -54,7 +61,10 @@ func New(c *config.SlackConfig, t *template.Template, l *slog.Logger, httpOpts .
5461
}, nil
5562
}
5663

57-
// Notify implements the Notifier interface.
64+
// Notify implements the Notifier interface. It expands templates, builds the Slack
65+
// payload, and sends it (or updates an existing message / thread) based on
66+
// message_strategy and nflog state. The returned bool is true when the delivery
67+
// should be retried (e.g. transport or retryable HTTP errors).
5868
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
5969
var err error
6070
key, err := notify.ExtractGroupKey(ctx)
@@ -143,6 +153,21 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
143153
att.Actions = actions
144154
}
145155

156+
req := &request{
157+
Channel: tmplText(n.conf.Channel),
158+
Username: tmplText(n.conf.Username),
159+
IconEmoji: tmplText(n.conf.IconEmoji),
160+
IconURL: tmplText(n.conf.IconURL),
161+
LinkNames: n.conf.LinkNames,
162+
Text: tmplText(n.conf.MessageText),
163+
Attachments: []attachment{*att},
164+
}
165+
// tmplText is notify.TmplText(..., &err): every field execution appends template errors
166+
// into the same err. Check here so we never call Slack after a failed template render.
167+
if err != nil {
168+
return false, err
169+
}
170+
146171
u, err := n.urlResolver.URLForMethod("")
147172
if err != nil {
148173
return false, err
@@ -154,44 +179,57 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
154179
ctx = postCtx
155180
}
156181

157-
req := &request{
158-
Channel: tmplText(n.conf.Channel),
159-
Username: tmplText(n.conf.Username),
160-
IconEmoji: tmplText(n.conf.IconEmoji),
161-
IconURL: tmplText(n.conf.IconURL),
162-
LinkNames: n.conf.LinkNames,
163-
Text: tmplText(n.conf.MessageText),
164-
Attachments: []attachment{*att},
165-
}
166-
167-
// If a notification for this alert group has already been sent and `update_message` config is set
168-
// edit API endpoint and payload to update notification instead of sending a new one.
169182
var store *nflog.Store
170183

171-
if n.conf.UpdateMessage {
184+
if n.conf.HasStrategyThatUpdatesParent() {
172185
var ok bool
173186
store, ok = notify.NflogStore(ctx)
174187
if !ok {
175-
logger.Warn("cannot create NflogStore, updatable messages will be disabled.")
188+
logger.Warn("cannot create NflogStore, updatable/threaded messages will be disabled.")
189+
} else if store == nil {
190+
logger.Warn("NflogStore is nil, updatable/threaded messages will be disabled.")
176191
} else {
177-
threadTs, _ := store.GetStr("threadTs")
178-
channelId, _ := store.GetStr("channelId")
179-
logger.Debug("attempt recovering threadTs and channelId to update an existing message", "threadTs", threadTs, "channelId", channelId)
180-
if threadTs != "" && channelId != "" {
181-
updateURL, err := n.urlResolver.URLForMethod("chat.update")
182-
if err != nil {
183-
return false, err
192+
193+
// If message_strategy is "update", edit the API endpoint and payload to update
194+
// the existing notification instead of sending a new one.
195+
if n.conf.HasUpdateStrategy() {
196+
threadTs, _ := store.GetStr(storeKeyThreadTs)
197+
channelId, _ := store.GetStr(storeKeyChannelId)
198+
logger.Debug("attempt recovering threadTs and channelId to update an existing message", storeKeyThreadTs, threadTs, storeKeyChannelId, channelId)
199+
if threadTs != "" && channelId != "" {
200+
updateURL, err := n.urlResolver.URLForMethod("chat.update")
201+
if err != nil {
202+
return false, err
203+
}
204+
u = updateURL
205+
req.Timestamp = threadTs
206+
req.Channel = channelId
207+
logger.Debug("updating previously sent message", storeKeyThreadTs, threadTs, storeKeyChannelId, channelId)
208+
}
209+
} else if n.conf.HasThreadStrategy() {
210+
// If message_strategy is "thread", there are two modes controlled by the flag use_summary_header.
211+
if n.conf.UseSummaryHeaderInThread() {
212+
return n.handleThreadedSummaryHeaderMode(ctx, data, tmplText, &err, store, u, req, logger)
184213
}
185-
u = updateURL
186-
req.Timestamp = threadTs
187-
req.Channel = channelId
188-
logger.Debug("updating previously sent message", "threadTs", threadTs, "channelId", channelId)
214+
return n.handleThreadedDirectMode(ctx, store, req, u, logger)
189215
}
190216
}
191217
}
218+
219+
// Default path: post the message directly (for "new" and "update" strategies, or when thread strategy falls
220+
// through due to missing nflog store).
221+
return n.postAndHandle(ctx, u, req.Channel, req, store, slackResponseOpts{})
222+
}
223+
224+
// postAndHandle JSON-encodes payload, POSTs it to u, applies HTTP retry classification,
225+
// then parses the Slack body. channel is only used in error messages. When store is
226+
// non-nil and the response is successful JSON with ts/channel, persistResponseState may
227+
// persist nflog keys for update/thread strategies. opts.IgnoreAPIErrors lists Slack
228+
// JSON error codes treated as success (e.g. already_reacted for reactions.add).
229+
func (n *Notifier) postAndHandle(ctx context.Context, u, channel string, payload any, store *nflog.Store, opts slackResponseOpts) (bool, error) {
192230
var buf bytes.Buffer
193-
if err := json.NewEncoder(&buf).Encode(req); err != nil {
194-
return false, err
231+
if err := json.NewEncoder(&buf).Encode(payload); err != nil {
232+
return false, fmt.Errorf("encode slack request: %w", err)
195233
}
196234

197235
resp, err := n.postJSONFunc(ctx, n.client, u, &buf)
@@ -207,51 +245,77 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
207245
// classify them as retriable or not.
208246
retry, err := n.retrier.Check(resp.StatusCode, resp.Body)
209247
if err != nil {
210-
err = fmt.Errorf("channel %q: %w", req.Channel, err)
248+
err = fmt.Errorf("channel %q: %w", channel, err)
211249
return retry, notify.NewErrorWithReason(notify.GetFailureReasonFromStatusCode(resp.StatusCode), err)
212250
}
213251

214-
retry, err = n.slackResponseHandler(resp, store)
252+
data, retry, err := readAndParseSlackResponse(resp, opts)
215253
if err != nil {
216-
err = fmt.Errorf("channel %q: %w", req.Channel, err)
254+
err = fmt.Errorf("channel %q: %w", channel, err)
217255
return retry, notify.NewErrorWithReason(notify.ClientErrorReason, err)
218256
}
219-
return retry, nil
257+
258+
n.persistResponseState(store, data)
259+
260+
return false, nil
220261
}
221262

222-
// slackResponseHandler parses the response body of the request, handles retryable errors
223-
// and saves the response timestamp and channelId to nflog.
224-
func (n *Notifier) slackResponseHandler(resp *http.Response, store *nflog.Store) (bool, error) {
225-
body, err := io.ReadAll(resp.Body)
226-
if err != nil {
227-
return true, fmt.Errorf("could not read response body: %w", err)
228-
}
229-
if !strings.HasPrefix(resp.Header.Get("Content-Type"), "application/json") {
230-
return checkTextResponseError(body)
231-
}
232-
var data slackResponse
233-
if err := json.Unmarshal(body, &data); err != nil {
234-
return true, fmt.Errorf("could not unmarshal JSON response %q: %w", string(body), err)
263+
// persistResponseState persists the threadTs and channelId of a message in nflog. For message_strategy "thread", only
264+
// the first message is saved, so later replies do not replace the thread root.
265+
func (n *Notifier) persistResponseState(store *nflog.Store, data slackResponse) {
266+
if store == nil || data.Timestamp == "" || data.Channel == "" {
267+
return
235268
}
236-
if !data.OK {
237-
return false, fmt.Errorf("error response from Slack: %s", data.Error)
238-
}
239-
// If store, TS and Channel are set, store the threadTS and channelId
240-
if store != nil && data.Timestamp != "" && data.Channel != "" {
241-
store.SetStr("threadTs", data.Timestamp)
242-
store.SetStr("channelId", data.Channel)
243-
n.logger.Debug("stored threadTs and channelId", "threadTs", data.Timestamp, "channelId", data.Channel)
269+
if n.conf.HasThreadStrategy() {
270+
parentThreadTs, parentChannelId, parentFound := getStoredParent(store)
271+
if !parentFound {
272+
store.SetStr(storeKeyThreadTs, data.Timestamp)
273+
store.SetStr(storeKeyChannelId, data.Channel)
274+
n.logger.Debug("stored threadTs and channelId for thread parent", storeKeyThreadTs, data.Timestamp, storeKeyChannelId, data.Channel)
275+
} else {
276+
n.logger.Debug("skipping storing reply as thread parent is already stored", storeKeyThreadTs, parentThreadTs, storeKeyChannelId, parentChannelId)
277+
}
278+
} else {
279+
store.SetStr(storeKeyThreadTs, data.Timestamp)
280+
store.SetStr(storeKeyChannelId, data.Channel)
281+
n.logger.Debug("stored threadTs and channelId", storeKeyThreadTs, data.Timestamp, storeKeyChannelId, data.Channel)
244282
}
245-
return false, nil
246283
}
247284

248-
// checkTextResponseError classifies plaintext responses from Slack.
249-
// A plaintext (non-JSON) response is successful if it's a string "ok".
250-
// This is typically a response for an Incoming Webhook
251-
// (https://api.slack.com/messaging/webhooks#handling_errors)
285+
// checkTextResponseError classifies incoming-webhook plaintext responses.
286+
// Success requires body exactly "ok". The bool is the retry hint (always false here).
287+
// See https://api.slack.com/messaging/webhooks#handling_errors
252288
func checkTextResponseError(body []byte) (bool, error) {
253-
if !bytes.Equal(body, []byte("ok")) {
289+
if !bytes.Equal(bytes.TrimSpace(body), []byte("ok")) {
254290
return false, fmt.Errorf("received an error response from Slack: %s", string(body))
255291
}
256292
return false, nil
257293
}
294+
295+
// readAndParseSlackResponse reads the response body. For Content-Type application/json
296+
// it unmarshals slackResponse; ok=false is an error unless data.Error is listed in
297+
// opts.IgnoreAPIErrors. Non-JSON bodies use incoming-webhook plaintext rules (body "ok").
298+
// retry is true for read/unmarshal failures that may be transient; false for definitive
299+
// Slack API errors (ok=false without ignore) or successful plaintext.
300+
func readAndParseSlackResponse(resp *http.Response, opts slackResponseOpts) (slackResponse, bool, error) {
301+
body, err := io.ReadAll(resp.Body)
302+
if err != nil {
303+
return slackResponse{}, true, fmt.Errorf("could not read response body: %w", err)
304+
}
305+
contentType := strings.TrimSpace(strings.ToLower(resp.Header.Get("Content-Type")))
306+
if !strings.HasPrefix(contentType, "application/json") {
307+
retry, err := checkTextResponseError(body)
308+
return slackResponse{}, retry, err
309+
}
310+
var data slackResponse
311+
if err = json.Unmarshal(body, &data); err != nil {
312+
return slackResponse{}, true, fmt.Errorf("could not unmarshal JSON response %q: %w", string(body), err)
313+
}
314+
if !data.OK {
315+
if opts.treatsSlackErrorAsSuccess(data.Error) {
316+
return data, false, nil
317+
}
318+
return slackResponse{}, false, fmt.Errorf("error response from Slack: %s", data.Error)
319+
}
320+
return data, false, nil
321+
}

0 commit comments

Comments
 (0)