Skip to content

Commit fc612e3

Browse files
PoC: Use jsontext package to stream components from checkin requests
1 parent 4490fee commit fc612e3

6 files changed

Lines changed: 504 additions & 9 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ require (
6161
github.com/elastic/go-windows v1.0.2 // indirect
6262
github.com/elastic/gosigar v0.14.3 // indirect
6363
github.com/fatih/color v1.15.0 // indirect
64+
github.com/go-json-experiment/json v0.0.0-20260520185125-572e7c383686 // indirect
6465
github.com/go-logr/logr v1.4.3 // indirect
6566
github.com/go-logr/stdr v1.2.2 // indirect
6667
github.com/go-ole/go-ole v1.3.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ github.com/fxamacker/cbor/v2 v2.9.2 h1:X4Ksno9+x3cz0TZv69ec1hxP/+tymuR8PXQJyDwfh
6161
github.com/fxamacker/cbor/v2 v2.9.2/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
6262
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
6363
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
64+
github.com/go-json-experiment/json v0.0.0-20260520185125-572e7c383686 h1:NZBJxCpbHS1gzS6xAmyxbJznosZIIPk9IB42v62UvKA=
65+
github.com/go-json-experiment/json v0.0.0-20260520185125-572e7c383686/go.mod h1:tphK2c80bpPhMOI4v6bIc2xWywPfbqi1Z06+RcrMkDg=
6466
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
6567
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
6668
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=

internal/pkg/api/handleCheckin.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,12 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
181181
span, ctx := apm.StartSpan(r.Context(), "validateRequest", "validate")
182182
defer span.End()
183183

184-
body := r.Body
185-
// Limit the size of the body to prevent malicious agent from exhausting RAM in server
186-
if ct.cfg.Limits.CheckinLimit.MaxBody > 0 {
187-
body = http.MaxBytesReader(w, body, ct.cfg.Limits.CheckinLimit.MaxBody)
184+
// Checkin requests that have unknown size or that are too large will use the streaming validation approach
185+
if r.ContentLength == -1 || r.ContentLength > ct.cfg.Limits.CheckinLimit.MaxBody {
186+
return ct.validateRequestStream(zlog, w, r, start, agent)
188187
}
189-
readCounter := datacounter.NewReaderCounter(body)
188+
189+
readCounter := datacounter.NewReaderCounter(r.Body)
190190

191191
// Decompress the body when the client signals Content-Encoding: gzip.
192192
var bodyReader io.Reader = readCounter
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package api
6+
7+
import (
8+
"compress/gzip"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"time"
14+
15+
"github.com/go-json-experiment/json/jsontext"
16+
17+
"github.com/miolini/datacounter"
18+
"github.com/rs/zerolog"
19+
"go.elastic.co/apm/v2"
20+
21+
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
22+
"github.com/elastic/fleet-server/v7/internal/pkg/model"
23+
)
24+
25+
// componentFlushThreshold is the number of component bytes accumulated before
26+
// issuing a partial ES update while still reading the request body.
27+
const componentFlushThreshold int64 = 4 * 1024 * 1024
28+
29+
// streamParseCheckinRequest decodes a CheckinRequest from r using jsontext streaming.
30+
// It is specifically meant to stream the components array into a sequence of intermediate ES updates
31+
// so the the fleet-server does not have to decode a large request into memory at once (in order to avoid OOM issues).
32+
// it return a CheckinRequest without Components, and the unhealthyReason parsed from the components (if any)
33+
func (ct *CheckinT) streamParseCheckinRequest(r io.Reader, threshold int64, agent *model.Agent) (CheckinRequest, *[]string, error) {
34+
dec := jsontext.NewDecoder(r)
35+
var req CheckinRequest
36+
var reason *[]string
37+
38+
tok, err := dec.ReadToken()
39+
if err != nil {
40+
return req, reason, fmt.Errorf("opening token: %w", err)
41+
}
42+
if tok.Kind() != jsontext.KindBeginObject {
43+
return req, reason, fmt.Errorf("expected JSON object, got %s", tok.Kind().String())
44+
}
45+
46+
for dec.PeekKind() != jsontext.KindEndObject {
47+
keyTok, err := dec.ReadToken()
48+
if err != nil {
49+
return req, reason, fmt.Errorf("json read key: %w", err)
50+
}
51+
key := keyTok.String()
52+
53+
switch key {
54+
case "components":
55+
unhealthyReason, err := ct.streamComponentsArray(dec, threshold, agent)
56+
if err != nil {
57+
return req, reason, fmt.Errorf("json error streaming components: %w", err)
58+
}
59+
reason = unhealthyReason
60+
default:
61+
val, err := dec.ReadValue()
62+
if err != nil {
63+
return req, reason, fmt.Errorf("json read error %q: %w", key, err)
64+
}
65+
if err := assignCheckinRequestField(&req, key, val); err != nil {
66+
return req, reason, fmt.Errorf("struct assign error %q: %w", key, err)
67+
}
68+
}
69+
}
70+
71+
if _, err := dec.ReadToken(); err != nil {
72+
return req, reason, fmt.Errorf("failed to read closing token: %w", err)
73+
}
74+
return req, reason, nil
75+
}
76+
77+
// streamComponentsArray reads the components JSON array from dec.
78+
// It flushes partial arrays as intermediate updates to ES.
79+
// Returns the unhealthy reason collected from the components
80+
func (ct *CheckinT) streamComponentsArray(dec *jsontext.Decoder, threshold int64, agent *model.Agent) (*[]string, error) {
81+
if dec.PeekKind() == jsontext.KindNull {
82+
// FIXME need to clean component list in ES if we ever get here
83+
// If this ever happens it means that the content length was either set to -1
84+
// or some other part of the request is greater than the checkin body limit
85+
// there is no guard against the latter from occuring
86+
if _, err := dec.ReadToken(); err != nil { // consume null
87+
return nil, err
88+
}
89+
return nil, nil
90+
}
91+
92+
arrTok, err := dec.ReadToken()
93+
if err != nil {
94+
return nil, err
95+
}
96+
if arrTok.Kind() != jsontext.KindBeginArray {
97+
return nil, fmt.Errorf("expected array, got %c", arrTok.Kind())
98+
}
99+
100+
var (
101+
components = make([]model.ComponentsItems, 0, 100) // 100 is arbitrary to pre allocate some space
102+
reason []string
103+
flushNum int
104+
accumulated int64
105+
)
106+
107+
for dec.PeekKind() != jsontext.KindEndArray {
108+
val, err := dec.ReadValue()
109+
if err != nil {
110+
return nil, fmt.Errorf("read component element: %w", err)
111+
}
112+
accumulated += int64(len(val))
113+
114+
var component model.ComponentsItems
115+
if err := json.Unmarshal(val, &component); err != nil {
116+
return nil, fmt.Errorf("parse component element: %w", err)
117+
}
118+
119+
components = append(components, component)
120+
121+
if accumulated >= threshold {
122+
reason = append(reason, calcUnhealthyReason(components)...)
123+
if err := ct.bc.CheckIn(agent.Id, checkin.WithComponentsStream(components, flushNum)); err != nil {
124+
return nil, fmt.Errorf("partial component update failed: %w", err)
125+
}
126+
flushNum++
127+
clear(components) // zero elements
128+
components = components[:0] // set len to 0 without removing capacity
129+
accumulated = 0
130+
}
131+
}
132+
133+
// Consume closing ]
134+
if _, err := dec.ReadToken(); err != nil {
135+
return nil, err
136+
}
137+
138+
if flushNum == 0 && len(components) == 0 {
139+
// FIXME update ES with no components here too
140+
return nil, nil
141+
}
142+
if len(components) > 0 {
143+
reason = append(reason, calcUnhealthyReason(components)...)
144+
if err := ct.bc.CheckIn(agent.Id, checkin.WithComponentsStream(components, flushNum)); err != nil {
145+
return nil, fmt.Errorf("partial component update failed: %w", err)
146+
}
147+
}
148+
return &reason, nil
149+
}
150+
151+
// assignCheckinRequestField unmarshals val into the named field of req.
152+
// Unknown fields are silently ignored, matching standard JSON decoder behaviour.
153+
func assignCheckinRequestField(req *CheckinRequest, key string, val jsontext.Value) error {
154+
switch key {
155+
case "ack_token":
156+
var v string
157+
if err := json.Unmarshal(val, &v); err != nil {
158+
return err
159+
}
160+
req.AckToken = &v
161+
case "agent_policy_id":
162+
var v string
163+
if err := json.Unmarshal(val, &v); err != nil {
164+
return err
165+
}
166+
req.AgentPolicyId = &v
167+
case "local_metadata":
168+
req.LocalMetadata = json.RawMessage(val.Clone())
169+
case "message":
170+
return json.Unmarshal(val, &req.Message)
171+
case "policy_revision_idx":
172+
return json.Unmarshal(val, &req.PolicyRevisionIdx)
173+
case "poll_timeout":
174+
var v string
175+
if err := json.Unmarshal(val, &v); err != nil {
176+
return err
177+
}
178+
req.PollTimeout = &v
179+
case "status":
180+
return json.Unmarshal(val, &req.Status)
181+
case "upgrade":
182+
req.Upgrade = json.RawMessage(val.Clone())
183+
case "upgrade_details":
184+
req.UpgradeDetails = new(UpgradeDetails)
185+
return json.Unmarshal(val, req.UpgradeDetails)
186+
}
187+
return nil
188+
}
189+
190+
// validateRequestStream is the streaming request validator.
191+
// It uses jsontext to parse the request body token-by-token, specifically to handle
192+
// the components array seperatly from other attributes.
193+
// When validating the request stream, ES will recieve intermediate updates to the components
194+
// array before things like status are set.
195+
// The return validatdCheckin object will have the Components attribute set to nil.
196+
//
197+
// FIXME: go v1.27+ this is a PoC to demonstrate stream-read behaviour only
198+
func (ct *CheckinT) validateRequestStream(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent) (validatedCheckin, error) {
199+
span, ctx := apm.StartSpan(r.Context(), "validateRequestStream", "validate")
200+
defer span.End()
201+
202+
readCounter := datacounter.NewReaderCounter(r.Body)
203+
204+
var bodyReader io.Reader = readCounter
205+
if r.Header.Get("Content-Encoding") == kEncodingGzip {
206+
gr, err := gzip.NewReader(readCounter)
207+
if err != nil {
208+
return validatedCheckin{}, &BadRequestErr{msg: "unable to create gzip reader for request body", nextErr: err}
209+
}
210+
defer gr.Close()
211+
bodyReader = gr
212+
}
213+
214+
req, unhealthyReason, err := ct.streamParseCheckinRequest(bodyReader, componentFlushThreshold, agent)
215+
if err != nil {
216+
return validatedCheckin{}, &BadRequestErr{msg: "unable to decode checkin request", nextErr: err}
217+
}
218+
cntCheckin.bodyIn.Add(readCounter.Count())
219+
220+
if unhealthyReason == nil {
221+
// fallback to agent doc if no reason found
222+
unhealthyReason = &agent.UnhealthyReason
223+
if agent.UnhealthyReason == nil && (agent.LastCheckinStatus == FailedStatus || agent.LastCheckinStatus == DegradedStatus) {
224+
// set to other if no reason found, and agent is not healthy
225+
unhealthyReason = &([]string{"other"})
226+
}
227+
}
228+
229+
if req.Status == CheckinRequestStatus("") {
230+
return validatedCheckin{}, &BadRequestErr{msg: "checkin status missing"}
231+
}
232+
if len(req.Message) == 0 {
233+
zlog.Warn().Msg("checkin request method is empty.")
234+
}
235+
236+
var pDur time.Duration
237+
if req.PollTimeout != nil {
238+
pDur, err = time.ParseDuration(*req.PollTimeout)
239+
if err != nil {
240+
return validatedCheckin{}, &BadRequestErr{msg: "poll_timeout cannot be parsed as duration", nextErr: err}
241+
}
242+
}
243+
244+
pollDuration := ct.cfg.Timeouts.CheckinLongPoll
245+
if pDur != 0 {
246+
pollDuration = max(min(pDur-(2*time.Minute), ct.cfg.Timeouts.CheckinMaxPoll), time.Minute)
247+
wTime := pollDuration + time.Minute
248+
rc := http.NewResponseController(w)
249+
if err := rc.SetWriteDeadline(start.Add(wTime)); err != nil {
250+
zlog.Warn().Err(err).Time("write_deadline", start.Add(wTime)).Msg("Unable to set checkin write deadline.")
251+
} else {
252+
zlog.Trace().Time("write_deadline", start.Add(wTime)).Msg("Request write deadline set.")
253+
}
254+
}
255+
zlog.Trace().Dur("pollDuration", pollDuration).Msg("Request poll duration set.")
256+
257+
rawMeta, err := parseMeta(zlog, agent, &req)
258+
if err != nil {
259+
return validatedCheckin{}, &BadRequestErr{msg: "unable to parse meta", nextErr: err}
260+
}
261+
262+
seqno, err := ct.resolveSeqNo(ctx, zlog, req, agent)
263+
if err != nil {
264+
return validatedCheckin{}, err
265+
}
266+
267+
rawRollbacks, err := parseAvailableRollbacks(zlog, agent.Upgrade, &req)
268+
if err != nil {
269+
zlog.Warn().Err(err).Msg("unable to parse available rollbacks")
270+
rawRollbacks = nil
271+
}
272+
273+
return validatedCheckin{
274+
req: &req,
275+
dur: pollDuration,
276+
rawMeta: rawMeta,
277+
seqno: seqno,
278+
unhealthyReason: unhealthyReason,
279+
rawAvailableRollbacks: rawRollbacks,
280+
}, nil
281+
}

0 commit comments

Comments
 (0)