Skip to content

Commit 3259be0

Browse files
committed
opamp-bridge: add standalone mode for ConfigMaps
Introduces a new standalone mode that allows managing collector config from a remote server without requiring Operator CRDs. Adds the standalone client, a plain collector instance type, Kubernetes RBAC and deployment manifests, and a --mode flag to select between operator (default) and standalone at startup.
1 parent 2b0c71f commit 3259be0

20 files changed

Lines changed: 1359 additions & 57 deletions

File tree

.chloggen/standalone-mode.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: opamp-bridge
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: OpAMP Bridge standalone mode
9+
10+
# One or more tracking issues related to the change
11+
issues: [4913]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Standalone mode for OpAMP Bridge allows users to manage collector configuration from a remote
18+
OpAMP server without the need to deploy full Otel Operator.

Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,26 @@ deploy: install-gateway-api-crds set-image-controller
333333
undeploy: set-image-controller
334334
$(KUSTOMIZE) build config/default | kubectl delete --ignore-not-found=$(ignore-not-found) -f -
335335

336+
##@ Standalone OpAMP Bridge (no operator / CRDs required)
337+
338+
# Deploy the standalone OpAMP bridge into the current Kubernetes context.
339+
# Does not require the operator, CRDs, or cert-manager.
340+
.PHONY: deploy-standalone-bridge
341+
deploy-standalone-bridge: kustomize
342+
cd config/standalone-bridge && $(KUSTOMIZE) edit set image operator-opamp-bridge=${OPERATOROPAMPBRIDGE_IMG}
343+
$(KUSTOMIZE) build config/standalone-bridge | kubectl apply -f -
344+
kubectl rollout status deployment/otel-opamp-bridge-standalone -n opentelemetry-opamp-bridge --timeout=120s
345+
346+
# Undeploy the standalone OpAMP bridge from the current Kubernetes context.
347+
.PHONY: undeploy-standalone-bridge
348+
undeploy-standalone-bridge: kustomize
349+
$(KUSTOMIZE) build config/standalone-bridge | kubectl delete --ignore-not-found=true -f -
350+
351+
# Build, load, and deploy the standalone bridge to a kind cluster.
352+
# Assumes a kind cluster is already running (use start-kind first).
353+
.PHONY: deploy-standalone-bridge-kind
354+
deploy-standalone-bridge-kind: load-image-operator-opamp-bridge deploy-standalone-bridge
355+
336356
# Generates the released manifests
337357
.PHONY: release-artifacts
338358
release-artifacts: set-image-controller

cmd/operator-opamp-bridge/internal/agent/agent.go

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
"github.com/open-telemetry/opamp-go/client/types"
1818
"github.com/open-telemetry/opamp-go/protobufs"
1919
"k8s.io/utils/clock"
20-
"sigs.k8s.io/yaml"
2120

22-
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
2321
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config"
2422
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/metrics"
2523
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator"
@@ -118,21 +116,20 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
118116
proxiesUsed := make(map[uuid.UUID]struct{}, len(agentsByHostName))
119117
for _, col := range cols {
120118
key := newKubeResourceKey(col.GetNamespace(), col.GetName())
121-
podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace())
119+
podMap, err := agent.generateCollectorHealth(col.GetSelectorLabels(), col.GetNamespace())
122120
if err != nil {
123121
return nil, err
124122
}
125123
isPoolHealthy := true
126124
for podName, pod := range podMap {
127-
// we need to remove the prefix here as we don't have the namespace from the hostname.
128125
podNameWithoutNamespace := strings.TrimPrefix(podName, col.GetNamespace()+"/")
129126
isPoolHealthy = isPoolHealthy && pod.Healthy
130127
if uid, ok := agentsByHostName[podNameWithoutNamespace]; ok {
131128
podMap[podName].ComponentHealthMap[uid.String()] = proxyHealth[uid]
132129
proxiesUsed[uid] = struct{}{}
133130
}
134131
}
135-
podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time)
132+
podStartTime, err := timeToUnixNanoUnsigned(col.GetCreationTimestamp())
136133
if err != nil {
137134
return nil, err
138135
}
@@ -143,7 +140,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
143140
healthMap[key.String()] = &protobufs.ComponentHealth{
144141
StartTimeUnixNano: podStartTime,
145142
StatusTimeUnixNano: statusTime,
146-
Status: col.Status.Scale.StatusReplicas,
143+
Status: col.GetStatusReplicas(),
147144
ComponentHealthMap: podMap,
148145
Healthy: isPoolHealthy,
149146
}
@@ -157,28 +154,6 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
157154
return healthMap, nil
158155
}
159156

160-
// getCollectorSelector destructures the collectors scale selector if present, it uses the labelmap from the operator.
161-
func (*Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map[string]string {
162-
if col.Status.Scale.Selector != "" {
163-
selMap := map[string]string{}
164-
for kvPair := range strings.SplitSeq(col.Status.Scale.Selector, ",") {
165-
kv := strings.Split(kvPair, "=")
166-
// skip malformed pairs
167-
if len(kv) != 2 {
168-
continue
169-
}
170-
selMap[kv[0]] = kv[1]
171-
}
172-
return selMap
173-
}
174-
return map[string]string{
175-
"app.kubernetes.io/managed-by": "opentelemetry-operator",
176-
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()),
177-
"app.kubernetes.io/part-of": "opentelemetry",
178-
"app.kubernetes.io/component": "opentelemetry-collector",
179-
}
180-
}
181-
182157
func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
183158
statusTime, err := agent.getCurrentTimeUnixNano()
184159
if err != nil {
@@ -348,15 +323,15 @@ func (agent *Agent) getEffectiveConfig(context.Context) (*protobufs.EffectiveCon
348323
}
349324
instanceMap := map[string]*protobufs.AgentConfigFile{}
350325
for _, instance := range instances {
351-
col := instance
352-
marshaled, err := yaml.Marshal(&col)
353-
if err != nil {
354-
agent.logger.Error(err, "failed to marhsal config")
355-
return nil, err
326+
body := instance.GetEffectiveConfig()
327+
if body == nil {
328+
agent.logger.Error(fmt.Errorf("nil effective config"), "failed to get effective config",
329+
"name", instance.GetName(), "namespace", instance.GetNamespace())
330+
continue
356331
}
357332
mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName())
358333
instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{
359-
Body: marshaled,
334+
Body: body,
360335
ContentType: "yaml",
361336
}
362337
}
@@ -390,11 +365,11 @@ func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) {
390365

391366
// applyRemoteConfig receives a remote configuration from a remote server of the following form:
392367
//
393-
// map[name/namespace] -> collector CRD spec
368+
// map[name/namespace] -> AgentConfigFile body
394369
//
395-
// For every key in the received remote configuration, the agent attempts to apply it to the connected
396-
// Kubernetes cluster. If an agent fails to apply a collector CRD, it will continue to the next entry. The agent will
397-
// store the received configuration hash regardless of application status as per the OpAMP spec.
370+
// For every key in the received remote configuration, the agent attempts to apply it via the configured
371+
// applier. If an entry fails to apply, the agent continues to the next entry. The agent stores the
372+
// received configuration hash regardless of application status, as per the OpAMP spec.
398373
//
399374
// INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls.
400375
func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) {

cmd/operator-opamp-bridge/internal/config/config.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ type Config struct {
9191
HeartbeatInterval time.Duration `yaml:"heartbeatInterval,omitempty"`
9292
Name string `yaml:"name,omitempty"`
9393
AgentDescription AgentDescription `yaml:"description,omitempty"`
94+
95+
// Mode selects the operating mode: "operator" (default) uses OpenTelemetryCollector CRDs,
96+
// "standalone" discovers Deployments/DaemonSets by label and manages their ConfigMaps.
97+
Mode string `yaml:"mode,omitempty"`
9498
}
9599

96100
// AgentDescription is copied from the OpAMP Extension in the collector.
@@ -225,15 +229,25 @@ func (c *Config) RemoteConfigEnabled() bool {
225229
}
226230

227231
func (c *Config) GetKubernetesClient() (client.Client, error) {
228-
err := schemeBuilder.AddToScheme(scheme.Scheme)
229-
if err != nil {
230-
return nil, err
232+
if c.Mode != "standalone" {
233+
err := schemeBuilder.AddToScheme(scheme.Scheme)
234+
if err != nil {
235+
return nil, err
236+
}
231237
}
232238
return client.New(c.ClusterConfig, client.Options{
233239
Scheme: scheme.Scheme,
234240
})
235241
}
236242

243+
func (c *Config) IsStandaloneMode() bool {
244+
return c.Mode == "standalone"
245+
}
246+
247+
func (c *Config) GetRestConfig() *rest.Config {
248+
return c.ClusterConfig
249+
}
250+
237251
func Load(logger logr.Logger, args []string) (*Config, error) {
238252
flagSet := GetFlagSet(pflag.ExitOnError)
239253
err := flagSet.Parse(args)
@@ -300,6 +314,11 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error {
300314
} else if changed {
301315
target.Name = name
302316
}
317+
if mode, changed, err := getMode(flagSet); err != nil {
318+
return err
319+
} else if changed {
320+
target.Mode = mode
321+
}
303322
return nil
304323
}
305324

cmd/operator-opamp-bridge/internal/config/flags.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ const (
2424
kubeConfigPathFlagName = "kubeconfig-path"
2525
heartbeatIntervalFlagName = "heartbeat-interval"
2626
nameFlagName = "name"
27+
modeFlagName = "mode"
2728
defaultHeartbeatInterval = 30 * time.Second
29+
defaultMode = "operator"
2830
)
2931

3032
var defaultKubeConfigPath = filepath.Join(homedir.HomeDir(), ".kube", "config")
@@ -39,6 +41,7 @@ func GetFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet {
3941
flagSet.String(kubeConfigPathFlagName, defaultKubeConfigPath, "absolute path to the KubeconfigPath file.")
4042
flagSet.Duration(heartbeatIntervalFlagName, defaultHeartbeatInterval, "The interval to use for sending a heartbeat. Setting it to 0 disables the heartbeat.")
4143
flagSet.String(nameFlagName, opampBridgeName, "The name of the bridge to use for querying managed collectors.")
44+
flagSet.String(modeFlagName, defaultMode, `Operating mode: "operator" (default, uses CRDs) or "standalone" (manages ConfigMaps for Deployments/DaemonSets).`)
4245
zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling))
4346
zapCmdLineOpts.BindFlags(zapFlagSet)
4447
flagSet.AddGoFlagSet(zapFlagSet)
@@ -65,6 +68,10 @@ func getListenAddr(flagSet *pflag.FlagSet) (value string, changed bool, err erro
6568
return getFlagValueAndChanged[string](flagSet, listenAddrFlagName)
6669
}
6770

71+
func getMode(flagSet *pflag.FlagSet) (value string, changed bool, err error) {
72+
return getFlagValueAndChanged[string](flagSet, modeFlagName)
73+
}
74+
6875
func getFlagValueAndChanged[T any](flagSet *pflag.FlagSet, flagName string) (value T, changed bool, err error) {
6976
var zero T
7077
if changed = flagSet.Changed(flagName); !changed {

cmd/operator-opamp-bridge/internal/operator/client.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,14 @@ const (
2929
)
3030

3131
type ConfigApplier interface {
32-
// Apply receives a name and namespace to apply an OpenTelemetryCollector CRD that is contained in the configmap.
32+
// Apply receives a name and namespace to apply a collector configuration.
3333
Apply(name, namespace string, configmap *protobufs.AgentConfigFile) error
3434

35-
// Delete attempts to delete an OpenTelemetryCollector object given a name and namespace.
35+
// Delete attempts to delete a collector resource given a name and namespace.
3636
Delete(name, namespace string) error
3737

38-
// ListInstances retrieves all OpenTelemetryCollector CRDs created by the operator-opamp-bridge agent.
39-
ListInstances() ([]v1beta1.OpenTelemetryCollector, error)
40-
41-
// GetInstance retrieves an OpenTelemetryCollector CRD given a name and namespace.
42-
GetInstance(name, namespace string) (*v1beta1.OpenTelemetryCollector, error)
38+
// ListInstances retrieves all collector instances managed by the bridge.
39+
ListInstances() ([]CollectorInstance, error)
4340

4441
// GetCollectorPods retrieves all pods that match the given collector's selector labels and namespace.
4542
GetCollectorPods(selectorLabels map[string]string, namespace string) (*v1.PodList, error)
@@ -207,7 +204,19 @@ func (c Client) Delete(name, namespace string) error {
207204
return c.k8sClient.Delete(ctx, &result)
208205
}
209206

210-
func (c Client) ListInstances() ([]v1beta1.OpenTelemetryCollector, error) {
207+
func (c Client) ListInstances() ([]CollectorInstance, error) {
208+
rawInstances, err := c.listRawInstances()
209+
if err != nil {
210+
return nil, err
211+
}
212+
result := make([]CollectorInstance, len(rawInstances))
213+
for i := range rawInstances {
214+
result[i] = CRDInstance{Col: rawInstances[i]}
215+
}
216+
return result, nil
217+
}
218+
219+
func (c Client) listRawInstances() ([]v1beta1.OpenTelemetryCollector, error) {
211220
ctx := context.Background()
212221

213222
var instances []v1beta1.OpenTelemetryCollector

cmd/operator-opamp-bridge/internal/operator/client_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,12 @@ func TestClient_ApplyUpdate(t *testing.T) {
246246
allInstances, err = c.ListInstances()
247247
require.NoError(t, err, "Should be able to list all collectors")
248248
assert.Len(t, allInstances, 2)
249-
assert.Contains(t, allInstances, reportingCol)
250-
assert.Contains(t, allInstances, *updatedInstance)
249+
instanceNames := make([]string, len(allInstances))
250+
for i, inst := range allInstances {
251+
instanceNames[i] = inst.GetNamespace() + "/" + inst.GetName()
252+
}
253+
assert.Contains(t, instanceNames, reportingCol.GetNamespace()+"/"+reportingCol.GetName())
254+
assert.Contains(t, instanceNames, updatedInstance.GetNamespace()+"/"+updatedInstance.GetName())
251255
}
252256

253257
func TestClient_Delete(t *testing.T) {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package operator
5+
6+
import (
7+
"fmt"
8+
"strings"
9+
"time"
10+
11+
"sigs.k8s.io/yaml"
12+
13+
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
14+
)
15+
16+
var _ CollectorInstance = CRDInstance{}
17+
18+
// CRDInstance wraps an OpenTelemetryCollector CRD to implement CollectorInstance.
19+
type CRDInstance struct {
20+
Col v1beta1.OpenTelemetryCollector
21+
}
22+
23+
func (c CRDInstance) GetName() string {
24+
return c.Col.GetName()
25+
}
26+
27+
func (c CRDInstance) GetNamespace() string {
28+
return c.Col.GetNamespace()
29+
}
30+
31+
func (c CRDInstance) GetCreationTimestamp() time.Time {
32+
return c.Col.GetCreationTimestamp().Time
33+
}
34+
35+
func (c CRDInstance) GetSelectorLabels() map[string]string {
36+
if c.Col.Status.Scale.Selector != "" {
37+
selMap := map[string]string{}
38+
for kvPair := range strings.SplitSeq(c.Col.Status.Scale.Selector, ",") {
39+
kv := strings.Split(kvPair, "=")
40+
if len(kv) != 2 {
41+
continue
42+
}
43+
selMap[kv[0]] = kv[1]
44+
}
45+
return selMap
46+
}
47+
return map[string]string{
48+
"app.kubernetes.io/managed-by": "opentelemetry-operator",
49+
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", c.Col.GetNamespace(), c.Col.GetName()),
50+
"app.kubernetes.io/part-of": "opentelemetry",
51+
"app.kubernetes.io/component": "opentelemetry-collector",
52+
}
53+
}
54+
55+
func (c CRDInstance) GetStatusReplicas() string {
56+
return c.Col.Status.Scale.StatusReplicas
57+
}
58+
59+
func (c CRDInstance) GetEffectiveConfig() []byte {
60+
marshaled, err := yaml.Marshal(&c.Col)
61+
if err != nil {
62+
return nil
63+
}
64+
return marshaled
65+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package operator
5+
6+
import "time"
7+
8+
// CollectorInstance represents a collector managed by the bridge, abstracting
9+
// the underlying Kubernetes resource (CRD or Deployment/DaemonSet).
10+
type CollectorInstance interface {
11+
GetName() string
12+
GetNamespace() string
13+
GetCreationTimestamp() time.Time
14+
GetSelectorLabels() map[string]string
15+
GetStatusReplicas() string
16+
GetEffectiveConfig() []byte
17+
}

0 commit comments

Comments
 (0)