Skip to content

Commit 44f5433

Browse files
committed
feat: enable restarting precreated qemu cluster
Allows same talos clusters to created by `talosctl cluster create dev` to be started again after system restart or such. This is useful when using talosctl created clusters as development clusters where rapid iteration is required. Signed-off-by: Jaakko Sirén <jaakko@craci.com>
1 parent d43a01c commit 44f5433

21 files changed

Lines changed: 623 additions & 82 deletions

File tree

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package cluster
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"os"
11+
12+
"github.com/spf13/cobra"
13+
14+
"github.com/siderolabs/talos/pkg/cli"
15+
"github.com/siderolabs/talos/pkg/provision"
16+
"github.com/siderolabs/talos/pkg/provision/providers"
17+
)
18+
19+
// startCmd represents the cluster start command.
20+
var startCmd = &cobra.Command{
21+
Use: "start",
22+
Short: "Starts a stopped local Talos Kubernetes cluster",
23+
Long: `Starts a local Talos Kubernetes cluster that was previously created but is now stopped.
24+
This is useful when the development container is restarted and the VM processes are no longer running.
25+
The cluster state (disks, configs) must still exist from a previous 'cluster create' command.`,
26+
Args: cobra.NoArgs,
27+
RunE: func(cmd *cobra.Command, args []string) error {
28+
return cli.WithContext(context.Background(), start)
29+
},
30+
}
31+
32+
func start(ctx context.Context) error {
33+
state, err := provision.ReadState(ctx, PersistentFlags.ClusterName, PersistentFlags.StateDir)
34+
if err != nil {
35+
return fmt.Errorf("failed to read cluster state: %w", err)
36+
}
37+
38+
provisioner, err := providers.Factory(ctx, state.ProvisionerName)
39+
if err != nil {
40+
return err
41+
}
42+
43+
defer provisioner.Close() //nolint:errcheck
44+
45+
cluster, err := provisioner.Reflect(ctx, PersistentFlags.ClusterName, PersistentFlags.StateDir)
46+
if err != nil {
47+
return err
48+
}
49+
50+
return provisioner.Start(
51+
ctx,
52+
cluster,
53+
provision.WithLogWriter(os.Stdout),
54+
)
55+
}
56+
57+
func init() {
58+
AddProvisionerFlag(startCmd)
59+
cli.Should(startCmd.Flags().MarkDeprecated(ProvisionerFlagName, "the provisioner is inferred automatically"))
60+
61+
Cmd.AddCommand(startCmd)
62+
}

pkg/provision/providers/docker/docker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ func (p *provisioner) UserDiskName(index int) string {
130130
return ""
131131
}
132132

133+
// Start is not supported for docker provisioner.
134+
func (p *provisioner) Start(ctx context.Context, cluster provision.Cluster, opts ...provision.Option) error {
135+
return fmt.Errorf("start is not supported for docker provisioner")
136+
}
137+
133138
// GetFirstInterface returns first network interface name.
134139
func (p *provisioner) GetFirstInterface() v1alpha1.IfaceSelector {
135140
return v1alpha1.IfaceByName(p.GetFirstInterfaceName())

pkg/provision/providers/qemu/create.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ import (
99
"fmt"
1010
"path/filepath"
1111

12+
"github.com/siderolabs/gen/xslices"
13+
1214
"github.com/siderolabs/talos/pkg/machinery/constants"
1315
"github.com/siderolabs/talos/pkg/provision"
16+
"github.com/siderolabs/talos/pkg/provision/providers/vm"
1417
)
1518

1619
// Create Talos cluster as a set of qemu VMs.
@@ -166,12 +169,40 @@ func (p *provisioner) Create(ctx context.Context, request provision.ClusterReque
166169
NoMasqueradeCIDRs: request.Network.NoMasqueradeCIDRs,
167170
GatewayAddrs: request.Network.GatewayAddrs,
168171
MTU: request.Network.MTU,
172+
IPMasquerade: !request.Network.Airgapped,
169173
},
170174
Nodes: nodeInfo,
171175
ExtraNodes: pxeNodeInfo,
172176
KubernetesEndpoint: p.GetExternalKubernetesControlPlaneEndpoint(request.Network, lbPort),
173177
}
174178

179+
// Save helper service configurations for restart support.
180+
state.SelfExecutable = request.SelfExecutable
181+
182+
controlPlaneIPs := xslices.Map(request.Nodes.ControlPlaneNodes(), func(req provision.NodeRequest) string {
183+
return req.IPs[0].String()
184+
})
185+
186+
state.LoadBalancerConfig = &provision.LoadBalancerConfig{
187+
BindAddress: vm.GetLbBindIP(request.Network.GatewayAddrs[0]),
188+
Upstreams: controlPlaneIPs,
189+
Ports: request.Network.LoadBalancerPorts,
190+
}
191+
192+
state.DHCPdConfig = &provision.DHCPdConfig{
193+
GatewayAddrs: request.Network.GatewayAddrs,
194+
IPXEBootScript: request.IPXEBootScript,
195+
}
196+
197+
state.DNSdConfig = &provision.DNSdConfig{
198+
GatewayAddrs: request.Network.GatewayAddrs,
199+
}
200+
201+
state.CNIConfig = &provision.CNIConfig{
202+
BinPath: request.Network.CNI.BinPath,
203+
CacheDir: request.Network.CNI.CacheDir,
204+
}
205+
175206
if err := state.Save(); err != nil {
176207
return nil, err
177208
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package qemu
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"os"
13+
"os/exec"
14+
"strconv"
15+
"syscall"
16+
17+
"github.com/hashicorp/go-multierror"
18+
19+
"github.com/siderolabs/talos/pkg/provision"
20+
"github.com/siderolabs/talos/pkg/provision/providers/vm"
21+
)
22+
23+
// Start restarts an existing cluster that was previously created but is now stopped.
24+
// This recreates the network bridge and restarts all helper services and VM nodes.
25+
func (p *provisioner) Start(ctx context.Context, cluster provision.Cluster, opts ...provision.Option) error {
26+
options := provision.DefaultOptions()
27+
28+
for _, opt := range opts {
29+
if err := opt(&options); err != nil {
30+
return err
31+
}
32+
}
33+
34+
state, ok := cluster.(*provision.State)
35+
if !ok {
36+
return fmt.Errorf("cluster is not a *provision.State")
37+
}
38+
39+
fmt.Fprintln(options.LogWriter, "recreating network bridge", state.BridgeName)
40+
41+
if err := p.RecreateNetwork(ctx, state, options); err != nil {
42+
return fmt.Errorf("error recreating network: %w", err)
43+
}
44+
45+
fmt.Fprintln(options.LogWriter, "starting load balancer")
46+
47+
if err := p.StartLoadBalancer(state); err != nil {
48+
return fmt.Errorf("error starting loadbalancer: %w", err)
49+
}
50+
51+
fmt.Fprintln(options.LogWriter, "starting dnsd")
52+
53+
if err := p.StartDNSd(state); err != nil {
54+
return fmt.Errorf("error starting dnsd: %w", err)
55+
}
56+
57+
fmt.Fprintln(options.LogWriter, "starting nodes")
58+
59+
if err := p.startNodes(ctx, state, &options); err != nil {
60+
return err
61+
}
62+
63+
fmt.Fprintln(options.LogWriter, "starting dhcpd")
64+
65+
if err := p.StartDHCPd(state); err != nil {
66+
return fmt.Errorf("error starting dhcpd: %w", err)
67+
}
68+
69+
return nil
70+
}
71+
72+
// startNodes starts all nodes from saved state.
73+
func (p *provisioner) startNodes(ctx context.Context, state *provision.State, options *provision.Options) error {
74+
errCh := make(chan error)
75+
nodes := state.ClusterInfo.Nodes
76+
77+
for _, node := range nodes {
78+
go func(node provision.NodeInfo) {
79+
errCh <- p.startNode(ctx, state, node, options)
80+
}(node)
81+
}
82+
83+
var multiErr *multierror.Error
84+
85+
for range nodes {
86+
multiErr = multierror.Append(multiErr, <-errCh)
87+
}
88+
89+
return multiErr.ErrorOrNil()
90+
}
91+
92+
// startNode starts a single node from saved state.
93+
func (p *provisioner) startNode(_ context.Context, state *provision.State, node provision.NodeInfo, options *provision.Options) error {
94+
pidPath := state.GetRelativePath(fmt.Sprintf("%s.pid", node.Name))
95+
96+
// Check if already running
97+
if vm.IsProcessRunning(pidPath) {
98+
fmt.Fprintf(options.LogWriter, "node %s already running\n", node.Name)
99+
100+
return nil
101+
}
102+
103+
// Read the saved launch config
104+
configPath := state.GetRelativePath(fmt.Sprintf("%s.config", node.Name))
105+
106+
configFile, err := os.Open(configPath)
107+
if err != nil {
108+
return fmt.Errorf("error opening config file for %s: %w", node.Name, err)
109+
}
110+
111+
defer configFile.Close() //nolint:errcheck
112+
113+
// Verify the config is valid JSON
114+
var launchConfig LaunchConfig
115+
if err := json.NewDecoder(configFile).Decode(&launchConfig); err != nil {
116+
return fmt.Errorf("error decoding config file for %s: %w", node.Name, err)
117+
}
118+
119+
// Seek back to beginning for stdin
120+
if _, err := configFile.Seek(0, io.SeekStart); err != nil {
121+
return fmt.Errorf("error seeking config file for %s: %w", node.Name, err)
122+
}
123+
124+
logFile, err := os.OpenFile(state.GetRelativePath(fmt.Sprintf("%s.log", node.Name)), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0o666)
125+
if err != nil {
126+
return fmt.Errorf("error opening log file for %s: %w", node.Name, err)
127+
}
128+
129+
defer logFile.Close() //nolint:errcheck
130+
131+
fmt.Fprintf(options.LogWriter, "starting node %s\n", node.Name)
132+
133+
cmd := exec.Command(state.SelfExecutable, "qemu-launch") //nolint:noctx // runs in background
134+
cmd.Stdout = logFile
135+
cmd.Stderr = logFile
136+
cmd.Stdin = configFile
137+
cmd.SysProcAttr = &syscall.SysProcAttr{
138+
Setsid: true, // daemonize
139+
}
140+
141+
if err = cmd.Start(); err != nil {
142+
return fmt.Errorf("error starting node %s: %w", node.Name, err)
143+
}
144+
145+
if err = os.WriteFile(pidPath, []byte(strconv.Itoa(cmd.Process.Pid)), os.ModePerm); err != nil {
146+
return fmt.Errorf("error writing PID file for %s: %w", node.Name, err)
147+
}
148+
149+
return nil
150+
}

pkg/provision/providers/vm/dhcpd.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,29 @@ const (
275275
dhcpLog = "dhcpd.log"
276276
)
277277

278-
// startDHCPd starts the DHCPd server.
279-
func (p *Provisioner) startDHCPd(state *provision.State, clusterReq provision.ClusterRequest) error {
278+
// DestroyDHCPd destoys load balancer.
279+
func (p *Provisioner) DestroyDHCPd(state *provision.State) error {
280+
pidPath := state.GetRelativePath(dhcpPid)
281+
282+
return StopProcessByPidfile(pidPath)
283+
}
284+
285+
// StartDHCPd starts the DHCP server if not already running, using saved state config.
286+
func (p *Provisioner) StartDHCPd(state *provision.State) error {
280287
pidPath := state.GetRelativePath(dhcpPid)
281288

289+
if IsProcessRunning(pidPath) {
290+
return nil
291+
}
292+
293+
if state.DHCPdConfig == nil {
294+
return fmt.Errorf("no DHCPd config in state; cluster was created with older talosctl, please destroy and recreate")
295+
}
296+
297+
if state.SelfExecutable == "" {
298+
return fmt.Errorf("no self executable path in state; cluster was created with older talosctl, please destroy and recreate")
299+
}
300+
282301
logFile, err := os.OpenFile(state.GetRelativePath(dhcpLog), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0o666)
283302
if err != nil {
284303
return err
@@ -291,17 +310,17 @@ func (p *Provisioner) startDHCPd(state *provision.State, clusterReq provision.Cl
291310
return err
292311
}
293312

294-
gatewayAddrs := xslices.Map(clusterReq.Network.GatewayAddrs, netip.Addr.String)
313+
gatewayAddrs := xslices.Map(state.DHCPdConfig.GatewayAddrs, netip.Addr.String)
295314

296315
args := []string{
297316
"dhcpd-launch",
298317
"--state-path", statePath,
299318
"--addr", strings.Join(gatewayAddrs, ","),
300319
"--interface", state.BridgeName,
301-
"--ipxe-next-handler", clusterReq.IPXEBootScript,
320+
"--ipxe-next-handler", state.DHCPdConfig.IPXEBootScript,
302321
}
303322

304-
cmd := exec.Command(clusterReq.SelfExecutable, args...) //nolint:noctx // runs in background
323+
cmd := exec.Command(state.SelfExecutable, args...) //nolint:noctx // runs in background
305324
cmd.Stdout = logFile
306325
cmd.Stderr = logFile
307326
cmd.SysProcAttr = &syscall.SysProcAttr{
@@ -318,10 +337,3 @@ func (p *Provisioner) startDHCPd(state *provision.State, clusterReq provision.Cl
318337

319338
return nil
320339
}
321-
322-
// DestroyDHCPd destoys load balancer.
323-
func (p *Provisioner) DestroyDHCPd(state *provision.State) error {
324-
pidPath := state.GetRelativePath(dhcpPid)
325-
326-
return StopProcessByPidfile(pidPath)
327-
}

pkg/provision/providers/vm/dhcpd_darwin.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ import (
2323
// starts the talos DHCP server and then starts the apple bootp server again, which is configured such
2424
// that it detects existing dhcp servers on interfaces and doesn't interfare with them.
2525
func (p *Provisioner) CreateDHCPd(ctx context.Context, state *provision.State, clusterReq provision.ClusterRequest) error {
26+
state.DHCPdConfig = &provision.DHCPdConfig{
27+
GatewayAddrs: clusterReq.Network.GatewayAddrs,
28+
IPXEBootScript: clusterReq.IPXEBootScript,
29+
}
30+
state.SelfExecutable = clusterReq.SelfExecutable
31+
2632
err := waitForInterface(ctx, state.BridgeName)
2733
if err != nil {
2834
return err
@@ -35,7 +41,7 @@ func (p *Provisioner) CreateDHCPd(ctx context.Context, state *provision.State, c
3541
return fmt.Errorf("failed to stop native dhcp server: %w", err)
3642
}
3743

38-
err = p.startDHCPd(state, clusterReq)
44+
err = p.StartDHCPd(state)
3945
if err != nil {
4046
return err
4147
}

pkg/provision/providers/vm/dhcpd_linux.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,11 @@ import (
1212

1313
// CreateDHCPd creates a DHCP server.
1414
func (p *Provisioner) CreateDHCPd(ctx context.Context, state *provision.State, clusterReq provision.ClusterRequest) error {
15-
return p.startDHCPd(state, clusterReq)
15+
state.DHCPdConfig = &provision.DHCPdConfig{
16+
GatewayAddrs: clusterReq.Network.GatewayAddrs,
17+
IPXEBootScript: clusterReq.IPXEBootScript,
18+
}
19+
state.SelfExecutable = clusterReq.SelfExecutable
20+
21+
return p.StartDHCPd(state)
1622
}

pkg/provision/providers/vm/dnsd.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ const (
6868

6969
// CreateDNSd creates the DNSd server.
7070
func (p *Provisioner) CreateDNSd(state *State, clusterReq provision.ClusterRequest) error {
71-
return p.startDNSd(state, clusterReq)
71+
state.DNSdConfig = &provision.DNSdConfig{
72+
GatewayAddrs: clusterReq.Network.GatewayAddrs,
73+
}
74+
state.SelfExecutable = clusterReq.SelfExecutable
75+
76+
return p.StartDNSd(state)
7277
}
7378

7479
// DestroyDNSd destoys DNSd server.

0 commit comments

Comments
 (0)