Skip to content

Commit 107ca26

Browse files
committed
feat[agent-manager](recovery): port the YAML-driven agent recovery subsystem
1 parent da2d644 commit 107ca26

22 files changed

Lines changed: 2662 additions & 16 deletions

agent-manager/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ COPY agent-manager /app/
44
COPY ./dependencies/agent/ /dependencies/agent/
55
COPY ./dependencies/collector/ /dependencies/collector/
66

7+
COPY ./recoveries/ /app/recoveries/
8+
79
# Install jq
810
RUN apt-get update && \
911
apt-get install -y ca-certificates jq wget && \

agent-manager/agent/agent_imp.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ func (s *AgentService) RegisterAgent(ctx context.Context, req *AgentRequest) (*A
117117
}
118118

119119
catcher.Info("Agent registered correctly", map[string]any{"hostname": agent.Hostname, "id": agent.ID, "process": "agent-manager"})
120+
121+
if OnAgentRegisterHook != nil {
122+
OnAgentRegisterHook(agent)
123+
}
124+
120125
return &AuthResponse{
121126
Id: uint32(agent.ID),
122127
Key: key,
@@ -171,6 +176,10 @@ func (s *AgentService) UpdateAgent(ctx context.Context, req *AgentRequest) (*Aut
171176
return nil, status.Errorf(codes.Internal, "failed to update agent: %v", err)
172177
}
173178

179+
if OnAgentUpdateHook != nil {
180+
OnAgentUpdateHook(agent)
181+
}
182+
174183
res := &AuthResponse{
175184
Id: uint32(agent.ID),
176185
Key: key,
@@ -255,6 +264,10 @@ func (s *AgentService) AgentStream(stream AgentService_AgentStreamServer) error
255264
s.AgentStreamMap[idUint] = stream
256265
s.AgentStreamMutex.Unlock()
257266

267+
if OnAgentConnectHook != nil {
268+
OnAgentConnectHook(stream.Context(), idUint)
269+
}
270+
258271
for {
259272
in, err := stream.Recv()
260273
if err == io.EOF {
@@ -288,7 +301,7 @@ func (s *AgentService) AgentStream(stream AgentService_AgentStreamServer) error
288301
CmdId: cmdID,
289302
ExecutedAt: msg.Result.ExecutedAt,
290303
}
291-
} else {
304+
} else if OnCommandResultHook == nil || !OnCommandResultHook(msg.Result) {
292305
catcher.Error("failed to find result channel for CmdID", nil, map[string]any{"cmdID": cmdID, "process": "agent-manager"})
293306
}
294307
s.CommandResultChannelM.Unlock()
@@ -338,16 +351,26 @@ func (s *AgentService) ProcessCommand(stream PanelService_ProcessCommandServer)
338351
catcher.Error("unable to create a new command history", err, map[string]any{"process": "agent-manager"})
339352
}
340353

341-
err = agentStream.Send(&BidirectionalStream{
342-
StreamMessage: &BidirectionalStream_Command{
343-
Command: &UtmCommand{
344-
AgentId: cmd.AgentId,
345-
Command: replaceSecretValues(cmd.Command),
346-
CmdId: cmdID,
347-
Shell: cmd.Shell,
354+
var lock sync.Locker
355+
if LockStreamHook != nil {
356+
lock = LockStreamHook(uint(streamId))
357+
}
358+
func() {
359+
if lock != nil {
360+
lock.Lock()
361+
defer lock.Unlock()
362+
}
363+
err = agentStream.Send(&BidirectionalStream{
364+
StreamMessage: &BidirectionalStream_Command{
365+
Command: &UtmCommand{
366+
AgentId: cmd.AgentId,
367+
Command: replaceSecretValues(cmd.Command),
368+
CmdId: cmdID,
369+
Shell: cmd.Shell,
370+
},
348371
},
349-
},
350-
})
372+
})
373+
}()
351374
if err != nil {
352375
return status.Errorf(codes.Internal, "failed to send command to agent: %v", err)
353376
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/utmstack/UTMStack/agent-manager/models"
8+
)
9+
10+
var (
11+
OnAgentConnectHook func(ctx context.Context, agentID uint)
12+
OnAgentRegisterHook func(agent *models.Agent)
13+
OnAgentUpdateHook func(agent *models.Agent)
14+
OnCommandResultHook func(result *CommandResult) bool
15+
LockStreamHook func(agentID uint) sync.Locker
16+
)
17+
18+
func RegisterRecoveryHooks(
19+
onConnect func(ctx context.Context, agentID uint),
20+
onRegister func(agent *models.Agent),
21+
onUpdate func(agent *models.Agent),
22+
onResult func(result *CommandResult) bool,
23+
lockStream func(agentID uint) sync.Locker,
24+
) {
25+
if OnAgentConnectHook == nil {
26+
OnAgentConnectHook = onConnect
27+
}
28+
if OnAgentRegisterHook == nil {
29+
OnAgentRegisterHook = onRegister
30+
}
31+
if OnAgentUpdateHook == nil {
32+
OnAgentUpdateHook = onUpdate
33+
}
34+
if OnCommandResultHook == nil {
35+
OnCommandResultHook = onResult
36+
}
37+
if LockStreamHook == nil {
38+
LockStreamHook = lockStream
39+
}
40+
}

agent-manager/config/recovery.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package config
2+
3+
import (
4+
"os"
5+
"strconv"
6+
)
7+
8+
func envOrDefault(key, fallback string) string {
9+
if v := os.Getenv(key); v != "" {
10+
return v
11+
}
12+
return fallback
13+
}
14+
15+
func envBoolOrDefault(key string, fallback bool) bool {
16+
v := os.Getenv(key)
17+
if v == "" {
18+
return fallback
19+
}
20+
b, err := strconv.ParseBool(v)
21+
if err != nil {
22+
return fallback
23+
}
24+
return b
25+
}
26+
27+
func envIntOrDefault(key string, fallback int) int {
28+
v := os.Getenv(key)
29+
if v == "" {
30+
return fallback
31+
}
32+
n, err := strconv.Atoi(v)
33+
if err != nil {
34+
return fallback
35+
}
36+
return n
37+
}
38+
39+
var (
40+
RecoveryDir = envOrDefault("RECOVERY_DIR", "/app/recoveries/")
41+
RecoveryDispatchEnabled = envBoolOrDefault("RECOVERY_DISPATCH_ENABLED", true)
42+
RecoveryGlobalConcurrency = envIntOrDefault("RECOVERY_GLOBAL_CONCURRENCY", 50)
43+
)

agent-manager/database/db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ type DB struct {
2222
locker sync.RWMutex
2323
}
2424

25+
func (d *DB) GormDB() *gorm.DB {
26+
return d.conn
27+
}
28+
2529
func (d *DB) Migrate(data ...interface{}) error {
2630
d.locker.Lock()
2731
defer d.locker.Unlock()

agent-manager/database/migration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "github.com/utmstack/UTMStack/agent-manager/models"
44

55
func MigrateDatabase() error {
66
db := GetDB()
7-
err := db.Migrate(&models.Agent{}, &models.AgentCommand{}, &models.LastSeen{}, &models.Collector{}, &models.ConnectionKey{})
7+
err := db.Migrate(&models.Agent{}, &models.AgentCommand{}, &models.LastSeen{}, &models.Collector{}, &models.ConnectionKey{}, &models.Recovery{}, &models.RecoveryTarget{})
88
if err != nil {
99
return err
1010
}

agent-manager/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
github.com/go-playground/universal-translator v0.18.1 // indirect
2626
github.com/go-playground/validator/v10 v10.30.1 // indirect
2727
github.com/goccy/go-json v0.10.5 // indirect
28-
github.com/goccy/go-yaml v1.19.2 // indirect
28+
github.com/goccy/go-yaml v1.19.2
2929
github.com/jackc/pgpassfile v1.0.0 // indirect
3030
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
3131
github.com/jackc/pgx/v5 v5.8.0 // indirect
@@ -47,7 +47,7 @@ require (
4747
golang.org/x/arch v0.24.0 // indirect
4848
golang.org/x/crypto v0.49.0 // indirect
4949
golang.org/x/net v0.52.0 // indirect
50-
golang.org/x/sync v0.20.0 // indirect
50+
golang.org/x/sync v0.20.0
5151
golang.org/x/sys v0.42.0 // indirect
5252
golang.org/x/text v0.35.0 // indirect
5353
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect

agent-manager/main.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,89 @@
11
package main
22

33
import (
4+
"context"
45
"os"
6+
"os/signal"
7+
"sync"
8+
"syscall"
59
"time"
610

711
"github.com/threatwinds/go-sdk/catcher"
812
"github.com/utmstack/UTMStack/agent-manager/agent"
913
"github.com/utmstack/UTMStack/agent-manager/database"
14+
"github.com/utmstack/UTMStack/agent-manager/recovery"
1015
"github.com/utmstack/UTMStack/agent-manager/updates"
1116
)
1217

18+
type recoveryProvider struct {
19+
server *agent.AgentService
20+
lastSeenServ *agent.LastSeenService
21+
}
22+
23+
func (p *recoveryProvider) GetStream(agentID uint) (recovery.AgentStream, bool) {
24+
p.server.AgentStreamMutex.Lock()
25+
defer p.server.AgentStreamMutex.Unlock()
26+
s, ok := p.server.AgentStreamMap[agentID]
27+
if !ok {
28+
return nil, false
29+
}
30+
return s, true
31+
}
32+
33+
func (p *recoveryProvider) IsOnline(agentID uint) bool {
34+
if p.lastSeenServ == nil {
35+
return false
36+
}
37+
st, _, err := p.lastSeenServ.GetLastSeenStatus(agentID, "agent")
38+
if err != nil {
39+
return false
40+
}
41+
return st == agent.Status_ONLINE
42+
}
43+
1344
func main() {
1445
catcher.Info("Starting Agent Manager", map[string]any{"process": "agent-manager"})
1546

16-
err := database.MigrateDatabase()
17-
if err != nil {
47+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
48+
defer stop()
49+
50+
if err := database.MigrateDatabase(); err != nil {
1851
_ = catcher.Error("failed to migrate database", err, map[string]any{"process": "agent-manager"})
1952
time.Sleep(5 * time.Second)
2053
os.Exit(1)
2154
}
2255

56+
if err := agent.InitAgentService(); err != nil {
57+
_ = catcher.Error("failed to init agent service", err, map[string]any{"process": "agent-manager"})
58+
time.Sleep(5 * time.Second)
59+
os.Exit(1)
60+
}
61+
62+
go agent.InitCollectorService()
63+
agent.InitLastSeenService()
64+
65+
recovery.SetStreamProvider(&recoveryProvider{
66+
server: agent.AgentServ,
67+
lastSeenServ: agent.LastSeenServ,
68+
})
69+
agent.RegisterRecoveryHooks(
70+
recovery.OnAgentConnect,
71+
recovery.OnAgentRegister,
72+
recovery.OnAgentUpdate,
73+
recovery.OnCommandResult,
74+
func(agentID uint) sync.Locker { return recovery.StreamMutex.For(agentID) },
75+
)
76+
if err := recovery.Init(ctx, database.GetDB().GormDB()); err != nil {
77+
_ = catcher.Error("failed to init recovery", err, map[string]any{"process": "agent-manager"})
78+
}
79+
2380
go updates.InitUpdatesManager()
24-
agent.InitGrpcServer()
81+
go agent.StartGrpcServer()
82+
83+
<-ctx.Done()
84+
catcher.Info("Shutdown signal received; draining recovery dispatches", map[string]any{"process": "agent-manager"})
85+
if err := recovery.Shutdown(15 * time.Second); err != nil {
86+
_ = catcher.Error("recovery shutdown error", err, map[string]any{"process": "agent-manager"})
87+
}
88+
catcher.Info("Agent Manager shut down cleanly", map[string]any{"process": "agent-manager"})
2589
}

agent-manager/models/recovery.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package models
2+
3+
import (
4+
"time"
5+
6+
"gorm.io/gorm"
7+
)
8+
9+
// Recovery represents an imported YAML recovery directive.
10+
// One row per yaml_id; source of truth is the YAML filesystem.
11+
type Recovery struct {
12+
gorm.Model
13+
YamlID string `gorm:"type:varchar(128);not null;uniqueIndex"`
14+
YamlHash string `gorm:"type:char(64);not null"`
15+
Name string `gorm:"type:varchar(255);not null"`
16+
Description string `gorm:"type:text"`
17+
Shell string `gorm:"type:varchar(16);not null"`
18+
TargetOS string `gorm:"type:varchar(32);not null"`
19+
TargetVersionLte string `gorm:"type:varchar(32)"`
20+
SuccessPattern string `gorm:"type:varchar(255);not null;default:'RECOVERY_OK'"`
21+
MaxConcurrency int `gorm:"not null;default:100"`
22+
MaxAttempts int `gorm:"not null;default:3"`
23+
RetryAfterSecs int `gorm:"type:int;not null;default:1800"`
24+
AckTimeoutSecs int `gorm:"type:int;not null;default:300"`
25+
ExpiresAt time.Time `gorm:"not null"`
26+
DryRun bool `gorm:"not null;default:false"`
27+
Script string `gorm:"type:text;not null"`
28+
DependsOnYamlID string `gorm:"type:varchar(128)"`
29+
Status string `gorm:"type:varchar(16);not null;default:'ACTIVE';index:idx_recoveries_status"`
30+
BlockedReason string `gorm:"type:varchar(512)"`
31+
ImportedAt time.Time `gorm:"not null;autoCreateTime"`
32+
}
33+
34+
// RecoveryTarget represents one agent's execution slot for a given recovery.
35+
// One row per (recovery_id, agent_id) pair.
36+
type RecoveryTarget struct {
37+
gorm.Model
38+
RecoveryID uint `gorm:"not null;uniqueIndex:idx_recovery_target_unique;index:idx_target_recovery_status,priority:1"`
39+
Recovery Recovery `gorm:"foreignKey:RecoveryID"`
40+
AgentID uint `gorm:"not null;uniqueIndex:idx_recovery_target_unique;index:idx_target_status_agent,priority:2"`
41+
Status string `gorm:"type:varchar(16);not null;default:'PENDING';index:idx_target_status_agent,priority:1;index:idx_target_recovery_status,priority:2"`
42+
Attempts int `gorm:"not null;default:0"`
43+
LastAttemptAt *time.Time
44+
LastCmdID string `gorm:"type:varchar(64)"`
45+
LastResult string `gorm:"type:text"`
46+
LastError string `gorm:"type:varchar(512)"`
47+
CompletedAt *time.Time
48+
}

agent-manager/recoveries/.gitkeep

Whitespace-only changes.

0 commit comments

Comments
 (0)