Skip to content

Commit 8fc6c8f

Browse files
committed
fix(framework/event): add panic recovery and HandlePublish to MQTT broker
A panicking handler in route() propagated through paho's internal goroutine and crashed the entire MQTT connection. Add defer/recover via deliverToHandler. NewMQTTBrokerFrom never wired OnPublishReceived, making handlers unreachable. Add public HandlePublish method and SetClient for two-step wiring pattern. Adds 4 MQTT-specific tests for panic recovery and HandlePublish.
1 parent afb1960 commit 8fc6c8f

2 files changed

Lines changed: 162 additions & 8 deletions

File tree

framework/event/mqtt.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"log/slog"
78
"net/url"
89
"strconv"
910
"strings"
@@ -196,11 +197,7 @@ func NewMQTTBrokerWith(options *MQTTBrokerOptions) (*MQTTBroker, error) {
196197
ClientConfig: paho.ClientConfig{
197198
ClientID: "",
198199
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
199-
func(pr paho.PublishReceived) (bool, error) {
200-
broker.route(pr.Packet)
201-
202-
return true, nil
203-
},
200+
broker.HandlePublish,
204201
},
205202
},
206203
}
@@ -225,6 +222,19 @@ func NewMQTTBrokerWith(options *MQTTBrokerOptions) (*MQTTBroker, error) {
225222
// autopaho ConnectionManager and QoS level. This constructor is
226223
// useful for advanced scenarios where the user needs full control
227224
// over the MQTT connection configuration.
225+
//
226+
// Because autopaho.ConnectionManager does not allow post-creation
227+
// configuration changes, the caller must wire up message routing
228+
// when building the paho config. Use [MQTTBroker.HandlePublish] in
229+
// the ClientConfig.OnPublishReceived slice:
230+
//
231+
// broker := event.NewMQTTBrokerFrom(nil, 1)
232+
// cfg.ClientConfig.OnPublishReceived = []func(paho.PublishReceived) (bool, error){
233+
// broker.HandlePublish,
234+
// }
235+
// cm, err := autopaho.NewConnection(ctx, cfg)
236+
// // then set the client so Publish/Subscribe/Close work:
237+
// broker.SetClient(cm)
228238
func NewMQTTBrokerFrom(
229239
client *autopaho.ConnectionManager,
230240
qos byte,
@@ -237,25 +247,69 @@ func NewMQTTBrokerFrom(
237247
}
238248
}
239249

250+
// SetClient sets the underlying autopaho ConnectionManager. This is
251+
// intended for use with [NewMQTTBrokerFrom] when the broker must be
252+
// created before the ConnectionManager so that [MQTTBroker.HandlePublish]
253+
// can be wired into the paho config.
254+
func (broker *MQTTBroker) SetClient(client *autopaho.ConnectionManager) {
255+
broker.client = client
256+
}
257+
258+
// HandlePublish is a paho OnPublishReceived callback that routes
259+
// incoming MQTT messages to registered handlers. Callers using
260+
// [NewMQTTBrokerFrom] must include this method in the paho
261+
// ClientConfig.OnPublishReceived slice so that subscribed handlers
262+
// receive messages.
263+
func (broker *MQTTBroker) HandlePublish(pr paho.PublishReceived) (bool, error) {
264+
broker.route(pr.Packet)
265+
266+
return true, nil
267+
}
268+
240269
// route delivers an incoming MQTT message to all matching
241270
// handlers based on topic pattern matching. This implements
242271
// fan-out behavior where multiple handlers can receive the
243272
// same message if they subscribed to matching patterns.
273+
//
274+
// Each handler is called with panic recovery so that a
275+
// panicking handler does not crash the paho client goroutine
276+
// and tear down the entire MQTT connection.
244277
func (broker *MQTTBroker) route(pb *paho.Publish) {
245278
broker.mu.RLock()
246279
defer broker.mu.RUnlock()
247280

248281
for pattern, handlers := range broker.handlers {
249282
if matchTopic(pattern, pb.Topic) {
250283
for _, handler := range handlers {
251-
handler(func(dest any) error {
252-
return json.Unmarshal(pb.Payload, dest)
253-
})
284+
broker.deliverToHandler(handler, pb.Topic, pb.Payload)
254285
}
255286
}
256287
}
257288
}
258289

290+
// deliverToHandler invokes a single handler with panic recovery.
291+
// Recovered panics are logged via slog so they remain visible for
292+
// debugging without propagating to the caller.
293+
func (broker *MQTTBroker) deliverToHandler(
294+
handler contract.EventHandler,
295+
topic string,
296+
payload []byte,
297+
) {
298+
defer func() {
299+
if recovered := recover(); recovered != nil {
300+
slog.Error(
301+
"mqtt event handler panicked",
302+
"topic", topic,
303+
"error", recovered,
304+
)
305+
}
306+
}()
307+
308+
handler(func(dest any) error {
309+
return json.Unmarshal(payload, dest)
310+
})
311+
}
312+
259313
// Publish sends an event with the given name and payload to all
260314
// subscribers listening for that event. The payload is serialized
261315
// to JSON and the event name is converted to MQTT topic format.

framework/event/mqtt_internal_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,103 @@ func TestMQTTBrokerRouteUnmarshalsJSONPayload(t *testing.T) {
296296

297297
require.Equal(t, "hello world", received)
298298
}
299+
300+
func TestMQTTBrokerRoutePanicDoesNotPropagate(t *testing.T) {
301+
t.Parallel()
302+
303+
broker := &MQTTBroker{
304+
handlers: map[string]map[string]contract.EventHandler{
305+
"user/created": {
306+
"1": func(payload contract.EventPayload) {
307+
panic("handler panic")
308+
},
309+
},
310+
},
311+
}
312+
313+
require.NotPanics(t, func() {
314+
broker.route(&paho.Publish{
315+
Topic: "user/created",
316+
Payload: []byte(`"data"`),
317+
})
318+
})
319+
}
320+
321+
func TestMQTTBrokerRoutePanicDoesNotAffectOtherHandlers(t *testing.T) {
322+
t.Parallel()
323+
324+
var called atomic.Bool
325+
326+
broker := &MQTTBroker{
327+
handlers: map[string]map[string]contract.EventHandler{
328+
"user/created": {
329+
"1": func(payload contract.EventPayload) {
330+
panic("handler panic")
331+
},
332+
"2": func(payload contract.EventPayload) {
333+
called.Store(true)
334+
},
335+
},
336+
},
337+
}
338+
339+
broker.route(&paho.Publish{
340+
Topic: "user/created",
341+
Payload: []byte(`"data"`),
342+
})
343+
344+
require.True(t, called.Load())
345+
}
346+
347+
func TestMQTTBrokerHandlePublishRoutesMessage(t *testing.T) {
348+
t.Parallel()
349+
350+
var called atomic.Bool
351+
352+
broker := &MQTTBroker{
353+
handlers: map[string]map[string]contract.EventHandler{
354+
"user/created": {
355+
"1": func(payload contract.EventPayload) {
356+
called.Store(true)
357+
},
358+
},
359+
},
360+
}
361+
362+
handled, err := broker.HandlePublish(paho.PublishReceived{
363+
Packet: &paho.Publish{
364+
Topic: "user/created",
365+
Payload: []byte(`"hello"`),
366+
},
367+
})
368+
369+
require.NoError(t, err)
370+
require.True(t, handled)
371+
require.True(t, called.Load())
372+
}
373+
374+
func TestMQTTBrokerHandlePublishRecoversPanic(t *testing.T) {
375+
t.Parallel()
376+
377+
broker := &MQTTBroker{
378+
handlers: map[string]map[string]contract.EventHandler{
379+
"user/created": {
380+
"1": func(payload contract.EventPayload) {
381+
panic("handler panic")
382+
},
383+
},
384+
},
385+
}
386+
387+
require.NotPanics(t, func() {
388+
handled, err := broker.HandlePublish(paho.PublishReceived{
389+
Packet: &paho.Publish{
390+
Topic: "user/created",
391+
Payload: []byte(`"data"`),
392+
},
393+
})
394+
395+
require.NoError(t, err)
396+
require.True(t, handled)
397+
})
398+
}

0 commit comments

Comments
 (0)