Skip to content

Commit 1848569

Browse files
committed
Add labels functionality for service discovery
Signed-off-by: Moritz Reinhardt <moreinhardt@gmail.com>
1 parent 71f3cad commit 1848569

5 files changed

Lines changed: 113 additions & 28 deletions

File tree

cmd/client/main.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"crypto/tls"
2121
"crypto/x509"
22+
"encoding/json"
2223
"errors"
2324
"fmt"
2425
"io"
@@ -47,6 +48,7 @@ var (
4748
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
4849
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
4950
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()
51+
labels = kingpin.Flag("label", "Labels to register with client FQDN for service discovery in key=value format. Can be specified multiple times.").StringMap()
5052

5153
retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
5254
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
@@ -190,7 +192,19 @@ func (c *Coordinator) doPoll(client *http.Client) error {
190192
return fmt.Errorf("error parsing url poll: %w", err)
191193
}
192194
url := base.ResolveReference(u)
193-
resp, err := client.Post(url.String(), "", strings.NewReader(*myFqdn))
195+
196+
pollReq := util.PollRequest{
197+
FQDN: *myFqdn,
198+
Labels: *labels,
199+
}
200+
201+
body, err := json.Marshal(pollReq)
202+
if err != nil {
203+
c.logger.Error("Error marshaling poll request:", "err", err)
204+
return fmt.Errorf("error marshaling poll request: %w", err)
205+
}
206+
207+
resp, err := client.Post(url.String(), "application/json", bytes.NewReader(body))
194208
if err != nil {
195209
c.logger.Error("Error polling:", "err", err)
196210
return fmt.Errorf("error polling: %w", err)

cmd/proxy/coordinator.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ var (
3232
registrationTimeout = kingpin.Flag("registration.timeout", "After how long a registration expires.").Default("5m").Duration()
3333
)
3434

35+
type ClientInfo struct {
36+
LastSeen time.Time
37+
Labels map[string]string
38+
}
39+
3540
// Coordinator metrics.
3641
var (
3742
knownClients = promauto.NewGauge(
@@ -51,8 +56,8 @@ type Coordinator struct {
5156
waiting map[string]chan *http.Request
5257
// Responses from clients.
5358
responses map[string]chan *http.Response
54-
// Clients we know about and when they last contacted us.
55-
known map[string]time.Time
59+
// Clients we know about with their labels and last contact time.
60+
known map[string]ClientInfo
5661

5762
logger *slog.Logger
5863
}
@@ -62,7 +67,7 @@ func NewCoordinator(logger *slog.Logger) (*Coordinator, error) {
6267
c := &Coordinator{
6368
waiting: map[string]chan *http.Request{},
6469
responses: map[string]chan *http.Response{},
65-
known: map[string]time.Time{},
70+
known: map[string]ClientInfo{},
6671
logger: logger,
6772
}
6873

@@ -131,10 +136,10 @@ func (c *Coordinator) DoScrape(ctx context.Context, r *http.Request) (*http.Resp
131136
}
132137

133138
// WaitForScrapeInstruction registers a client waiting for a scrape result
134-
func (c *Coordinator) WaitForScrapeInstruction(fqdn string) (*http.Request, error) {
135-
c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn)
139+
func (c *Coordinator) WaitForScrapeInstruction(fqdn string, labels map[string]string) (*http.Request, error) {
140+
c.logger.Info("WaitForScrapeInstruction", "fqdn", fqdn, "labels", labels)
136141

137-
c.addKnownClient(fqdn)
142+
c.addKnownClient(fqdn, labels)
138143
// TODO: What if the client times out?
139144
ch := c.getRequestChannel(fqdn)
140145

@@ -179,24 +184,31 @@ func (c *Coordinator) ScrapeResult(r *http.Response) error {
179184
}
180185
}
181186

182-
func (c *Coordinator) addKnownClient(fqdn string) {
187+
func (c *Coordinator) addKnownClient(fqdn string, labels map[string]string) {
183188
c.mu.Lock()
184189
defer c.mu.Unlock()
185190

186-
c.known[fqdn] = time.Now()
191+
if labels == nil {
192+
labels = make(map[string]string)
193+
}
194+
195+
c.known[fqdn] = ClientInfo{
196+
LastSeen: time.Now(),
197+
Labels: labels,
198+
}
187199
knownClients.Set(float64(len(c.known)))
188200
}
189201

190-
// KnownClients returns a list of alive clients
191-
func (c *Coordinator) KnownClients() []string {
202+
// KnownClients returns a map of alive clients with their info
203+
func (c *Coordinator) KnownClients() map[string]ClientInfo {
192204
c.mu.Lock()
193205
defer c.mu.Unlock()
194206

195207
limit := time.Now().Add(-*registrationTimeout)
196-
known := make([]string, 0, len(c.known))
197-
for k, t := range c.known {
198-
if limit.Before(t) {
199-
known = append(known, k)
208+
known := make(map[string]ClientInfo)
209+
for fqdn, info := range c.known {
210+
if limit.Before(info.LastSeen) {
211+
known[fqdn] = info
200212
}
201213
}
202214
return known
@@ -210,9 +222,9 @@ func (c *Coordinator) gc() {
210222
defer c.mu.Unlock()
211223
limit := time.Now().Add(-*registrationTimeout)
212224
deleted := 0
213-
for k, ts := range c.known {
214-
if ts.Before(limit) {
215-
delete(c.known, k)
225+
for fqdn, info := range c.known {
226+
if info.LastSeen.Before(limit) {
227+
delete(c.known, fqdn)
216228
deleted++
217229
}
218230
}

cmd/proxy/main.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,24 +141,51 @@ func (h *httpHandler) handlePush(w http.ResponseWriter, r *http.Request) {
141141

142142
// handlePoll handles clients registering and asking for scrapes.
143143
func (h *httpHandler) handlePoll(w http.ResponseWriter, r *http.Request) {
144-
fqdn, _ := io.ReadAll(r.Body)
145-
request, err := h.coordinator.WaitForScrapeInstruction(strings.TrimSpace(string(fqdn)))
144+
var fqdn string
145+
var labels map[string]string
146+
147+
contentType := r.Header.Get("Content-Type")
148+
if contentType == "application/json" {
149+
var pollReq util.PollRequest
150+
body, err := io.ReadAll(r.Body)
151+
if err != nil {
152+
h.logger.Error("Error reading request body:", "err", err)
153+
http.Error(w, fmt.Sprintf("Error reading request body: %s", err.Error()), http.StatusBadRequest)
154+
return
155+
}
156+
if err := json.Unmarshal(body, &pollReq); err != nil {
157+
h.logger.Error("Error unmarshaling JSON:", "err", err)
158+
http.Error(w, fmt.Sprintf("Error unmarshaling JSON: %s", err.Error()), http.StatusBadRequest)
159+
return
160+
}
161+
fqdn = pollReq.FQDN
162+
labels = pollReq.Labels
163+
} else {
164+
body, _ := io.ReadAll(r.Body)
165+
fqdn = strings.TrimSpace(string(body))
166+
labels = make(map[string]string)
167+
}
168+
169+
request, err := h.coordinator.WaitForScrapeInstruction(fqdn, labels)
146170
if err != nil {
147171
h.logger.Info("Error WaitForScrapeInstruction:", "err", err)
148172
http.Error(w, fmt.Sprintf("Error WaitForScrapeInstruction: %s", err.Error()), http.StatusRequestTimeout)
149173
return
150174
}
151175
//nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111
152176
request.WriteProxy(w) // Send full request as the body of the response.
153-
h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id"))
177+
h.logger.Info("Responded to /poll", "url", request.URL.String(), "scrape_id", request.Header.Get("Id"), "labels", labels)
154178
}
155179

156180
// handleListClients handles requests to list available clients as a JSON array.
157181
func (h *httpHandler) handleListClients(w http.ResponseWriter, r *http.Request) {
158182
known := h.coordinator.KnownClients()
159183
targets := make([]*targetGroup, 0, len(known))
160-
for _, k := range known {
161-
targets = append(targets, &targetGroup{Targets: []string{k}})
184+
for fqdn, info := range known {
185+
targets = append(targets, &targetGroup{
186+
Targets: []string{fqdn},
187+
Labels: info.Labels,
188+
})
162189
}
163190
w.Header().Set("Content-Type", "application/json")
164191
//nolint:errcheck // https://github.com/prometheus-community/PushProx/issues/111

end-to-end-test.sh

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,40 @@ while ! curl -s -f -L http://localhost:8080/clients; do
2929
sleep 2
3030
done
3131

32-
./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 &
33-
echo $! > "${tmpdir}/client.pid"
34-
while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '1' ] ; do
35-
echo 'Waiting for client'
32+
./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 &
33+
echo $! >"${tmpdir}/client.pid"
34+
./pushprox-client --log.level=debug --proxy-url=http://localhost:8080 --fqdn=client2 --label foo=bar --label exporter=node &
35+
echo $! >"${tmpdir}/client2.pid"
36+
while [ "$(curl -s -L 'http://localhost:8080/clients' | jq 'length')" != '2' ]; do
37+
echo 'Waiting for clients'
3638
sleep 2
3739
done
3840

41+
echo "Testing client labels..."
42+
clients_response=$(curl -s -L 'http://localhost:8080/clients')
43+
44+
# Check that the first client has empty labels
45+
client1_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] != "client2") | .labels')
46+
if [ "$client1_labels" != "{}" ]; then
47+
echo "ERROR: Expected client1 to have empty labels {}, got $client1_labels"
48+
exit 1
49+
fi
50+
51+
# Check that client2 has the expected labels
52+
client2_labels=$(echo "$clients_response" | jq -r '.[] | select(.targets[] == "client2") | .labels')
53+
label_value_foo=$(echo "$client2_labels" | jq -r '.foo // "missing"')
54+
label_value_exporter=$(echo "$client2_labels" | jq -r '.exporter // "missing"')
55+
if [ "$label_value_foo" != "bar" ]; then
56+
echo "ERROR: Expected label foo=bar, got foo=$label_value_foo"
57+
exit 1
58+
fi
59+
if [ "$label_value_exporter" != "node" ]; then
60+
echo "ERROR: Expected label exporter=node, got exporter=$label_value_exporter"
61+
exit 1
62+
fi
63+
64+
echo "✅ Labels test passed: client2 has correct labels (foo=bar, exporter=node), client1 has empty labels"
65+
3966
prometheus --config.file=prometheus.yml --log.level=debug &
4067
echo $! > "${tmpdir}/prometheus.pid"
4168
while ! curl -s -f -L http://localhost:9090/-/ready; do
@@ -45,7 +72,7 @@ done
4572
sleep 15
4673

4774
query='http://localhost:9090/api/v1/query?query=node_exporter_build_info'
48-
while [ $(curl -s -L "${query}" | jq '.data.result | length') != '1' ]; do
75+
while [ $(curl -s -L "${query}" | jq '.data.result | length') != '2' ]; do
4976
echo 'Waiting for results'
5077
sleep 2
5178
done

util/proxy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import (
1919
"time"
2020
)
2121

22+
type PollRequest struct {
23+
FQDN string `json:"fqdn"`
24+
Labels map[string]string `json:"labels"`
25+
}
26+
2227
func GetScrapeTimeout(maxScrapeTimeout, defaultScrapeTimeout *time.Duration, h http.Header) time.Duration {
2328
timeout := *defaultScrapeTimeout
2429
headerTimeout, err := GetHeaderTimeout(h)

0 commit comments

Comments
 (0)