Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d5f5a02
fix(github): use RFC 3339 format for since= incremental API parameters
lrf-nitro Apr 24, 2026
544d724
feat(github): migrate event extractor to StatefulApiExtractor
lrf-nitro Apr 24, 2026
f15baea
feat(github): migrate PR extractor to StatefulApiExtractor
lrf-nitro Apr 24, 2026
d8cea9b
feat(github): migrate workflow run and PR commit extractors to Statef…
lrf-nitro Apr 24, 2026
6842dfc
feat(github): migrate remaining high-volume extractors to StatefulApi…
lrf-nitro Apr 24, 2026
fe10970
feat(github): migrate remaining low-volume extractors to StatefulApiE…
lrf-nitro Apr 24, 2026
bab2fb0
feat(github): migrate CICD converters to StatefulDataConverter
lrf-nitro Apr 24, 2026
2fdadba
feat(github): migrate PR supporting data converters to StatefulDataCo…
lrf-nitro Apr 24, 2026
8e94537
feat(github): migrate PR main and cross converters to StatefulDataCon…
lrf-nitro Apr 24, 2026
09b6de7
feat(github): migrate remaining data converters to StatefulDataConverter
lrf-nitro Apr 27, 2026
d991671
fix(github): keep incremental commit and PR-issue processing accurate
lrf-nitro Apr 27, 2026
3538c53
fix(github): bootstrap workflow runs incremental window from tool state
lrf-nitro Apr 27, 2026
03274c4
feat(runner): include processed record count in subtask finish logs
lrf-nitro Apr 27, 2026
9a23290
fix(stateful): bootstrap subtask state from collector checkpoints
lrf-nitro Apr 27, 2026
7b1e8f1
fix(github): optimize incremental Convert Jobs query
lrf-nitro Apr 27, 2026
f543864
fix(github): resolve lint and staticcheck issues in converters
lrf-nitro Apr 28, 2026
0c3c6e1
fix(stateful): tolerate missing collector state table in bootstrap
lrf-nitro Apr 28, 2026
4924878
fix(github): address CI failures in state bootstrap and run tests
lrf-nitro Apr 28, 2026
2202017
fix(server): use interface{} for store handler swag annotations (#8859)
yamoyamoto Apr 28, 2026
95bb07a
Merge branch 'main' into feat/githup-incremental-data-collect
lrf-nitro Apr 28, 2026
ece87a4
fix(github): stabilize e2e state bootstrap and issue assignee join
lrf-nitro Apr 29, 2026
dcd0ca5
fix(stateful): skip collector bootstrap lookup when state table is ab…
lrf-nitro Apr 29, 2026
27e1020
test(stateful): use generic scope params in bootstrap tests
lrf-nitro Apr 29, 2026
e31b5c2
fix(github): keep issue assignee conversion full-sync safe
lrf-nitro Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/core/plugin/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ExecContext interface {
GetData() interface{}
SetProgress(current int, total int)
IncProgress(quantity int)
GetProgress() int
}

// SubTaskContext This interface define all resources that needed for subtask execution
Expand Down
2 changes: 1 addition & 1 deletion backend/core/runner/run_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func RunPluginSubTasks(
logger.Info("executing subtask %s", subtaskMeta.Name)
start := time.Now()
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
logger.Info("subtask %s finished in %d ms, records processed: %d", subtaskMeta.Name, time.Since(start).Milliseconds(), subtaskCtx.GetProgress())
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
Expand Down
55 changes: 55 additions & 0 deletions backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package api
import (
"fmt"
"reflect"
"strings"
"time"

"github.com/apache/incubator-devlake/core/dal"
Expand Down Expand Up @@ -88,6 +89,10 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
if err != nil {
return
}
preState, err = bootstrapStateFromCollectorStateIfNeeded(db, preState, args)
if err != nil {
return
}

isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig))

Expand Down Expand Up @@ -127,6 +132,56 @@ func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.Subt
return preState, nil
}

func bootstrapStateFromCollectorStateIfNeeded(db dal.Dal, preState *models.SubtaskState, args *SubtaskCommonArgs) (*models.SubtaskState, errors.Error) {
if preState == nil || preState.PrevStartedAt != nil {
return preState, nil
}
if args == nil || args.Table == "" {
return preState, nil
}
if !db.HasTable(&models.CollectorLatestState{}) {
return preState, nil
}

rawTable := args.GetRawDataTable()
if rawTable == "" {
return preState, nil
}

collectorState := &models.CollectorLatestState{}
err := db.First(
collectorState,
dal.Where("raw_data_table = ? AND raw_data_params = ?", rawTable, preState.Params),
)
if err != nil {
if db.IsErrorNotFound(err) || isStateTableNotReadyError(err) {
return preState, nil
}
return nil, errors.Default.Wrap(err, "failed to load collector state for subtask bootstrap")
}

if collectorState.LatestSuccessStart != nil {
preState.PrevStartedAt = collectorState.LatestSuccessStart
}
if preState.TimeAfter == nil && collectorState.TimeAfter != nil {
preState.TimeAfter = collectorState.TimeAfter
}

return preState, nil
}

func isStateTableNotReadyError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "_devlake_collector_latest_state") &&
(strings.Contains(msg, "doesn't exist") ||
strings.Contains(msg, "does not exist") ||
strings.Contains(msg, "unknown table") ||
strings.Contains(msg, "no such table"))
}

// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time.
func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
if preState == nil || syncPolicy == nil {
Expand Down
169 changes: 169 additions & 0 deletions backend/helpers/pluginhelper/api/subtask_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/stretchr/testify/mock"
)

const testGithubScopeParams = `{"ConnectionId":1,"Name":"test/repo"}`

func TestSubtaskStateManager(t *testing.T) {
time0 := errors.Must1(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z"))
time1 := errors.Must1(time.Parse(time.RFC3339, "2021-01-01T00:00:00Z"))
Expand Down Expand Up @@ -187,3 +189,170 @@ func TestSubtaskStateManager(t *testing.T) {
})
}
}

func TestBootstrapStateFromCollectorStateIfNeeded(t *testing.T) {
latest := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:05:31Z"))
timeAfter := errors.Must1(time.Parse(time.RFC3339, "2024-12-20T00:00:00Z"))

t.Run("bootstrap prev_started_at and time_after from collector state", func(t *testing.T) {
mockDal := new(mockdal.Dal)
mockDal.On("HasTable", mock.Anything).Return(true).Once()
mockDal.On("First", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
dst := args.Get(0).(*models.CollectorLatestState)
dst.LatestSuccessStart = &latest
dst.TimeAfter = &timeAfter
}).Return(nil).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, err)
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
assert.True(t, bootstrapped.PrevStartedAt.Equal(latest))
}
if assert.NotNil(t, bootstrapped.TimeAfter) {
assert.True(t, bootstrapped.TimeAfter.Equal(timeAfter))
}
mockDal.AssertExpectations(t)
})

t.Run("do not overwrite existing prev_started_at", func(t *testing.T) {
mockDal := new(mockdal.Dal)

existing := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:06:37Z"))
state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
PrevStartedAt: &existing,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, err)
if assert.NotNil(t, bootstrapped.PrevStartedAt) {
assert.True(t, bootstrapped.PrevStartedAt.Equal(existing))
}
mockDal.AssertNotCalled(t, "HasTable", mock.Anything)
mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything)
})

t.Run("ignore not found collector state", func(t *testing.T) {
mockDal := new(mockdal.Dal)
notFoundErr := errors.Default.New("record not found")
mockDal.On("HasTable", mock.Anything).Return(true).Once()
mockDal.On("First", mock.Anything, mock.Anything).Return(notFoundErr).Once()
mockDal.On("IsErrorNotFound", notFoundErr).Return(true).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, err)
assert.Nil(t, bootstrapped.PrevStartedAt)
assert.Nil(t, bootstrapped.TimeAfter)
mockDal.AssertExpectations(t)
})

t.Run("return error when collector state query fails", func(t *testing.T) {
mockDal := new(mockdal.Dal)
dbErr := errors.Default.New("db unavailable")
mockDal.On("HasTable", mock.Anything).Return(true).Once()
mockDal.On("First", mock.Anything, mock.Anything).Return(dbErr).Once()
mockDal.On("IsErrorNotFound", dbErr).Return(false).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, bootstrapped)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap")
mockDal.AssertExpectations(t)
})

t.Run("ignore missing collector table errors", func(t *testing.T) {
mockDal := new(mockdal.Dal)
mockDal.On("HasTable", mock.Anything).Return(true).Once()
tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist")
mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once()
mockDal.On("IsErrorNotFound", tableErr).Return(false).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, err)
assert.NotNil(t, bootstrapped)
assert.Nil(t, bootstrapped.PrevStartedAt)
assert.Nil(t, bootstrapped.TimeAfter)
mockDal.AssertExpectations(t)
})

t.Run("do not ignore missing unrelated table errors", func(t *testing.T) {
mockDal := new(mockdal.Dal)
mockDal.On("HasTable", mock.Anything).Return(true).Once()
tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist")
mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once()
mockDal.On("IsErrorNotFound", tableErr).Return(false).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, bootstrapped)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap")
mockDal.AssertExpectations(t)
})

t.Run("skip collector lookup when collector state table does not exist", func(t *testing.T) {
mockDal := new(mockdal.Dal)
mockDal.On("HasTable", mock.Anything).Return(false).Once()

state := &models.SubtaskState{
Plugin: "github",
Subtask: "Convert Jobs",
Params: testGithubScopeParams,
}
args := &SubtaskCommonArgs{Table: "github_api_jobs"}

bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args)
assert.Nil(t, err)
assert.NotNil(t, bootstrapped)
assert.Nil(t, bootstrapped.PrevStartedAt)
assert.Nil(t, bootstrapped.TimeAfter)
mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything)
mockDal.AssertExpectations(t)
})
}

func TestIsStateTableNotReadyError(t *testing.T) {
assert.False(t, isStateTableNotReadyError(nil))
assert.True(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist")))
assert.True(t, isStateTableNotReadyError(errors.Default.New("pq: relation \"_devlake_collector_latest_state\" does not exist")))
assert.True(t, isStateTableNotReadyError(errors.Default.New("no such table: _devlake_collector_latest_state")))
assert.False(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist")))
assert.False(t, isStateTableNotReadyError(errors.Default.New("db unavailable")))
}
4 changes: 4 additions & 0 deletions backend/impls/context/default_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (c *defaultExecContext) IncProgress(progressType plugin.ProgressType, quant
}
}

func (c *defaultExecContext) GetProgress() int {
return int(atomic.LoadInt64(&c.current))
}

func (c *defaultExecContext) fork(name string) *defaultExecContext {
return newDefaultExecContext(
c.ctx,
Expand Down
62 changes: 27 additions & 35 deletions backend/plugins/github/tasks/account_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ limitations under the License.
package tasks

import (
"reflect"
"strings"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/models/domainlayer"
"github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
Expand All @@ -49,50 +47,44 @@ var ConvertAccountsMeta = plugin.SubTaskMeta{
ProductTables: []string{crossdomain.Account{}.TableName()},
}

type GithubAccountWithOrg struct {
models.GithubAccount
Login string `json:"login" gorm:"type:varchar(255)"`
common.NoPKModel
}

func ConvertAccounts(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GithubTaskData)

cursor, err := db.Cursor(
dal.Select("_tool_github_accounts.*"),
dal.From(&models.GithubAccount{}),
dal.Where(
"repo_github_id = ? and _tool_github_accounts.connection_id=?",
data.Options.GithubId,
data.Options.ConnectionId,
),
dal.Join(`left join _tool_github_repo_accounts gra on (
_tool_github_accounts.connection_id = gra.connection_id
AND _tool_github_accounts.id = gra.account_id
)`),
)
if err != nil {
return err
}
defer cursor.Close()

accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{})

converter, err := api.NewDataConverter(api.DataConverterArgs{
InputRowType: reflect.TypeOf(models.GithubAccount{}),
Input: cursor,
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubAccount]{
SubtaskCommonArgs: &api.SubtaskCommonArgs{
SubTaskContext: taskCtx,
Table: RAW_ACCOUNT_TABLE,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
Table: RAW_ACCOUNT_TABLE,
},
Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
githubUser := inputRow.(*models.GithubAccount)

Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) {
clauses := []dal.Clause{
dal.Select("_tool_github_accounts.*"),
dal.From(&models.GithubAccount{}),
dal.Where(
"repo_github_id = ? and _tool_github_accounts.connection_id=?",
data.Options.GithubId,
data.Options.ConnectionId,
),
dal.Join(`left join _tool_github_repo_accounts gra on (
_tool_github_accounts.connection_id = gra.connection_id
AND _tool_github_accounts.id = gra.account_id
)`),
}
if stateManager.IsIncremental() {
since := stateManager.GetSince()
if since != nil {
clauses = append(clauses, dal.Where("_tool_github_accounts.updated_at >= ?", since))
}
}
return db.Cursor(clauses...)
},
Convert: func(githubUser *models.GithubAccount) ([]interface{}, errors.Error) {
// query related orgs
var orgs []string
err := db.Pluck(`org_login`, &orgs,
Expand Down
Loading
Loading