Skip to content

Commit 91e8f6c

Browse files
committed
fix(dataplane): fix batch replay oom
1 parent d5f9403 commit 91e8f6c

4 files changed

Lines changed: 100 additions & 23 deletions

File tree

api/api.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,7 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler
343343
r.Post("/dynamic", handler.CreateDynamicEvent)
344344
})
345345

346-
// Batch replay route (different middleware - no rate limiting)
347-
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).
346+
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation(), middleware.Pagination).
348347
Post("/batchreplay", handler.BatchReplayEvents)
349348

350349
// Event ID subroutes
@@ -572,7 +571,7 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler
572571
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/fanout", handler.CreateEndpointFanoutEvent)
573572
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/broadcast", handler.CreateBroadcastEvent)
574573
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/dynamic", handler.CreateDynamicEvent)
575-
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/batchreplay", handler.BatchReplayEvents)
574+
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation(), middleware.Pagination).Post("/batchreplay", handler.BatchReplayEvents)
576575

577576
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
578577
eventSubRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Put("/replay", handler.ReplayEndpointEvent)
@@ -754,7 +753,7 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler
754753
portalLinkRouter.Route("/events", func(eventRouter chi.Router) {
755754
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/", handler.CreateEndpointEvent)
756755
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
757-
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/batchreplay", handler.BatchReplayEvents)
756+
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation(), middleware.Pagination).Post("/batchreplay", handler.BatchReplayEvents)
758757
eventRouter.Get("/countbatchreplayevents", handler.CountAffectedEvents)
759758

760759
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
@@ -908,7 +907,7 @@ func (a *ApplicationHandler) mountDataPlaneRoutes(router chi.Router, handler *ha
908907
})
909908

910909
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
911-
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).Post("/batchreplay", handler.BatchReplayEvents)
910+
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation(), middleware.Pagination).Post("/batchreplay", handler.BatchReplayEvents)
912911

913912
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
914913
eventSubRouter.Get("/", handler.GetEndpointEvent)
@@ -959,7 +958,7 @@ func (a *ApplicationHandler) mountDataPlaneRoutes(router chi.Router, handler *ha
959958
eventRouter.Post("/", handler.CreateEndpointEvent)
960959
eventRouter.Post("/fanout", handler.CreateEndpointFanoutEvent)
961960
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
962-
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)
961+
eventRouter.With(middleware.Pagination).Post("/batchreplay", handler.BatchReplayEvents)
963962
eventRouter.Get("/countbatchreplayevents", handler.CountAffectedEvents)
964963

965964
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {
@@ -1002,7 +1001,7 @@ func (a *ApplicationHandler) mountDataPlaneRoutes(router chi.Router, handler *ha
10021001
portalLinkRouter.Route("/events", func(eventRouter chi.Router) {
10031002
eventRouter.Post("/", handler.CreateEndpointEvent)
10041003
eventRouter.With(middleware.Pagination).Get("/", handler.GetEventsPaged)
1005-
eventRouter.Post("/batchreplay", handler.BatchReplayEvents)
1004+
eventRouter.With(middleware.Pagination).Post("/batchreplay", handler.BatchReplayEvents)
10061005
eventRouter.Get("/countbatchreplayevents", handler.CountAffectedEvents)
10071006

10081007
eventRouter.Route("/{eventID}", func(eventSubRouter chi.Router) {

api/handlers/event.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,10 +358,7 @@ func (h *Handler) BatchReplayEvents(w http.ResponseWriter, r *http.Request) {
358358
}
359359

360360
data.Filter.Project = p
361-
ep := datastore.Pageable{}
362-
if data.Filter.Pageable == ep {
363-
data.Filter.Pageable.PerPage = 2000000000
364-
}
361+
data.Filter.Pageable = services.NormalizeBatchReplayPageable(data.Filter.Pageable)
365362

366363
bs := services.BatchReplayEventService{
367364
EndpointRepo: endpoints.New(h.A.Logger, h.A.DB),

services/batch_replay_event.go

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,27 @@ import (
66
"github.com/frain-dev/convoy/datastore"
77
log "github.com/frain-dev/convoy/pkg/logger"
88
"github.com/frain-dev/convoy/queue"
9+
"github.com/frain-dev/convoy/util"
910
)
1011

12+
const BatchReplayPageSize = 1000
13+
14+
func NormalizeBatchReplayPageable(pageable datastore.Pageable) datastore.Pageable {
15+
if pageable.PerPage <= 0 {
16+
pageable.PerPage = BatchReplayPageSize
17+
}
18+
if pageable.PerPage > BatchReplayPageSize {
19+
pageable.PerPage = BatchReplayPageSize
20+
}
21+
if pageable.Direction == "" {
22+
pageable.Direction = datastore.Next
23+
}
24+
if util.IsStringEmpty(pageable.NextCursor) && util.IsStringEmpty(pageable.PrevCursor) {
25+
pageable.SetCursors()
26+
}
27+
return pageable
28+
}
29+
1130
type BatchReplayEventService struct {
1231
EndpointRepo datastore.EndpointRepository
1332
Queue queue.Queuer
@@ -18,28 +37,47 @@ type BatchReplayEventService struct {
1837
}
1938

2039
func (e *BatchReplayEventService) Run(ctx context.Context) (int, int, error) {
21-
events, _, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, e.Filter)
22-
if err != nil {
23-
e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err)
24-
return 0, 0, &ServiceError{ErrMsg: "failed to fetch event deliveries", Err: err}
25-
}
40+
filter := *e.Filter
41+
filter.Pageable = NormalizeBatchReplayPageable(filter.Pageable)
2642

2743
rs := ReplayEventService{
2844
EndpointRepo: e.EndpointRepo,
2945
Queue: e.Queue,
3046
Logger: e.Logger,
3147
}
3248

33-
failures := 0
34-
for _, ev := range events {
35-
rs.Event = &ev
36-
err = rs.Run(ctx)
49+
successes, failures := 0, 0
50+
51+
for {
52+
events, pagination, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, &filter)
3753
if err != nil {
38-
failures++
39-
e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err)
54+
e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err)
55+
return successes, failures, &ServiceError{ErrMsg: "failed to fetch event deliveries", Err: err}
4056
}
57+
58+
if len(events) == 0 {
59+
break
60+
}
61+
62+
pageFailures := 0
63+
for i := range events {
64+
rs.Event = &events[i]
65+
if err = rs.Run(ctx); err != nil {
66+
pageFailures++
67+
e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err)
68+
}
69+
}
70+
71+
successes += len(events) - pageFailures
72+
failures += pageFailures
73+
74+
if !pagination.HasNextPage {
75+
break
76+
}
77+
78+
filter.Pageable.NextCursor = pagination.NextPageCursor
79+
filter.Pageable.PrevCursor = pagination.PrevPageCursor
4180
}
4281

43-
successes := len(events) - failures
4482
return successes, failures, nil
4583
}

services/batch_replay_event_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ func provideBatchReplayEventService(ctrl *gomock.Controller, f *datastore.Filter
2323
}
2424
}
2525

26+
func TestNormalizeBatchReplayPageable(t *testing.T) {
27+
t.Run("defaults empty pageable", func(t *testing.T) {
28+
got := NormalizeBatchReplayPageable(datastore.Pageable{})
29+
require.Equal(t, BatchReplayPageSize, got.PerPage)
30+
require.Equal(t, datastore.Next, got.Direction)
31+
require.NotEmpty(t, got.NextCursor)
32+
})
33+
34+
t.Run("caps oversized pageable", func(t *testing.T) {
35+
got := NormalizeBatchReplayPageable(datastore.Pageable{PerPage: 2000000000})
36+
require.Equal(t, BatchReplayPageSize, got.PerPage)
37+
})
38+
}
39+
2640
func TestBatchReplayEventService_Run(t *testing.T) {
2741
ctx := context.Background()
2842

@@ -99,6 +113,35 @@ func TestBatchReplayEventService_Run(t *testing.T) {
99113
wantErr: false,
100114
wantErrMsg: "",
101115
},
116+
{
117+
name: "should_paginate_through_all_events",
118+
dbFn: func(br *BatchReplayEventService) {
119+
e, _ := br.EventRepo.(*mocks.MockEventRepository)
120+
gomock.InOrder(
121+
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
122+
[]datastore.Event{{UID: "event1", ProjectID: "proj0"}},
123+
datastore.PaginationData{HasNextPage: true, NextPageCursor: "cursor-2"},
124+
nil,
125+
),
126+
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
127+
[]datastore.Event{{UID: "event2", ProjectID: "proj0"}},
128+
datastore.PaginationData{},
129+
nil,
130+
),
131+
)
132+
133+
q, _ := br.Queue.(*mocks.MockQueuer)
134+
q.EXPECT().Write(gomock.Any(), convoy.CreateEventProcessor, convoy.CreateEventQueue, gomock.Any()).Times(2).Return(nil)
135+
},
136+
args: args{
137+
ctx: ctx,
138+
f: &datastore.Filter{
139+
Project: &datastore.Project{UID: "1234"},
140+
},
141+
},
142+
wantSuccesses: 2,
143+
wantFailures: 0,
144+
},
102145
{
103146
name: "should_fail_to_load_events",
104147
dbFn: func(br *BatchReplayEventService) {

0 commit comments

Comments
 (0)