-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprovider.go
More file actions
346 lines (288 loc) · 10.4 KB
/
provider.go
File metadata and controls
346 lines (288 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package scaleway
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync/atomic"
"github.com/hashicorp/go-hclog"
"github.com/scaleway/scaleway-sdk-go/api/applesilicon/v1alpha1"
"github.com/scaleway/scaleway-sdk-go/scw"
"gitlab.com/gitlab-org/fleeting/fleeting/provider"
)
// namePrefix is embedded in every server name so we can identify servers that
// belong to this instance group when listing.
const namePrefix = "fleeting-"
var _ provider.InstanceGroup = (*InstanceGroup)(nil)
// InstanceGroup implements the fleeting provider.InstanceGroup interface for
// Scaleway Apple Silicon servers.
type InstanceGroup struct {
// AccessKey is the Scaleway API access key (SCWXXXXXXXXXXXXXXXXX format).
// Required together with SecretKey when using API key authentication.
// Falls back to the SCW_ACCESS_KEY environment variable when empty.
AccessKey string `json:"access_key,omitempty"`
// SecretKey is the Scaleway API secret key (UUID format).
// Falls back to the SCW_SECRET_KEY environment variable when empty.
// When provided without AccessKey it is used as a JWT/session token.
SecretKey string `json:"secret_key,omitempty"`
// ProjectID is the Scaleway project in which servers are created.
// Falls back to the SCW_DEFAULT_PROJECT_ID environment variable when empty.
ProjectID string `json:"project_id,omitempty"`
// Zone is the Scaleway availability zone, e.g. "fr-par-3".
// Falls back to the SCW_DEFAULT_ZONE environment variable when empty.
// Currently only "fr-par-3" supports Apple Silicon.
Zone string `json:"zone,omitempty"`
// ServerType is the Mac mini type to provision, e.g. "M2-M", "M2-L", "M1-M".
ServerType string `json:"server_type"`
// OsID is the optional OS UUID to install. When empty the default OS for
// the chosen server type is used.
OsID string `json:"os_id,omitempty"`
// Name is a logical name for this instance group. It is used as a
// prefix/tag to identify servers managed by this group.
Name string `json:"name"`
log hclog.Logger
api *applesilicon.API
zone scw.Zone
instanceCounter atomic.Int32
settings provider.Settings
}
// Init validates configuration, creates the Scaleway API client and returns
// provider metadata.
func (g *InstanceGroup) Init(ctx context.Context, log hclog.Logger, settings provider.Settings) (provider.ProviderInfo, error) {
if g.ServerType == "" {
return provider.ProviderInfo{}, fmt.Errorf("server_type is required")
}
if g.Name == "" {
return provider.ProviderInfo{}, fmt.Errorf("name is required")
}
opts := []scw.ClientOption{
scw.WithEnv(), // honour SCW_* env vars
scw.WithUserAgent("fleeting-plugin-scaleway/" + Version.Version),
}
switch {
case g.AccessKey != "" && g.SecretKey != "":
// Full API key authentication (access key + secret key).
opts = append(opts, scw.WithAuth(g.AccessKey, g.SecretKey))
case g.SecretKey != "":
// Secret key only — use as a session/JWT token (same as X-Auth-Token header).
opts = append(opts, scw.WithJWT(g.SecretKey))
}
if g.ProjectID != "" {
opts = append(opts, scw.WithDefaultProjectID(g.ProjectID))
}
var zone scw.Zone
if g.Zone != "" {
zone = scw.Zone(g.Zone)
} else {
zone = scw.ZoneFrPar3 // only zone currently available for Apple Silicon
}
opts = append(opts, scw.WithDefaultZone(zone))
client, err := scw.NewClient(opts...)
if err != nil {
return provider.ProviderInfo{}, fmt.Errorf("failed to create Scaleway client: %w", err)
}
g.api = applesilicon.NewAPI(client)
g.zone = zone
g.log = log.With("name", g.Name, "zone", zone)
g.settings = settings
// Validate that the zone is reachable and server type exists.
if _, err := g.api.GetServerType(&applesilicon.GetServerTypeRequest{
Zone: g.zone,
ServerType: g.ServerType,
}); err != nil {
return provider.ProviderInfo{}, fmt.Errorf("failed to validate server type %q in zone %s: %w", g.ServerType, g.zone, err)
}
g.log.Info("Scaleway Apple Silicon plugin initialised", "server_type", g.ServerType)
return provider.ProviderInfo{
ID: path.Join("scaleway", string(g.zone), g.Name),
MaxSize: 10, // Apple Silicon quotas are typically small
Version: Version.Version,
BuildInfo: Version.BuildInfo(),
}, nil
}
// Update iterates over all servers in the project, filters to those belonging
// to this instance group, and reports their state to the fleeting runtime.
func (g *InstanceGroup) Update(ctx context.Context, fn func(instance string, state provider.State)) error {
servers, err := g.listGroupServers(ctx)
if err != nil {
return err
}
for _, srv := range servers {
state := serverState(srv)
g.log.Debug("Server state", "id", srv.ID, "status", srv.Status, "state", state)
fn(srv.ID, state)
}
return nil
}
// Increase provisions delta new Apple Silicon servers.
func (g *InstanceGroup) Increase(ctx context.Context, delta int) (int, error) {
succeeded := 0
var errs error
for range delta {
id, err := g.createServer(ctx)
if err != nil {
g.log.Error("Failed to create server", "err", err)
errs = errors.Join(errs, err)
} else {
g.log.Info("Server creation requested", "id", id)
succeeded++
}
}
g.log.Info("Increase", "delta", delta, "succeeded", succeeded)
return succeeded, errs
}
// Decrease deletes the specified servers.
func (g *InstanceGroup) Decrease(ctx context.Context, instances []string) ([]string, error) {
if len(instances) == 0 {
return nil, nil
}
succeeded := make([]string, 0, len(instances))
var errs error
for _, id := range instances {
err := g.api.DeleteServer(&applesilicon.DeleteServerRequest{
Zone: g.zone,
ServerID: id,
})
if err != nil {
// Scaleway Apple Silicon servers have a 24h minimum commitment
// period. Treat this as a non-fatal condition: log a warning and
// skip the instance rather than returning an error that causes
// taskscaler to enter a retry loop.
var precondition scw.PreconditionFailedError
if errors.As(err, &precondition) {
g.log.Warn("Server cannot be deleted yet (24h commitment)", "id", id, "err", err)
continue
}
g.log.Error("Failed to delete server", "id", id, "err", err)
errs = errors.Join(errs, err)
} else {
g.log.Info("Server deletion requested", "id", id)
succeeded = append(succeeded, id)
}
}
g.log.Info("Decrease", "requested", instances, "succeeded", succeeded)
return succeeded, errs
}
// ConnectInfo returns the SSH connection details for a provisioned server.
func (g *InstanceGroup) ConnectInfo(ctx context.Context, instanceID string) (provider.ConnectInfo, error) {
srv, err := g.api.GetServer(&applesilicon.GetServerRequest{
Zone: g.zone,
ServerID: instanceID,
})
if err != nil {
return provider.ConnectInfo{}, fmt.Errorf("failed to get server %s: %w", instanceID, err)
}
if srv.Status != applesilicon.ServerStatusReady {
return provider.ConnectInfo{}, fmt.Errorf("server %s is not ready (status: %s)", instanceID, srv.Status)
}
if srv.IP == nil {
return provider.ConnectInfo{}, fmt.Errorf("server %s has no IP address yet", instanceID)
}
ipAddr := srv.IP.String()
connCfg := g.settings.ConnectorConfig
// Apply sensible defaults for Apple Silicon if not already set by the user.
if connCfg.OS == "" {
connCfg.OS = "darwin"
}
if connCfg.Arch == "" {
connCfg.Arch = "arm64"
}
if connCfg.Protocol == "" {
connCfg.Protocol = provider.ProtocolSSH
}
// Scaleway provides the SSH username in the API response; use it if the
// connector_config did not already specify one.
if connCfg.Username == "" && srv.SSHUsername != "" {
connCfg.Username = srv.SSHUsername
}
// Expose the sudo password as the SSH password so the runner can use it
// for password-based authentication and for instance_ready_command.
if connCfg.Password == "" && srv.SudoPassword != "" {
connCfg.Password = srv.SudoPassword
}
return provider.ConnectInfo{
ConnectorConfig: connCfg,
ID: instanceID,
InternalAddr: ipAddr,
ExternalAddr: ipAddr,
}, nil
}
// Heartbeat checks whether a server still exists and is not in a terminal
// error state.
func (g *InstanceGroup) Heartbeat(ctx context.Context, instanceID string) error {
srv, err := g.api.GetServer(&applesilicon.GetServerRequest{
Zone: g.zone,
ServerID: instanceID,
})
if err != nil {
return fmt.Errorf("heartbeat: failed to get server %s: %w", instanceID, err)
}
if srv.Status == applesilicon.ServerStatusError || srv.Status == applesilicon.ServerStatusLocked {
return fmt.Errorf("%w: server %s is in status %s", provider.ErrInstanceUnhealthy, instanceID, srv.Status)
}
return nil
}
// Shutdown is a no-op; no persistent resources need to be cleaned up.
func (g *InstanceGroup) Shutdown(ctx context.Context) error {
return nil
}
// --- helpers -----------------------------------------------------------------
func (g *InstanceGroup) serverNamePrefix() string {
return namePrefix + g.Name + "-"
}
func (g *InstanceGroup) createServer(ctx context.Context) (string, error) {
index := int(g.instanceCounter.Add(1))
name := fmt.Sprintf("%s%d", g.serverNamePrefix(), index)
req := &applesilicon.CreateServerRequest{
Zone: g.zone,
Name: name,
ProjectID: g.ProjectID,
Type: g.ServerType,
}
if g.OsID != "" {
req.OsID = &g.OsID
}
srv, err := g.api.CreateServer(req)
if err != nil {
return "", fmt.Errorf("create server %q: %w", name, err)
}
return srv.ID, nil
}
func (g *InstanceGroup) listGroupServers(ctx context.Context) ([]*applesilicon.Server, error) {
resp, err := g.api.ListServers(&applesilicon.ListServersRequest{
Zone: g.zone,
ProjectID: &g.ProjectID,
})
if err != nil {
return nil, fmt.Errorf("list servers: %w", err)
}
prefix := g.serverNamePrefix()
filtered := make([]*applesilicon.Server, 0, len(resp.Servers))
for _, srv := range resp.Servers {
if strings.HasPrefix(srv.Name, prefix) {
filtered = append(filtered, srv)
}
}
return filtered, nil
}
// serverState maps a Scaleway ServerStatus to a fleeting provider.State.
func serverState(srv *applesilicon.Server) provider.State {
switch srv.Status {
case applesilicon.ServerStatusReady:
return provider.StateRunning
case applesilicon.ServerStatusStarting,
applesilicon.ServerStatusRebooting,
applesilicon.ServerStatusReinstalling,
applesilicon.ServerStatusUpdating,
applesilicon.ServerStatusLocking,
applesilicon.ServerStatusUnlocking,
applesilicon.ServerStatusUnknownStatus:
return provider.StateCreating
case applesilicon.ServerStatusError,
applesilicon.ServerStatusLocked:
return provider.StateDeleting
default:
return provider.StateCreating
}
}