Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type Client struct {
serverURL string
token string
uuid string
httpClient *http.Client
}

Expand All @@ -25,6 +26,10 @@ func NewClient(serverURL, token string) *Client {
}
}

func (c *Client) SetUUID(uuid string) {
c.uuid = uuid
}

func (c *Client) doRequest(method, path string, body interface{}) (*http.Response, error) {
var bodyReader io.Reader
if body != nil {
Expand All @@ -40,6 +45,9 @@ func (c *Client) doRequest(method, path string, body interface{}) (*http.Respons
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.token)
if c.uuid != "" {
req.Header.Set("X-Agent-UUID", c.uuid)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
Expand Down
16 changes: 16 additions & 0 deletions agent/flock_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build !windows

package agent

import (
"os"
"syscall"
)

func lockFileExclusive(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
}

func unlockFile(f *os.File) {
_ = syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
}
45 changes: 45 additions & 0 deletions agent/flock_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//go:build windows

package agent

import (
"os"
"syscall"
"unsafe"
)

var (
modkernel32 = syscall.NewLazyDLL("kernel32.dll")
procLockFileEx = modkernel32.NewProc("LockFileEx")
procUnlockFileEx = modkernel32.NewProc("UnlockFileEx")
)

const (
lockfileExclusiveLock = 0x00000002
lockfileFailImmediately = 0x00000001
)

func lockFileExclusive(f *os.File) error {
var overlapped syscall.Overlapped
r1, _, err := procLockFileEx.Call(
uintptr(f.Fd()),
uintptr(lockfileExclusiveLock|lockfileFailImmediately),
0,
1, 0,
uintptr(unsafe.Pointer(&overlapped)),
)
if r1 == 0 {
return err
}
return nil
}

func unlockFile(f *os.File) {
var overlapped syscall.Overlapped
procUnlockFileEx.Call(
uintptr(f.Fd()),
0,
1, 0,
uintptr(unsafe.Pointer(&overlapped)),
)
}
1 change: 1 addition & 0 deletions agent/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type StatusReport struct {
// HeartbeatRequest is sent periodically with agent metrics.
type HeartbeatRequest struct {
UUID string `json:"uuid,omitempty"`
Labels string `json:"labels,omitempty"`
CPUPercent float64 `json:"cpu_percent"`
MemPercent float64 `json:"mem_percent"`
MemUsedBytes int64 `json:"mem_used_bytes"`
Expand Down
4 changes: 2 additions & 2 deletions agent/vcs/p4.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *P4Provider) Checkout(ctx context.Context, url, ref, pipeline, destDir s
return CheckoutResult{}, fmt.Errorf("p4 sync pipeline file failed: %w", err)
}

return CheckoutResult{Dir: root, Persistent: true}, nil
return CheckoutResult{Dir: root, Persistent: true, P4Client: p.clientName}, nil
}

// Create temporary workspace
Expand Down Expand Up @@ -132,7 +132,7 @@ func (p *P4Provider) Checkout(ctx context.Context, url, ref, pipeline, destDir s
return CheckoutResult{}, fmt.Errorf("p4 sync pipeline file failed: %w", err)
}

return CheckoutResult{Dir: absDir}, nil
return CheckoutResult{Dir: absDir, Persistent: true, P4Client: p.clientName}, nil
}

func (p *P4Provider) Cleanup(ctx context.Context) error {
Expand Down
4 changes: 4 additions & 0 deletions agent/vcs/vcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type CheckoutResult struct {
Persistent bool
// SHA is the resolved commit SHA (or changelist number for P4) after checkout.
SHA string
// P4Client is the Perforce workspace name created or reused during checkout.
// The worker should set P4CLIENT in the subprocess environment so that
// p4 commands within the graph can operate on the same workspace.
P4Client string
}

// Provider handles VCS checkout operations.
Expand Down
92 changes: 70 additions & 22 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,61 +24,105 @@ type Worker struct {
client *Client
docker DockerConfig
vcsOpts vcs.Options
labels string
pollInterval time.Duration
uuid string
slotCleanup func()
log *logrus.Entry

metricsMu sync.Mutex
lastCounters *RawCounters
}

func NewWorker(client *Client, docker DockerConfig, vcsOpts vcs.Options) *Worker {
func NewWorker(client *Client, docker DockerConfig, vcsOpts vcs.Options, labels string) *Worker {
uuid, cleanup := acquireAgentSlot()
client.SetUUID(uuid)
return &Worker{
client: client,
docker: docker,
vcsOpts: vcsOpts,
labels: labels,
pollInterval: 1 * time.Second,
uuid: loadOrGenerateUUID(),
uuid: uuid,
slotCleanup: cleanup,
log: logrus.WithField("component", "agent"),
}
}

// uuidFilePath returns the path to the persistent UUID file in the user's config directory.
func uuidFilePath() string {
// agentSlotDir returns the directory for agent UUID slot files.
func agentSlotDir() string {
dir, err := os.UserConfigDir()
if err != nil {
dir = os.TempDir()
}
return filepath.Join(dir, "actionforge", "agent-uuid")
return filepath.Join(dir, "actionforge")
}

// loadOrGenerateUUID loads a persistent UUID from disk, or generates and saves a new one.
func loadOrGenerateUUID() string {
path := uuidFilePath()
if data, err := os.ReadFile(path); err == nil {
if id := strings.TrimSpace(string(data)); len(id) == 36 {
return id
// acquireAgentSlot finds and locks the lowest available agent slot.
// Each slot has a persistent UUID file and a lock file. When the process
// exits, the lock is released so the next process can reuse that slot
// (and its UUID/metrics history).
// Returns the UUID and a cleanup function that releases the lock.
func acquireAgentSlot() (string, func()) {
dir := agentSlotDir()
_ = os.MkdirAll(dir, 0700)

const maxSlots = 256
for i := 0; i < maxSlots; i++ {
lockPath := filepath.Join(dir, fmt.Sprintf("agent-%d.lock", i))
uuidPath := filepath.Join(dir, fmt.Sprintf("agent-%d.uuid", i))

lockFile, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
continue
}

if err := lockFileExclusive(lockFile); err != nil {
lockFile.Close()
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
continue
}

// Slot acquired — read or generate UUID
uuid := ""
if data, err := os.ReadFile(uuidPath); err == nil {
if id := strings.TrimSpace(string(data)); len(id) == 36 {
uuid = id
}
}
if uuid == "" {
var buf [16]byte
_, _ = rand.Read(buf[:])
buf[6] = (buf[6] & 0x0f) | 0x40 // version 4
buf[8] = (buf[8] & 0x3f) | 0x80 // variant 1
uuid = fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
buf[0:4], buf[4:6], buf[6:8], buf[8:10], buf[10:16])
_ = os.WriteFile(uuidPath, []byte(uuid+"\n"), 0600)
}

cleanup := func() {
unlockFile(lockFile)
lockFile.Close()
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
}
return uuid, cleanup
}

// Generate UUID v4
// Fallback: all slots taken, generate ephemeral UUID with no lock
var buf [16]byte
_, _ = rand.Read(buf[:])
buf[6] = (buf[6] & 0x0f) | 0x40 // version 4
buf[8] = (buf[8] & 0x3f) | 0x80 // variant 1
id := fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
buf[0:4], buf[4:6], buf[6:8], buf[8:10], buf[10:16])

_ = os.MkdirAll(filepath.Dir(path), 0700)
_ = os.WriteFile(path, []byte(id+"\n"), 0600)
return id
buf[6] = (buf[6] & 0x0f) | 0x40
buf[8] = (buf[8] & 0x3f) | 0x80
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
buf[0:4], buf[4:6], buf[6:8], buf[8:10], buf[10:16]), func() {}
}

// maxConsecutiveErrors is the number of consecutive connection errors before
// Run returns ErrConnectionLost so the caller can decide to restart.
const maxConsecutiveErrors = 10

func (w *Worker) Run(ctx context.Context) error {
if w.slotCleanup != nil {
defer w.slotCleanup()
}
w.log.Info("starting")

// Take initial snapshot for delta computation
Expand Down Expand Up @@ -358,12 +402,16 @@ func (w *Worker) execute(ctx context.Context, job *ClaimResponse) {
env = append(env, "BUILD_TMPDIR="+tmpDir)
env = append(env, "BUILD_VCS_TYPE="+job.VCSType)
env = append(env, "BUILD_VCS_URL="+job.VCSURL)
env = append(env, "BUILD_REF="+ref)
if job.RepoID != "" {
env = append(env, "BUILD_REPO_ID="+job.RepoID)
}
if checkout.SHA != "" {
env = append(env, "BUILD_COMMIT_SHA="+checkout.SHA)
}
if checkout.P4Client != "" {
env = append(env, "P4CLIENT="+checkout.P4Client)
}

// Resolve env mappings from trigger config (if present)
if len(job.EnvMappings) > 0 && job.MatrixValues != nil {
Expand Down Expand Up @@ -582,13 +630,13 @@ func (w *Worker) buildHeartbeatRequest() HeartbeatRequest {
snap, err := Snapshot()
if err != nil {
w.log.WithError(err).Warn("metrics snapshot error")
return HeartbeatRequest{UUID: w.uuid}
return HeartbeatRequest{UUID: w.uuid, Labels: w.labels}
}

w.metricsMu.Lock()
defer w.metricsMu.Unlock()

req := HeartbeatRequest{UUID: w.uuid}
req := HeartbeatRequest{UUID: w.uuid, Labels: w.labels}
if w.lastCounters != nil {
// CPU percent
if snap.CPUInstant {
Expand Down
4 changes: 3 additions & 1 deletion cmd/cmd_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var (
flagAgentServer string
flagAgentToken string
flagAgentLabels string
flagAgentDockerDisabled bool
flagAgentDockerDefaultImage string
flagAgentP4Client string
Expand All @@ -35,6 +36,7 @@ func init() {
if os.Getenv("ACT_AGENT_TOKEN") == "" {
cmdAgent.MarkFlagRequired("token")
}
cmdAgent.Flags().StringVar(&flagAgentLabels, "labels", envOr("ACT_AGENT_LABELS", ""), "Comma-separated labels for job matching (env: ACT_AGENT_LABELS)")
cmdAgent.Flags().BoolVar(&flagAgentDockerDisabled, "docker-disabled", envOrBool("ACT_AGENT_DOCKER_DISABLED", false), "Disable Docker execution, always run natively")
cmdAgent.Flags().StringVar(&flagAgentDockerDefaultImage, "docker-default-image", envOr("ACT_AGENT_DOCKER_DEFAULT_IMAGE", ""), "Force this Docker image for all scripts")
cmdAgent.Flags().StringVar(&flagAgentP4Client, "p4-client", envOr("ACT_AGENT_P4CLIENT", ""), "Reuse an existing Perforce workspace instead of creating a temporary one (env: ACT_AGENT_P4CLIENT)")
Expand Down Expand Up @@ -68,7 +70,7 @@ func cmdAgentRun(cmd *cobra.Command, args []string) {
DefaultImage: flagAgentDockerDefaultImage,
}, vcs.Options{
P4Client: flagAgentP4Client,
})
}, flagAgentLabels)

log.WithField("server", serverURL).Info("connecting")
err := w.Run(ctx)
Expand Down
1 change: 1 addition & 0 deletions core/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
CredentialTypeSSH CredentialType = iota
CredentialTypeUsernamePassword
CredentialTypeAccessKey
CredentialTypeP4
)

type Credentials interface {
Expand Down
23 changes: 23 additions & 0 deletions node_interfaces/interface_core_p4-credentials_v1.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions node_interfaces/interface_core_p4-print_v1.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions node_interfaces/interface_core_p4-run_v1.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading