@@ -24,10 +24,12 @@ import (
2424 "net/url"
2525 "time"
2626
27+ "github.com/apache/incubator-devlake/core/dal"
2728 "github.com/apache/incubator-devlake/core/errors"
2829 "github.com/apache/incubator-devlake/core/log"
2930 "github.com/apache/incubator-devlake/core/plugin"
3031 helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
32+ "github.com/apache/incubator-devlake/plugins/github/models"
3133)
3234
3335func init () {
@@ -89,6 +91,24 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
8991 // `windowStart` past the previously collected second (inclusive-both-ends), while
9092 // fullsync + TimeAfter keeps the user-specified bound inclusive.
9193 createdAfter := manager .GetSince ()
94+ sinceSource := "state_since"
95+ syncPolicy := taskCtx .TaskContext ().SyncPolicy ()
96+ if createdAfter == nil {
97+ sinceSource = "none"
98+ }
99+ if createdAfter == nil && (syncPolicy == nil || ! syncPolicy .FullSync ) {
100+ fallbackSince , err := loadLatestRunUpdatedAt (taskCtx , data .Options .ConnectionId , data .Options .GithubId )
101+ if err != nil {
102+ return err
103+ }
104+ if fallbackSince != nil {
105+ createdAfter = fallbackSince
106+ sinceSource = "tool_runs_fallback"
107+ logger .Info ("cicd_run_collector: collector state missing; bootstrapping since from existing _tool_github_runs at %s" , fallbackSince .UTC ().Format (time .RFC3339 ))
108+ } else {
109+ logger .Debug ("cicd_run_collector: collector state missing and no _tool_github_runs timestamp found" )
110+ }
111+ }
92112 untilPtr := manager .GetUntil ()
93113 * untilPtr = untilPtr .Truncate (time .Second )
94114 until := * untilPtr
@@ -102,12 +122,14 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
102122 } else {
103123 // 2018-01-01 conservatively predates GitHub Actions' late-2019 GA.
104124 windowStart = time .Date (2018 , 1 , 1 , 0 , 0 , 0 , 0 , time .UTC )
125+ sinceSource = "epoch_fullsync"
105126 }
106127
107- logger .Info ("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v)" ,
128+ logger .Info ("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v, since_source=%s )" ,
108129 windowStart .Format (githubTimeLayout ),
109130 until .Format (githubTimeLayout ),
110- manager .IsIncremental ())
131+ manager .IsIncremental (),
132+ sinceSource )
111133
112134 leafWindows , err := newLeafWindowBuilder (taskCtx , data ).build (windowStart , until )
113135 if err != nil {
@@ -122,6 +144,28 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
122144 return manager .Execute ()
123145}
124146
147+ func loadLatestRunUpdatedAt (taskCtx plugin.SubTaskContext , connectionId uint64 , repoId int ) (* time.Time , errors.Error ) {
148+ db := taskCtx .GetDal ()
149+ latest := & models.GithubRun {}
150+ err := db .First (
151+ latest ,
152+ dal .Where ("connection_id = ? AND repo_id = ? AND github_updated_at IS NOT NULL" , connectionId , repoId ),
153+ dal .Orderby ("github_updated_at DESC" ),
154+ dal .Limit (1 ),
155+ )
156+ if err != nil {
157+ if db .IsErrorNotFound (err ) {
158+ return nil , nil
159+ }
160+ return nil , err
161+ }
162+ if latest .GithubUpdatedAt == nil {
163+ return nil , nil
164+ }
165+ fallback := latest .GithubUpdatedAt .UTC ()
166+ return & fallback , nil
167+ }
168+
125169// buildRunsQuery assembles the filtered-mode query for a single leaf TimeWindow.
126170// Shared between registerCollectorForLeafWindows and tests.
127171func buildRunsQuery (reqData * helper.RequestData ) (url.Values , errors.Error ) {
0 commit comments