diff --git a/actionsrunner/runner.go b/actionsrunner/runner.go index 1a21f43..a52ee34 100644 --- a/actionsrunner/runner.go +++ b/actionsrunner/runner.go @@ -471,6 +471,7 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro } jobreq := &protocol.AgentJobRequestMessage{} var runServiceURL string + ok := false if strings.EqualFold(message.MessageType, "RunnerJobRequest") { plogger.Printf("Warning: TaskAgentMessage.MessageType is %v, which has not been properly tested "+ "due to missing access to test servers of the new protocol before rollout. "+ @@ -478,7 +479,7 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro message.MessageType) rjrr := &RunnerJobRequestRef{} if unmarshalErr := json.Unmarshal(src, rjrr); unmarshalErr != nil { - fmt.Printf("fail to unmashal job request: %v", unmarshalErr) + fmt.Printf("fail to unmarshal job request: %v", unmarshalErr) } for retries := 0; retries < 5; retries++ { var requestErr error @@ -502,17 +503,26 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro } if requestErr == nil { if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil { - fmt.Printf("fail to unmarshall job request: %v", unmarshalErr) + fmt.Printf("fail to unmarshal job request: %v", unmarshalErr) + } else { + ok = true } break + } else if strings.Contains(requestErr.Error(), "job assignment is invalid") { + return } <-time.After(time.Second * 5 * time.Duration(retries+1)) } } else { if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil { - fmt.Printf("fail to unmarshall job request: %v", unmarshalErr) + fmt.Printf("fail to unmarshal job request: %v", unmarshalErr) + } else { + ok = true } } + if !ok { + return + } jobrun := &JobRun{ RequestID: jobreq.RequestID, JobID: jobreq.JobID, diff --git a/actionsrunner/worker_context.go b/actionsrunner/worker_context.go index 9cba70a..f9de949 100644 --- a/actionsrunner/worker_context.go +++ b/actionsrunner/worker_context.go @@ -38,13 +38,17 @@ type DefaultWorkerContext struct { // //nolint:gocritic // ptrToRefParam: API compatibility requirement - changing pointer to value would be breaking change func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]protocol.VariableValue) { - if strings.EqualFold(wc.Message().MessageType, "RunnerJobRequest") { + jobreq := wc.Message() + if jobreq == nil { + return + } + if strings.EqualFold(jobreq.MessageType, "RunnerJobRequest") { payload := &run.CompleteJobRequest{ - PlanID: wc.Message().Plan.PlanID, - JobID: wc.Message().JobID, + PlanID: jobreq.Plan.PlanID, + JobID: jobreq.JobID, Conclusion: strings.ToLower(result), Outputs: nil, - BillingOwnerID: wc.Message().BillingOwnerID, + BillingOwnerID: jobreq.BillingOwnerID, } if outputs != nil { payload.Outputs = *outputs @@ -70,14 +74,14 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro completejobURL.Path = path.Join(completejobURL.Path, "completejob") for i := 0; ; i++ { if err := wc.VssConnection.RequestWithContext2(context.Background(), "POST", completejobURL.String(), "", payload, nil); err != nil { - wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error()) + wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error()) } else { - wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result) + wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result) break } if i < workerMaxRetryAttempts { wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n", - wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts) + jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts) <-time.After(workerRetry) } else { break @@ -85,23 +89,27 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro } return } + if jobreq.Plan == nil { + wc.RunnerLogger.Printf("Invalid Job %s\n", jobreq.JobID) + return + } finish := &protocol.JobEvent{ Name: "JobCompleted", - JobID: wc.Message().JobID, - RequestID: wc.Message().RequestID, + JobID: jobreq.JobID, + RequestID: jobreq.RequestID, Result: result, Outputs: outputs, } for i := 0; ; i++ { - if err := wc.VssConnection.FinishJob(finish, wc.Message().Plan); err != nil { - wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error()) + if err := wc.VssConnection.FinishJob(finish, jobreq.Plan); err != nil { + wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error()) } else { - wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result) + wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result) break } if i < workerMaxRetryAttempts { wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n", - wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts) + jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts) <-time.After(workerRetry) } else { break @@ -110,10 +118,14 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro } func (wc *DefaultWorkerContext) FailInitJob(title, message string) { + jobreq := wc.Message() + if jobreq == nil { + return + } if wc.Logger().Current() != nil { wc.Logger().Current().Complete("Failed") } - timelineEntry := protocol.CreateTimelineEntry(wc.Message().JobID, "__fatal", title) + timelineEntry := protocol.CreateTimelineEntry(jobreq.JobID, "__fatal", title) e := wc.Logger().Append(&timelineEntry) e.Start() if wc.Logger().Current() != e { @@ -147,7 +159,11 @@ func (wc *DefaultWorkerContext) JobExecCtx() context.Context { } func (wc *DefaultWorkerContext) Init() { - jobVssConnection, vssConnectionData, err := wc.Message().GetConnection("SystemVssConnection") + jobreq := wc.Message() + if jobreq == nil { + return + } + jobVssConnection, vssConnectionData, err := jobreq.GetConnection("SystemVssConnection") if err != nil { wc.RunnerLogger.Printf("Failed to find the SystemVssConnection Endpoint, try to finish job as failed") wc.FinishJob("Failed", &map[string]protocol.VariableValue{}) @@ -159,7 +175,6 @@ func (wc *DefaultWorkerContext) Init() { } wc.VssConnection = jobVssConnection - jobreq := wc.Message() resultsEndpoint, hasResultsEndpoint := jobreq.Variables["system.github.results_endpoint"] wc.JobLogger = &logger.JobLogger{ JobRequest: jobreq, @@ -194,9 +209,9 @@ func (wc *DefaultWorkerContext) Init() { }, } } - timelineEntry := protocol.CreateTimelineEntry("", wc.Message().JobName, wc.Message().JobDisplayName) + timelineEntry := protocol.CreateTimelineEntry("", jobreq.JobName, jobreq.JobDisplayName) jobEntry := wc.Logger().Append(&timelineEntry) - jobEntry.ID = wc.Message().JobID + jobEntry.ID = jobreq.JobID jobEntry.Type = "Job" jobEntry.Order = 0 jobEntry.Start() diff --git a/protocol/session.go b/protocol/session.go index 1c538fb..f987592 100644 --- a/protocol/session.go +++ b/protocol/session.go @@ -226,8 +226,29 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas func (session *AgentMessageConnection) DeleteMessage(ctx context.Context, message *TaskAgentMessage) error { if session.ServerV2URL != "" { - // V2 no support for deleting messages - return nil + if !strings.EqualFold(message.MessageType, "RunnerJobRequest") { + return nil + } + type RunnerJobRequestRef struct { + ID string `json:"id"` + RunnerRequestID string `json:"runner_request_id"` + RunServiceURL string `json:"run_service_url"` + BillingOwnerID string `json:"billing_owner_id,omitempty"` + } + ref := &RunnerJobRequestRef{} + err := json.Unmarshal([]byte(message.Body), ref) + if err != nil { + return err + } + query := url.Values{} + query.Set("sessionId", session.TaskAgentSession.SessionID) + query.Set("runnerVersion", "3.0.0") + query.Set("status", session.Status) + query.Set("disableUpdate", fmt.Sprint(session.TaskAgentSession.Agent.DisableUpdate)) + + return session.VssConnection.RequestWithContext2(ctx, "POST", session.ServerV2URL+"/acknowledge?"+query.Encode(), "", &map[string]string{ + "runnerRequestId": ref.RunnerRequestID, + }, nil) } return session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{ "poolId": fmt.Sprint(session.VssConnection.PoolID),