Skip to content

Commit 66a0b38

Browse files
committed
MAJOR: add a feedback to the listeners Programmed condition
The feedback status is received from Hug when the haproxy.cfg is applied with success or failure
1 parent 9c75004 commit 66a0b38

26 files changed

Lines changed: 899 additions & 63 deletions

hug/haproxy/main.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ func (h *AppManagerImpl) Run() {
102102
logging.LogAttrError(err),
103103
)
104104
}
105-
if haproxyCfg.Done != nil {
106-
close(haproxyCfg.Done)
107-
}
108105
}
109106
}
110107
})
@@ -396,6 +393,27 @@ func (h *AppManagerImpl) confUpdateProcessed(haproxyCfgDiffs diffs.HaproxyConfDi
396393

397394
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "Haproxy configuration update result",
398395
slog.Any("result", result))
396+
397+
if haproxyCfgDiffs.Done != nil {
398+
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "[sending] HUG => CONTROLLER: DIFFS (Done)")
399+
close(haproxyCfgDiffs.Done)
400+
}
401+
402+
if haproxyCfgDiffs.ResultCh != nil {
403+
confResult := diffs.HaproxyConfResult{
404+
Err: err,
405+
GatewayObservedGenerations: haproxyCfgDiffs.GatewayObservedGenerations(),
406+
}
407+
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "[sending] HUG => CONTROLLER: haproxy conf update result", slog.Any("result", confResult))
408+
select {
409+
case haproxyCfgDiffs.ResultCh <- confResult:
410+
default:
411+
// TODO: check with the team
412+
h.logger.LogAttrs(context.Background(), slog.LevelError,
413+
"dropping haproxy conf result: ResultCh is full",
414+
)
415+
}
416+
}
399417
}
400418

401419
func addFrontendMetadataToResult(meta map[string]any, fe *models.Frontend) {

k8s/gate/conditions/gateway.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,25 @@ func NewGatewayAcceptedInvalidConditions(msg string) generic.Conditions {
6262
func NewGatewayAcceptedInvalidParameters(err *field.Error) generic.Conditions {
6363
return generic.Conditions{
6464
generic.ConditionType(gatewayv1.GatewayConditionAccepted): {
65-
Type: generic.ConditionType(gatewayv1.GatewayReasonInvalidParameters),
65+
Type: generic.ConditionType(gatewayv1.GatewayConditionAccepted),
6666
Status: metav1.ConditionFalse,
6767
Reason: string(gatewayv1.GatewayReasonInvalidParameters),
6868
Message: fmt.Sprintf("invalid parametersRef: %s", err),
6969
},
7070
}
7171
}
7272

73+
func NewGatewayAcceptedListenersNotValid(msg string) generic.Conditions {
74+
return generic.Conditions{
75+
generic.ConditionType(gatewayv1.GatewayConditionAccepted): {
76+
Type: generic.ConditionType(gatewayv1.GatewayConditionAccepted),
77+
Status: metav1.ConditionFalse,
78+
Reason: string(gatewayv1.GatewayReasonListenersNotValid),
79+
Message: msg,
80+
},
81+
}
82+
}
83+
7384
func NewGatewayAcceptedListenerNotValidAtLeast1Valid() generic.Conditions {
7485
return generic.Conditions{
7586
generic.ConditionType(gatewayv1.GatewayConditionAccepted): {
@@ -116,3 +127,14 @@ func NewGatewayProgrammedInvalidParameters(msg string) generic.Conditions {
116127
},
117128
}
118129
}
130+
131+
func NewGatewayProgrammedListenersNotValid(msg string) generic.Conditions {
132+
return generic.Conditions{
133+
generic.ConditionType(gatewayv1.GatewayConditionProgrammed): {
134+
Type: generic.ConditionType(gatewayv1.GatewayConditionProgrammed),
135+
Status: metav1.ConditionFalse,
136+
Reason: string(gatewayv1.GatewayReasonListenersNotValid),
137+
Message: msg,
138+
},
139+
}
140+
}

k8s/gate/conditions/gateway_listener.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,10 @@ func NewListenerResolvedRefOK() generic.Conditions {
6161
func NewListenerProgrammedPending() generic.Conditions {
6262
return generic.Conditions{
6363
generic.ConditionType(gatewayv1.ListenerConditionProgrammed): {
64-
Type: generic.ConditionType(gatewayv1.ListenerConditionProgrammed),
65-
// Status: metav1.ConditionUnknown,
66-
Status: metav1.ConditionTrue,
67-
// Reason: string(gatewayv1.ListenerReasonPending),
68-
// Message: "Listener is pending Haproxy programmation",
69-
Reason: string(gatewayv1.ListenerReasonProgrammed),
70-
Message: "Listener is programmed in Haproxy",
64+
Type: generic.ConditionType(gatewayv1.ListenerConditionProgrammed),
65+
Status: metav1.ConditionUnknown,
66+
Reason: string(gatewayv1.ListenerReasonPending),
67+
Message: "Listener is pending Haproxy programmation",
7168
},
7269
}
7370
}

k8s/gate/handler/batch.go

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,6 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev
194194
}
195195
haproxyConfDiffs := h.haproxyConfBuilder.GetDiffs()
196196
h.recordDiffMetrics(haproxyConfDiffs)
197-
if !haproxyConfDiffs.IsEmpty() || haproxyConfDiffs.ReloadNeed {
198-
if h.config.TransferHaproxyConfChannel != nil {
199-
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "DIFFS CONTROLLER => HUG")
200-
haproxyConfDiffs.Done = make(chan struct{})
201-
transferStart := time.Now()
202-
h.config.TransferHaproxyConfChannel <- haproxyConfDiffs
203-
<-haproxyConfDiffs.Done
204-
metrics.ConfigTransferDuration.Observe(time.Since(transferStart).Seconds())
205-
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "DIFFS HUG => CONTROLLER")
206-
}
207-
}
208197

209198
// -------------
210199
// Status updates
@@ -218,6 +207,22 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev
218207
// Now process them
219208
h.statusUpdater.UpdateStatus(ctx, statusUpdates)
220209

210+
if !haproxyConfDiffs.IsEmpty() || haproxyConfDiffs.ReloadNeed {
211+
if h.config.TransferHaproxyConfChannel != nil {
212+
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "[sending] CONTROLLER => HUG: DIFFS")
213+
haproxyConfDiffs.Done = make(chan struct{})
214+
haproxyConfDiffs.ResultCh = make(chan diffs.HaproxyConfResult, 1)
215+
transferStart := time.Now()
216+
h.config.TransferHaproxyConfChannel <- haproxyConfDiffs
217+
h.waitDone(haproxyConfDiffs.Done)
218+
metrics.ConfigTransferDuration.Observe(time.Since(transferStart).Seconds())
219+
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "[received] HUG => CONTROLLER: DIFFS (Done)")
220+
221+
// Forward the HUG result back as status feedback to the relevant Gateway/Route objects.
222+
h.forwardFeedback(ctx, haproxyConfDiffs.ResultCh)
223+
}
224+
}
225+
221226
// Reconcile Hug service Ports
222227
h.hugServiceReconciler.ReconcilePorts(ctx, gatetree.VirtualListeners)
223228
}
@@ -256,6 +261,50 @@ func (h *eventHandlerImpl) updateClusterStore(event any) {
256261
}
257262
}
258263

264+
// waitDoneTimeout is the maximum time to wait for Done to be closed after
265+
// sending diffs to HUG.
266+
const waitDoneTimeout = 30 * time.Second
267+
268+
// waitDone blocks until done is closed or waitDoneTimeout elapses, logging a
269+
// warning in the latter case.
270+
func (h *eventHandlerImpl) waitDone(done <-chan struct{}) {
271+
select {
272+
case <-done:
273+
case <-time.After(waitDoneTimeout):
274+
h.logger.LogAttrs(context.Background(), slog.LevelWarn,
275+
"timed out waiting for Done signal from HUG",
276+
)
277+
}
278+
}
279+
280+
// forwardFeedbackTimeout is the maximum time to wait for a result on ResultCh
281+
// after Done has been closed. The result may arrive slightly after Done in some
282+
// implementations, so we give a short grace period before giving up.
283+
const forwardFeedbackTimeout = 5 * time.Second
284+
285+
// forwardFeedback waits up to forwardFeedbackTimeout for a result on resultCh,
286+
// prepares the feedback status updates, and dispatches them through the shared
287+
// statusUpdater channel.
288+
func (h *eventHandlerImpl) forwardFeedback(ctx context.Context, resultCh chan diffs.HaproxyConfResult) {
289+
select {
290+
case result := <-resultCh:
291+
h.logger.LogAttrs(ctx, slog.LevelInfo, "[received] HUG => CONTROLLER: haproxy conf update result ", slog.Any("result", result))
292+
gatetree := h.treeBuilder.GetTree()
293+
for gwKey, gen := range result.GatewayObservedGenerations {
294+
gatetree.UpdateListenerProgrammedCondition(gwKey, gen, result.Err)
295+
}
296+
gateways := gatetree.RLockGateways()
297+
updates := h.statusUpdater.PrepareFeedbackStatusUpdate(result, gateways)
298+
gatetree.RUnlockGateways()
299+
h.statusUpdater.UpdateStatus(ctx, updates)
300+
case <-time.After(forwardFeedbackTimeout):
301+
// TODO: check with the team
302+
h.logger.LogAttrs(context.Background(), slog.LevelWarn,
303+
"timed out waiting for result from HUG on ResultCh",
304+
)
305+
}
306+
}
307+
259308
func (*eventHandlerImpl) recordDiffMetrics(d diffs.HaproxyConfDiffs) {
260309
metrics.ConfigDiffs.WithLabelValues("created", "frontend").Add(float64(len(d.Created.Frontends)))
261310
metrics.ConfigDiffs.WithLabelValues("updated", "frontend").Add(float64(len(d.Updated.Frontends)))

k8s/gate/handler/monitor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package handler
1515

1616
import (
1717
"context"
18+
"fmt"
1819
"log/slog"
1920
"sync"
2021
"time"
@@ -191,14 +192,17 @@ func (el *EventLoop) stopTimer() {
191192

192193
func (el *EventLoop) logEvent(e any) {
193194
var o client.Object
195+
eventType := ""
194196
switch obj := e.(type) {
195197
case *events.UpsertEvent:
196198
o = obj.Resource
199+
eventType = "UpsertEvent"
197200
case *events.DeleteEvent:
198201
o = obj.Type
202+
eventType = "DeleteEvent"
199203
}
200204
el.logger.LogAttrs(context.Background(), slog.LevelDebug,
201-
"added an event to the batch",
205+
fmt.Sprintf("added an event to the batch %s", eventType),
202206
logging.LogAttrBatch(el.nextBatch.BatchID, len(el.nextBatch.Events)),
203207
logging.LogAttrResource(o, el.extractGVK(o)),
204208
)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright 2025 HAProxy Technologies LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package handler

k8s/gate/haproxy/configuration.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,24 @@ func (c *Configuration) resetDiffs() {
4848
}
4949
}
5050

51-
func (c *Configuration) upsertFrontend(logger *slog.Logger, fe *models.Frontend) error {
51+
func (c *Configuration) upsertFrontend(logger *slog.Logger, fe *models.Frontend, reprogramm bool) error {
5252
if fe == nil {
5353
logger.LogAttrs(context.Background(), slog.LevelError, "nil frontend")
5454
return errors.New("nil frontend")
5555
}
5656

5757
if previousFe, ok := c.structured.Frontends[fe.Name]; ok {
5858
// Check if they are the same
59-
if previousFe.Equal(*fe) {
59+
if previousFe.Equal(*fe) && !reprogramm {
6060
logger.LogAttrs(context.Background(), slog.LevelDebug, "Frontend [same]",
6161
logging.LogAttrFrontendName(fe.Name),
6262
)
6363
return nil
6464
}
6565

6666
// Update existing frontend
67-
logger.LogAttrs(context.Background(), slog.LevelInfo, "Frontend [UPDATE]",
67+
msg := fmt.Sprintf("Frontend [UPDATE] (%t)", reprogramm)
68+
logger.LogAttrs(context.Background(), slog.LevelInfo, msg,
6869
logging.LogAttrFrontendName(fe.Name),
6970
)
7071

k8s/gate/haproxy/diffs/diffs.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,32 @@
1414
package diffs
1515

1616
import (
17+
"encoding/json"
1718
"fmt"
19+
"strings"
1820

21+
"github.com/haproxytech/client-native/v6/models"
22+
"github.com/haproxytech/haproxy-unified-gateway/k8s/gate/haproxy/metadata"
1923
"github.com/haproxytech/haproxy-unified-gateway/k8s/gate/haproxy/structured"
24+
"k8s.io/apimachinery/pkg/types"
2025
)
2126

2227
type MergeStrategies struct {
2328
Global string // override or append
2429
Defaults string // override or append
2530
}
2631

32+
// GatewayObservedGenerations maps each gateway (namespace/name) to the highest
33+
// observed generation seen in the processed frontends' metadata.
34+
type GatewayObservedGenerations map[types.NamespacedName]int64
35+
36+
// HaproxyConfResult is the outcome of a single HaproxyConfDiffs processing
37+
// cycle, sent back to the controller via HaproxyConfDiffs.ResultCh.
38+
type HaproxyConfResult struct {
39+
Err error
40+
GatewayObservedGenerations GatewayObservedGenerations
41+
}
42+
2743
type HaproxyConfDiffs struct {
2844
Created structured.Structured
2945
Updated structured.Structured
@@ -36,6 +52,9 @@ type HaproxyConfDiffs struct {
3652
// close(haproxyCfg.Done)
3753
// }
3854
Done chan struct{}
55+
// ResultCh, when non-nil, receives exactly one HaproxyConfResult after
56+
// the diffs have been applied. The channel must be buffered (size >= 1).
57+
ResultCh chan HaproxyConfResult
3958

4059
MergeStrategies MergeStrategies
4160
ReloadNeed bool
@@ -45,6 +64,77 @@ func (c HaproxyConfDiffs) IsEmpty() bool {
4564
return c.Created.IsEmpty() && c.Updated.IsEmpty() && c.Deleted.IsEmpty()
4665
}
4766

67+
// GatewayObservedGenerations builds a map of gateway → max generation from
68+
// the metadata of all Created and Updated frontends. Deleted frontends are
69+
// excluded because their config is no longer active.
70+
func (c HaproxyConfDiffs) GatewayObservedGenerations() GatewayObservedGenerations {
71+
result := make(GatewayObservedGenerations)
72+
for _, fe := range c.Created.Frontends {
73+
for key, gen := range extractGatewayGenerationsFromFrontend(fe) {
74+
if existing, ok := result[key]; !ok || gen > existing {
75+
result[key] = gen
76+
}
77+
}
78+
}
79+
for _, fe := range c.Updated.Frontends {
80+
for key, gen := range extractGatewayGenerationsFromFrontend(fe) {
81+
if existing, ok := result[key]; !ok || gen > existing {
82+
result[key] = gen
83+
}
84+
}
85+
}
86+
return result
87+
}
88+
89+
func extractGatewayGenerationsFromFrontend(fe *models.Frontend) map[types.NamespacedName]int64 {
90+
result := make(map[types.NamespacedName]int64)
91+
if fe == nil || fe.Metadata == nil {
92+
return result
93+
}
94+
hugI, ok := fe.Metadata[metadata.UnifiedGatewayMetaDataKey]
95+
if !ok {
96+
return result
97+
}
98+
by, err := json.Marshal(hugI)
99+
if err != nil {
100+
return result
101+
}
102+
// After the JSON round-trip in FrontendMetadata(), the structure is:
103+
// { "Gateway": { "namespace/name": { "Generation": float64, "LinkID": string } } }
104+
var hugMeta map[string]map[string]map[string]any
105+
if err := json.Unmarshal(by, &hugMeta); err != nil {
106+
return result
107+
}
108+
gatewayMeta, ok := hugMeta["Gateway"]
109+
if !ok {
110+
return result
111+
}
112+
for nsName, info := range gatewayMeta {
113+
genAny, ok := info["Generation"]
114+
if !ok {
115+
continue
116+
}
117+
genFloat, ok := genAny.(float64)
118+
if !ok {
119+
continue
120+
}
121+
nn := parseNamespacedName(nsName)
122+
if nn == (types.NamespacedName{}) {
123+
continue
124+
}
125+
result[nn] = int64(genFloat)
126+
}
127+
return result
128+
}
129+
130+
func parseNamespacedName(s string) types.NamespacedName {
131+
parts := strings.SplitN(s, "/", 2)
132+
if len(parts) != 2 {
133+
return types.NamespacedName{}
134+
}
135+
return types.NamespacedName{Namespace: parts[0], Name: parts[1]}
136+
}
137+
48138
func (c HaproxyConfDiffs) Stats() string {
49139
return fmt.Sprintf("Created/Updated/Deleted FE:[%d/%d/%d] BE[%d/%d/%d] Global[%d/%d/%d] Defaults[%d/%d/%d] Reload[%t]",
50140
len(c.Created.Frontends), len(c.Updated.Frontends), len(c.Deleted.Frontends),

0 commit comments

Comments
 (0)