Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hcloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -55,9 +56,10 @@ type cloud struct {
recorder record.EventRecorder
networkID int64
cidr string
nodeLister corelisters.NodeLister
}

func NewCloud(cidr string) (cloudprovider.Interface, error) {
func NewCloud(cidr string, nodeLister corelisters.NodeLister) (cloudprovider.Interface, error) {
const op = "hcloud/newCloud"
metrics.OperationCalled.WithLabelValues(op).Inc()

Expand Down Expand Up @@ -147,6 +149,7 @@ func NewCloud(cidr string) (cloudprovider.Interface, error) {
cfg: cfg,
networkID: networkID,
cidr: cidr,
nodeLister: nodeLister,
}, nil
}

Expand Down Expand Up @@ -213,6 +216,7 @@ func (c *cloud) Routes() (cloudprovider.Routes, bool) {
c.networkID,
c.cidr,
c.recorder,
c.nodeLister,
)
if err != nil {
klog.ErrorS(err, "create routes provider", "networkID", c.networkID)
Expand Down
10 changes: 5 additions & 5 deletions hcloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestNewCloud(t *testing.T) {
json.NewEncoder(w).Encode(schema.LocationListResponse{Locations: []schema.Location{}})
})

_, err := NewCloud(DefaultClusterCIDR)
_, err := NewCloud(DefaultClusterCIDR, nil)
assert.NoError(t, err)
}

Expand All @@ -107,7 +107,7 @@ func TestNewCloudConnectionNotPossible(t *testing.T) {
)
defer resetEnv()

_, err := NewCloud(DefaultClusterCIDR)
_, err := NewCloud(DefaultClusterCIDR, nil)
assert.EqualError(t, err,
`hcloud/newCloud: Get "http://127.0.0.1:4711/v1/locations?": dial tcp 127.0.0.1:4711: connect: connection refused`)
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestNewCloudInvalidToken(t *testing.T) {
)
})

_, err := NewCloud(DefaultClusterCIDR)
_, err := NewCloud(DefaultClusterCIDR, nil)
assert.EqualError(t, err, "hcloud/newCloud: unable to authenticate (unauthorized)")
}

Expand Down Expand Up @@ -172,7 +172,7 @@ func TestCloud(t *testing.T) {
)
})

cloud, err := NewCloud(DefaultClusterCIDR)
cloud, err := NewCloud(DefaultClusterCIDR, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestCloud(t *testing.T) {
)
defer resetEnv()

c, err := NewCloud(DefaultClusterCIDR)
c, err := NewCloud(DefaultClusterCIDR, nil)
if err != nil {
t.Errorf("%s", err)
}
Expand Down
213 changes: 114 additions & 99 deletions hcloud/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ import (
"errors"
"fmt"
"net"
"slices"
"time"

"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"

"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/hcops"
"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/metrics"
"github.com/hetznercloud/hcloud-cloud-controller-manager/internal/providerid"
"github.com/hetznercloud/hcloud-go/v2/hcloud"
)

Expand All @@ -29,9 +31,10 @@ type routes struct {
serverCache *hcops.AllServersCache
clusterCIDR *net.IPNet
recorder record.EventRecorder
nodeLister corelisters.NodeLister
}

func newRoutes(client *hcloud.Client, networkID int64, clusterCIDR string, recorder record.EventRecorder) (*routes, error) {
func newRoutes(client *hcloud.Client, networkID int64, clusterCIDR string, recorder record.EventRecorder, nodeLister corelisters.NodeLister) (*routes, error) {
const op = "hcloud/newRoutes"
metrics.OperationCalled.WithLabelValues(op).Inc()

Expand Down Expand Up @@ -60,6 +63,7 @@ func newRoutes(client *hcloud.Client, networkID int64, clusterCIDR string, recor
},
clusterCIDR: cidr,
recorder: recorder,
nodeLister: nodeLister,
}, nil
}

Expand Down Expand Up @@ -105,93 +109,139 @@ func (r *routes) CreateRoute(ctx context.Context, _ string, _ string, route *clo
const op = "hcloud/CreateRoute"
metrics.OperationCalled.WithLabelValues(op).Inc()

srv, err := r.serverCache.ByName(ctx, string(route.TargetNode))
node, gateway, err := r.resolveRouteTarget(ctx, string(route.TargetNode))
if err != nil {
return fmt.Errorf("%s: %w", op, err)
return fmt.Errorf("%s: error resolving route target: %w", op, err)
}

privNet, ok := findServerPrivateNetByID(srv, r.network.ID)
if !ok {
r.serverCache.InvalidateCache()
srv, err = r.serverCache.ByName(ctx, string(route.TargetNode))
if err != nil {
return fmt.Errorf("%s: %w", op, err)
}

privNet, ok = findServerPrivateNetByID(srv, r.network.ID)
if !ok {
return fmt.Errorf("%s: server %v: network with id %d not attached to this server", op, route.TargetNode, r.network.ID)
}
if !slices.ContainsFunc(route.TargetNodeAddresses, func(target corev1.NodeAddress) bool {
return target.Type == corev1.NodeInternalIP && target.Address == gateway.String()
}) {
return fmt.Errorf("%s: IP %s not part of routes target addresses", op, gateway.String())
}
ip := privNet.IP

_, cidr, err := net.ParseCIDR(route.DestinationCIDR)
if err != nil {
return fmt.Errorf("%s: %w", op, err)
}

clusterPrefixLen, _ := r.clusterCIDR.Mask.Size()
destPrefixLen, _ := cidr.Mask.Size()
r.warnCIDRMismatch(cidr, node)

if !r.clusterCIDR.Contains(cidr.IP) || destPrefixLen < clusterPrefixLen {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(route.TargetNode),
Namespace: "",
},
}
// Event is only visible via `kubectl get events` and not `kubectl describe node`,
// as we do not have the UID here and `kubectl describe node` filters by UID.
// Because of this behavior we are also dispatching a log message.
r.recorder.Eventf(
node,
corev1.EventTypeWarning,
"ClusterCIDRMisconfigured",
"route CIDR %s is not contained within cluster CIDR %s",
route.DestinationCIDR,
r.clusterCIDR.String(),
)
klog.Warningf(
"route CIDR %s is not contained within cluster CIDR %s",
route.DestinationCIDR,
r.clusterCIDR.String(),
)
if err := r.upsertRoute(ctx, gateway, cidr, string(route.TargetNode)); err != nil {
return fmt.Errorf("error upserting route %q via %q: %w", cidr.String(), gateway.String(), err)
}

routeExists, err := r.checkIfRouteAlreadyExists(ctx, route)
return nil
}

// resolveRouteTarget returns the k8s node and the hcloud server's private IP on the routes
// network — everything needed to create a route for this node (gateway IP) and record events
// against it (node).
// Looks up the server by ProviderID to survive k8s node-name drift, with a ByName fallback for
// ID changes (e.g. server recreated). Refreshes the cache once if the private-net attachment
// isn't yet reflected.
func (r *routes) resolveRouteTarget(ctx context.Context, nodeName string) (*corev1.Node, net.IP, error) {
node, err := r.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("%s: %w", op, err)
return nil, nil, fmt.Errorf("error fetching node %s by name: %w", nodeName, err)
}

if node.Spec.ProviderID == "" {
return nil, nil, fmt.Errorf("node %s not yet initialized", node.Name)
}

id, isCloudServer, err := providerid.ToServerID(node.Spec.ProviderID)
if err != nil {
return nil, nil, fmt.Errorf("error parsing provider id %q for node %s: %w", node.Spec.ProviderID, nodeName, err)
}
if !isCloudServer {
return nil, nil, fmt.Errorf("node %s is not a cloud server, routes are only supported for cloud servers", node.Name)
}

server, err := r.serverCache.ByID(ctx, id)
if errors.Is(err, hcops.ErrNotFound) {
server, err = r.serverCache.ByName(ctx, node.Name)
}
if err != nil {
return nil, nil, fmt.Errorf("error fetching node %s by id: %w", nodeName, err)
}

if routeExists {
privNet, ok := findServerPrivateNetByID(server, r.network.ID)
if !ok {
r.serverCache.InvalidateCache()
server, err = r.serverCache.ByID(ctx, server.ID)
if err != nil {
return nil, nil, fmt.Errorf("error fetching node %s by id: %w", nodeName, err)
}
privNet, ok = findServerPrivateNetByID(server, r.network.ID)
if !ok {
return nil, nil, fmt.Errorf("server %s (%d): network with id %d not attached to this server", server.Name, server.ID, r.network.ID)
}
}

return node, privNet.IP, nil
}

// upsertRoute ensures the hcloud network has a route for cidr pointing at gateway. A matching
// route is a no-op; a stale route with a different gateway is replaced in place. nodeName is
// used only for logging and for surfacing API conflicts against the right k8s object.
func (r *routes) upsertRoute(ctx context.Context, gateway net.IP, cidr *net.IPNet, nodeName string) error {
if err := r.reloadNetwork(ctx); err != nil {
return fmt.Errorf("error reloading network: %w", err)
}

destination := cidr.String()
existingIdx := slices.IndexFunc(r.network.Routes, func(nr hcloud.NetworkRoute) bool {
return nr.Destination.String() == destination
})
if existingIdx >= 0 {
existing := r.network.Routes[existingIdx]
if existing.Gateway.Equal(gateway) {
klog.InfoS(
"route already exists: skipping creation",
"target-node", nodeName,
"destination-cidr", destination,
)
return nil
}

action, _, err := r.client.Network.DeleteRoute(ctx, r.network, hcloud.NetworkDeleteRouteOpts{
Route: existing,
})
if err != nil {
return fmt.Errorf("error deleting route for %q via %q: %w", cidr.String(), gateway.String(), err)
}
if err := r.client.Action.WaitFor(ctx, action); err != nil {
return fmt.Errorf("error deleting route for %q via %q: %w", cidr.String(), gateway.String(), err)
}
klog.InfoS(
"route already exists: skipping creation",
"target-node", route.TargetNode,
"destination-cidr", route.DestinationCIDR,
"deleted stale route with wrong gateway; recreating",
"node", nodeName,
"gateway", gateway,
"cidr", destination,
)
return nil
}

opts := hcloud.NetworkAddRouteOpts{
Route: hcloud.NetworkRoute{
Destination: cidr,
Gateway: ip,
Gateway: gateway,
},
}
action, _, err := r.client.Network.AddRoute(ctx, r.network, opts)
if err != nil {
if hcloud.IsError(err, hcloud.ErrorCodeLocked, hcloud.ErrorCodeConflict) {
return apierrors.NewConflict(
corev1.Resource("nodes"),
string(route.TargetNode),
nodeName,
err,
)
}
return fmt.Errorf("%s: %w", op, err)
return fmt.Errorf("error adding route for %q via %q: %w", cidr.String(), gateway.String(), err)
}

if err := r.client.Action.WaitFor(ctx, action); err != nil {
return fmt.Errorf("%s: %w", op, err)
return fmt.Errorf("error adding route for %q via %q: %w", cidr.String(), gateway.String(), err)
}

return nil
Expand Down Expand Up @@ -220,17 +270,6 @@ func (r *routes) DeleteRoute(ctx context.Context, _ string, route *cloudprovider
return fmt.Errorf("%s: %w", op, err)
}

err = r.deleteRouteFromHcloud(ctx, cidr, ip)
if err != nil {
return fmt.Errorf("%s: %w", op, err)
}
return nil
}

func (r *routes) deleteRouteFromHcloud(ctx context.Context, cidr *net.IPNet, ip net.IP) error {
const op = "hcloud/deleteRouteFromHcloud"
metrics.OperationCalled.WithLabelValues(op).Inc()

opts := hcloud.NetworkDeleteRouteOpts{
Route: hcloud.NetworkRoute{
Destination: cidr,
Expand Down Expand Up @@ -272,43 +311,19 @@ func (r *routes) hcloudRouteToRoute(ctx context.Context, route hcloud.NetworkRou
return cpRoute, nil
}

func (r *routes) checkIfRouteAlreadyExists(ctx context.Context, route *cloudprovider.Route) (bool, error) {
const op = "hcloud/checkIfRouteAlreadyExists"
metrics.OperationCalled.WithLabelValues(op).Inc()
func (r *routes) warnCIDRMismatch(cidr *net.IPNet, node *corev1.Node) {
clusterPrefixLen, _ := r.clusterCIDR.Mask.Size()
destPrefixLen, _ := cidr.Mask.Size()

if err := r.reloadNetwork(ctx); err != nil {
return false, fmt.Errorf("%s: %w", op, err)
}

for _, _route := range r.network.Routes {
if _route.Destination.String() == route.DestinationCIDR {
srv, err := r.serverCache.ByName(ctx, string(route.TargetNode))
if err != nil {
return false, fmt.Errorf("%s: %w", op, err)
}
privNet, ok := findServerPrivateNetByID(srv, r.network.ID)
if !ok {
return false, fmt.Errorf("%s: server %v: no network with id: %d", op, route.TargetNode, r.network.ID)
}
ip := privNet.IP

if !_route.Gateway.Equal(ip) {
action, _, err := r.client.Network.DeleteRoute(ctx, r.network, hcloud.NetworkDeleteRouteOpts{
Route: _route,
})
if err != nil {
return false, fmt.Errorf("%s: %w", op, err)
}

if err := r.client.Action.WaitFor(ctx, action); err != nil {
return false, fmt.Errorf("%s: %w", op, err)
}
}

return true, nil
}
if !r.clusterCIDR.Contains(cidr.IP) || destPrefixLen < clusterPrefixLen {
warnMsg := fmt.Sprintf(
"route CIDR %s is not contained within cluster CIDR %s",
cidr.String(),
r.clusterCIDR.String(),
)
klog.Warning(warnMsg)
r.recorder.Event(node, corev1.EventTypeWarning, "ClusterCIDRMisconfigured", warnMsg)
}
return false, nil
}

func findServerPrivateNetByID(srv *hcloud.Server, id int64) (hcloud.ServerPrivateNet, bool) {
Expand Down
Loading
Loading