Skip to content

Commit 525940f

Browse files
committed
beholder - batch emit events
1 parent 16c179a commit 525940f

2 files changed

Lines changed: 22 additions & 14 deletions

File tree

pkg/beholder/chip_ingress_emitter.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,34 @@ func (c *ChipIngressEmitter) Close() error {
2626
}
2727

2828
func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
29+
return c.BatchEmit(ctx, Message{
30+
Body: body,
31+
Attrs: ExtractAttributes(attrKVs...),
32+
})
33+
}
2934

30-
sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...)
31-
if err != nil {
32-
return err
33-
}
35+
func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages ...Message) error {
36+
events := make([]chipingress.CloudEvent, len(messages))
37+
for i, msg := range messages {
38+
sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs)
39+
if err != nil {
40+
return err
41+
}
3442

35-
event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...))
36-
if err != nil {
37-
return err
43+
event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs)
44+
if err != nil {
45+
return err
46+
}
47+
48+
events[i] = event
3849
}
3950

40-
eventPb, err := chipingress.EventToProto(event)
51+
eventPb, err := chipingress.EventsToBatch(events)
4152
if err != nil {
4253
return fmt.Errorf("failed to convert event to proto: %w", err)
4354
}
4455

45-
_, err = c.client.Publish(ctx, eventPb)
56+
_, err = c.client.PublishBatch(ctx, eventPb)
4657
if err != nil {
4758
return err
4859
}
@@ -51,10 +62,7 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a
5162
}
5263

5364
// ExtractSourceAndType extracts source domain and entity from the attributes
54-
func ExtractSourceAndType(attrKVs ...any) (string, string, error) {
55-
56-
attributes := newAttributes(attrKVs...)
57-
65+
func ExtractSourceAndType(attributes Attributes) (string, string, error) {
5866
var sourceDomain string
5967
var entityType string
6068

pkg/beholder/chip_ingress_emitter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func TestExtractSourceAndType(t *testing.T) {
190190

191191
for _, tt := range tests {
192192
t.Run(tt.name, func(t *testing.T) {
193-
domain, entity, err := beholder.ExtractSourceAndType(tt.attrs...)
193+
domain, entity, err := beholder.ExtractSourceAndType(beholder.ExtractAttributes(tt.attrs))
194194

195195
if tt.wantErr {
196196
if err == nil {

0 commit comments

Comments
 (0)