Skip to content

Commit 9a23290

Browse files
committed
fix(stateful): bootstrap subtask state from collector checkpoints
1 parent 03274c4 commit 9a23290

2 files changed

Lines changed: 130 additions & 0 deletions

File tree

backend/helpers/pluginhelper/api/subtask_state_manager.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
8888
if err != nil {
8989
return
9090
}
91+
preState, err = bootstrapStateFromCollectorStateIfNeeded(db, preState, args)
92+
if err != nil {
93+
return
94+
}
9195

9296
isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig))
9397

@@ -127,6 +131,41 @@ func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.Subt
127131
return preState, nil
128132
}
129133

134+
func bootstrapStateFromCollectorStateIfNeeded(db dal.Dal, preState *models.SubtaskState, args *SubtaskCommonArgs) (*models.SubtaskState, errors.Error) {
135+
if preState == nil || preState.PrevStartedAt != nil {
136+
return preState, nil
137+
}
138+
if args == nil || args.Table == "" {
139+
return preState, nil
140+
}
141+
142+
rawTable := args.GetRawDataTable()
143+
if rawTable == "" {
144+
return preState, nil
145+
}
146+
147+
collectorState := &models.CollectorLatestState{}
148+
err := db.First(
149+
collectorState,
150+
dal.Where("raw_data_table = ? AND raw_data_params = ?", rawTable, preState.Params),
151+
)
152+
if err != nil {
153+
if db.IsErrorNotFound(err) {
154+
return preState, nil
155+
}
156+
return nil, errors.Default.Wrap(err, "failed to load collector state for subtask bootstrap")
157+
}
158+
159+
if collectorState.LatestSuccessStart != nil {
160+
preState.PrevStartedAt = collectorState.LatestSuccessStart
161+
}
162+
if preState.TimeAfter == nil && collectorState.TimeAfter != nil {
163+
preState.TimeAfter = collectorState.TimeAfter
164+
}
165+
166+
return preState, nil
167+
}
168+
130169
// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time.
131170
func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
132171
if preState == nil || syncPolicy == nil {

backend/helpers/pluginhelper/api/subtask_state_manager_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,94 @@ func TestSubtaskStateManager(t *testing.T) {
187187
})
188188
}
189189
}
190+
191+
func TestBootstrapStateFromCollectorStateIfNeeded(t *testing.T) {
192+
latest := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:05:31Z"))
193+
timeAfter := errors.Must1(time.Parse(time.RFC3339, "2024-12-20T00:00:00Z"))
194+
195+
t.Run("bootstrap prev_started_at and time_after from collector state", func(t *testing.T) {
196+
mockDal := new(mockdal.Dal)
197+
mockDal.On("First", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
198+
dst := args.Get(0).(*models.CollectorLatestState)
199+
dst.LatestSuccessStart = &latest
200+
dst.TimeAfter = &timeAfter
201+
}).Return(nil).Once()
202+
203+
state := &models.SubtaskState{
204+
Plugin: "github",
205+
Subtask: "Convert Jobs",
206+
Params: `{"ConnectionId":1,"Name":"AkerBP/autogration"}`,
207+
}
208+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
209+
210+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
211+
assert.Nil(t, err)
212+
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
213+
assert.True(t, bootstrapped.PrevStartedAt.Equal(latest))
214+
}
215+
if assert.NotNil(t, bootstrapped.TimeAfter) {
216+
assert.True(t, bootstrapped.TimeAfter.Equal(timeAfter))
217+
}
218+
mockDal.AssertExpectations(t)
219+
})
220+
221+
t.Run("do not overwrite existing prev_started_at", func(t *testing.T) {
222+
mockDal := new(mockdal.Dal)
223+
224+
existing := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:06:37Z"))
225+
state := &models.SubtaskState{
226+
Plugin: "github",
227+
Subtask: "Convert Jobs",
228+
Params: `{"ConnectionId":1,"Name":"AkerBP/autogration"}`,
229+
PrevStartedAt: &existing,
230+
}
231+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
232+
233+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
234+
assert.Nil(t, err)
235+
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
236+
assert.True(t, bootstrapped.PrevStartedAt.Equal(existing))
237+
}
238+
mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything)
239+
})
240+
241+
t.Run("ignore not found collector state", func(t *testing.T) {
242+
mockDal := new(mockdal.Dal)
243+
notFoundErr := errors.Default.New("record not found")
244+
mockDal.On("First", mock.Anything, mock.Anything).Return(notFoundErr).Once()
245+
mockDal.On("IsErrorNotFound", notFoundErr).Return(true).Once()
246+
247+
state := &models.SubtaskState{
248+
Plugin: "github",
249+
Subtask: "Convert Jobs",
250+
Params: `{"ConnectionId":1,"Name":"AkerBP/autogration"}`,
251+
}
252+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
253+
254+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
255+
assert.Nil(t, err)
256+
assert.Nil(t, bootstrapped.PrevStartedAt)
257+
assert.Nil(t, bootstrapped.TimeAfter)
258+
mockDal.AssertExpectations(t)
259+
})
260+
261+
t.Run("return error when collector state query fails", func(t *testing.T) {
262+
mockDal := new(mockdal.Dal)
263+
dbErr := errors.Default.New("db unavailable")
264+
mockDal.On("First", mock.Anything, mock.Anything).Return(dbErr).Once()
265+
mockDal.On("IsErrorNotFound", dbErr).Return(false).Once()
266+
267+
state := &models.SubtaskState{
268+
Plugin: "github",
269+
Subtask: "Convert Jobs",
270+
Params: `{"ConnectionId":1,"Name":"AkerBP/autogration"}`,
271+
}
272+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
273+
274+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
275+
assert.Nil(t, bootstrapped)
276+
assert.NotNil(t, err)
277+
assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap")
278+
mockDal.AssertExpectations(t)
279+
})
280+
}

0 commit comments

Comments
 (0)