diff --git a/backend/core/plugin/plugin_task.go b/backend/core/plugin/plugin_task.go index e1404ff0848..e3eea470a81 100644 --- a/backend/core/plugin/plugin_task.go +++ b/backend/core/plugin/plugin_task.go @@ -53,6 +53,7 @@ type ExecContext interface { GetData() interface{} SetProgress(current int, total int) IncProgress(quantity int) + GetProgress() int } // SubTaskContext This interface define all resources that needed for subtask execution diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 2b848ea9bbd..961725bce79 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -336,7 +336,7 @@ func RunPluginSubTasks( logger.Info("executing subtask %s", subtaskMeta.Name) start := time.Now() err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint) - logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds()) + logger.Info("subtask %s finished in %d ms, records processed: %d", subtaskMeta.Name, time.Since(start).Milliseconds(), subtaskCtx.GetProgress()) if err != nil { err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta)) logger.Error(err, "") diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index 7dd7d2370b0..ff1c8d7f4d5 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -20,6 +20,7 @@ package api import ( "fmt" "reflect" + "strings" "time" "github.com/apache/incubator-devlake/core/dal" @@ -88,6 +89,10 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState if err != nil { return } + preState, err = bootstrapStateFromCollectorStateIfNeeded(db, preState, args) + if err != nil { + return + } isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig)) @@ -127,6 +132,56 @@ func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.Subt return preState, nil } +func bootstrapStateFromCollectorStateIfNeeded(db dal.Dal, preState *models.SubtaskState, args *SubtaskCommonArgs) (*models.SubtaskState, errors.Error) { + if preState == nil || preState.PrevStartedAt != nil { + return preState, nil + } + if args == nil || args.Table == "" { + return preState, nil + } + if !db.HasTable(&models.CollectorLatestState{}) { + return preState, nil + } + + rawTable := args.GetRawDataTable() + if rawTable == "" { + return preState, nil + } + + collectorState := &models.CollectorLatestState{} + err := db.First( + collectorState, + dal.Where("raw_data_table = ? AND raw_data_params = ?", rawTable, preState.Params), + ) + if err != nil { + if db.IsErrorNotFound(err) || isStateTableNotReadyError(err) { + return preState, nil + } + return nil, errors.Default.Wrap(err, "failed to load collector state for subtask bootstrap") + } + + if collectorState.LatestSuccessStart != nil { + preState.PrevStartedAt = collectorState.LatestSuccessStart + } + if preState.TimeAfter == nil && collectorState.TimeAfter != nil { + preState.TimeAfter = collectorState.TimeAfter + } + + return preState, nil +} + +func isStateTableNotReadyError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "_devlake_collector_latest_state") && + (strings.Contains(msg, "doesn't exist") || + strings.Contains(msg, "does not exist") || + strings.Contains(msg, "unknown table") || + strings.Contains(msg, "no such table")) +} + // calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time. func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) { if preState == nil || syncPolicy == nil { diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager_test.go b/backend/helpers/pluginhelper/api/subtask_state_manager_test.go index 06eeaa43374..f4ba733ea34 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager_test.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager_test.go @@ -30,6 +30,8 @@ import ( "github.com/stretchr/testify/mock" ) +const testGithubScopeParams = `{"ConnectionId":1,"Name":"test/repo"}` + func TestSubtaskStateManager(t *testing.T) { time0 := errors.Must1(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")) time1 := errors.Must1(time.Parse(time.RFC3339, "2021-01-01T00:00:00Z")) @@ -187,3 +189,170 @@ func TestSubtaskStateManager(t *testing.T) { }) } } + +func TestBootstrapStateFromCollectorStateIfNeeded(t *testing.T) { + latest := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:05:31Z")) + timeAfter := errors.Must1(time.Parse(time.RFC3339, "2024-12-20T00:00:00Z")) + + t.Run("bootstrap prev_started_at and time_after from collector state", func(t *testing.T) { + mockDal := new(mockdal.Dal) + mockDal.On("HasTable", mock.Anything).Return(true).Once() + mockDal.On("First", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + dst := args.Get(0).(*models.CollectorLatestState) + dst.LatestSuccessStart = &latest + dst.TimeAfter = &timeAfter + }).Return(nil).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, err) + if assert.NotNil(t, bootstrapped.PrevStartedAt) { + assert.True(t, bootstrapped.PrevStartedAt.Equal(latest)) + } + if assert.NotNil(t, bootstrapped.TimeAfter) { + assert.True(t, bootstrapped.TimeAfter.Equal(timeAfter)) + } + mockDal.AssertExpectations(t) + }) + + t.Run("do not overwrite existing prev_started_at", func(t *testing.T) { + mockDal := new(mockdal.Dal) + + existing := errors.Must1(time.Parse(time.RFC3339, "2026-04-27T18:06:37Z")) + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + PrevStartedAt: &existing, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, err) + if assert.NotNil(t, bootstrapped.PrevStartedAt) { + assert.True(t, bootstrapped.PrevStartedAt.Equal(existing)) + } + mockDal.AssertNotCalled(t, "HasTable", mock.Anything) + mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything) + }) + + t.Run("ignore not found collector state", func(t *testing.T) { + mockDal := new(mockdal.Dal) + notFoundErr := errors.Default.New("record not found") + mockDal.On("HasTable", mock.Anything).Return(true).Once() + mockDal.On("First", mock.Anything, mock.Anything).Return(notFoundErr).Once() + mockDal.On("IsErrorNotFound", notFoundErr).Return(true).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, err) + assert.Nil(t, bootstrapped.PrevStartedAt) + assert.Nil(t, bootstrapped.TimeAfter) + mockDal.AssertExpectations(t) + }) + + t.Run("return error when collector state query fails", func(t *testing.T) { + mockDal := new(mockdal.Dal) + dbErr := errors.Default.New("db unavailable") + mockDal.On("HasTable", mock.Anything).Return(true).Once() + mockDal.On("First", mock.Anything, mock.Anything).Return(dbErr).Once() + mockDal.On("IsErrorNotFound", dbErr).Return(false).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, bootstrapped) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap") + mockDal.AssertExpectations(t) + }) + + t.Run("ignore missing collector table errors", func(t *testing.T) { + mockDal := new(mockdal.Dal) + mockDal.On("HasTable", mock.Anything).Return(true).Once() + tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist") + mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once() + mockDal.On("IsErrorNotFound", tableErr).Return(false).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, err) + assert.NotNil(t, bootstrapped) + assert.Nil(t, bootstrapped.PrevStartedAt) + assert.Nil(t, bootstrapped.TimeAfter) + mockDal.AssertExpectations(t) + }) + + t.Run("do not ignore missing unrelated table errors", func(t *testing.T) { + mockDal := new(mockdal.Dal) + mockDal.On("HasTable", mock.Anything).Return(true).Once() + tableErr := errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist") + mockDal.On("First", mock.Anything, mock.Anything).Return(tableErr).Once() + mockDal.On("IsErrorNotFound", tableErr).Return(false).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, bootstrapped) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "failed to load collector state for subtask bootstrap") + mockDal.AssertExpectations(t) + }) + + t.Run("skip collector lookup when collector state table does not exist", func(t *testing.T) { + mockDal := new(mockdal.Dal) + mockDal.On("HasTable", mock.Anything).Return(false).Once() + + state := &models.SubtaskState{ + Plugin: "github", + Subtask: "Convert Jobs", + Params: testGithubScopeParams, + } + args := &SubtaskCommonArgs{Table: "github_api_jobs"} + + bootstrapped, err := bootstrapStateFromCollectorStateIfNeeded(mockDal, state, args) + assert.Nil(t, err) + assert.NotNil(t, bootstrapped) + assert.Nil(t, bootstrapped.PrevStartedAt) + assert.Nil(t, bootstrapped.TimeAfter) + mockDal.AssertNotCalled(t, "First", mock.Anything, mock.Anything) + mockDal.AssertExpectations(t) + }) +} + +func TestIsStateTableNotReadyError(t *testing.T) { + assert.False(t, isStateTableNotReadyError(nil)) + assert.True(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._devlake_collector_latest_state' doesn't exist"))) + assert.True(t, isStateTableNotReadyError(errors.Default.New("pq: relation \"_devlake_collector_latest_state\" does not exist"))) + assert.True(t, isStateTableNotReadyError(errors.Default.New("no such table: _devlake_collector_latest_state"))) + assert.False(t, isStateTableNotReadyError(errors.Default.New("Error 1146 (42S02): Table 'lake._tool_github_issues' doesn't exist"))) + assert.False(t, isStateTableNotReadyError(errors.Default.New("db unavailable"))) +} diff --git a/backend/impls/context/default_exec_context.go b/backend/impls/context/default_exec_context.go index d1a2ceda00e..2d372f5b95c 100644 --- a/backend/impls/context/default_exec_context.go +++ b/backend/impls/context/default_exec_context.go @@ -96,6 +96,10 @@ func (c *defaultExecContext) IncProgress(progressType plugin.ProgressType, quant } } +func (c *defaultExecContext) GetProgress() int { + return int(atomic.LoadInt64(&c.current)) +} + func (c *defaultExecContext) fork(name string) *defaultExecContext { return newDefaultExecContext( c.ctx, diff --git a/backend/plugins/github/tasks/account_convertor.go b/backend/plugins/github/tasks/account_convertor.go index dfd9a929ee9..e86780d2746 100644 --- a/backend/plugins/github/tasks/account_convertor.go +++ b/backend/plugins/github/tasks/account_convertor.go @@ -18,12 +18,10 @@ limitations under the License. package tasks import ( - "reflect" "strings" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/models/domainlayer" "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain" "github.com/apache/incubator-devlake/core/models/domainlayer/didgen" @@ -49,50 +47,44 @@ var ConvertAccountsMeta = plugin.SubTaskMeta{ ProductTables: []string{crossdomain.Account{}.TableName()}, } -type GithubAccountWithOrg struct { - models.GithubAccount - Login string `json:"login" gorm:"type:varchar(255)"` - common.NoPKModel -} - func ConvertAccounts(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*GithubTaskData) - cursor, err := db.Cursor( - dal.Select("_tool_github_accounts.*"), - dal.From(&models.GithubAccount{}), - dal.Where( - "repo_github_id = ? and _tool_github_accounts.connection_id=?", - data.Options.GithubId, - data.Options.ConnectionId, - ), - dal.Join(`left join _tool_github_repo_accounts gra on ( - _tool_github_accounts.connection_id = gra.connection_id - AND _tool_github_accounts.id = gra.account_id - )`), - ) - if err != nil { - return err - } - defer cursor.Close() - accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubAccount{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubAccount]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_ACCOUNT_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_ACCOUNT_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubUser := inputRow.(*models.GithubAccount) - + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.Select("_tool_github_accounts.*"), + dal.From(&models.GithubAccount{}), + dal.Where( + "repo_github_id = ? and _tool_github_accounts.connection_id=?", + data.Options.GithubId, + data.Options.ConnectionId, + ), + dal.Join(`left join _tool_github_repo_accounts gra on ( + _tool_github_accounts.connection_id = gra.connection_id + AND _tool_github_accounts.id = gra.account_id + )`), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_accounts.updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubUser *models.GithubAccount) ([]interface{}, errors.Error) { // query related orgs var orgs []string err := db.Pluck(`org_login`, &orgs, diff --git a/backend/plugins/github/tasks/account_extractor.go b/backend/plugins/github/tasks/account_extractor.go index 88e4e8efb48..27500402b8a 100644 --- a/backend/plugins/github/tasks/account_extractor.go +++ b/backend/plugins/github/tasks/account_extractor.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" "time" "github.com/apache/incubator-devlake/core/errors" @@ -61,39 +60,32 @@ type DetailGithubAccountResponse struct { func ExtractAccounts(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[DetailGithubAccountResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_ACCOUNT_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - apiAccount := &DetailGithubAccountResponse{} - err := errors.Convert(json.Unmarshal(row.Data, apiAccount)) - if err != nil { - return nil, err - } - results := make([]interface{}, 0, 1) - if apiAccount.Id == 0 { + Extract: func(body *DetailGithubAccountResponse, row *api.RawData) ([]any, errors.Error) { + if body.Id == 0 { return nil, nil } githubAccount := &models.GithubAccount{ ConnectionId: data.Options.ConnectionId, - Id: apiAccount.Id, - Login: apiAccount.Login, - Name: apiAccount.Name, - Company: apiAccount.Company, - Email: apiAccount.Email, - AvatarUrl: apiAccount.AvatarUrl, - Url: apiAccount.Url, - HtmlUrl: apiAccount.HtmlUrl, - Type: apiAccount.Type, + Id: body.Id, + Login: body.Login, + Name: body.Name, + Company: body.Company, + Email: body.Email, + AvatarUrl: body.AvatarUrl, + Url: body.Url, + HtmlUrl: body.HtmlUrl, + Type: body.Type, } - results = append(results, githubAccount) - return results, nil + return []any{githubAccount}, nil }, }) diff --git a/backend/plugins/github/tasks/account_org_extractor.go b/backend/plugins/github/tasks/account_org_extractor.go index 8aa69b664c7..91f9a7b8cfe 100644 --- a/backend/plugins/github/tasks/account_org_extractor.go +++ b/backend/plugins/github/tasks/account_org_extractor.go @@ -50,28 +50,23 @@ type GithubAccountOrgsResponse struct { func ExtractAccountOrg(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[[]GithubAccountOrgsResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_ACCOUNT_ORG_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - apiAccountOrgs := &[]GithubAccountOrgsResponse{} - err := json.Unmarshal(row.Data, apiAccountOrgs) - if err != nil { - return nil, errors.Convert(err) - } + Extract: func(body *[]GithubAccountOrgsResponse, row *api.RawData) ([]any, errors.Error) { simpleAccount := &SimpleAccountWithId{} - err = json.Unmarshal(row.Input, simpleAccount) + err := errors.Convert(json.Unmarshal(row.Input, simpleAccount)) if err != nil { - return nil, errors.Convert(err) + return nil, err } - results := make([]interface{}, 0, len(*apiAccountOrgs)) - for _, apiAccountOrg := range *apiAccountOrgs { + results := make([]interface{}, 0, len(*body)) + for _, apiAccountOrg := range *body { githubAccount := &models.GithubAccountOrg{ ConnectionId: data.Options.ConnectionId, AccountId: simpleAccount.AccountId, diff --git a/backend/plugins/github/tasks/cicd_job_convertor.go b/backend/plugins/github/tasks/cicd_job_convertor.go index 81ef9cd4df5..2dcfbce37fc 100644 --- a/backend/plugins/github/tasks/cicd_job_convertor.go +++ b/backend/plugins/github/tasks/cicd_job_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -55,37 +53,38 @@ type SimpleBranch struct { func ConvertJobs(taskCtx plugin.SubTaskContext) (err errors.Error) { db := taskCtx.GetDal() + logger := taskCtx.GetLogger() data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - if err != nil { - return err - } - job := &models.GithubJob{} - cursor, err := db.Cursor( - dal.From(job), - dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() + jobIdGen := didgen.NewDomainIdGenerator(&models.GithubJob{}) runIdGen := didgen.NewDomainIdGenerator(&models.GithubRun{}) repoIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubJob]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_JOB_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_JOB_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubJob{}), - Input: cursor, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - line := inputRow.(*models.GithubJob) - + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubJob{}), + dal.Where("_tool_github_jobs.repo_id = ? and _tool_github_jobs.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + logger.Info("[Convert Jobs] incremental mode using _tool_github_jobs.updated_at >= %v", since) + clauses = append(clauses, dal.Where("_tool_github_jobs.updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(line *models.GithubJob) ([]interface{}, errors.Error) { // Skip jobs with no started_at value (workaround for https://github.com/apache/incubator-devlake/issues/8442) if line.StartedAt == nil { return nil, nil diff --git a/backend/plugins/github/tasks/cicd_job_extractor.go b/backend/plugins/github/tasks/cicd_job_extractor.go index 5701476bcce..6cfab02f3cf 100644 --- a/backend/plugins/github/tasks/cicd_job_extractor.go +++ b/backend/plugins/github/tasks/cicd_job_extractor.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" "strings" "github.com/apache/incubator-devlake/core/errors" @@ -46,23 +45,16 @@ func ExtractJobs(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[models.GithubJob]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_JOB_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - githubJob := &models.GithubJob{} - err := errors.Convert(json.Unmarshal(row.Data, githubJob)) - if err != nil { - return nil, err - } - - results := make([]interface{}, 0, 1) + Extract: func(githubJob *models.GithubJob, row *api.RawData) ([]any, errors.Error) { githubJobResult := &models.GithubJob{ ConnectionId: data.Options.ConnectionId, RepoId: repoId, @@ -87,8 +79,7 @@ func ExtractJobs(taskCtx plugin.SubTaskContext) errors.Error { Type: data.RegexEnricher.ReturnNameIfMatched(devops.DEPLOYMENT, githubJob.Name), Environment: data.RegexEnricher.ReturnNameIfOmittedOrMatched(devops.PRODUCTION, githubJob.Name), } - results = append(results, githubJobResult) - return results, nil + return []any{githubJobResult}, nil }, }) diff --git a/backend/plugins/github/tasks/cicd_run_collector.go b/backend/plugins/github/tasks/cicd_run_collector.go index 44c816ca236..9359b98de61 100644 --- a/backend/plugins/github/tasks/cicd_run_collector.go +++ b/backend/plugins/github/tasks/cicd_run_collector.go @@ -24,10 +24,12 @@ import ( "net/url" "time" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/log" "github.com/apache/incubator-devlake/core/plugin" helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/plugins/github/models" ) func init() { @@ -89,6 +91,24 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error { // `windowStart` past the previously collected second (inclusive-both-ends), while // fullsync + TimeAfter keeps the user-specified bound inclusive. createdAfter := manager.GetSince() + sinceSource := "state_since" + syncPolicy := taskCtx.TaskContext().SyncPolicy() + if createdAfter == nil { + sinceSource = "none" + } + if createdAfter == nil && (syncPolicy == nil || !syncPolicy.FullSync) { + fallbackSince, err := loadLatestRunUpdatedAt(taskCtx, data.Options.ConnectionId, data.Options.GithubId) + if err != nil { + return err + } + if fallbackSince != nil { + createdAfter = fallbackSince + sinceSource = "tool_runs_fallback" + logger.Info("cicd_run_collector: collector state missing; bootstrapping since from existing _tool_github_runs at %s", fallbackSince.UTC().Format(time.RFC3339)) + } else { + logger.Debug("cicd_run_collector: collector state missing and no _tool_github_runs timestamp found") + } + } untilPtr := manager.GetUntil() *untilPtr = untilPtr.Truncate(time.Second) until := *untilPtr @@ -102,12 +122,14 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error { } else { // 2018-01-01 conservatively predates GitHub Actions' late-2019 GA. windowStart = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC) + sinceSource = "epoch_fullsync" } - logger.Info("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v)", + logger.Info("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v, since_source=%s)", windowStart.Format(githubTimeLayout), until.Format(githubTimeLayout), - manager.IsIncremental()) + manager.IsIncremental(), + sinceSource) leafWindows, err := newLeafWindowBuilder(taskCtx, data).build(windowStart, until) if err != nil { @@ -122,6 +144,28 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error { return manager.Execute() } +func loadLatestRunUpdatedAt(taskCtx plugin.SubTaskContext, connectionId uint64, repoId int) (*time.Time, errors.Error) { + db := taskCtx.GetDal() + latest := &models.GithubRun{} + err := db.First( + latest, + dal.Where("connection_id = ? AND repo_id = ? AND github_updated_at IS NOT NULL", connectionId, repoId), + dal.Orderby("github_updated_at DESC"), + dal.Limit(1), + ) + if err != nil { + if db.IsErrorNotFound(err) { + return nil, nil + } + return nil, err + } + if latest.GithubUpdatedAt == nil { + return nil, nil + } + fallback := latest.GithubUpdatedAt.UTC() + return &fallback, nil +} + // buildRunsQuery assembles the filtered-mode query for a single leaf TimeWindow. // Shared between registerCollectorForLeafWindows and tests. func buildRunsQuery(reqData *helper.RequestData) (url.Values, errors.Error) { diff --git a/backend/plugins/github/tasks/cicd_run_collector_test.go b/backend/plugins/github/tasks/cicd_run_collector_test.go index d69b5c73b59..a7a12abe6b9 100644 --- a/backend/plugins/github/tasks/cicd_run_collector_test.go +++ b/backend/plugins/github/tasks/cicd_run_collector_test.go @@ -34,12 +34,64 @@ import ( "github.com/apache/incubator-devlake/helpers/unithelper" mockdal "github.com/apache/incubator-devlake/mocks/core/dal" mockapi "github.com/apache/incubator-devlake/mocks/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/plugins/github/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +func TestCicdRunLoadLatestRunUpdatedAt_ReturnsLatestTimestamp(t *testing.T) { + connectionId := uint64(1) + repoId := 101 + latestTs := time.Date(2025, 4, 1, 10, 11, 12, 0, time.UTC) + + mockDal := new(mockdal.Dal) + mockDal.On("First", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + dst := args.Get(0).(*models.GithubRun) + dst.ConnectionId = connectionId + dst.RepoId = repoId + dst.GithubUpdatedAt = &latestTs + }).Return(nil).Once() + + ctx := unithelper.DummySubTaskContext(mockDal) + since, err := loadLatestRunUpdatedAt(ctx, connectionId, repoId) + + require.Nil(t, err) + require.NotNil(t, since) + assert.True(t, since.Equal(latestTs)) + mockDal.AssertExpectations(t) +} + +func TestCicdRunLoadLatestRunUpdatedAt_NotFoundReturnsNil(t *testing.T) { + mockDal := new(mockdal.Dal) + notFoundErr := errors.Default.New("record not found") + mockDal.On("First", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(notFoundErr).Once() + mockDal.On("IsErrorNotFound", notFoundErr).Return(true).Once() + + ctx := unithelper.DummySubTaskContext(mockDal) + since, err := loadLatestRunUpdatedAt(ctx, 1, 101) + + require.Nil(t, err) + assert.Nil(t, since) + mockDal.AssertExpectations(t) +} + +func TestCicdRunLoadLatestRunUpdatedAt_PropagatesError(t *testing.T) { + mockDal := new(mockdal.Dal) + dbErr := errors.Default.New("db unavailable") + mockDal.On("First", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(dbErr).Once() + mockDal.On("IsErrorNotFound", dbErr).Return(false).Once() + + ctx := unithelper.DummySubTaskContext(mockDal) + since, err := loadLatestRunUpdatedAt(ctx, 1, 101) + + assert.Nil(t, since) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "db unavailable") + mockDal.AssertExpectations(t) +} + // newTestBuilder constructs a leafWindowBuilder with a stubbed probe for unit testing. func newTestBuilder(probe probeFunc) *leafWindowBuilder { mockDal := new(mockdal.Dal) diff --git a/backend/plugins/github/tasks/cicd_run_convertor.go b/backend/plugins/github/tasks/cicd_run_convertor.go index 1cdf9c6daa5..69911482e47 100644 --- a/backend/plugins/github/tasks/cicd_run_convertor.go +++ b/backend/plugins/github/tasks/cicd_run_convertor.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "reflect" "time" "github.com/apache/incubator-devlake/core/dal" @@ -63,31 +62,33 @@ func ConvertRuns(taskCtx plugin.SubTaskContext) errors.Error { return err } - pipeline := &models.GithubRun{} - cursor, err := db.Cursor( - dal.Select("*"), - dal.From(pipeline), - dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() repoIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) runIdGen := didgen.NewDomainIdGenerator(&models.GithubRun{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubRun]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_RUN_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_RUN_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubRun{}), - Input: cursor, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - line := inputRow.(*models.GithubRun) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.Select("*"), + dal.From(&models.GithubRun{}), + dal.Where("repo_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(line *models.GithubRun) ([]interface{}, errors.Error) { createdAt := time.Now() if line.GithubCreatedAt != nil { createdAt = *line.GithubCreatedAt diff --git a/backend/plugins/github/tasks/cicd_run_extractor.go b/backend/plugins/github/tasks/cicd_run_extractor.go index 1e6a3f41a9e..3a474c64c02 100644 --- a/backend/plugins/github/tasks/cicd_run_extractor.go +++ b/backend/plugins/github/tasks/cicd_run_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/devops" "github.com/apache/incubator-devlake/core/plugin" @@ -45,26 +43,21 @@ func ExtractRuns(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[models.GithubRun]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_RUN_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - githubRun := &models.GithubRun{} - err := errors.Convert(json.Unmarshal(row.Data, githubRun)) - if err != nil { - return nil, err - } - githubRun.RepoId = repoId - githubRun.ConnectionId = data.Options.ConnectionId - githubRun.Type = data.RegexEnricher.ReturnNameIfMatched(devops.DEPLOYMENT, githubRun.Name) - githubRun.Environment = data.RegexEnricher.ReturnNameIfOmittedOrMatched(devops.PRODUCTION, githubRun.Name, githubRun.HeadBranch) - return []interface{}{githubRun}, nil + Extract: func(body *models.GithubRun, row *api.RawData) ([]any, errors.Error) { + body.RepoId = repoId + body.ConnectionId = data.Options.ConnectionId + body.Type = data.RegexEnricher.ReturnNameIfMatched(devops.DEPLOYMENT, body.Name) + body.Environment = data.RegexEnricher.ReturnNameIfOmittedOrMatched(devops.PRODUCTION, body.Name, body.HeadBranch) + return []any{body}, nil }, }) diff --git a/backend/plugins/github/tasks/comment_collector.go b/backend/plugins/github/tasks/comment_collector.go index 9f40e7736a7..303ecf7b1f7 100644 --- a/backend/plugins/github/tasks/comment_collector.go +++ b/backend/plugins/github/tasks/comment_collector.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/url" + "time" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -66,7 +67,7 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error { query := url.Values{} query.Set("state", "all") if apiCollector.GetSince() != nil { - query.Set("since", apiCollector.GetSince().String()) + query.Set("since", apiCollector.GetSince().UTC().Format(time.RFC3339)) } query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("direction", "asc") diff --git a/backend/plugins/github/tasks/comment_extractor.go b/backend/plugins/github/tasks/comment_extractor.go index 8fc52a18b7e..c383e532e9c 100644 --- a/backend/plugins/github/tasks/comment_extractor.go +++ b/backend/plugins/github/tasks/comment_extractor.go @@ -59,28 +59,21 @@ type IssueComment struct { func ExtractApiComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{ - RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := helper.NewStatefulApiExtractor(&helper.StatefulApiExtractorArgs[IssueComment]{ + SubtaskCommonArgs: &helper.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_COMMENTS_TABLE, }, - Extract: func(row *helper.RawData) ([]interface{}, errors.Error) { - apiComment := &IssueComment{} - err := errors.Convert(json.Unmarshal(row.Data, apiComment)) - if err != nil { - return nil, err - } - // need to extract 2 kinds of entities here + Extract: func(body *IssueComment, row *helper.RawData) ([]any, errors.Error) { results := make([]interface{}, 0, 2) - if apiComment.GithubId == 0 { + if body.GithubId == 0 { return nil, nil } - //If this is a pr, ignore - issueINumber, err := errors.Convert01(githubUtils.GetIssueIdByIssueUrl(apiComment.IssueUrl)) + issueINumber, err := errors.Convert01(githubUtils.GetIssueIdByIssueUrl(body.IssueUrl)) if err != nil { return nil, err } @@ -90,7 +83,7 @@ func ExtractApiComments(taskCtx plugin.SubTaskContext) errors.Error { if err != nil && !db.IsErrorNotFound(err) { return nil, err } - //if we can not find issues with issue number above, move the comments to github_pull_request_comments + // if we can not find issues with issue number above, move the comments to github_pull_request_comments if issue.GithubId == 0 { pr := &models.GithubPullRequest{} err = db.First(pr, dal.Where("connection_id = ? and number = ? and repo_id = ?", data.Options.ConnectionId, issueINumber, data.Options.GithubId)) @@ -99,23 +92,21 @@ func ExtractApiComments(taskCtx plugin.SubTaskContext) errors.Error { } githubPrComment := &models.GithubPrComment{ ConnectionId: data.Options.ConnectionId, - GithubId: apiComment.GithubId, + GithubId: body.GithubId, PullRequestId: pr.GithubId, - Body: string(apiComment.Body), - GithubCreatedAt: apiComment.GithubCreatedAt.ToTime(), - GithubUpdatedAt: apiComment.GithubUpdatedAt.ToTime(), + Body: string(body.Body), + GithubCreatedAt: body.GithubCreatedAt.ToTime(), + GithubUpdatedAt: body.GithubUpdatedAt.ToTime(), Type: "NORMAL", } - if apiComment.User != nil { - // Filter bot comments by username - if shouldSkipByUsername(apiComment.User.Login) { - taskCtx.GetLogger().Debug("Skipping PR comment #%d from bot user: %s", apiComment.GithubId, apiComment.User.Login) + if body.User != nil { + if shouldSkipByUsername(body.User.Login) { + taskCtx.GetLogger().Debug("Skipping PR comment #%d from bot user: %s", body.GithubId, body.User.Login) return nil, nil } - githubPrComment.AuthorUsername = apiComment.User.Login - githubPrComment.AuthorUserId = apiComment.User.Id - - githubAccount, err := convertAccount(apiComment.User, data.Options.GithubId, data.Options.ConnectionId) + githubPrComment.AuthorUsername = body.User.Login + githubPrComment.AuthorUserId = body.User.Id + githubAccount, err := convertAccount(body.User, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } @@ -125,22 +116,20 @@ func ExtractApiComments(taskCtx plugin.SubTaskContext) errors.Error { } else { githubIssueComment := &models.GithubIssueComment{ ConnectionId: data.Options.ConnectionId, - GithubId: apiComment.GithubId, + GithubId: body.GithubId, IssueId: issue.GithubId, - Body: string(apiComment.Body), - GithubCreatedAt: apiComment.GithubCreatedAt.ToTime(), - GithubUpdatedAt: apiComment.GithubUpdatedAt.ToTime(), + Body: string(body.Body), + GithubCreatedAt: body.GithubCreatedAt.ToTime(), + GithubUpdatedAt: body.GithubUpdatedAt.ToTime(), } - if apiComment.User != nil { - // Filter bot comments by username - if shouldSkipByUsername(apiComment.User.Login) { - taskCtx.GetLogger().Debug("Skipping issue comment #%d from bot user: %s", apiComment.GithubId, apiComment.User.Login) + if body.User != nil { + if shouldSkipByUsername(body.User.Login) { + taskCtx.GetLogger().Debug("Skipping issue comment #%d from bot user: %s", body.GithubId, body.User.Login) return nil, nil } - githubIssueComment.AuthorUsername = apiComment.User.Login - githubIssueComment.AuthorUserId = apiComment.User.Id - - githubAccount, err := convertAccount(apiComment.User, data.Options.GithubId, data.Options.ConnectionId) + githubIssueComment.AuthorUsername = body.User.Login + githubIssueComment.AuthorUserId = body.User.Id + githubAccount, err := convertAccount(body.User, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } diff --git a/backend/plugins/github/tasks/commit_collector.go b/backend/plugins/github/tasks/commit_collector.go index 019060c17d8..3376f248665 100644 --- a/backend/plugins/github/tasks/commit_collector.go +++ b/backend/plugins/github/tasks/commit_collector.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/url" + "time" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -78,7 +79,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error { query := url.Values{} query.Set("state", "all") if apiCollector.GetSince() != nil { - query.Set("since", apiCollector.GetSince().String()) + query.Set("since", apiCollector.GetSince().UTC().Format(time.RFC3339)) } query.Set("direction", "asc") query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) diff --git a/backend/plugins/github/tasks/commit_convertor.go b/backend/plugins/github/tasks/commit_convertor.go index 4dc3fa43cc0..14448d48b1c 100644 --- a/backend/plugins/github/tasks/commit_convertor.go +++ b/backend/plugins/github/tasks/commit_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/code" @@ -54,36 +52,36 @@ func ConvertCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From("_tool_github_commits gc"), - dal.Join(`left join _tool_github_repo_commits grc on ( - grc.commit_sha = gc.sha - )`), - dal.Select("gc.*"), - dal.Where("grc.repo_id = ? AND grc.connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - repoDidGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) domainRepoId := repoDidGen.Generate(data.Options.ConnectionId, repoId) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubCommit]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_COMMIT_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_COMMIT_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubCommit{}), - Input: cursor, - - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubCommit := inputRow.(*models.GithubCommit) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From("_tool_github_commits gc"), + dal.Join(`left join _tool_github_repo_commits grc on ( + grc.commit_sha = gc.sha + )`), + dal.Select("gc.*"), + dal.Where("grc.repo_id = ? AND grc.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("(gc.updated_at >= ? OR grc.updated_at >= ?)", since, since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubCommit *models.GithubCommit) ([]interface{}, errors.Error) { domainCommit := &code.Commit{ Sha: githubCommit.Sha, Message: githubCommit.Message, diff --git a/backend/plugins/github/tasks/commit_extractor.go b/backend/plugins/github/tasks/commit_extractor.go index a0ed8d11d3f..72a29454dac 100644 --- a/backend/plugins/github/tasks/commit_extractor.go +++ b/backend/plugins/github/tasks/commit_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/plugin" @@ -68,60 +66,44 @@ type Commit struct { func ExtractApiCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraCommits by Board - */ + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[CommitsResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_COMMIT_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - commit := &CommitsResponse{} - err := errors.Convert(json.Unmarshal(row.Data, commit)) - if err != nil { - return nil, err - } - if commit.Sha == "" { + Extract: func(body *CommitsResponse, row *api.RawData) ([]any, errors.Error) { + if body.Sha == "" { return nil, nil } - results := make([]interface{}, 0, 4) - githubCommit := &models.GithubCommit{ - Sha: commit.Sha, - Message: commit.Commit.Message, - AuthorName: commit.Commit.Author.Name, - AuthorEmail: commit.Commit.Author.Email, - AuthoredDate: commit.Commit.Author.Date.ToTime(), - CommitterName: commit.Commit.Committer.Name, - CommitterEmail: commit.Commit.Committer.Email, - CommittedDate: commit.Commit.Committer.Date.ToTime(), - Url: commit.Url, + Sha: body.Sha, + Message: body.Commit.Message, + AuthorName: body.Commit.Author.Name, + AuthorEmail: body.Commit.Author.Email, + AuthoredDate: body.Commit.Author.Date.ToTime(), + CommitterName: body.Commit.Committer.Name, + CommitterEmail: body.Commit.Committer.Email, + CommittedDate: body.Commit.Committer.Date.ToTime(), + Url: body.Url, } - if commit.Author != nil { - githubCommit.AuthorId = commit.Author.Id - results = append(results, commit.Author) + if body.Author != nil { + githubCommit.AuthorId = body.Author.Id + results = append(results, body.Author) } - if commit.Committer != nil { - githubCommit.CommitterId = commit.Committer.Id - results = append(results, commit.Committer) + if body.Committer != nil { + githubCommit.CommitterId = body.Committer.Id + results = append(results, body.Committer) } - githubRepoCommit := &models.GithubRepoCommit{ ConnectionId: data.Options.ConnectionId, RepoId: data.Options.GithubId, - CommitSha: commit.Sha, + CommitSha: body.Sha, } - results = append(results, githubCommit) results = append(results, githubRepoCommit) return results, nil diff --git a/backend/plugins/github/tasks/commit_stats_extractor.go b/backend/plugins/github/tasks/commit_stats_extractor.go index 5f99b1ed5bc..431f2d28ea1 100644 --- a/backend/plugins/github/tasks/commit_stats_extractor.go +++ b/backend/plugins/github/tasks/commit_stats_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" @@ -62,42 +60,27 @@ type ApiSingleCommitResponse struct { func ExtractApiCommitStats(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraCommits by Board - */ + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[ApiSingleCommitResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_COMMIT_STATS_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - body := &ApiSingleCommitResponse{} - err := errors.Convert(json.Unmarshal(row.Data, body)) - if err != nil { - return nil, err - } + Extract: func(body *ApiSingleCommitResponse, row *api.RawData) ([]any, errors.Error) { if body.Sha == "" { return nil, nil } - db := taskCtx.GetDal() commit := &models.GithubCommit{} - err = db.First(commit, dal.Where("sha = ?", body.Sha), dal.Limit(1)) + err := db.First(commit, dal.Where("sha = ?", body.Sha), dal.Limit(1)) if err != nil && !db.IsErrorNotFound(err) { return nil, err } - commit.Additions = body.Stats.Additions commit.Deletions = body.Stats.Deletions - commitStat := &models.GithubCommitStat{ ConnectionId: data.Options.ConnectionId, Additions: body.Stats.Additions, @@ -105,13 +88,7 @@ func ExtractApiCommitStats(taskCtx plugin.SubTaskContext) errors.Error { CommittedDate: body.Commit.Committer.Date.ToTime(), Sha: body.Sha, } - - results := make([]interface{}, 0, 2) - - results = append(results, commit) - results = append(results, commitStat) - - return results, nil + return []any{commit, commitStat}, nil }, }) diff --git a/backend/plugins/github/tasks/event_extractor.go b/backend/plugins/github/tasks/event_extractor.go index 133527d168d..01b5495be3e 100644 --- a/backend/plugins/github/tasks/event_extractor.go +++ b/backend/plugins/github/tasks/event_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/plugin" @@ -54,21 +52,16 @@ type IssueEvent struct { func ExtractApiEvents(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[IssueEvent]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_EVENTS_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - body := &IssueEvent{} - err := errors.Convert(json.Unmarshal(row.Data, body)) - if err != nil { - return nil, err - } + Extract: func(body *IssueEvent, row *api.RawData) ([]any, errors.Error) { results := make([]interface{}, 0, 1) if body.GithubId == 0 || body.Actor == nil { return nil, nil @@ -80,22 +73,15 @@ func ExtractApiEvents(taskCtx plugin.SubTaskContext) errors.Error { Type: body.Event, GithubCreatedAt: body.GithubCreatedAt.ToTime(), } - if body.Actor != nil { githubIssueEvent.AuthorUsername = body.Actor.Login - githubAccount, err := convertAccount(body.Actor, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } results = append(results, githubAccount) } - - if err != nil { - return nil, err - } results = append(results, githubIssueEvent) - return results, nil }, }) diff --git a/backend/plugins/github/tasks/issue_assignee_convertor.go b/backend/plugins/github/tasks/issue_assignee_convertor.go index 5f8c883b8f9..f632db8a6df 100644 --- a/backend/plugins/github/tasks/issue_assignee_convertor.go +++ b/backend/plugins/github/tasks/issue_assignee_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/didgen" @@ -53,31 +51,35 @@ func ConvertIssueAssignee(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubIssueAssignee{}), - dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - issueIdGen := didgen.NewDomainIdGenerator(&models.GithubIssue{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubIssueAssignee]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_ISSUE_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_ISSUE_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubIssueAssignee{}), - Input: cursor, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubIssueAssignee := inputRow.(*models.GithubIssueAssignee) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubIssueAssignee{}), + dal.Where("_tool_github_issue_assignees.repo_id = ? and _tool_github_issue_assignees.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, + dal.Join(`left join _tool_github_issues on _tool_github_issues.github_id = _tool_github_issue_assignees.issue_id and _tool_github_issues.connection_id = _tool_github_issue_assignees.connection_id`), + ) + clauses = append(clauses, dal.Where("_tool_github_issues.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubIssueAssignee *models.GithubIssueAssignee) ([]interface{}, errors.Error) { issueAssignee := &ticket.IssueAssignee{ IssueId: issueIdGen.Generate(data.Options.ConnectionId, githubIssueAssignee.IssueId), AssigneeId: accountIdGen.Generate(data.Options.ConnectionId, githubIssueAssignee.AssigneeId), diff --git a/backend/plugins/github/tasks/issue_collector.go b/backend/plugins/github/tasks/issue_collector.go index 5558da69780..4b38d4be684 100644 --- a/backend/plugins/github/tasks/issue_collector.go +++ b/backend/plugins/github/tasks/issue_collector.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/url" + "time" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -78,7 +79,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { query := url.Values{} query.Set("state", "all") if apiCollector.GetSince() != nil { - query.Set("since", apiCollector.GetSince().String()) + query.Set("since", apiCollector.GetSince().UTC().Format(time.RFC3339)) } query.Set("direction", "asc") query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) diff --git a/backend/plugins/github/tasks/issue_comment_convertor.go b/backend/plugins/github/tasks/issue_comment_convertor.go index 07ddeec65c3..2510e32d2d6 100644 --- a/backend/plugins/github/tasks/issue_comment_convertor.go +++ b/backend/plugins/github/tasks/issue_comment_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -53,33 +51,34 @@ func ConvertIssueComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubIssueComment{}), - dal.Join("left join _tool_github_issues "+ - "on _tool_github_issues.github_id = _tool_github_issue_comments.issue_id"), - dal.Where("repo_id = ? and _tool_github_issues.connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - issueIdGen := didgen.NewDomainIdGenerator(&models.GithubIssue{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubIssueComment{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubIssueComment]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_COMMENTS_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_COMMENTS_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubIssueComment := inputRow.(*models.GithubIssueComment) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubIssueComment{}), + dal.Join("left join _tool_github_issues " + + "on _tool_github_issues.github_id = _tool_github_issue_comments.issue_id"), + dal.Where("repo_id = ? and _tool_github_issues.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_issue_comments.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubIssueComment *models.GithubIssueComment) ([]interface{}, errors.Error) { domainIssueComment := &ticket.IssueComment{ DomainEntity: domainlayer.DomainEntity{ Id: issueIdGen.Generate(data.Options.ConnectionId, githubIssueComment.GithubId), diff --git a/backend/plugins/github/tasks/issue_convertor.go b/backend/plugins/github/tasks/issue_convertor.go index e50f6f22ac0..13e79529aec 100644 --- a/backend/plugins/github/tasks/issue_convertor.go +++ b/backend/plugins/github/tasks/issue_convertor.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "reflect" "strconv" "strings" @@ -57,33 +56,33 @@ func ConvertIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - issue := &models.GithubIssue{} - cursor, err := db.Cursor( - dal.From(issue), - dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - issueIdGen := didgen.NewDomainIdGenerator(&models.GithubIssue{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) boardIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubIssue]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_ISSUE_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_ISSUE_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubIssue{}), - Input: cursor, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - issue := inputRow.(*models.GithubIssue) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubIssue{}), + dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(issue *models.GithubIssue) ([]interface{}, errors.Error) { domainIssue := &ticket.Issue{ DomainEntity: domainlayer.DomainEntity{Id: issueIdGen.Generate(data.Options.ConnectionId, issue.GithubId)}, IssueKey: strconv.Itoa(issue.Number), diff --git a/backend/plugins/github/tasks/issue_extractor.go b/backend/plugins/github/tasks/issue_extractor.go index 101b1e5b9a3..43a276f3e58 100644 --- a/backend/plugins/github/tasks/issue_extractor.go +++ b/backend/plugins/github/tasks/issue_extractor.go @@ -22,6 +22,7 @@ import ( "regexp" "strings" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/models/domainlayer/ticket" @@ -84,44 +85,53 @@ type IssueRegexes struct { func ExtractApiIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) + db := taskCtx.GetDal() config := data.Options.ScopeConfig issueRegexes, err := NewIssueRegexes(config) if err != nil { return nil } - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraIssues by Board - */ + + extractor, extErr := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[IssuesResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_ISSUE_TABLE, + SubtaskConfig: map[string]string{ + "issueSeverity": config.IssueSeverity, + "issueComponent": config.IssueComponent, + "issuePriority": config.IssuePriority, + "issueTypeBug": config.IssueTypeBug, + "issueTypeRequirement": config.IssueTypeRequirement, + "issueTypeIncident": config.IssueTypeIncident, + }, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - body := &IssuesResponse{} - err := errors.Convert(json.Unmarshal(row.Data, body)) - if err != nil { - return nil, err + BeforeExtract: func(body *IssuesResponse, stateManager *api.SubtaskStateManager) errors.Error { + if stateManager.IsIncremental() { + if err := db.Delete(&models.GithubIssueLabel{}, + dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, body.GithubId), + ); err != nil { + return errors.Convert(err) + } + return errors.Convert(db.Delete(&models.GithubIssueAssignee{}, + dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, body.GithubId), + )) } - // need to extract 2 kinds of entities here + return nil + }, + Extract: func(body *IssuesResponse, row *api.RawData) ([]any, errors.Error) { if body.GithubId == 0 { return nil, nil } - //If this is a pr, ignore + // If this is a pr, ignore if body.PullRequest.Url != "" { return nil, nil } results := make([]interface{}, 0, 2) - githubIssue, err := convertGithubIssue(body, data.Options.ConnectionId, data.Options.GithubId) if err != nil { return nil, err @@ -163,8 +173,8 @@ func ExtractApiIssues(taskCtx plugin.SubTaskContext) errors.Error { return results, nil }, }) - if err != nil { - return err + if extErr != nil { + return extErr } return extractor.Execute() diff --git a/backend/plugins/github/tasks/issue_label_convertor.go b/backend/plugins/github/tasks/issue_label_convertor.go index bff0c0143db..6ef940b0601 100644 --- a/backend/plugins/github/tasks/issue_label_convertor.go +++ b/backend/plugins/github/tasks/issue_label_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/didgen" @@ -51,31 +49,33 @@ func ConvertIssueLabels(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubIssueLabel{}), - dal.Join(`left join _tool_github_issues on _tool_github_issues.github_id = _tool_github_issue_labels.issue_id`), - dal.Where("_tool_github_issues.repo_id = ? and _tool_github_issues.connection_id = ?", repoId, data.Options.ConnectionId), - dal.Orderby("issue_id ASC"), - ) - if err != nil { - return err - } - defer cursor.Close() issueIdGen := didgen.NewDomainIdGenerator(&models.GithubIssue{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubIssueLabel]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_ISSUE_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_ISSUE_TABLE, }, - InputRowType: reflect.TypeOf(models.GithubIssueLabel{}), - Input: cursor, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - issueLabel := inputRow.(*models.GithubIssueLabel) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubIssueLabel{}), + dal.Join(`left join _tool_github_issues on _tool_github_issues.github_id = _tool_github_issue_labels.issue_id`), + dal.Where("_tool_github_issues.repo_id = ? and _tool_github_issues.connection_id = ?", repoId, data.Options.ConnectionId), + dal.Orderby("issue_id ASC"), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_issues.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(issueLabel *models.GithubIssueLabel) ([]interface{}, errors.Error) { domainIssueLabel := &ticket.IssueLabel{ IssueId: issueIdGen.Generate(data.Options.ConnectionId, issueLabel.IssueId), LabelName: issueLabel.LabelName, diff --git a/backend/plugins/github/tasks/milestone_extractor.go b/backend/plugins/github/tasks/milestone_extractor.go index 7828b2c26f9..4737bd47e82 100644 --- a/backend/plugins/github/tasks/milestone_extractor.go +++ b/backend/plugins/github/tasks/milestone_extractor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "encoding/json" - "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/plugin" @@ -81,24 +79,17 @@ type MilestonesResponse struct { func ExtractMilestones(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[MilestonesResponse]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_MILESTONE_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - response := &MilestonesResponse{} - err := errors.Convert(json.Unmarshal(row.Data, response)) - if err != nil { - return nil, err - } - results := make([]interface{}, 0, 1) - results = append(results, convertGithubMilestone(response, data.Options.ConnectionId, data.Options.GithubId)) - return results, nil + Extract: func(body *MilestonesResponse, row *api.RawData) ([]any, errors.Error) { + return []any{convertGithubMilestone(body, data.Options.ConnectionId, data.Options.GithubId)}, nil }, }) if err != nil { diff --git a/backend/plugins/github/tasks/pr_comment_convertor.go b/backend/plugins/github/tasks/pr_comment_convertor.go index 43e0e73ba2b..b02342e2b8f 100644 --- a/backend/plugins/github/tasks/pr_comment_convertor.go +++ b/backend/plugins/github/tasks/pr_comment_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -54,34 +52,36 @@ func ConvertPullRequestComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubPrComment{}), - dal.Join("left join _tool_github_pull_requests "+ - "on _tool_github_pull_requests.github_id = _tool_github_pull_request_comments.pull_request_id"), - dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - prCommentIdGen := didgen.NewDomainIdGenerator(&models.GithubPrComment{}) prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) prReviewIdGen := didgen.NewDomainIdGenerator(&models.GithubPrReview{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPrComment{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPrComment]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_COMMENTS_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_COMMENTS_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubPullRequestComment := inputRow.(*models.GithubPrComment) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPrComment{}), + dal.Join("left join _tool_github_pull_requests " + + "on _tool_github_pull_requests.github_id = _tool_github_pull_request_comments.pull_request_id"), + dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_request_comments.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubPullRequestComment *models.GithubPrComment) ([]interface{}, errors.Error) { domainPrComment := &code.PullRequestComment{ DomainEntity: domainlayer.DomainEntity{ Id: prCommentIdGen.Generate(data.Options.ConnectionId, githubPullRequestComment.GithubId), diff --git a/backend/plugins/github/tasks/pr_commit_convertor.go b/backend/plugins/github/tasks/pr_commit_convertor.go index b9cbbbbf52b..d21c343dcad 100644 --- a/backend/plugins/github/tasks/pr_commit_convertor.go +++ b/backend/plugins/github/tasks/pr_commit_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/code" @@ -54,30 +52,31 @@ func ConvertPullRequestCommits(taskCtx plugin.SubTaskContext) (err errors.Error) pullIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) - cursor, err := db.Cursor( - dal.From(&models.GithubPrCommit{}), - dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_commits.pull_request_id`), - dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - dal.Orderby("pull_request_id ASC"), - ) - if err != nil { - return err - } - defer cursor.Close() - - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPrCommit{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPrCommit]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PR_COMMIT_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PR_COMMIT_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubPullRequestCommit := inputRow.(*models.GithubPrCommit) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPrCommit{}), + dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_commits.pull_request_id`), + dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + dal.Orderby("pull_request_id ASC"), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_requests.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubPullRequestCommit *models.GithubPrCommit) ([]interface{}, errors.Error) { domainPrCommit := &code.PullRequestCommit{ CommitSha: githubPullRequestCommit.CommitSha, PullRequestId: pullIdGen.Generate(data.Options.ConnectionId, githubPullRequestCommit.PullRequestId), diff --git a/backend/plugins/github/tasks/pr_commit_extractor.go b/backend/plugins/github/tasks/pr_commit_extractor.go index 569e39abac5..536af4d26e7 100644 --- a/backend/plugins/github/tasks/pr_commit_extractor.go +++ b/backend/plugins/github/tasks/pr_commit_extractor.go @@ -70,62 +70,44 @@ type PullRequestCommit struct { func ExtractApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{ - RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraIssues by Board - */ + extractor, err := helper.NewStatefulApiExtractor(&helper.StatefulApiExtractorArgs[PrCommitsResponse]{ + SubtaskCommonArgs: &helper.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_PR_COMMIT_TABLE, }, - Extract: func(row *helper.RawData) ([]interface{}, errors.Error) { - apiPullRequestCommit := &PrCommitsResponse{} + Extract: func(body *PrCommitsResponse, row *helper.RawData) ([]any, errors.Error) { if strings.HasPrefix(string(row.Data), "{\"message\": \"Not Found\"") { return nil, nil } - err := errors.Convert(json.Unmarshal(row.Data, apiPullRequestCommit)) - if err != nil { - return nil, err - } pull := &SimplePr{} - err = errors.Convert(json.Unmarshal(row.Input, pull)) + err := errors.Convert(json.Unmarshal(row.Input, pull)) if err != nil { return nil, err } - // need to extract 2 kinds of entities here results := make([]interface{}, 0, 3) githubRepoCommit := &models.GithubRepoCommit{ ConnectionId: data.Options.ConnectionId, RepoId: repoId, - CommitSha: apiPullRequestCommit.Sha, + CommitSha: body.Sha, } results = append(results, githubRepoCommit) - - githubCommit, err := convertPullRequestCommit(apiPullRequestCommit, data.Options.ConnectionId) + githubCommit, err := convertPullRequestCommit(body, data.Options.ConnectionId) if err != nil { return nil, err } results = append(results, githubCommit) - githubPullRequestCommit := &models.GithubPrCommit{ ConnectionId: data.Options.ConnectionId, - CommitSha: apiPullRequestCommit.Sha, + CommitSha: body.Sha, PullRequestId: pull.GithubId, CommitAuthorName: githubCommit.AuthorName, CommitAuthorEmail: githubCommit.AuthorEmail, CommitAuthoredDate: githubCommit.AuthoredDate, } - if err != nil { - return nil, err - } results = append(results, githubPullRequestCommit) return results, nil }, diff --git a/backend/plugins/github/tasks/pr_convertor.go b/backend/plugins/github/tasks/pr_convertor.go index 4a94d17dfe2..48a40cc4e67 100644 --- a/backend/plugins/github/tasks/pr_convertor.go +++ b/backend/plugins/github/tasks/pr_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -53,32 +51,33 @@ func ConvertPullRequests(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubPullRequest{}), - dal.Where("repo_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) repoIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPullRequest{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPullRequest]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PULL_REQUEST_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PULL_REQUEST_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - pr := inputRow.(*models.GithubPullRequest) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPullRequest{}), + dal.Where("repo_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(pr *models.GithubPullRequest) ([]interface{}, errors.Error) { domainPr := &code.PullRequest{ DomainEntity: domainlayer.DomainEntity{ Id: prIdGen.Generate(data.Options.ConnectionId, pr.GithubId), diff --git a/backend/plugins/github/tasks/pr_extractor.go b/backend/plugins/github/tasks/pr_extractor.go index ea78583858b..802846f301c 100644 --- a/backend/plugins/github/tasks/pr_extractor.go +++ b/backend/plugins/github/tasks/pr_extractor.go @@ -21,6 +21,7 @@ import ( "encoding/json" "regexp" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/common" "github.com/apache/incubator-devlake/core/plugin" @@ -85,6 +86,7 @@ type GithubApiPullRequest struct { func ExtractApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) + db := taskCtx.GetDal() config := data.Options.ScopeConfig var labelTypeRegex *regexp.Regexp var labelComponentRegex *regexp.Regexp @@ -104,45 +106,44 @@ func ExtractApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { } } - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraIssues by Board - */ + extractor, extErr := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[GithubApiPullRequest]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_PULL_REQUEST_TABLE, + SubtaskConfig: map[string]string{ + "prType": prType, + "prComponent": prComponent, + }, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - rawL := &GithubApiPullRequest{} - err := errors.Convert(json.Unmarshal(row.Data, rawL)) - if err != nil { - return nil, err + BeforeExtract: func(body *GithubApiPullRequest, stateManager *api.SubtaskStateManager) errors.Error { + if stateManager.IsIncremental() { + return errors.Convert(db.Delete( + &models.GithubPrLabel{}, + dal.Where("connection_id = ? AND pull_id = ?", data.Options.ConnectionId, body.GithubId), + )) } - // need to extract 2 kinds of entities here + return nil + }, + Extract: func(body *GithubApiPullRequest, row *api.RawData) ([]any, errors.Error) { results := make([]interface{}, 0, 1) - if rawL.GithubId == 0 { + if body.GithubId == 0 { return nil, nil } // Filter bot PRs by username - if rawL.User != nil && shouldSkipByUsername(rawL.User.Login) { - taskCtx.GetLogger().Debug("Skipping PR #%d from bot user: %s", rawL.Number, rawL.User.Login) + if body.User != nil && shouldSkipByUsername(body.User.Login) { + taskCtx.GetLogger().Debug("Skipping PR #%d from bot user: %s", body.Number, body.User.Login) return nil, nil } - //If this is a pr, ignore - githubPr, err := convertGithubPullRequest(rawL, data.Options.ConnectionId, data.Options.GithubId) + githubPr, err := convertGithubPullRequest(body, data.Options.ConnectionId, data.Options.GithubId) if err != nil { return nil, err } - if rawL.User != nil { - githubUser, err := convertAccount(rawL.User, data.Options.GithubId, data.Options.ConnectionId) + if body.User != nil { + githubUser, err := convertAccount(body.User, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } @@ -150,33 +151,31 @@ func ExtractApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { githubPr.AuthorName = githubUser.Login githubPr.AuthorId = githubUser.AccountId } - for _, label := range rawL.Labels { + for _, label := range body.Labels { results = append(results, &models.GithubPrLabel{ ConnectionId: data.Options.ConnectionId, PullId: githubPr.GithubId, LabelName: label.Name, }) - // if pr.Type has not been set and prType is set in .env, process the below if labelTypeRegex != nil && labelTypeRegex.MatchString(label.Name) { githubPr.Type = label.Name } - // if pr.Component has not been set and prComponent is set in .env, process if labelComponentRegex != nil && labelComponentRegex.MatchString(label.Name) { githubPr.Component = label.Name } } results = append(results, githubPr) - return results, nil }, }) - if err != nil { - return errors.Default.Wrap(err, "error initializing Github PR extractor") + if extErr != nil { + return errors.Default.Wrap(extErr, "error initializing Github PR extractor") } return extractor.Execute() } + func convertGithubPullRequest(pull *GithubApiPullRequest, connId uint64, repoId int) (*models.GithubPullRequest, errors.Error) { githubPull := &models.GithubPullRequest{ ConnectionId: connId, diff --git a/backend/plugins/github/tasks/pr_issue_convertor.go b/backend/plugins/github/tasks/pr_issue_convertor.go index 979a8b128f3..0d724224f77 100644 --- a/backend/plugins/github/tasks/pr_issue_convertor.go +++ b/backend/plugins/github/tasks/pr_issue_convertor.go @@ -19,7 +19,6 @@ package tasks import ( "github.com/spf13/cast" - "reflect" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -52,32 +51,34 @@ func ConvertPullRequestIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubPrIssue{}), - dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_issues.pull_request_id`), - dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - dal.Orderby("pull_request_id ASC"), - ) - if err != nil { - return err - } - defer cursor.Close() prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) issueIdGen := didgen.NewDomainIdGenerator(&models.GithubIssue{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPrIssue{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPrIssue]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PULL_REQUEST_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PULL_REQUEST_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubPrIssue := inputRow.(*models.GithubPrIssue) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPrIssue{}), + dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_issues.pull_request_id`), + dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + dal.Orderby("pull_request_id ASC"), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_requests.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubPrIssue *models.GithubPrIssue) ([]interface{}, errors.Error) { pullRequestIssue := &crossdomain.PullRequestIssue{ PullRequestId: prIdGen.Generate(data.Options.ConnectionId, githubPrIssue.PullRequestId), IssueId: issueIdGen.Generate(data.Options.ConnectionId, githubPrIssue.IssueId), diff --git a/backend/plugins/github/tasks/pr_issue_enricher.go b/backend/plugins/github/tasks/pr_issue_enricher.go index bf4d2837d17..cf8228b79a3 100644 --- a/backend/plugins/github/tasks/pr_issue_enricher.go +++ b/backend/plugins/github/tasks/pr_issue_enricher.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "reflect" "regexp" "strconv" "strings" @@ -62,27 +61,42 @@ func EnrichPullRequestIssues(taskCtx plugin.SubTaskContext) (err errors.Error) { } } charPattern := regexp.MustCompile(`[\/a-zA-Z\s,]+`) - cursor, err := db.Cursor(dal.From(&models.GithubPullRequest{}), - dal.Where("repo_id = ? and connection_id = ?", repoId, data.Options.ConnectionId)) - if err != nil { - return err - } - defer cursor.Close() - // iterate all rows - - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPullRequest{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPullRequest]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PULL_REQUEST_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PULL_REQUEST_TABLE, + SubtaskConfig: map[string]string{ + "prBodyClosePattern": prBodyClosePattern, + }, + }, + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPullRequest{}), + dal.Where("repo_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + BeforeConvert: func(githubPullRequest *models.GithubPullRequest, stateManager *api.SubtaskStateManager) errors.Error { + if !stateManager.IsIncremental() { + return nil + } + return db.Delete( + &models.GithubPrIssue{}, + dal.Where("connection_id = ? AND pull_request_id = ?", data.Options.ConnectionId, githubPullRequest.GithubId), + ) }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubPullRequst := inputRow.(*models.GithubPullRequest) + Convert: func(githubPullRequst *models.GithubPullRequest) ([]interface{}, errors.Error) { results := make([]interface{}, 0, 1) //find the matched string in body diff --git a/backend/plugins/github/tasks/pr_label_convertor.go b/backend/plugins/github/tasks/pr_label_convertor.go index 3351218a71f..432a7308f27 100644 --- a/backend/plugins/github/tasks/pr_label_convertor.go +++ b/backend/plugins/github/tasks/pr_label_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/code" @@ -51,31 +49,33 @@ func ConvertPullRequestLabels(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubPrLabel{}), - dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_labels.pull_id`), - dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - dal.Orderby("pull_id ASC"), - ) - if err != nil { - return err - } - defer cursor.Close() prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPrLabel{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPrLabel]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PULL_REQUEST_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PULL_REQUEST_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - prLabel := inputRow.(*models.GithubPrLabel) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPrLabel{}), + dal.Join(`left join _tool_github_pull_requests on _tool_github_pull_requests.github_id = _tool_github_pull_request_labels.pull_id`), + dal.Where("_tool_github_pull_requests.repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + dal.Orderby("pull_id ASC"), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_requests.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(prLabel *models.GithubPrLabel) ([]interface{}, errors.Error) { domainPrLabel := &code.PullRequestLabel{ PullRequestId: prIdGen.Generate(data.Options.ConnectionId, prLabel.PullId), LabelName: prLabel.LabelName, diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go b/backend/plugins/github/tasks/pr_review_comment_collector.go index d6dfb296f42..ac29938cf1e 100644 --- a/backend/plugins/github/tasks/pr_review_comment_collector.go +++ b/backend/plugins/github/tasks/pr_review_comment_collector.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/url" + "time" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -75,7 +76,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} if apiCollector.GetSince() != nil { - query.Set("since", apiCollector.GetSince().String()) + query.Set("since", apiCollector.GetSince().UTC().Format(time.RFC3339)) } query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("direction", "asc") diff --git a/backend/plugins/github/tasks/pr_review_comment_extractor.go b/backend/plugins/github/tasks/pr_review_comment_extractor.go index ed8aa182584..cc39806cdba 100644 --- a/backend/plugins/github/tasks/pr_review_comment_extractor.go +++ b/backend/plugins/github/tasks/pr_review_comment_extractor.go @@ -48,86 +48,75 @@ var ExtractApiPrReviewCommentsMeta = plugin.SubTaskMeta{ models.GithubPrCommit{}.TableName()}, } +type GithubPrReviewCommentBody struct { + GithubId int `json:"id"` + Body json.RawMessage + User *GithubAccountResponse + PrUrl string `json:"pull_request_url"` + GithubCreatedAt common.Iso8601Time `json:"created_at"` + GithubUpdatedAt common.Iso8601Time `json:"updated_at"` + CommitId string `json:"commit_id"` + PrReviewId int `json:"pull_request_review_id"` +} + func ExtractApiPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) db := taskCtx.GetDal() + prUrlPattern := fmt.Sprintf(`https\:\/\/api\.github\.com\/repos\/%s\/pulls\/(\d+)`, data.Options.Name) + prUrlRegex, err := regexp.Compile(prUrlPattern) + if err != nil { + return errors.Default.Wrap(err, "regexp Compile prUrlPattern failed") + } - extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{ - RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ - Ctx: taskCtx, + extractor, extErr := helper.NewStatefulApiExtractor(&helper.StatefulApiExtractorArgs[GithubPrReviewCommentBody]{ + SubtaskCommonArgs: &helper.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, Table: RAW_PR_REVIEW_COMMENTS_TABLE, }, - Extract: func(row *helper.RawData) ([]interface{}, errors.Error) { - var prReviewComment struct { - GithubId int `json:"id"` - Body json.RawMessage - User *GithubAccountResponse - PrUrl string `json:"pull_request_url"` - GithubCreatedAt common.Iso8601Time `json:"created_at"` - GithubUpdatedAt common.Iso8601Time `json:"updated_at"` - CommitId string `json:"commit_id"` - PrReviewId int `json:"pull_request_review_id"` - } - err := errors.Convert(json.Unmarshal(row.Data, &prReviewComment)) - if err != nil { - return nil, err - } - + Extract: func(body *GithubPrReviewCommentBody, row *helper.RawData) ([]any, errors.Error) { results := make([]interface{}, 0, 1) githubPrComment := &models.GithubPrComment{ ConnectionId: data.Options.ConnectionId, - GithubId: prReviewComment.GithubId, - Body: string(prReviewComment.Body), - CommitSha: prReviewComment.CommitId, - ReviewId: prReviewComment.PrReviewId, - GithubCreatedAt: prReviewComment.GithubCreatedAt.ToTime(), - GithubUpdatedAt: prReviewComment.GithubUpdatedAt.ToTime(), + GithubId: body.GithubId, + Body: string(body.Body), + CommitSha: body.CommitId, + ReviewId: body.PrReviewId, + GithubCreatedAt: body.GithubCreatedAt.ToTime(), + GithubUpdatedAt: body.GithubUpdatedAt.ToTime(), Type: "DIFF", } - - prUrlRegex, err := errors.Convert01(regexp.Compile(prUrlPattern)) - if err != nil { - return nil, errors.Default.Wrap(err, "regexp Compile prUrlPattern failed") - } - prId, err := enrichGithubPrComment(data, db, prUrlRegex, prReviewComment.PrUrl) + prId, err := enrichGithubPrComment(data, db, prUrlRegex, body.PrUrl) if err != nil { return nil, errors.Default.Wrap(err, "parse prId failed") } if prId != 0 { githubPrComment.PullRequestId = prId } - - if prReviewComment.User != nil { - // Filter bot comments by username - if shouldSkipByUsername(prReviewComment.User.Login) { - taskCtx.GetLogger().Debug("Skipping PR review comment #%d from bot user: %s", prReviewComment.GithubId, prReviewComment.User.Login) + if body.User != nil { + if shouldSkipByUsername(body.User.Login) { + taskCtx.GetLogger().Debug("Skipping PR review comment #%d from bot user: %s", body.GithubId, body.User.Login) return nil, nil } - - githubPrComment.AuthorUserId = prReviewComment.User.Id - githubPrComment.AuthorUsername = prReviewComment.User.Login - - githubAccount, err := convertAccount(prReviewComment.User, data.Options.GithubId, data.Options.ConnectionId) + githubPrComment.AuthorUserId = body.User.Id + githubPrComment.AuthorUsername = body.User.Login + githubAccount, err := convertAccount(body.User, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } results = append(results, githubAccount) } - results = append(results, githubPrComment) return results, nil }, }) - - if err != nil { - return err + if extErr != nil { + return extErr } - return extractor.Execute() } diff --git a/backend/plugins/github/tasks/pr_review_convertor.go b/backend/plugins/github/tasks/pr_review_convertor.go index 4ff2f353126..f266b130a37 100644 --- a/backend/plugins/github/tasks/pr_review_convertor.go +++ b/backend/plugins/github/tasks/pr_review_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -53,34 +51,35 @@ func ConvertPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubPrReview{}), - dal.Join("left join _tool_github_pull_requests "+ - "on _tool_github_pull_requests.github_id = _tool_github_pull_request_reviews.pull_request_id"), - dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - prReviewUIdGen := didgen.NewDomainIdGenerator(&models.GithubPrReview{}) prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubPrReview{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubPrReview]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PR_REVIEW_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PR_REVIEW_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubPullRequestReview := inputRow.(*models.GithubPrReview) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubPrReview{}), + dal.Join("left join _tool_github_pull_requests " + + "on _tool_github_pull_requests.github_id = _tool_github_pull_request_reviews.pull_request_id"), + dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_requests.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubPullRequestReview *models.GithubPrReview) ([]interface{}, errors.Error) { domainPrReview := &code.PullRequestComment{ DomainEntity: domainlayer.DomainEntity{ Id: prReviewUIdGen.Generate(data.Options.ConnectionId, githubPullRequestReview.GithubId), diff --git a/backend/plugins/github/tasks/pr_review_extractor.go b/backend/plugins/github/tasks/pr_review_extractor.go index 2af41b60a7e..ff8015411ee 100644 --- a/backend/plugins/github/tasks/pr_review_extractor.go +++ b/backend/plugins/github/tasks/pr_review_extractor.go @@ -56,79 +56,59 @@ type PullRequestReview struct { func ExtractApiPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, - /* - This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal - set of data to be process, for example, we process JiraIssues by Board - */ + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[PullRequestReview]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - /* - Table store raw data - */ Table: RAW_PR_REVIEW_TABLE, }, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - apiPullRequestReview := &PullRequestReview{} + Extract: func(body *PullRequestReview, row *api.RawData) ([]any, errors.Error) { if strings.HasPrefix(string(row.Data), "{\"message\": \"Not Found\"") { return nil, nil } - err := errors.Convert(json.Unmarshal(row.Data, apiPullRequestReview)) - if err != nil { - return nil, err - } - if apiPullRequestReview.State == "PENDING" || apiPullRequestReview.User == nil { + if body.State == "PENDING" || body.User == nil { return nil, nil } // Filter bot reviews by username - if shouldSkipByUsername(apiPullRequestReview.User.Login) { - taskCtx.GetLogger().Debug("Skipping review #%d from bot user: %s", apiPullRequestReview.GithubId, apiPullRequestReview.User.Login) + if shouldSkipByUsername(body.User.Login) { + taskCtx.GetLogger().Debug("Skipping review #%d from bot user: %s", body.GithubId, body.User.Login) return nil, nil } pull := &SimplePr{} - err = errors.Convert(json.Unmarshal(row.Input, pull)) + err := errors.Convert(json.Unmarshal(row.Input, pull)) if err != nil { return nil, err } - // need to extract 2 kinds of entities here results := make([]interface{}, 0, 1) - githubReviewer := &models.GithubReviewer{ ConnectionId: data.Options.ConnectionId, PullRequestId: pull.GithubId, } - githubPrReview := &models.GithubPrReview{ ConnectionId: data.Options.ConnectionId, - GithubId: apiPullRequestReview.GithubId, - Body: apiPullRequestReview.Body, - State: apiPullRequestReview.State, - CommitSha: apiPullRequestReview.CommitId, - GithubSubmitAt: apiPullRequestReview.SubmittedAt.ToNullableTime(), + GithubId: body.GithubId, + Body: body.Body, + State: body.State, + CommitSha: body.CommitId, + GithubSubmitAt: body.SubmittedAt.ToNullableTime(), PullRequestId: pull.GithubId, } - - if apiPullRequestReview.User != nil { - githubReviewer.ReviewerId = apiPullRequestReview.User.Id - githubReviewer.Username = apiPullRequestReview.User.Login - - githubPrReview.AuthorUserId = apiPullRequestReview.User.Id - githubPrReview.AuthorUsername = apiPullRequestReview.User.Login - - githubUser, err := convertAccount(apiPullRequestReview.User, data.Options.GithubId, data.Options.ConnectionId) + if body.User != nil { + githubReviewer.ReviewerId = body.User.Id + githubReviewer.Username = body.User.Login + githubPrReview.AuthorUserId = body.User.Id + githubPrReview.AuthorUsername = body.User.Login + githubUser, err := convertAccount(body.User, data.Options.GithubId, data.Options.ConnectionId) if err != nil { return nil, err } results = append(results, githubUser) } - results = append(results, githubReviewer) results = append(results, githubPrReview) - return results, nil }, }) diff --git a/backend/plugins/github/tasks/release_convertor.go b/backend/plugins/github/tasks/release_convertor.go index be49141eb5d..a5c2f941d9f 100644 --- a/backend/plugins/github/tasks/release_convertor.go +++ b/backend/plugins/github/tasks/release_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer" @@ -50,28 +48,38 @@ var ConvertReleasesMeta = plugin.SubTaskMeta{ func ConvertRelease(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() - rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_RELEASE_TABLE) - cursor, err := db.Cursor( - dal.From(&models.GithubRelease{}), - dal.Where( - "published_at IS NOT NULL AND connection_id = ? AND github_id = ?", - data.Options.ConnectionId, data.Options.GithubId, - ), - ) - if err != nil { - return err + data := taskCtx.GetData().(*GithubTaskData) + params := GithubApiParams{ + ConnectionId: data.Options.ConnectionId, + Name: data.Options.Name, } - defer cursor.Close() releaseIdGen := didgen.NewDomainIdGenerator(&models.GithubRelease{}) releaseScopeIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubRelease{}), - Input: cursor, - RawDataSubTaskArgs: *rawDataSubTaskArgs, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubRelease := inputRow.(*models.GithubRelease) + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubRelease]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_RELEASE_TABLE, + Params: params, + }, + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubRelease{}), + dal.Where( + "published_at IS NOT NULL AND connection_id = ? AND github_id = ?", + data.Options.ConnectionId, data.Options.GithubId, + ), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubRelease *models.GithubRelease) ([]interface{}, errors.Error) { release := &devops.CicdRelease{ DomainEntity: domainlayer.DomainEntity{ Id: releaseIdGen.Generate(githubRelease.ConnectionId, githubRelease.Id), @@ -87,18 +95,12 @@ func ConvertRelease(taskCtx plugin.SubTaskContext) errors.Error { IsPrerelease: githubRelease.IsPrerelease, TagName: githubRelease.TagName, CommitSha: githubRelease.CommitSha, - - AuthorID: githubRelease.AuthorID, - - RepoId: releaseScopeIdGen.Generate(githubRelease.ConnectionId, githubRelease.GithubId), + AuthorID: githubRelease.AuthorID, + RepoId: releaseScopeIdGen.Generate(githubRelease.ConnectionId, githubRelease.GithubId), } - - return []interface{}{ - release, - }, nil + return []interface{}{release}, nil }, }) - if err != nil { return err } diff --git a/backend/plugins/github/tasks/repo_convertor.go b/backend/plugins/github/tasks/repo_convertor.go index b9c711ad9f8..c6bff20b320 100644 --- a/backend/plugins/github/tasks/repo_convertor.go +++ b/backend/plugins/github/tasks/repo_convertor.go @@ -19,7 +19,6 @@ package tasks import ( "fmt" - "reflect" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -35,10 +34,6 @@ import ( "github.com/apache/incubator-devlake/plugins/github/models" ) -func init() { - RegisterSubtaskMeta(&ConvertRepoMeta) -} - type GithubApiRepo struct { Name string `json:"name"` FullName string `json:"full_name"` @@ -53,6 +48,10 @@ type GithubApiRepo struct { CloneUrl string `json:"clone_url"` } +func init() { + RegisterSubtaskMeta(&ConvertRepoMeta) +} + var ConvertRepoMeta = plugin.SubTaskMeta{ Name: "Convert Repos", EntryPoint: ConvertRepo, @@ -80,30 +79,31 @@ func ConvertRepo(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubRepo{}), - dal.Where("github_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - repoIdGen := didgen.NewDomainIdGenerator(&models.GithubRepo{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubRepo{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubRepo]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: models.GithubRepo{}.TableName(), Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: models.GithubRepo{}.TableName(), }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - repository := inputRow.(*models.GithubRepo) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubRepo{}), + dal.Where("github_id = ? and connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(repository *models.GithubRepo) ([]interface{}, errors.Error) { domainRepository := &code.Repo{ DomainEntity: domainlayer.DomainEntity{ Id: repoIdGen.Generate(data.Options.ConnectionId, repository.GithubId), @@ -125,12 +125,10 @@ func ConvertRepo(taskCtx plugin.SubTaskContext) errors.Error { Description: repository.Description, CreatedDate: repository.CreatedDate, } - domainBoardRepo := &crossdomain.BoardRepo{ BoardId: repoIdGen.Generate(data.Options.ConnectionId, repository.GithubId), RepoId: repoIdGen.Generate(data.Options.ConnectionId, repository.GithubId), } - domainCicdScope := &devops.CicdScope{ DomainEntity: domainlayer.DomainEntity{ Id: repoIdGen.Generate(data.Options.ConnectionId, repository.GithubId), @@ -141,7 +139,6 @@ func ConvertRepo(taskCtx plugin.SubTaskContext) errors.Error { CreatedDate: repository.CreatedDate, UpdatedDate: repository.UpdatedDate, } - return []interface{}{ domainRepository, domainBoard, diff --git a/backend/plugins/github/tasks/review_convertor.go b/backend/plugins/github/tasks/review_convertor.go index d4d10104256..18a0de211e0 100644 --- a/backend/plugins/github/tasks/review_convertor.go +++ b/backend/plugins/github/tasks/review_convertor.go @@ -18,8 +18,6 @@ limitations under the License. package tasks import ( - "reflect" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models/domainlayer/code" @@ -52,33 +50,34 @@ func ConvertReviews(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) repoId := data.Options.GithubId - cursor, err := db.Cursor( - dal.From(&models.GithubReviewer{}), - dal.Join("left join _tool_github_pull_requests "+ - "on _tool_github_pull_requests.github_id = _tool_github_reviewers.pull_request_id"), - dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), - ) - if err != nil { - return err - } - defer cursor.Close() - prIdGen := didgen.NewDomainIdGenerator(&models.GithubPullRequest{}) accountIdGen := didgen.NewDomainIdGenerator(&models.GithubAccount{}) - converter, err := api.NewDataConverter(api.DataConverterArgs{ - InputRowType: reflect.TypeOf(models.GithubReviewer{}), - Input: cursor, - RawDataSubTaskArgs: api.RawDataSubTaskArgs{ - Ctx: taskCtx, + converter, err := api.NewStatefulDataConverter(&api.StatefulDataConverterArgs[models.GithubReviewer]{ + SubtaskCommonArgs: &api.SubtaskCommonArgs{ + SubTaskContext: taskCtx, + Table: RAW_PR_REVIEW_TABLE, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, Name: data.Options.Name, }, - Table: RAW_PR_REVIEW_TABLE, }, - Convert: func(inputRow interface{}) ([]interface{}, errors.Error) { - githubReview := inputRow.(*models.GithubReviewer) + Input: func(stateManager *api.SubtaskStateManager) (dal.Rows, errors.Error) { + clauses := []dal.Clause{ + dal.From(&models.GithubReviewer{}), + dal.Join("left join _tool_github_pull_requests " + + "on _tool_github_pull_requests.github_id = _tool_github_reviewers.pull_request_id"), + dal.Where("repo_id = ? and _tool_github_pull_requests.connection_id = ?", repoId, data.Options.ConnectionId), + } + if stateManager.IsIncremental() { + since := stateManager.GetSince() + if since != nil { + clauses = append(clauses, dal.Where("_tool_github_pull_requests.github_updated_at >= ?", since)) + } + } + return db.Cursor(clauses...) + }, + Convert: func(githubReview *models.GithubReviewer) ([]interface{}, errors.Error) { domainReview := &code.PullRequestReviewer{ PullRequestId: prIdGen.Generate(data.Options.ConnectionId, githubReview.PullRequestId), ReviewerId: accountIdGen.Generate(data.Options.ConnectionId, githubReview.ReviewerId), diff --git a/backend/server/services/remote/bridge/context.go b/backend/server/services/remote/bridge/context.go index e66d1e66e25..08393dd465d 100644 --- a/backend/server/services/remote/bridge/context.go +++ b/backend/server/services/remote/bridge/context.go @@ -138,4 +138,11 @@ func (r remoteContextImpl) IncProgress(quantity int) { } } +func (r remoteContextImpl) GetProgress() int { + if r.parent != nil { + return r.parent.GetProgress() + } + return 0 +} + var _ RemoteContext = (*remoteContextImpl)(nil)