|
| 1 | +// Command parity-check is the command-line wrapper around the parity package. It |
| 2 | +// reads the configuration the extension server intended to program, reads the |
| 3 | +// proxy's live configuration, compares them, prints the result as JSON, and |
| 4 | +// exits non-zero on any mismatch. |
| 5 | +// |
| 6 | +// The proxy and extension server can be reached either by a direct URL or, when |
| 7 | +// their containers ship without any shell or HTTP client, by forwarding a port |
| 8 | +// to the pod and fetching through it from this process. The flags below select |
| 9 | +// which. |
| 10 | +package main |
| 11 | + |
| 12 | +import ( |
| 13 | + "bufio" |
| 14 | + "context" |
| 15 | + "encoding/json" |
| 16 | + "flag" |
| 17 | + "fmt" |
| 18 | + "io" |
| 19 | + "net" |
| 20 | + "net/http" |
| 21 | + "os" |
| 22 | + "os/exec" |
| 23 | + "strings" |
| 24 | + "time" |
| 25 | + |
| 26 | + "go.datum.net/network-services-operator/test/parity" |
| 27 | +) |
| 28 | + |
| 29 | +func main() { |
| 30 | + cfg := parseFlags() |
| 31 | + if err := run(cfg); err != nil { |
| 32 | + fmt.Fprintf(os.Stderr, "parity-check: %v\n", err) |
| 33 | + os.Exit(2) // 2 = could not fetch or parse; 1 = mismatch found. |
| 34 | + } |
| 35 | +} |
| 36 | + |
| 37 | +type config struct { |
| 38 | + corazaFilter string |
| 39 | + timeout time.Duration |
| 40 | + |
| 41 | + adminURL string |
| 42 | + admin execTarget |
| 43 | + |
| 44 | + extURL string |
| 45 | + ext execTarget |
| 46 | + // extSelector matches all extension server replicas. Only one replica |
| 47 | + // actually builds configuration at a time, so the check must query every |
| 48 | + // replica and use the authoritative one. Mutually exclusive with |
| 49 | + // --ext-exec-pod / --ext-url. |
| 50 | + extSelector string |
| 51 | + |
| 52 | + // expectMinBuildID, when set, requires the build count to be at least this |
| 53 | + // value, proving a fresh build happened. Capture the count before a change, |
| 54 | + // then require a higher one after. |
| 55 | + expectMinBuildID uint64 |
| 56 | + jsonOut bool |
| 57 | + |
| 58 | + // printBuildID, when true, resolves the authoritative replica and prints only |
| 59 | + // its build count, then exits without comparing anything. Used to capture the |
| 60 | + // count before a change and to wait for configuration to settle. |
| 61 | + printBuildID bool |
| 62 | +} |
| 63 | + |
| 64 | +type execTarget struct { |
| 65 | + pod string |
| 66 | + namespace string |
| 67 | + container string |
| 68 | + kubeCtx string |
| 69 | +} |
| 70 | + |
| 71 | +func (e execTarget) set() bool { return e.pod != "" } |
| 72 | + |
| 73 | +func parseFlags() config { |
| 74 | + var c config |
| 75 | + flag.StringVar(&c.corazaFilter, "coraza-filter", "coraza-waf", "Coraza HTTP filter name (identifies WAF HCM injection)") |
| 76 | + flag.DurationVar(&c.timeout, "timeout", 30*time.Second, "overall fetch timeout") |
| 77 | + |
| 78 | + flag.StringVar(&c.adminURL, "admin-url", "", "Envoy admin base URL (e.g. http://127.0.0.1:19000); use this OR --admin-exec-pod") |
| 79 | + flag.StringVar(&c.admin.pod, "admin-exec-pod", "", "proxy pod to kubectl-exec into for admin access") |
| 80 | + flag.StringVar(&c.admin.namespace, "admin-exec-namespace", "envoy-gateway-system", "namespace of the proxy pod") |
| 81 | + flag.StringVar(&c.admin.container, "admin-exec-container", "envoy", "container in the proxy pod") |
| 82 | + flag.StringVar(&c.admin.kubeCtx, "admin-exec-context", "", "kubeconfig context for admin exec (optional)") |
| 83 | + |
| 84 | + flag.StringVar(&c.extURL, "ext-url", "", "ext-server health base URL (e.g. http://127.0.0.1:8080); use this OR --ext-exec-pod / --ext-exec-selector") |
| 85 | + flag.StringVar(&c.ext.pod, "ext-exec-pod", "", "single ext-server pod to kubectl-exec into") |
| 86 | + flag.StringVar(&c.extSelector, "ext-exec-selector", "", "label selector resolving ALL ext-server replicas (multi-replica; picks the authoritative one). Preferred over --ext-exec-pod") |
| 87 | + flag.StringVar(&c.ext.namespace, "ext-exec-namespace", "", "namespace of the ext-server pod(s)") |
| 88 | + flag.StringVar(&c.ext.container, "ext-exec-container", "", "container in the ext-server pod") |
| 89 | + flag.StringVar(&c.ext.kubeCtx, "ext-exec-context", "", "kubeconfig context for ext exec (optional)") |
| 90 | + |
| 91 | + flag.Uint64Var(&c.expectMinBuildID, "expect-min-build-id", 0, "require programmed-set BuildID >= this (STALE oracle)") |
| 92 | + flag.BoolVar(&c.jsonOut, "json", true, "emit the ParityReport as JSON") |
| 93 | + flag.BoolVar(&c.printBuildID, "print-build-id", false, "resolve the authoritative ext-server replica and print ONLY its BuildID, then exit (capture half of the STALE oracle; polled by _steps/wait-config-settle.yaml)") |
| 94 | + flag.Parse() |
| 95 | + return c |
| 96 | +} |
| 97 | + |
| 98 | +func run(c config) error { |
| 99 | + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) |
| 100 | + defer cancel() |
| 101 | + |
| 102 | + // Resolve the authoritative replica and emit only its build count, without |
| 103 | + // fetching or comparing the proxy's configuration. |
| 104 | + if c.printBuildID { |
| 105 | + expected, _, err := resolveExpected(ctx, c) |
| 106 | + if err != nil { |
| 107 | + return err |
| 108 | + } |
| 109 | + fmt.Printf("%d\n", expected.BuildID) |
| 110 | + return nil |
| 111 | + } |
| 112 | + |
| 113 | + adminFetcher, err := buildFetcher(c.adminURL, c.admin, true) |
| 114 | + if err != nil { |
| 115 | + return fmt.Errorf("admin fetcher: %w", err) |
| 116 | + } |
| 117 | + |
| 118 | + // Resolve the intended configuration and the fetcher for the authoritative |
| 119 | + // replica. With a selector this queries every replica and picks the active |
| 120 | + // one; otherwise it uses the single configured endpoint. |
| 121 | + expected, extFetcher, err := resolveExpected(ctx, c) |
| 122 | + if err != nil { |
| 123 | + return err |
| 124 | + } |
| 125 | + if c.expectMinBuildID > 0 && expected.BuildID < c.expectMinBuildID { |
| 126 | + return fmt.Errorf("STALE: programmed-set BuildID %d < required %d (ext-server did not re-translate)", |
| 127 | + expected.BuildID, c.expectMinBuildID) |
| 128 | + } |
| 129 | + |
| 130 | + droppedSecrets := expected.Keys[parity.FamilyTLSPrune] |
| 131 | + actual, _, err := parity.FetchActual(ctx, adminFetcher, extFetcher, c.corazaFilter, droppedSecrets) |
| 132 | + if err != nil { |
| 133 | + return err |
| 134 | + } |
| 135 | + |
| 136 | + report := parity.Compare(expected, actual) |
| 137 | + |
| 138 | + if c.jsonOut { |
| 139 | + enc := json.NewEncoder(os.Stdout) |
| 140 | + enc.SetIndent("", " ") |
| 141 | + _ = enc.Encode(report) |
| 142 | + } |
| 143 | + fmt.Fprintln(os.Stderr, report.String()) |
| 144 | + |
| 145 | + if !report.OK() { |
| 146 | + os.Exit(1) // mismatch found — distinct from a fetch or parse failure (exit 2). |
| 147 | + } |
| 148 | + return nil |
| 149 | +} |
| 150 | + |
| 151 | +// resolveExpected returns the intended configuration and the fetcher for the |
| 152 | +// authoritative replica. With a selector it queries every replica and picks the |
| 153 | +// active one; otherwise it uses the single configured endpoint. The chosen |
| 154 | +// fetcher is reused later so other signals also come from the active replica. |
| 155 | +func resolveExpected(ctx context.Context, c config) (parity.Expected, parity.Fetcher, error) { |
| 156 | + if c.extSelector != "" { |
| 157 | + pods, err := resolvePods(ctx, c.ext.kubeCtx, c.ext.namespace, c.extSelector) |
| 158 | + if err != nil { |
| 159 | + return parity.Expected{}, nil, fmt.Errorf("resolve ext-server pods: %w", err) |
| 160 | + } |
| 161 | + if len(pods) == 0 { |
| 162 | + return parity.Expected{}, nil, fmt.Errorf("no ext-server pods matched selector %q in namespace %q", c.extSelector, c.ext.namespace) |
| 163 | + } |
| 164 | + fetchers := make(map[string]parity.Fetcher, len(pods)) |
| 165 | + for _, pod := range pods { |
| 166 | + et := c.ext |
| 167 | + et.pod = pod |
| 168 | + fetchers[pod] = &pfFetcher{target: et, remotePort: "8080"} |
| 169 | + } |
| 170 | + src, perReplicaErrs, err := parity.FetchExpectedFromMany(ctx, fetchers) |
| 171 | + if err != nil { |
| 172 | + return parity.Expected{}, nil, err |
| 173 | + } |
| 174 | + // Report per-replica fetch errors and the chosen replica for diagnosis; |
| 175 | + // they are not fatal as long as one replica answered. |
| 176 | + for pod, e := range perReplicaErrs { |
| 177 | + fmt.Fprintf(os.Stderr, "parity-check: ext replica %s unreachable: %v\n", pod, e) |
| 178 | + } |
| 179 | + fmt.Fprintf(os.Stderr, "parity-check: authoritative ext replica %s (BuildID %d) of %d\n", |
| 180 | + src.Replica, src.Expected.BuildID, len(pods)) |
| 181 | + return src.Expected, fetchers[src.Replica], nil |
| 182 | + } |
| 183 | + |
| 184 | + // Single-endpoint fallback. |
| 185 | + extFetcher, err := buildFetcher(c.extURL, c.ext, false) |
| 186 | + if err != nil { |
| 187 | + return parity.Expected{}, nil, fmt.Errorf("ext fetcher: %w", err) |
| 188 | + } |
| 189 | + exp, err := parity.FetchExpected(ctx, extFetcher) |
| 190 | + if err != nil { |
| 191 | + return parity.Expected{}, nil, err |
| 192 | + } |
| 193 | + return exp, extFetcher, nil |
| 194 | +} |
| 195 | + |
| 196 | +// resolvePods returns the names of the running pods matching selector, used to |
| 197 | +// enumerate the extension server replicas. |
| 198 | +func resolvePods(ctx context.Context, kubeCtx, namespace, selector string) ([]string, error) { |
| 199 | + args := []string{} |
| 200 | + if kubeCtx != "" { |
| 201 | + args = append(args, "--context", kubeCtx) |
| 202 | + } |
| 203 | + if namespace != "" { |
| 204 | + args = append(args, "-n", namespace) |
| 205 | + } |
| 206 | + args = append(args, "get", "pods", "-l", selector, |
| 207 | + "--field-selector=status.phase=Running", |
| 208 | + "-o", "jsonpath={.items[*].metadata.name}") |
| 209 | + |
| 210 | + cmd := exec.CommandContext(ctx, "kubectl", args...) |
| 211 | + var stderr strings.Builder |
| 212 | + cmd.Stderr = &stderr |
| 213 | + out, err := cmd.Output() |
| 214 | + if err != nil { |
| 215 | + return nil, fmt.Errorf("kubectl get pods -l %s: %w: %s", selector, err, strings.TrimSpace(stderr.String())) |
| 216 | + } |
| 217 | + return strings.Fields(string(out)), nil |
| 218 | +} |
| 219 | + |
| 220 | +// buildFetcher returns a direct HTTP fetcher when a URL is given, otherwise a |
| 221 | +// port-forward fetcher to the given pod. admin selects which remote port to |
| 222 | +// reach. Port-forwarding is used rather than running a command inside the pod |
| 223 | +// because the containers ship without a shell or HTTP client. |
| 224 | +func buildFetcher(url string, et execTarget, admin bool) (parity.Fetcher, error) { |
| 225 | + switch { |
| 226 | + case url != "": |
| 227 | + return parity.HTTPFetcher{BaseURL: url}, nil |
| 228 | + case et.set(): |
| 229 | + port := "8080" |
| 230 | + if admin { |
| 231 | + port = "19000" |
| 232 | + } |
| 233 | + return &pfFetcher{target: et, remotePort: port}, nil |
| 234 | + default: |
| 235 | + return nil, fmt.Errorf("either a URL or an exec pod must be provided") |
| 236 | + } |
| 237 | +} |
| 238 | + |
| 239 | +// pfFetcher fetches a path by forwarding a local port to the pod and making the |
| 240 | +// request from this process. The forward is set up and torn down per call, on a |
| 241 | +// fresh local port each time so concurrent fetches don't collide. |
| 242 | +type pfFetcher struct { |
| 243 | + target execTarget |
| 244 | + remotePort string |
| 245 | +} |
| 246 | + |
| 247 | +func (p *pfFetcher) Fetch(ctx context.Context, path string) ([]byte, error) { |
| 248 | + localPort, err := freeLocalPort() |
| 249 | + if err != nil { |
| 250 | + return nil, fmt.Errorf("allocate local port: %w", err) |
| 251 | + } |
| 252 | + |
| 253 | + args := []string{} |
| 254 | + if p.target.kubeCtx != "" { |
| 255 | + args = append(args, "--context", p.target.kubeCtx) |
| 256 | + } |
| 257 | + if p.target.namespace != "" { |
| 258 | + args = append(args, "-n", p.target.namespace) |
| 259 | + } |
| 260 | + args = append(args, "port-forward", "pod/"+p.target.pod, |
| 261 | + fmt.Sprintf("%d:%s", localPort, p.remotePort)) |
| 262 | + |
| 263 | + // The forward must outlive the single request but be torn down right after. |
| 264 | + pfCtx, cancel := context.WithCancel(ctx) |
| 265 | + defer cancel() |
| 266 | + cmd := exec.CommandContext(pfCtx, "kubectl", args...) |
| 267 | + stdout, err := cmd.StdoutPipe() |
| 268 | + if err != nil { |
| 269 | + return nil, err |
| 270 | + } |
| 271 | + var stderr strings.Builder |
| 272 | + cmd.Stderr = &stderr |
| 273 | + if err := cmd.Start(); err != nil { |
| 274 | + return nil, fmt.Errorf("start port-forward: %w", err) |
| 275 | + } |
| 276 | + defer func() { cancel(); _ = cmd.Wait() }() |
| 277 | + |
| 278 | + // Wait until the forward is listening before fetching. |
| 279 | + if err := waitForwardReady(pfCtx, stdout); err != nil { |
| 280 | + return nil, fmt.Errorf("port-forward not ready: %w (%s)", err, strings.TrimSpace(stderr.String())) |
| 281 | + } |
| 282 | + |
| 283 | + url := fmt.Sprintf("http://127.0.0.1:%d%s", localPort, path) |
| 284 | + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
| 285 | + if err != nil { |
| 286 | + return nil, err |
| 287 | + } |
| 288 | + resp, err := http.DefaultClient.Do(req) |
| 289 | + if err != nil { |
| 290 | + return nil, fmt.Errorf("GET %s via port-forward: %w", url, err) |
| 291 | + } |
| 292 | + defer func() { _ = resp.Body.Close() }() |
| 293 | + body, err := io.ReadAll(resp.Body) |
| 294 | + if err != nil { |
| 295 | + return nil, fmt.Errorf("read %s: %w", url, err) |
| 296 | + } |
| 297 | + if resp.StatusCode/100 != 2 { |
| 298 | + return nil, fmt.Errorf("GET %s: status %d", url, resp.StatusCode) |
| 299 | + } |
| 300 | + return body, nil |
| 301 | +} |
| 302 | + |
| 303 | +// freeLocalPort asks the operating system for an unused port. Another process |
| 304 | +// could claim it in the brief gap before we bind, but that window is small and |
| 305 | +// the caller retries. |
| 306 | +func freeLocalPort() (int, error) { |
| 307 | + l, err := net.Listen("tcp", "127.0.0.1:0") |
| 308 | + if err != nil { |
| 309 | + return 0, err |
| 310 | + } |
| 311 | + defer func() { _ = l.Close() }() |
| 312 | + return l.Addr().(*net.TCPAddr).Port, nil |
| 313 | +} |
| 314 | + |
| 315 | +// waitForwardReady blocks until the forward reports that its local socket is |
| 316 | +// listening, or the context is done. |
| 317 | +func waitForwardReady(ctx context.Context, stdout io.Reader) error { |
| 318 | + ready := make(chan struct{}, 1) |
| 319 | + go func() { |
| 320 | + sc := bufio.NewScanner(stdout) |
| 321 | + for sc.Scan() { |
| 322 | + if strings.Contains(sc.Text(), "Forwarding from") { |
| 323 | + ready <- struct{}{} |
| 324 | + return |
| 325 | + } |
| 326 | + } |
| 327 | + }() |
| 328 | + select { |
| 329 | + case <-ready: |
| 330 | + return nil |
| 331 | + case <-ctx.Done(): |
| 332 | + return ctx.Err() |
| 333 | + case <-time.After(15 * time.Second): |
| 334 | + return fmt.Errorf("timed out waiting for port-forward") |
| 335 | + } |
| 336 | +} |
0 commit comments