Skip to content

Commit efc065a

Browse files
committed
Fix a crash if we can not resolve a runner job request
* implement broker job ack
1 parent b17bfd9 commit efc065a

3 files changed

Lines changed: 69 additions & 23 deletions

File tree

actionsrunner/runner.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,14 +471,15 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
471471
}
472472
jobreq := &protocol.AgentJobRequestMessage{}
473473
var runServiceURL string
474+
ok := false
474475
if strings.EqualFold(message.MessageType, "RunnerJobRequest") {
475476
plogger.Printf("Warning: TaskAgentMessage.MessageType is %v, which has not been properly tested "+
476477
"due to missing access to test servers of the new protocol before rollout. "+
477478
"Please report any failures to https://github.com/ChristopherHX/github-act-runner/issues.\n",
478479
message.MessageType)
479480
rjrr := &RunnerJobRequestRef{}
480481
if unmarshalErr := json.Unmarshal(src, rjrr); unmarshalErr != nil {
481-
fmt.Printf("fail to unmashal job request: %v", unmarshalErr)
482+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
482483
}
483484
for retries := 0; retries < 5; retries++ {
484485
var requestErr error
@@ -502,17 +503,26 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
502503
}
503504
if requestErr == nil {
504505
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
505-
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
506+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
507+
} else {
508+
ok = true
506509
}
507510
break
511+
} else if strings.Contains(requestErr.Error(), "job assignment is invalid") {
512+
return
508513
}
509514
<-time.After(time.Second * 5 * time.Duration(retries+1))
510515
}
511516
} else {
512517
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
513-
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
518+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
519+
} else {
520+
ok = true
514521
}
515522
}
523+
if !ok {
524+
return
525+
}
516526
jobrun := &JobRun{
517527
RequestID: jobreq.RequestID,
518528
JobID: jobreq.JobID,

actionsrunner/worker_context.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,17 @@ type DefaultWorkerContext struct {
3838
//
3939
//nolint:gocritic // ptrToRefParam: API compatibility requirement - changing pointer to value would be breaking change
4040
func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]protocol.VariableValue) {
41-
if strings.EqualFold(wc.Message().MessageType, "RunnerJobRequest") {
41+
jobreq := wc.Message()
42+
if jobreq == nil {
43+
return
44+
}
45+
if strings.EqualFold(jobreq.MessageType, "RunnerJobRequest") {
4246
payload := &run.CompleteJobRequest{
43-
PlanID: wc.Message().Plan.PlanID,
44-
JobID: wc.Message().JobID,
47+
PlanID: jobreq.Plan.PlanID,
48+
JobID: jobreq.JobID,
4549
Conclusion: strings.ToLower(result),
4650
Outputs: nil,
47-
BillingOwnerID: wc.Message().BillingOwnerID,
51+
BillingOwnerID: jobreq.BillingOwnerID,
4852
}
4953
if outputs != nil {
5054
payload.Outputs = *outputs
@@ -70,38 +74,42 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro
7074
completejobURL.Path = path.Join(completejobURL.Path, "completejob")
7175
for i := 0; ; i++ {
7276
if err := wc.VssConnection.RequestWithContext2(context.Background(), "POST", completejobURL.String(), "", payload, nil); err != nil {
73-
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error())
77+
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error())
7478
} else {
75-
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result)
79+
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result)
7680
break
7781
}
7882
if i < workerMaxRetryAttempts {
7983
wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n",
80-
wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
84+
jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
8185
<-time.After(workerRetry)
8286
} else {
8387
break
8488
}
8589
}
8690
return
8791
}
92+
if jobreq.Plan == nil {
93+
wc.RunnerLogger.Printf("Invalid Job %s\n", jobreq.JobID)
94+
return
95+
}
8896
finish := &protocol.JobEvent{
8997
Name: "JobCompleted",
90-
JobID: wc.Message().JobID,
91-
RequestID: wc.Message().RequestID,
98+
JobID: jobreq.JobID,
99+
RequestID: jobreq.RequestID,
92100
Result: result,
93101
Outputs: outputs,
94102
}
95103
for i := 0; ; i++ {
96-
if err := wc.VssConnection.FinishJob(finish, wc.Message().Plan); err != nil {
97-
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error())
104+
if err := wc.VssConnection.FinishJob(finish, jobreq.Plan); err != nil {
105+
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error())
98106
} else {
99-
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result)
107+
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result)
100108
break
101109
}
102110
if i < workerMaxRetryAttempts {
103111
wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n",
104-
wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
112+
jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
105113
<-time.After(workerRetry)
106114
} else {
107115
break
@@ -110,10 +118,14 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro
110118
}
111119

112120
func (wc *DefaultWorkerContext) FailInitJob(title, message string) {
121+
jobreq := wc.Message()
122+
if jobreq == nil {
123+
return
124+
}
113125
if wc.Logger().Current() != nil {
114126
wc.Logger().Current().Complete("Failed")
115127
}
116-
timelineEntry := protocol.CreateTimelineEntry(wc.Message().JobID, "__fatal", title)
128+
timelineEntry := protocol.CreateTimelineEntry(jobreq.JobID, "__fatal", title)
117129
e := wc.Logger().Append(&timelineEntry)
118130
e.Start()
119131
if wc.Logger().Current() != e {
@@ -147,7 +159,11 @@ func (wc *DefaultWorkerContext) JobExecCtx() context.Context {
147159
}
148160

149161
func (wc *DefaultWorkerContext) Init() {
150-
jobVssConnection, vssConnectionData, err := wc.Message().GetConnection("SystemVssConnection")
162+
jobreq := wc.Message()
163+
if jobreq == nil {
164+
return
165+
}
166+
jobVssConnection, vssConnectionData, err := jobreq.GetConnection("SystemVssConnection")
151167
if err != nil {
152168
wc.RunnerLogger.Printf("Failed to find the SystemVssConnection Endpoint, try to finish job as failed")
153169
wc.FinishJob("Failed", &map[string]protocol.VariableValue{})
@@ -159,7 +175,6 @@ func (wc *DefaultWorkerContext) Init() {
159175
}
160176
wc.VssConnection = jobVssConnection
161177

162-
jobreq := wc.Message()
163178
resultsEndpoint, hasResultsEndpoint := jobreq.Variables["system.github.results_endpoint"]
164179
wc.JobLogger = &logger.JobLogger{
165180
JobRequest: jobreq,
@@ -194,9 +209,9 @@ func (wc *DefaultWorkerContext) Init() {
194209
},
195210
}
196211
}
197-
timelineEntry := protocol.CreateTimelineEntry("", wc.Message().JobName, wc.Message().JobDisplayName)
212+
timelineEntry := protocol.CreateTimelineEntry("", jobreq.JobName, jobreq.JobDisplayName)
198213
jobEntry := wc.Logger().Append(&timelineEntry)
199-
jobEntry.ID = wc.Message().JobID
214+
jobEntry.ID = jobreq.JobID
200215
jobEntry.Type = "Job"
201216
jobEntry.Order = 0
202217
jobEntry.Start()

protocol/session.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,29 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas
226226

227227
func (session *AgentMessageConnection) DeleteMessage(ctx context.Context, message *TaskAgentMessage) error {
228228
if session.ServerV2URL != "" {
229-
// V2 no support for deleting messages
230-
return nil
229+
if !strings.EqualFold(message.MessageType, "RunnerJobRequest") {
230+
return nil
231+
}
232+
type RunnerJobRequestRef struct {
233+
ID string `json:"id"`
234+
RunnerRequestID string `json:"runner_request_id"`
235+
RunServiceURL string `json:"run_service_url"`
236+
BillingOwnerID string `json:"billing_owner_id,omitempty"`
237+
}
238+
ref := &RunnerJobRequestRef{}
239+
err := json.Unmarshal([]byte(message.Body), ref)
240+
if err != nil {
241+
return err
242+
}
243+
query := url.Values{}
244+
query.Set("sessionId", session.TaskAgentSession.SessionID)
245+
query.Set("runnerVersion", "3.0.0")
246+
query.Set("status", session.Status)
247+
query.Set("disableUpdate", fmt.Sprint(session.TaskAgentSession.Agent.DisableUpdate))
248+
249+
return session.VssConnection.RequestWithContext2(ctx, "POST", session.ServerV2URL+"/acknowledge?"+query.Encode(), "", &map[string]string{
250+
"runnerRequestId": ref.RunnerRequestID,
251+
}, nil)
231252
}
232253
return session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{
233254
"poolId": fmt.Sprint(session.VssConnection.PoolID),

0 commit comments

Comments
 (0)