Skip to content

Commit e26c61b

Browse files
feat(github): Add incremental data collection (#8858)
* fix(github): use RFC 3339 format for since= incremental API parameters The since= query parameter passed to the GitHub API in four collectors was formatted using Go's time.Time.String(), which produces a human-readable string (e.g. "2024-01-15 10:30:00 +0000 UTC") rather than the ISO 8601 / RFC 3339 format the GitHub API requires (e.g. "2024-01-15T10:30:00Z"). The GitHub API silently ignores malformed date strings, causing these collectors to perform full re-scans on every incremental run despite appearing to filter correctly. Fix by using .UTC().Format(time.RFC3339) in all four affected collectors: - comment_collector.go - issue_collector.go - commit_collector.go - pr_review_comment_collector.go (cherry picked from commit cf3cb621462d8ae661df5fcb8e1b47c70564cd60) * feat(github): migrate event extractor to StatefulApiExtractor Switch Extract Events from the legacy full-scan NewApiExtractor to NewStatefulApiExtractor, which filters _raw_github_api_events by created_at >= last_run_start on incremental syncs. This table had 13,828 rows in a representative production run and took ~117s to process on every run regardless of how few new events were collected. After this change incremental runs process only newly collected rows. (cherry picked from commit 84f324f65e8576cd2ff0ca8b74a2588ee2600e12) * feat(github): migrate PR extractor to StatefulApiExtractor Switch Extract Pull Requests from legacy full-scan NewApiExtractor to NewStatefulApiExtractor. The _raw_github_api_pull_requests table had 12,448 rows in a representative production run, taking ~108s on every incremental sync. After this change only newly collected PR rows are processed. SubtaskConfig captures prType and prComponent regex strings so that a scope config change automatically triggers a full re-extract. BeforeExtract deletes GithubPrLabel rows for the current PR before re-inserting them in incremental mode, preventing stale labels from persisting when labels are removed upstream. (cherry picked from commit 80dc76d4e5f1ad6c65d48d42f60f50b87c9dad2d) * feat(github): migrate workflow run and PR commit extractors to StatefulApiExtractor Switch Extract Workflow Runs (~7,364 raw rows, ~65s) and Extract PR Commits (~6,982 raw rows, ~60s) from legacy full-scan NewApiExtractor to NewStatefulApiExtractor. Both extractors are simple mappings with no scope-config dependency, so no SubtaskConfig or BeforeExtract needed. (cherry picked from commit ea4f158a65ad0f7a2c23ba7bbc9932059e2ca408) * feat(github): migrate remaining high-volume extractors to StatefulApiExtractor Migrate Extract Jobs (~5,369 rows, ~48s), Extract PR Reviews (~3,073 rows, ~27s), and Extract PR Review Comments (~1,820 rows, ~16s) from legacy full-scan NewApiExtractor to NewStatefulApiExtractor. Also moves prUrlRegex compilation in pr_review_comment_extractor.go from inside the Extract closure (recompiled on every raw row) to before the extractor is created, eliminating redundant regexp compilation. (cherry picked from commit 54d601587bc74ebd0f1103345c545571146739b5) * feat(github): migrate remaining low-volume extractors to StatefulApiExtractor Migrate the final seven GitHub extractors to NewStatefulApiExtractor: issue, comment, account, account_org, milestone, commit, commit_stats. issue_extractor gains SubtaskConfig (issue classification regex strings) so scope config changes trigger automatic full re-extraction, and BeforeExtract cleanup for GithubIssueLabel and GithubIssueAssignee rows in incremental mode to prevent stale labels/assignees persisting after upstream removal. All other extractors in this commit are simple migrations with no config-sensitivity or child record cleanup needed. With this commit all 14 GitHub plugin extractors are now incremental. Combined with the collector fixes in earlier commits, incremental collection runs that previously took 9+ minutes in the extract phase will now complete in seconds when few or no new records were collected. (cherry picked from commit 606acea88caef63662733162faa47a6c6d3155cc) * feat(github): migrate CICD converters to StatefulDataConverter Convert Workflow Runs and Convert Jobs now use NewStatefulDataConverter, skipping records unchanged since last run on incremental pipelines. Jobs are filtered via JOIN on _tool_github_runs.github_updated_at. (cherry picked from commit 75a909efccab1d04bfae4058f4674663b00762a6) * feat(github): migrate PR supporting data converters to StatefulDataConverter Convert PR Commits, Convert PR Comments, and Convert PR Reviews now use NewStatefulDataConverter. Child-of-PR records are filtered incrementally via JOIN on _tool_github_pull_requests.github_updated_at; PR comments additionally filter on their own github_updated_at. (cherry picked from commit 17cbc2e4caff7349fa2d3389d7cafcc5d0edee71) * feat(github): migrate PR main and cross converters to StatefulDataConverter Convert Pull Requests filters by GithubPullRequest.github_updated_at. Convert Reviews and Convert PR Issues filter via JOIN on pull_requests github_updated_at since reviewers and pr_issues have no own timestamp. (cherry picked from commit 47437ae49d30b967b443c964a4b1baca34344acc) * feat(github): migrate remaining data converters to StatefulDataConverter Migrate the last 9 converters from DataConverter to StatefulDataConverter so they skip already-processed records on incremental runs: - issue_convertor: filter on github_updated_at - issue_comment_convertor: filter on github_updated_at - issue_label_convertor: JOIN to issues, filter on issues.github_updated_at - issue_assignee_convertor: JOIN to issues, filter on issues.github_updated_at - pr_label_convertor: JOIN to pull_requests, filter on pr.github_updated_at - account_convertor: filter on updated_at - release_convertor: filter on updated_at - repo_convertor: filter on updated_at; retain GithubApiRepo struct (used by pr_extractor) - commit_convertor: filter on authored_date (cherry picked from commit 7bcc9da4676d34d9cd24e41a68765cf85723ac81) * fix(github): keep incremental commit and PR-issue processing accurate * fix(github): bootstrap workflow runs incremental window from tool state * feat(runner): include processed record count in subtask finish logs * fix(stateful): bootstrap subtask state from collector checkpoints * fix(github): optimize incremental Convert Jobs query * fix(github): resolve lint and staticcheck issues in converters * fix(stateful): tolerate missing collector state table in bootstrap * fix(github): address CI failures in state bootstrap and run tests * fix(server): use interface{} for store handler swag annotations (#8859) Signed-off-by: yamoyamoto <yamo7yamoto@gmail.com> * fix(github): stabilize e2e state bootstrap and issue assignee join * fix(stateful): skip collector bootstrap lookup when state table is absent * test(stateful): use generic scope params in bootstrap tests * fix(github): keep issue assignee conversion full-sync safe --------- Signed-off-by: yamoyamoto <yamo7yamoto@gmail.com> Co-authored-by: Tomoya Kawaguchi <68677002+yamoyamoto@users.noreply.github.com>
1 parent ac73e27 commit e26c61b

44 files changed

Lines changed: 955 additions & 763 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/core/plugin/plugin_task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type ExecContext interface {
5353
GetData() interface{}
5454
SetProgress(current int, total int)
5555
IncProgress(quantity int)
56+
GetProgress() int
5657
}
5758

5859
// SubTaskContext This interface define all resources that needed for subtask execution

backend/core/runner/run_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func RunPluginSubTasks(
336336
logger.Info("executing subtask %s", subtaskMeta.Name)
337337
start := time.Now()
338338
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
339-
logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
339+
logger.Info("subtask %s finished in %d ms, records processed: %d", subtaskMeta.Name, time.Since(start).Milliseconds(), subtaskCtx.GetProgress())
340340
if err != nil {
341341
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
342342
logger.Error(err, "")

backend/helpers/pluginhelper/api/subtask_state_manager.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package api
2020
import (
2121
"fmt"
2222
"reflect"
23+
"strings"
2324
"time"
2425

2526
"github.com/apache/incubator-devlake/core/dal"
@@ -88,6 +89,10 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
8889
if err != nil {
8990
return
9091
}
92+
preState, err = bootstrapStateFromCollectorStateIfNeeded(db, preState, args)
93+
if err != nil {
94+
return
95+
}
9196

9297
isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig))
9398

@@ -127,6 +132,56 @@ func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.Subt
127132
return preState, nil
128133
}
129134

135+
func bootstrapStateFromCollectorStateIfNeeded(db dal.Dal, preState *models.SubtaskState, args *SubtaskCommonArgs) (*models.SubtaskState, errors.Error) {
136+
if preState == nil || preState.PrevStartedAt != nil {
137+
return preState, nil
138+
}
139+
if args == nil || args.Table == "" {
140+
return preState, nil
141+
}
142+
if !db.HasTable(&models.CollectorLatestState{}) {
143+
return preState, nil
144+
}
145+
146+
rawTable := args.GetRawDataTable()
147+
if rawTable == "" {
148+
return preState, nil
149+
}
150+
151+
collectorState := &models.CollectorLatestState{}
152+
err := db.First(
153+
collectorState,
154+
dal.Where("raw_data_table = ? AND raw_data_params = ?", rawTable, preState.Params),
155+
)
156+
if err != nil {
157+
if db.IsErrorNotFound(err) || isStateTableNotReadyError(err) {
158+
return preState, nil
159+
}
160+
return nil, errors.Default.Wrap(err, "failed to load collector state for subtask bootstrap")
161+
}
162+
163+
if collectorState.LatestSuccessStart != nil {
164+
preState.PrevStartedAt = collectorState.LatestSuccessStart
165+
}
166+
if preState.TimeAfter == nil && collectorState.TimeAfter != nil {
167+
preState.TimeAfter = collectorState.TimeAfter
168+
}
169+
170+
return preState, nil
171+
}
172+
173+
func isStateTableNotReadyError(err error) bool {
174+
if err == nil {
175+
return false
176+
}
177+
msg := strings.ToLower(err.Error())
178+
return strings.Contains(msg, "_devlake_collector_latest_state") &&
179+
(strings.Contains(msg, "doesn't exist") ||
180+
strings.Contains(msg, "does not exist") ||
181+
strings.Contains(msg, "unknown table") ||
182+
strings.Contains(msg, "no such table"))
183+
}
184+
130185
// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time.
131186
func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
132187
if preState == nil || syncPolicy == nil {

backend/helpers/pluginhelper/api/subtask_state_manager_test.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"github.com/stretchr/testify/mock"
3131
)
3232

33+
const testGithubScopeParams = `{"ConnectionId":1,"Name":"test/repo"}`
34+
3335
func TestSubtaskStateManager(t *testing.T) {
3436
time0 := errors.Must1(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z"))
3537
time1 := errors.Must1(time.Parse(time.RFC3339, "2021-01-01T00:00:00Z"))
@@ -187,3 +189,170 @@ func TestSubtaskStateManager(t *testing.T) {
187189
})
188190
}
189191
}
192+
193+
func TestBootstrapStateFromCollectorStateIfNeeded(t *testing.T) {
194+
latest := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:05:31Z"))
195+
timeAfter := errors.Must1(time.Parse(time.RFC3339, "2024-12-20T00:00:00Z"))
196+
197+
t.Run("bootstrap prev_started_at and time_after from collector state", func(t *testing.T) {
198+
mockDal := new(mockdal.Dal)
199+
mockDal.On("HasTable", mock.Anything).Return(true).Once()
200+
mockDal.On("First", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
201+
dst := args.Get(0).(*models.CollectorLatestState)
202+
dst.LatestSuccessStart = &latest
203+
dst.TimeAfter = &timeAfter
204+
}).Return(nil).Once()
205+
206+
state := &models.SubtaskState{
207+
Plugin: "github",
208+
Subtask: "Convert Jobs",
209+
Params: testGithubScopeParams,
210+
}
211+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
212+
213+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
214+
assert.Nil(t, err)
215+
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
216+
assert.True(t, bootstrapped.PrevStartedAt.Equal(latest))
217+
}
218+
if assert.NotNil(t, bootstrapped.TimeAfter) {
219+
assert.True(t, bootstrapped.TimeAfter.Equal(timeAfter))
220+
}
221+
mockDal.AssertExpectations(t)
222+
})
223+
224+
t.Run("do not overwrite existing prev_started_at", func(t *testing.T) {
225+
mockDal := new(mockdal.Dal)
226+
227+
existing := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:06:37Z"))
228+
state := &models.SubtaskState{
229+
Plugin: "github",
230+
Subtask: "Convert Jobs",
231+
Params: testGithubScopeParams,
232+
PrevStartedAt: &existing,
233+
}
234+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
235+
236+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
237+
assert.Nil(t, err)
238+
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
239+
assert.True(t, bootstrapped.PrevStartedAt.Equal(existing))
240+
}
241+
mockDal.AssertNotCalled(t, "HasTable", mock.Anything)
242+
mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything)
243+
})
244+
245+
t.Run("ignore not found collector state", func(t *testing.T) {
246+
mockDal := new(mockdal.Dal)
247+
notFoundErr := errors.Default.New("record not found")
248+
mockDal.On("HasTable", mock.Anything).Return(true).Once()
249+
mockDal.On("First", mock.Anything, mock.Anything).Return(notFoundErr).Once()
250+
mockDal.On("IsErrorNotFound", notFoundErr).Return(true).Once()
251+
252+
state := &models.SubtaskState{
253+
Plugin: "github",
254+
Subtask: "Convert Jobs",
255+
Params: testGithubScopeParams,
256+
}
257+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
258+
259+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
260+
assert.Nil(t, err)
261+
assert.Nil(t, bootstrapped.PrevStartedAt)
262+
assert.Nil(t, bootstrapped.TimeAfter)
263+
mockDal.AssertExpectations(t)
264+
})
265+
266+
t.Run("return error when collector state query fails", func(t *testing.T) {
267+
mockDal := new(mockdal.Dal)
268+
dbErr := errors.Default.New("db unavailable")
269+
mockDal.On("HasTable", mock.Anything).Return(true).Once()
270+
mockDal.On("First", mock.Anything, mock.Anything).Return(dbErr).Once()
271+
mockDal.On("IsErrorNotFound", dbErr).Return(false).Once()
272+
273+
state := &models.SubtaskState{
274+
Plugin: "github",
275+
Subtask: "Convert Jobs",
276+
Params: testGithubScopeParams,
277+
}
278+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
279+
280+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
281+
assert.Nil(t, bootstrapped)
282+
assert.NotNil(t, err)
283+
assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap")
284+
mockDal.AssertExpectations(t)
285+
})
286+
287+
t.Run("ignore missing collector table errors", func(t *testing.T) {
288+
mockDal := new(mockdal.Dal)
289+
mockDal.On("HasTable", mock.Anything).Return(true).Once()
290+
tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist")
291+
mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once()
292+
mockDal.On("IsErrorNotFound", tableErr).Return(false).Once()
293+
294+
state := &models.SubtaskState{
295+
Plugin: "github",
296+
Subtask: "Convert Jobs",
297+
Params: testGithubScopeParams,
298+
}
299+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
300+
301+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
302+
assert.Nil(t, err)
303+
assert.NotNil(t, bootstrapped)
304+
assert.Nil(t, bootstrapped.PrevStartedAt)
305+
assert.Nil(t, bootstrapped.TimeAfter)
306+
mockDal.AssertExpectations(t)
307+
})
308+
309+
t.Run("do not ignore missing unrelated table errors", func(t *testing.T) {
310+
mockDal := new(mockdal.Dal)
311+
mockDal.On("HasTable", mock.Anything).Return(true).Once()
312+
tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist")
313+
mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once()
314+
mockDal.On("IsErrorNotFound", tableErr).Return(false).Once()
315+
316+
state := &models.SubtaskState{
317+
Plugin: "github",
318+
Subtask: "Convert Jobs",
319+
Params: testGithubScopeParams,
320+
}
321+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
322+
323+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
324+
assert.Nil(t, bootstrapped)
325+
assert.NotNil(t, err)
326+
assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap")
327+
mockDal.AssertExpectations(t)
328+
})
329+
330+
t.Run("skip collector lookup when collector state table does not exist", func(t *testing.T) {
331+
mockDal := new(mockdal.Dal)
332+
mockDal.On("HasTable", mock.Anything).Return(false).Once()
333+
334+
state := &models.SubtaskState{
335+
Plugin: "github",
336+
Subtask: "Convert Jobs",
337+
Params: testGithubScopeParams,
338+
}
339+
args := &SubtaskCommonArgs{Table: "github_api_jobs"}
340+
341+
bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
342+
assert.Nil(t, err)
343+
assert.NotNil(t, bootstrapped)
344+
assert.Nil(t, bootstrapped.PrevStartedAt)
345+
assert.Nil(t, bootstrapped.TimeAfter)
346+
mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything)
347+
mockDal.AssertExpectations(t)
348+
})
349+
}
350+
351+
func TestIsStateTableNotReadyError(t *testing.T) {
352+
assert.False(t, isStateTableNotReadyError(nil))
353+
assert.True(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist")))
354+
assert.True(t, isStateTableNotReadyError(errors.Default.New("pq: relation \"_devlake_collector_latest_state\" does not exist")))
355+
assert.True(t, isStateTableNotReadyError(errors.Default.New("no such table: _devlake_collector_latest_state")))
356+
assert.False(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist")))
357+
assert.False(t, isStateTableNotReadyError(errors.Default.New("db unavailable")))
358+
}

backend/impls/context/default_exec_context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ func (c *defaultExecContext) IncProgress(progressType plugin.ProgressType, quant
9696
}
9797
}
9898

99+
func (c *defaultExecContext) GetProgress() int {
100+
return int(atomic.LoadInt64(&c.current))
101+
}
102+
99103
func (c *defaultExecContext) fork(name string) *defaultExecContext {
100104
return newDefaultExecContext(
101105
c.ctx,

backend/plugins/github/tasks/account_convertor.go

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21-
"reflect"
2221
"strings"
2322

2423
"github.com/apache/incubator-devlake/core/dal"
2524
"github.com/apache/incubator-devlake/core/errors"
26-
"github.com/apache/incubator-devlake/core/models/common"
2725
"github.com/apache/incubator-devlake/core/models/domainlayer"
2826
"github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
2927
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
@@ -49,50 +47,44 @@ var ConvertAccountsMeta = plugin.SubTaskMeta{
4947
ProductTables: []string{crossdomain.Account{}.TableName()},
5048
}
5149

52-
type GithubAccountWithOrg struct {
53-
models.GithubAccount
54-
Login string `json:"login" gorm:"type:varchar(255)"`
55-
common.NoPKModel
56-
}
57-
5850
func ConvertAccounts(taskCtx plugin.SubTaskContext) errors.Error {
5951
db := taskCtx.GetDal()
6052
data := taskCtx.GetData().(*GithubTaskData)
6153

62-
cursor, err := db.Cursor(
63-
dal.Select("_tool_github_accounts.*"),
64-
dal.From(&models.GithubAccount{}),
65-
dal.Where(
66-
"repo_github_id = ? and _tool_github_accounts.connection_id=?",
67-
data.Options.GithubId,
68-
data.Options.ConnectionId,
69-
),
70-
dal.Join(`left join _tool_github_repo_accounts gra on (
71-
_tool_github_accounts.connection_id = gra.connection_id
72-
AND _tool_github_accounts.id = gra.account_id
73-
)`),
74-
)
75-
if err != nil {
76-
return err
77-
}
78-
defer cursor.Close()
79-
8054
accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{})
8155

82-
converter, err := api.NewDataConverter(api.DataConverterArgs{
83-
InputRowType: reflect.TypeOf(models.GithubAccount{}),
84-
Input: cursor,
85-
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
86-
Ctx: taskCtx,
56+
converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubAccount]{
57+
SubtaskCommonArgs: &api.SubtaskCommonArgs{
58+
SubTaskContext: taskCtx,
59+
Table: RAW_ACCOUNT_TABLE,
8760
Params: GithubApiParams{
8861
ConnectionId: data.Options.ConnectionId,
8962
Name: data.Options.Name,
9063
},
91-
Table: RAW_ACCOUNT_TABLE,
9264
},
93-
Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
94-
githubUser := inputRow.(*models.GithubAccount)
95-
65+
Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) {
66+
clauses := []dal.Clause{
67+
dal.Select("_tool_github_accounts.*"),
68+
dal.From(&models.GithubAccount{}),
69+
dal.Where(
70+
"repo_github_id = ? and _tool_github_accounts.connection_id=?",
71+
data.Options.GithubId,
72+
data.Options.ConnectionId,
73+
),
74+
dal.Join(`left join _tool_github_repo_accounts gra on (
75+
_tool_github_accounts.connection_id = gra.connection_id
76+
AND _tool_github_accounts.id = gra.account_id
77+
)`),
78+
}
79+
if stateManager.IsIncremental() {
80+
since := stateManager.GetSince()
81+
if since != nil {
82+
clauses = append(clauses, dal.Where("_tool_github_accounts.updated_at >= ?", since))
83+
}
84+
}
85+
return db.Cursor(clauses...)
86+
},
87+
Convert: func(githubUser *models.GithubAccount) ([]interface{}, errors.Error) {
9688
// query related orgs
9789
var orgs []string
9890
err := db.Pluck(`org_login`, &orgs,

0 commit comments

Comments
 (0)