Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions actionsrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type RunnerEnvironment interface {
func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Context, corectx context.Context) error {
settings := run.Settings
for i := 0; i < len(settings.Instances); i++ {
if err := settings.Instances[i].EnshurePKey(); err != nil {
if err := settings.Instances[i].EnsurePKey(); err != nil {
return err
}
}
Expand Down Expand Up @@ -281,14 +281,8 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte
}
}
session.Status = "Online"
err := vssConnection.RequestWithContext(xctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{
"poolId": fmt.Sprint(instance.PoolID),
}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
"runnerVersion": "3.0.0",
"status": session.Status,
}, nil, message)
//TODO lastMessageId=
var err error
message, err = session.GetSingleMessage(xctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return 0
Expand Down Expand Up @@ -321,12 +315,7 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte
return 1
}
success = true
err := vssConnection.Request("c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{
"poolId": fmt.Sprint(instance.PoolID),
"messageId": fmt.Sprint(message.MessageID),
}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
}, nil, nil)
err := session.DeleteMessage(context.Background(), message)
if err != nil {
runnerenv.Printf("Failed to delete Message\n")
success = false
Expand Down Expand Up @@ -425,7 +414,7 @@ func runJob(runnerenv RunnerEnvironment, joblock *sync.Mutex, vssConnection *pro
cancelJob()
finishJob()
}()
src, err := message.Decrypt(session.Block)
src, err := message.Decrypt(session)
if err != nil {
plogger.Printf("Failed to decode TaskAgentMessage: %v\n", err)
return
Expand Down
37 changes: 20 additions & 17 deletions protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,36 +294,39 @@ func (vssConnection *VssConnection) GetAgentPools() (*TaskAgentPools, error) {
}
return _taskAgentPools, nil
}
func (vssConnection *VssConnection) CreateSession(ctx context.Context) (*AgentMessageConnection, error) {

func (vssConnection *VssConnection) CreateSessionExt(ctx context.Context, serverV2URL string) (*AgentMessageConnection, error) {
session := &TaskAgentSession{}
session.Agent = *vssConnection.TaskAgent
session.UseFipsEncryption = false // Have to be set to false for "GitHub Enterprise Server 3.0.11", github.com reset it to false 24-07-2021
session.OwnerName = "RUNNER"
if err := vssConnection.RequestWithContext(ctx, "134e239e-2df3-4794-a6f6-24f1f19ec8dc", "5.1-preview", "POST", map[string]string{
"poolId": fmt.Sprint(vssConnection.PoolID),
}, map[string]string{}, session, session); err != nil {
return nil, err
if serverV2URL != "" {
err := vssConnection.RequestWithContext2(ctx, "POST", serverV2URL+"/session", "", session, session)
if err != nil {
return nil, err
}
} else {
if err := vssConnection.RequestWithContext(ctx, "134e239e-2df3-4794-a6f6-24f1f19ec8dc", "5.1-preview", "POST", map[string]string{
"poolId": fmt.Sprint(vssConnection.PoolID),
}, map[string]string{}, session, session); err != nil {
return nil, err
}
if session.BrokerMigrationMessage != nil {
return vssConnection.CreateSessionExt(ctx, session.BrokerMigrationMessage.BrokerBaseUrl)
}
Comment on lines +314 to +354
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

}

con := &AgentMessageConnection{VssConnection: vssConnection, TaskAgentSession: session}
var err error
con.Block, err = con.TaskAgentSession.GetSessionKey(vssConnection.Key)
if err != nil {
_ = con.Delete(ctx)
return nil, err
}
con.Status = "Online"
return con, nil
}

func (vssConnection *VssConnection) CreateSession(ctx context.Context) (*AgentMessageConnection, error) {
return vssConnection.CreateSessionExt(ctx, vssConnection.TaskAgent.ServerV2URL)
}

func (vssConnection *VssConnection) LoadSession(ctx context.Context, session *TaskAgentSession) (*AgentMessageConnection, error) {
con := &AgentMessageConnection{VssConnection: vssConnection, TaskAgentSession: session}
var err error
con.Block, err = con.TaskAgentSession.GetSessionKey(vssConnection.Key)
if err != nil {
_ = con.Delete(ctx)
return nil, err
}
con.Status = "Online"
return con, nil
}
Expand Down
1 change: 1 addition & 0 deletions protocol/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type GitHubAuthResult struct {
TenantURL string `json:"url"`
TokenSchema string `json:"token_schema"`
Token string `json:"token"`
UseV2FLow bool `json:"use_v2_flow"`
}

type TaskOrchestrationPlanReference struct {
Expand Down
28 changes: 28 additions & 0 deletions protocol/runner_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package protocol

type Runner struct {
Name string `json:"name"`
Id int64 `json:"id"`
Authorization struct {
AuthorizationURL string `json:"authorization_url"`
ServerURL string `json:"server_url"`
ClientId string `json:"client_id"`
} `json:"authorization"`
}

type RunnerGroup struct {
Id int32 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
IsDefault bool `json:"default,omitempty"`
IsHosted bool `json:"is_hosted,omitempty"`
}

type RunnerGroupList struct {
RunnerGroups []RunnerGroup `json:"runner_groups"`
Count int `json:"total_count"`
}

type ListRunnersResponse struct {
TotalCount int `json:"total_count"`
Runners []Runner `json:"runners"`
}
68 changes: 51 additions & 17 deletions protocol/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/cipher"
"crypto/rand"
"crypto/rsa"
"net/url"
"strings"

// nolint:gosec
Expand All @@ -27,7 +28,7 @@ type TaskAgentMessage struct {
Body string
}

func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) {
func (message *TaskAgentMessage) Decrypt(session *AgentMessageConnection) ([]byte, error) {
if message.IV == "" {
return []byte(message.Body), nil
}
Expand All @@ -39,9 +40,17 @@ func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) {
if err != nil {
return nil, err
}
cbcdec := cipher.NewCBCDecrypter(block, iv)
if session.Block == nil {
// Parse Key
var err error
session.Block, err = session.TaskAgentSession.GetSessionKey(session.VssConnection.Key)
if err != nil {
return nil, err
}
}
cbcdec := cipher.NewCBCDecrypter(session.Block, iv)
cbcdec.CryptBlocks(src, src)
maxlen := block.BlockSize()
maxlen := session.Block.BlockSize()
validlen := len(src)
if int(src[len(src)-1]) <= maxlen { // <= is needed if the message ends within a block boundary and maxlen=16 then we get 16 times char 16 appended, one whole extra block
ok := true
Expand Down Expand Up @@ -71,7 +80,7 @@ func (message *TaskAgentMessage) FetchBrokerIfNeeded(xctx context.Context, sessi
if strings.EqualFold(message.MessageType, "BrokerMigration") {
vssConnection := session.VssConnection
rjrr := &BrokerMigration{}
raw, err := message.Decrypt(session.Block)
raw, err := message.Decrypt(session)
if err != nil {
return err
}
Expand Down Expand Up @@ -112,11 +121,12 @@ type TaskAgentSessionKey struct {
}

type TaskAgentSession struct {
SessionID string `json:",omitempty"`
EncryptionKey TaskAgentSessionKey
OwnerName string
Agent TaskAgent
UseFipsEncryption bool
SessionID string `json:",omitempty"`
EncryptionKey TaskAgentSessionKey
OwnerName string
Agent TaskAgent
UseFipsEncryption bool
BrokerMigrationMessage *BrokerMigration `json:",omitempty"`
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed today again as unused code, planning revert

}

func (session *TaskAgentSession) GetSessionKey(key *rsa.PrivateKey) (cipher.Block, error) {
Expand Down Expand Up @@ -145,30 +155,50 @@ type AgentMessageConnection struct {
TaskAgentSession *TaskAgentSession
Block cipher.Block
Status string
ServerV2URL string
}

func (session *AgentMessageConnection) Delete(ctx context.Context) error {
if session.ServerV2URL != "" {
return session.VssConnection.RequestWithContext2(ctx, "DELETE", session.ServerV2URL+"/session", "", nil, nil)
}
return session.VssConnection.RequestWithContext(ctx, "134e239e-2df3-4794-a6f6-24f1f19ec8dc", "5.1-preview", "DELETE", map[string]string{
"poolId": fmt.Sprint(session.VssConnection.PoolID),
"sessionId": session.TaskAgentSession.SessionID,
}, map[string]string{}, session.TaskAgentSession, nil)
}

func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*TaskAgentMessage, error) {
func (session *AgentMessageConnection) GetSingleMessage(ctx context.Context) (*TaskAgentMessage, error) {
message := &TaskAgentMessage{}
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
err := session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{
var err error
if session.ServerV2URL != "" {
query := url.Values{}
query.Set("sessionId", session.TaskAgentSession.SessionID)
query.Set("runnerVersion", "3.0.0")
query.Set("status", session.Status)
query.Set("disableUpdate", fmt.Sprint(session.TaskAgentSession.Agent.DisableUpdate))
err = session.VssConnection.RequestWithContext2(ctx, "GET", session.ServerV2URL+"/message?"+query.Encode(), "", nil, message)
} else {
err = session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{
"poolId": fmt.Sprint(session.VssConnection.PoolID),
}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
"runnerVersion": "3.0.0",
"status": session.Status,
}, nil, message)
// TODO lastMessageId=
}
return message, err
}

func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*TaskAgentMessage, error) {
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
message, err := session.GetSingleMessage(ctx)
if err == nil {
err = session.DeleteMessage(ctx, message)
err = errors.Join(err, message.FetchBrokerIfNeeded(ctx, session))
Expand All @@ -191,6 +221,10 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas
}

func (session *AgentMessageConnection) DeleteMessage(ctx context.Context, message *TaskAgentMessage) error {
if session.ServerV2URL != "" {
// V2 no support for deleting messages
return nil
}
return session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "DELETE", map[string]string{
"poolId": fmt.Sprint(session.VssConnection.PoolID),
"messageId": fmt.Sprint(message.MessageID),
Expand Down
4 changes: 3 additions & 1 deletion protocol/task_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type TaskAgent struct {
Authorization TaskAgentAuthorization
Labels []AgentLabel
MaxParallelism int
ID int
ID int64
Name string
Version string
OSDescription string
Expand All @@ -45,6 +45,8 @@ type TaskAgent struct {
CreatedOn string
Ephemeral bool `json:",omitempty"`
DisableUpdate bool `json:",omitempty"`
// Just a convenient way to store the URL, not part of the spec
ServerV2URL string `json:",omitempty"`
}

type TaskAgents struct {
Expand Down
Loading