-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransport_event_handler.go
More file actions
92 lines (74 loc) · 2.28 KB
/
transport_event_handler.go
File metadata and controls
92 lines (74 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package mongo_watcher
import (
"context"
"encoding/json"
"github.com/techpro-studio/mongo_watcher/transport"
"go.mongodb.org/mongo-driver/v2/mongo"
"log"
)
type RoomFormatter[T any] interface {
Format(event *Event[T]) string
}
type RoomFormatterFunc[T any] func(event *Event[T]) string
func (r RoomFormatterFunc[T]) Format(event *Event[T]) string {
return r(event)
}
type StaticRoomFormatter[T any] struct {
roomName string
}
func NewStaticRoomFormatter[T any](roomName string) *StaticRoomFormatter[T] {
return &StaticRoomFormatter[T]{roomName: roomName}
}
func (r StaticRoomFormatter[T]) Format(event *Event[T]) string {
return r.roomName
}
type TransportEventHandler[T any] struct {
trans transport.RoomTransport
objectFormatter ObjectFormatter[T]
roomFormatter RoomFormatter[T]
}
type ObjectFormatter[T any] interface {
Format(event *Event[T]) (map[string]string, error)
}
type JsonPayloadObjectFormatter[T any] struct {
}
func NewJsonPayloadObjectFormatter[T any]() *JsonPayloadObjectFormatter[T] {
return &JsonPayloadObjectFormatter[T]{}
}
func (j JsonPayloadObjectFormatter[T]) Format(event *Event[T]) (map[string]string, error) {
jsonData, err := json.Marshal(*event.FullDocument)
if err != nil {
return nil, err
}
result := map[string]string{"payload": string(jsonData)}
return result, nil
}
func NewTransportEventHandler[T any](trans transport.RoomTransport, roomFormatter RoomFormatter[T], formatter ObjectFormatter[T]) *TransportEventHandler[T] {
return &TransportEventHandler[T]{trans: trans, roomFormatter: roomFormatter, objectFormatter: formatter}
}
func (t *TransportEventHandler[T]) Setup(ctx context.Context, collection *mongo.Collection) {
log.Printf("Setting up transport event handler for %s", collection.Name())
}
func (t *TransportEventHandler[T]) HandleEvent(ctx context.Context, event *Event[T]) error {
var sendData map[string]string
switch event.Type {
case MongoEventDelete:
sendData = map[string]string{"id": event.Key}
default:
var err error
sendData, err = t.objectFormatter.Format(event)
if err != nil {
return err
}
}
room := t.roomFormatter.Format(event)
err := t.trans.SendMessage(ctx, &transport.RoomMessage{
Data: sendData,
Room: room,
Event: string(event.Type),
})
if err != nil {
return err
}
return nil
}