Skip to content

Commit 0662766

Browse files
committed
Fix respect Server Decision to use v2 broker flow
1 parent b17bfd9 commit 0662766

7 files changed

Lines changed: 132 additions & 13 deletions

File tree

actionsrunner/runner.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,14 +471,15 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
471471
}
472472
jobreq := &protocol.AgentJobRequestMessage{}
473473
var runServiceURL string
474+
ok := false
474475
if strings.EqualFold(message.MessageType, "RunnerJobRequest") {
475476
plogger.Printf("Warning: TaskAgentMessage.MessageType is %v, which has not been properly tested "+
476477
"due to missing access to test servers of the new protocol before rollout. "+
477478
"Please report any failures to https://github.com/ChristopherHX/github-act-runner/issues.\n",
478479
message.MessageType)
479480
rjrr := &RunnerJobRequestRef{}
480481
if unmarshalErr := json.Unmarshal(src, rjrr); unmarshalErr != nil {
481-
fmt.Printf("fail to unmashal job request: %v", unmarshalErr)
482+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
482483
}
483484
for retries := 0; retries < 5; retries++ {
484485
var requestErr error
@@ -502,17 +503,26 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
502503
}
503504
if requestErr == nil {
504505
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
505-
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
506+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
507+
} else {
508+
ok = true
506509
}
507510
break
511+
} else if strings.Contains(requestErr.Error(), "job assignment is invalid") {
512+
return
508513
}
509514
<-time.After(time.Second * 5 * time.Duration(retries+1))
510515
}
511516
} else {
512517
if unmarshalErr := json.Unmarshal(src, jobreq); unmarshalErr != nil {
513-
fmt.Printf("fail to unmarshall job request: %v", unmarshalErr)
518+
fmt.Printf("fail to unmarshal job request: %v", unmarshalErr)
519+
} else {
520+
ok = true
514521
}
515522
}
523+
if !ok {
524+
return
525+
}
516526
jobrun := &JobRun{
517527
RequestID: jobreq.RequestID,
518528
JobID: jobreq.JobID,

actionsrunner/worker_context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ func (wc *DefaultWorkerContext) FinishJob(result string, outputs *map[string]pro
8585
}
8686
return
8787
}
88+
if wc.Message().Plan == nil {
89+
wc.RunnerLogger.Printf("Invalid Job %s\n", wc.Message().JobID)
90+
return
91+
}
8892
finish := &protocol.JobEvent{
8993
Name: "JobCompleted",
9094
JobID: wc.Message().JobID,

protocol/connection.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,16 +357,32 @@ func (vssConnection *VssConnection) CreateSessionExt(ctx context.Context, server
357357

358358
con := &AgentMessageConnection{VssConnection: vssConnection, TaskAgentSession: session}
359359
con.Status = statusOnline
360+
con.ServerV2URL = serverV2URL
360361
return con, nil
361362
}
362363

363364
func (vssConnection *VssConnection) CreateSession(ctx context.Context) (*AgentMessageConnection, error) {
364-
return vssConnection.CreateSessionExt(ctx, vssConnection.TaskAgent.ServerV2URL)
365+
useV2Flow, hasUseV2Flow := vssConnection.TaskAgent.Properties.LookupBool("UseV2Flow")
366+
serverV2URL, hasServerV2URL := vssConnection.TaskAgent.Properties.LookupString("ServerUrlV2")
367+
if !useV2Flow || !hasUseV2Flow || !hasServerV2URL {
368+
serverV2URL = ""
369+
} else {
370+
serverV2URL = strings.TrimSuffix(serverV2URL, "/")
371+
}
372+
return vssConnection.CreateSessionExt(ctx, serverV2URL)
365373
}
366374

367375
func (vssConnection *VssConnection) LoadSession(ctx context.Context, session *TaskAgentSession) (*AgentMessageConnection, error) {
368376
con := &AgentMessageConnection{VssConnection: vssConnection, TaskAgentSession: session}
369377
con.Status = statusOnline
378+
useV2Flow, hasUseV2Flow := vssConnection.TaskAgent.Properties.LookupBool("UseV2Flow")
379+
serverV2URL, hasServerV2URL := vssConnection.TaskAgent.Properties.LookupString("ServerUrlV2")
380+
if !useV2Flow || !hasUseV2Flow || !hasServerV2URL {
381+
serverV2URL = ""
382+
} else {
383+
serverV2URL = strings.TrimSuffix(serverV2URL, "/")
384+
}
385+
con.ServerV2URL = serverV2URL
370386
return con, nil
371387
}
372388

protocol/session.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,29 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas
226226

227227
func (session *AgentMessageConnection) DeleteMessage(ctx context.Context, message *TaskAgentMessage) error {
228228
if session.ServerV2URL != "" {
229-
// V2 no support for deleting messages
230-
return nil
229+
if !strings.EqualFold(message.MessageType, "RunnerJobRequest") {
230+
return nil
231+
}
232+
type RunnerJobRequestRef struct {
233+
ID string `json:"id"`
234+
RunnerRequestID string `json:"runner_request_id"`
235+
RunServiceURL string `json:"run_service_url"`
236+
BillingOwnerID string `json:"billing_owner_id,omitempty"`
237+
}
238+
ref := &RunnerJobRequestRef{}
239+
err := json.Unmarshal([]byte(message.Body), ref)
240+
if err != nil {
241+
return err
242+
}
243+
query := url.Values{}
244+
query.Set("sessionId", session.TaskAgentSession.SessionID)
245+
query.Set("runnerVersion", "3.0.0")
246+
query.Set("status", session.Status)
247+
query.Set("disableUpdate", fmt.Sprint(session.TaskAgentSession.Agent.DisableUpdate))
248+
249+
return session.VssConnection.RequestWithContext2(ctx, "POST", session.ServerV2URL+"/acknowledge?"+query.Encode(), "", &map[string]string{
250+
"runnerRequestId": ref.RunnerRequestID,
251+
}, nil)
231252
}
232253
return session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{
233254
"poolId": fmt.Sprint(session.VssConnection.PoolID),

protocol/task_agent.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net/http"
1010
"net/url"
11+
"strings"
1112
"time"
1213

1314
"github.com/golang-jwt/jwt"
@@ -39,6 +40,57 @@ type AgentLabel struct {
3940
Type string
4041
}
4142

43+
type PropertyValue struct {
44+
Type string `json:"$type"`
45+
Value interface{} `json:"$value"`
46+
}
47+
48+
func (v *PropertyValue) UnmarshalJSON(data []byte) error {
49+
var b bool
50+
if json.Unmarshal(data, &b) == nil {
51+
v.Type = "System.Boolean"
52+
v.Value = b
53+
return nil
54+
}
55+
var raw string
56+
if json.Unmarshal(data, &raw) == nil {
57+
v.Type = "System.String"
58+
v.Value = raw
59+
return nil
60+
}
61+
type PropertyValueRaw PropertyValue
62+
// Best Effort, drop errors
63+
_ = json.Unmarshal(data, (*PropertyValueRaw)(v))
64+
return nil
65+
}
66+
67+
type PropertiesCollection map[string]PropertyValue
68+
69+
func (c *PropertiesCollection) Lookup(name string, ty string) (interface{}, bool) {
70+
for k, v := range *c {
71+
if strings.EqualFold(k, name) && strings.EqualFold(v.Type, ty) {
72+
return v.Value, true
73+
}
74+
}
75+
return nil, false
76+
}
77+
78+
func (c *PropertiesCollection) LookupBool(name string) (bool, bool) {
79+
if v, ok := c.Lookup(name, "System.Boolean"); ok && v != nil {
80+
b, isBool := v.(bool)
81+
return b, isBool
82+
}
83+
return false, false
84+
}
85+
86+
func (c *PropertiesCollection) LookupString(name string) (string, bool) {
87+
if v, ok := c.Lookup(name, "System.String"); ok && v != nil {
88+
b, isString := v.(string)
89+
return b, isString
90+
}
91+
return "", false
92+
}
93+
4294
type TaskAgent struct {
4395
Authorization TaskAgentAuthorization
4496
Labels []AgentLabel
@@ -53,8 +105,7 @@ type TaskAgent struct {
53105
CreatedOn string
54106
Ephemeral bool `json:",omitempty"`
55107
DisableUpdate bool `json:",omitempty"`
56-
// Just a convenient way to store the URL, not part of the spec
57-
ServerV2URL string `json:",omitempty"`
108+
Properties PropertiesCollection
58109
}
59110

60111
type TaskAgents struct {

runnerconfiguration/add.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (config *ConfigureRunner) Configure(
8181
}
8282
if res.UseV2FLow {
8383
vssConnection = &protocol.VssConnection{
84-
AuthHeader: "RemoteAuth " + config.Token,
84+
AuthHeader: "Bearer " + res.Token,
8585
Trace: config.Trace,
8686
Client: c,
8787
}
@@ -309,7 +309,6 @@ func registerOrReplaceRunnerV2(taskAgent *protocol.TaskAgent, config *ConfigureR
309309
taskAgent.Name = runnerResp.Name
310310
taskAgent.Authorization.AuthorizationURL = runnerResp.Authorization.AuthorizationURL
311311
taskAgent.Authorization.ClientID = runnerResp.Authorization.ClientId
312-
taskAgent.ServerV2URL = runnerResp.Authorization.ServerURL
313312
return nil
314313
}
315314

runnerconfiguration/compat/actions_runner_compat.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,21 @@ func ToRunnerInstance(fileAccess ConfigFileAccess) (*runnerconfiguration.RunnerI
138138
}
139139
ephemeral, _ := strconv.ParseBool(agent.Ephemeral)
140140
disableUpdate, _ := strconv.ParseBool(agent.DisableUpdate)
141+
142+
props := protocol.PropertiesCollection{}
143+
if agent.ServerURLV2 != "" {
144+
props["ServerUrlV2"] = protocol.PropertyValue{
145+
Type: "System.String",
146+
Value: agent.ServerURLV2,
147+
}
148+
}
149+
if agent.UseV2Flow {
150+
props["UseV2Flow"] = protocol.PropertyValue{
151+
Type: "System.Boolean",
152+
Value: agent.UseV2Flow,
153+
}
154+
}
155+
141156
return &runnerconfiguration.RunnerInstance{
142157
PoolID: poolID,
143158
Auth: &protocol.GitHubAuthResult{
@@ -155,14 +170,17 @@ func ToRunnerInstance(fileAccess ConfigFileAccess) (*runnerconfiguration.RunnerI
155170
},
156171
DisableUpdate: disableUpdate,
157172
Version: "3.0.0",
158-
ServerV2URL: agent.ServerURLV2,
173+
Properties: props,
159174
},
160175
WorkFolder: agent.WorkFolder,
161176
RegistrationURL: agent.GitHubURL,
162177
}, nil
163178
}
164179

165180
func FromRunnerInstance(instance *runnerconfiguration.RunnerInstance, fileAccess ConfigFileAccess) error {
181+
182+
useV2Flow, _ := instance.Agent.Properties.LookupBool("UseV2Flow")
183+
serverV2URL, _ := instance.Agent.Properties.LookupString("ServerUrlV2")
166184
agent := &DotnetAgent{
167185
AgentID: fmt.Sprint(instance.Agent.ID),
168186
AgentName: instance.Agent.Name,
@@ -172,8 +190,8 @@ func FromRunnerInstance(instance *runnerconfiguration.RunnerInstance, fileAccess
172190
ServerURL: instance.Auth.TenantURL,
173191
WorkFolder: instance.WorkFolder,
174192
GitHubURL: instance.RegistrationURL,
175-
UseV2Flow: instance.Auth.UseV2FLow,
176-
ServerURLV2: instance.Agent.ServerV2URL,
193+
UseV2Flow: useV2Flow,
194+
ServerURLV2: serverV2URL,
177195
}
178196
if agent.WorkFolder == "" {
179197
agent.WorkFolder = "_work"

0 commit comments

Comments
 (0)