Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions actionsrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,15 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
}
jobreq := &protocol.AgentJobRequestMessage{}
var runServiceURL string
ok := false
if strings.EqualFold(message.MessageType, "RunnerJobRequest") {
plogger.Printf("Warning: TaskAgentMessage.MessageType is %v, which has not been properly tested "+
"due to missing access to test servers of the new protocol before rollout. "+
"Please report any failures to https://github.com/ChristopherHX/github-act-runner/issues.\n",
message.MessageType)
rjrr := &RunnerJobRequestRef{}
if unmarshalErr := json.Unmarshal(src, rjrr); unmarshalErr != nil {
fmt.Printf("fail to unmashal job request: %v", unmarshalErr)
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
}
for retries := 0; retries < 5; retries++ {
var requestErr error
Expand All @@ -502,17 +503,26 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
}
if requestErr == nil {
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
} else {
ok = true
}
break
} else if strings.Contains(requestErr.Error(), "job assignment is invalid") {
return
}
<-time.After(time.Second * 5 * time.Duration(retries+1))
}
} else {
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
} else {
ok = true
}
}
if !ok {
return
}
jobrun := &JobRun{
RequestID: jobreq.RequestID,
JobID: jobreq.JobID,
Expand Down
51 changes: 33 additions & 18 deletions actionsrunner/worker_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ type DefaultWorkerContext struct {
//
//nolint:gocritic // ptrToRefParam: API compatibility requirement - changing pointer to value would be breaking change
func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]protocol.VariableValue) {
if strings.EqualFold(wc.Message().MessageType, "RunnerJobRequest") {
jobreq := wc.Message()
if jobreq == nil {
return
}
if strings.EqualFold(jobreq.MessageType, "RunnerJobRequest") {
payload := &run.CompleteJobRequest{
PlanID: wc.Message().Plan.PlanID,
JobID: wc.Message().JobID,
PlanID: jobreq.Plan.PlanID,
JobID: jobreq.JobID,
Conclusion: strings.ToLower(result),
Outputs: nil,
BillingOwnerID: wc.Message().BillingOwnerID,
BillingOwnerID: jobreq.BillingOwnerID,
}
if outputs != nil {
payload.Outputs = *outputs
Expand All @@ -70,38 +74,42 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro
completejobURL.Path = path.Join(completejobURL.Path, "completejob")
for i := 0; ; i++ {
if err := wc.VssConnection.RequestWithContext2(context.Background(), "POST", completejobURL.String(), "", payload, nil); err != nil {
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error())
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error())
} else {
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result)
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result)
break
}
if i < workerMaxRetryAttempts {
wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n",
wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
<-time.After(workerRetry)
} else {
break
}
}
return
}
if jobreq.Plan == nil {
wc.RunnerLogger.Printf("Invalid Job %s\n", jobreq.JobID)
return
}
finish := &protocol.JobEvent{
Name: "JobCompleted",
JobID: wc.Message().JobID,
RequestID: wc.Message().RequestID,
JobID: jobreq.JobID,
RequestID: jobreq.RequestID,
Result: result,
Outputs: outputs,
}
for i := 0; ; i++ {
if err := wc.VssConnection.FinishJob(finish, wc.Message().Plan); err != nil {
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", wc.Message().JobDisplayName, result, err.Error())
if err := wc.VssConnection.FinishJob(finish, jobreq.Plan); err != nil {
wc.RunnerLogger.Printf("Failed to finish Job '%v' with Status %v: %v\n", jobreq.JobDisplayName, result, err.Error())
} else {
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", wc.Message().JobDisplayName, result)
wc.RunnerLogger.Printf("Finished Job '%v' with Status %v\n", jobreq.JobDisplayName, result)
break
}
if i < workerMaxRetryAttempts {
wc.RunnerLogger.Printf("Retry finishing '%v' in %d seconds attempt %v of %d\n",
wc.Message().JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
jobreq.JobDisplayName, workerRetry/time.Second, i+1, workerMaxRetryAttempts)
<-time.After(workerRetry)
} else {
break
Expand All @@ -110,10 +118,14 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro
}

func (wc *DefaultWorkerContext) FailInitJob(title, message string) {
jobreq := wc.Message()
if jobreq == nil {
return
}
if wc.Logger().Current() != nil {
wc.Logger().Current().Complete("Failed")
}
timelineEntry := protocol.CreateTimelineEntry(wc.Message().JobID, "__fatal", title)
timelineEntry := protocol.CreateTimelineEntry(jobreq.JobID, "__fatal", title)
e := wc.Logger().Append(&timelineEntry)
e.Start()
if wc.Logger().Current() != e {
Expand Down Expand Up @@ -147,7 +159,11 @@ func (wc *DefaultWorkerContext) JobExecCtx() context.Context {
}

func (wc *DefaultWorkerContext) Init() {
jobVssConnection, vssConnectionData, err := wc.Message().GetConnection("SystemVssConnection")
jobreq := wc.Message()
if jobreq == nil {
return
}
jobVssConnection, vssConnectionData, err := jobreq.GetConnection("SystemVssConnection")
if err != nil {
wc.RunnerLogger.Printf("Failed to find the SystemVssConnection Endpoint, try to finish job as failed")
wc.FinishJob("Failed", &map[string]protocol.VariableValue{})
Expand All @@ -159,7 +175,6 @@ func (wc *DefaultWorkerContext) Init() {
}
wc.VssConnection = jobVssConnection

jobreq := wc.Message()
resultsEndpoint, hasResultsEndpoint := jobreq.Variables["system.github.results_endpoint"]
wc.JobLogger = &logger.JobLogger{
JobRequest: jobreq,
Expand Down Expand Up @@ -194,9 +209,9 @@ func (wc *DefaultWorkerContext) Init() {
},
}
}
timelineEntry := protocol.CreateTimelineEntry("", wc.Message().JobName, wc.Message().JobDisplayName)
timelineEntry := protocol.CreateTimelineEntry("", jobreq.JobName, jobreq.JobDisplayName)
jobEntry := wc.Logger().Append(&timelineEntry)
jobEntry.ID = wc.Message().JobID
jobEntry.ID = jobreq.JobID
jobEntry.Type = "Job"
jobEntry.Order = 0
jobEntry.Start()
Expand Down
25 changes: 23 additions & 2 deletions protocol/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,29 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas

func (session *AgentMessageConnection) DeleteMessage(ctx context.Context, message *TaskAgentMessage) error {
if session.ServerV2URL != "" {
// V2 no support for deleting messages
return nil
if !strings.EqualFold(message.MessageType, "RunnerJobRequest") {
return nil
}
type RunnerJobRequestRef struct {
ID string `json:"id"`
RunnerRequestID string `json:"runner_request_id"`
RunServiceURL string `json:"run_service_url"`
BillingOwnerID string `json:"billing_owner_id,omitempty"`
}
ref := &RunnerJobRequestRef{}
err := json.Unmarshal([]byte(message.Body), ref)
if err != nil {
return err
}
query := url.Values{}
query.Set("sessionId", session.TaskAgentSession.SessionID)
query.Set("runnerVersion", "3.0.0")
query.Set("status", session.Status)
query.Set("disableUpdate", fmt.Sprint(session.TaskAgentSession.Agent.DisableUpdate))

return session.VssConnection.RequestWithContext2(ctx, "POST", session.ServerV2URL+"/acknowledge?"+query.Encode(), "", &map[string]string{
"runnerRequestId": ref.RunnerRequestID,
}, nil)
}
return session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{
"poolId": fmt.Sprint(session.VssConnection.PoolID),
Expand Down
Loading