Skip to content

Commit 054483e

Browse files
committed
opamp-bridge: add standalone mode for ConfigMaps
Introduces a new standalone mode that allows managing collector config from a remote server without requiring Operator CRDs. Adds the standalone client, a plain collector instance type, Kubernetes RBAC and deployment manifests, and a --mode flag to select between operator (default) and standalone at startup.
1 parent 2af2231 commit 054483e

22 files changed

Lines changed: 1439 additions & 66 deletions

.chloggen/standalone-mode.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: opamp-bridge
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: OpAMP Bridge standalone mode
9+
10+
# One or more tracking issues related to the change
11+
issues: [4913]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Standalone mode for OpAMP Bridge allows users to manage collector configuration from a remote
18+
OpAMP server without the need to deploy full Otel Operator.

Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,26 @@ deploy: install-gateway-api-crds set-image-controller
333333
undeploy: set-image-controller
334334
$(KUSTOMIZE) build config/default | kubectl delete --ignore-not-found=$(ignore-not-found) -f -
335335

336+
##@ Standalone OpAMP Bridge (no operator / CRDs required)
337+
338+
# Deploy the standalone OpAMP bridge into the current Kubernetes context.
339+
# Does not require the operator, CRDs, or cert-manager.
340+
.PHONY: deploy-standalone-bridge
341+
deploy-standalone-bridge: kustomize
342+
cd config/standalone-bridge && $(KUSTOMIZE) edit set image operator-opamp-bridge=${OPERATOROPAMPBRIDGE_IMG}
343+
$(KUSTOMIZE) build config/standalone-bridge | kubectl apply -f -
344+
kubectl rollout status deployment/otel-opamp-bridge-standalone -n opentelemetry-opamp-bridge --timeout=120s
345+
346+
# Undeploy the standalone OpAMP bridge from the current Kubernetes context.
347+
.PHONY: undeploy-standalone-bridge
348+
undeploy-standalone-bridge: kustomize
349+
$(KUSTOMIZE) build config/standalone-bridge | kubectl delete --ignore-not-found=true -f -
350+
351+
# Build, load, and deploy the standalone bridge to a kind cluster.
352+
# Assumes a kind cluster is already running (use start-kind first).
353+
.PHONY: deploy-standalone-bridge-kind
354+
deploy-standalone-bridge-kind: load-image-operator-opamp-bridge deploy-standalone-bridge
355+
336356
# Generates the released manifests
337357
.PHONY: release-artifacts
338358
release-artifacts: set-image-controller

cmd/operator-opamp-bridge/internal/agent/agent.go

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
"github.com/open-telemetry/opamp-go/client/types"
1818
"github.com/open-telemetry/opamp-go/protobufs"
1919
"k8s.io/utils/clock"
20-
"sigs.k8s.io/yaml"
2120

22-
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
2321
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/config"
2422
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/metrics"
2523
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator"
@@ -118,21 +116,20 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
118116
proxiesUsed := make(map[uuid.UUID]struct{}, len(agentsByHostName))
119117
for _, col := range cols {
120118
key := newKubeResourceKey(col.GetNamespace(), col.GetName())
121-
podMap, err := agent.generateCollectorHealth(agent.getCollectorSelector(col), col.GetNamespace())
119+
podMap, err := agent.generateCollectorHealth(col.GetSelectorLabels(), col.GetNamespace())
122120
if err != nil {
123121
return nil, err
124122
}
125123
isPoolHealthy := true
126124
for podName, pod := range podMap {
127-
// we need to remove the prefix here as we don't have the namespace from the hostname.
128125
podNameWithoutNamespace := strings.TrimPrefix(podName, col.GetNamespace()+"/")
129126
isPoolHealthy = isPoolHealthy && pod.Healthy
130127
if uid, ok := agentsByHostName[podNameWithoutNamespace]; ok {
131128
podMap[podName].ComponentHealthMap[uid.String()] = proxyHealth[uid]
132129
proxiesUsed[uid] = struct{}{}
133130
}
134131
}
135-
podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time)
132+
podStartTime, err := timeToUnixNanoUnsigned(col.GetCreationTimestamp())
136133
if err != nil {
137134
return nil, err
138135
}
@@ -143,7 +140,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
143140
healthMap[key.String()] = &protobufs.ComponentHealth{
144141
StartTimeUnixNano: podStartTime,
145142
StatusTimeUnixNano: statusTime,
146-
Status: col.Status.Scale.StatusReplicas,
143+
Status: col.GetStatusReplicas(),
147144
ComponentHealthMap: podMap,
148145
Healthy: isPoolHealthy,
149146
}
@@ -157,28 +154,6 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
157154
return healthMap, nil
158155
}
159156

160-
// getCollectorSelector destructures the collectors scale selector if present, it uses the labelmap from the operator.
161-
func (*Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map[string]string {
162-
if col.Status.Scale.Selector != "" {
163-
selMap := map[string]string{}
164-
for kvPair := range strings.SplitSeq(col.Status.Scale.Selector, ",") {
165-
kv := strings.Split(kvPair, "=")
166-
// skip malformed pairs
167-
if len(kv) != 2 {
168-
continue
169-
}
170-
selMap[kv[0]] = kv[1]
171-
}
172-
return selMap
173-
}
174-
return map[string]string{
175-
"app.kubernetes.io/managed-by": "opentelemetry-operator",
176-
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", col.GetNamespace(), col.GetName()),
177-
"app.kubernetes.io/part-of": "opentelemetry",
178-
"app.kubernetes.io/component": "opentelemetry-collector",
179-
}
180-
}
181-
182157
func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
183158
statusTime, err := agent.getCurrentTimeUnixNano()
184159
if err != nil {
@@ -348,15 +323,14 @@ func (agent *Agent) getEffectiveConfig(context.Context) (*protobufs.EffectiveCon
348323
}
349324
instanceMap := map[string]*protobufs.AgentConfigFile{}
350325
for _, instance := range instances {
351-
col := instance
352-
marshaled, err := yaml.Marshal(&col)
353-
if err != nil {
354-
agent.logger.Error(err, "failed to marhsal config")
355-
return nil, err
326+
body := instance.GetEffectiveConfig()
327+
if body == nil {
328+
agent.logger.Error(errors.New("nil effective config"), "failed to get effective config",
329+
"name", instance.GetName(), "namespace", instance.GetNamespace())
330+
continue
356331
}
357-
mapKey := newKubeResourceKey(instance.GetNamespace(), instance.GetName())
358-
instanceMap[mapKey.String()] = &protobufs.AgentConfigFile{
359-
Body: marshaled,
332+
instanceMap[instance.GetConfigMapKey()] = &protobufs.AgentConfigFile{
333+
Body: body,
360334
ContentType: "yaml",
361335
}
362336
}
@@ -390,11 +364,11 @@ func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) {
390364

391365
// applyRemoteConfig receives a remote configuration from a remote server of the following form:
392366
//
393-
// map[name/namespace] -> collector CRD spec
367+
// map[resource key] -> AgentConfigFile body
394368
//
395-
// For every key in the received remote configuration, the agent attempts to apply it to the connected
396-
// Kubernetes cluster. If an agent fails to apply a collector CRD, it will continue to the next entry. The agent will
397-
// store the received configuration hash regardless of application status as per the OpAMP spec.
369+
// For every key in the received remote configuration, the agent attempts to apply it via the configured
370+
// applier. If an entry fails to apply, the agent continues to the next entry. The agent stores the
371+
// received configuration hash regardless of application status, as per the OpAMP spec.
398372
//
399373
// INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls.
400374
func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) {
@@ -409,6 +383,10 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
409383
errs = append(errs, err)
410384
continue
411385
}
386+
if err = agent.validateConfigMapKey(colKey); err != nil {
387+
errs = append(errs, err)
388+
continue
389+
}
412390
err = agent.applier.Apply(colKey.name, colKey.namespace, file)
413391
if err != nil {
414392
errs = append(errs, err)
@@ -417,9 +395,9 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
417395
agent.appliedKeys[colKey] = true
418396
}
419397
// Check if anything was deleted
420-
for collectorKey := range agent.appliedKeys {
421-
if _, ok := config.Config.GetConfigMap()[collectorKey.String()]; !ok {
422-
err := agent.applier.Delete(collectorKey.name, collectorKey.namespace)
398+
for key := range agent.appliedKeys {
399+
if _, ok := config.Config.GetConfigMap()[key.String()]; !ok {
400+
err := agent.applier.Delete(key.name, key.namespace)
423401
if err != nil {
424402
errs = append(errs, err)
425403
}
@@ -440,6 +418,19 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*pro
440418
}, nil
441419
}
442420

421+
func (agent *Agent) validateConfigMapKey(key kubeResourceKey) error {
422+
if agent.config.IsStandaloneMode() {
423+
if key.kind != "configmap" {
424+
return errors.New("standalone config key must use configmap kind")
425+
}
426+
return nil
427+
}
428+
if key.kind != "" {
429+
return errors.New("operator config key must not include kind")
430+
}
431+
return nil
432+
}
433+
443434
// Shutdown will stop the OpAMP client gracefully.
444435
func (agent *Agent) Shutdown() {
445436
agent.logger.V(3).Info("Agent shutting down...")

cmd/operator-opamp-bridge/internal/agent/kube_resource_key.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,33 @@ import (
1212
type kubeResourceKey struct {
1313
name string
1414
namespace string
15+
kind string
1516
}
1617

1718
func newKubeResourceKey(namespace, name string) kubeResourceKey {
1819
return kubeResourceKey{name: name, namespace: namespace}
1920
}
2021

22+
func newKubeResourceKeyWithKind(namespace, name, kind string) kubeResourceKey {
23+
return kubeResourceKey{name: name, namespace: namespace, kind: kind}
24+
}
25+
2126
func kubeResourceFromKey(key string) (kubeResourceKey, error) {
2227
s := strings.Split(key, "/")
23-
// We expect map keys to be of the form name/namespace
24-
if len(s) != 2 {
28+
// We expect map keys to be of the form namespace/name or kind/namespace/name.
29+
switch len(s) {
30+
case 2:
31+
return newKubeResourceKey(s[0], s[1]), nil
32+
case 3:
33+
return newKubeResourceKeyWithKind(s[1], s[2], s[0]), nil
34+
default:
2535
return kubeResourceKey{}, errors.New("invalid key")
2636
}
27-
return newKubeResourceKey(s[0], s[1]), nil
2837
}
2938

3039
func (k kubeResourceKey) String() string {
40+
if k.kind != "" {
41+
return fmt.Sprintf("%s/%s/%s", k.kind, k.namespace, k.name)
42+
}
3143
return fmt.Sprintf("%s/%s", k.namespace, k.name)
3244
}

cmd/operator-opamp-bridge/internal/agent/kube_resource_key_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ func Test_collectorKeyFromKey(t *testing.T) {
3131
},
3232
wantErr: assert.NoError,
3333
},
34+
{
35+
name: "with kind",
36+
args: args{
37+
key: "configmap/namespace/good",
38+
},
39+
want: kubeResourceKey{
40+
name: "good",
41+
namespace: "namespace",
42+
kind: "configmap",
43+
},
44+
wantErr: assert.NoError,
45+
},
3446
{
3547
name: "unable to get key",
3648
args: args{
@@ -42,7 +54,7 @@ func Test_collectorKeyFromKey(t *testing.T) {
4254
{
4355
name: "too many slashes",
4456
args: args{
45-
key: "too/many/slashes",
57+
key: "too/many/slashes/here",
4658
},
4759
want: kubeResourceKey{},
4860
wantErr: assert.Error,
@@ -63,6 +75,7 @@ func Test_collectorKey_String(t *testing.T) {
6375
type fields struct {
6476
name string
6577
namespace string
78+
kind string
6679
}
6780
tests := []struct {
6881
name string
@@ -77,10 +90,22 @@ func Test_collectorKey_String(t *testing.T) {
7790
},
7891
want: "namespace/good",
7992
},
93+
{
94+
name: "can make a key with kind",
95+
fields: fields{
96+
name: "good",
97+
namespace: "namespace",
98+
kind: "configmap",
99+
},
100+
want: "configmap/namespace/good",
101+
},
80102
}
81103
for _, tt := range tests {
82104
t.Run(tt.name, func(t *testing.T) {
83105
k := newKubeResourceKey(tt.fields.namespace, tt.fields.name)
106+
if tt.fields.kind != "" {
107+
k = newKubeResourceKeyWithKind(tt.fields.namespace, tt.fields.name, tt.fields.kind)
108+
}
84109
assert.Equalf(t, tt.want, k.String(), "String()")
85110
})
86111
}

cmd/operator-opamp-bridge/internal/config/config.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ type Config struct {
9191
HeartbeatInterval time.Duration `yaml:"heartbeatInterval,omitempty"`
9292
Name string `yaml:"name,omitempty"`
9393
AgentDescription AgentDescription `yaml:"description,omitempty"`
94+
95+
// Mode selects the operating mode: "operator" (default) uses OpenTelemetryCollector CRDs,
96+
// "standalone" discovers Deployments/DaemonSets by label and manages their ConfigMaps.
97+
Mode string `yaml:"mode,omitempty"`
9498
}
9599

96100
// AgentDescription is copied from the OpAMP Extension in the collector.
@@ -225,15 +229,25 @@ func (c *Config) RemoteConfigEnabled() bool {
225229
}
226230

227231
func (c *Config) GetKubernetesClient() (client.Client, error) {
228-
err := schemeBuilder.AddToScheme(scheme.Scheme)
229-
if err != nil {
230-
return nil, err
232+
if c.Mode != "standalone" {
233+
err := schemeBuilder.AddToScheme(scheme.Scheme)
234+
if err != nil {
235+
return nil, err
236+
}
231237
}
232238
return client.New(c.ClusterConfig, client.Options{
233239
Scheme: scheme.Scheme,
234240
})
235241
}
236242

243+
func (c *Config) IsStandaloneMode() bool {
244+
return c.Mode == "standalone"
245+
}
246+
247+
func (c *Config) GetRestConfig() *rest.Config {
248+
return c.ClusterConfig
249+
}
250+
237251
func Load(logger logr.Logger, args []string) (*Config, error) {
238252
flagSet := GetFlagSet(pflag.ExitOnError)
239253
err := flagSet.Parse(args)
@@ -300,6 +314,11 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error {
300314
} else if changed {
301315
target.Name = name
302316
}
317+
if mode, changed, err := getMode(flagSet); err != nil {
318+
return err
319+
} else if changed {
320+
target.Mode = mode
321+
}
303322
return nil
304323
}
305324

cmd/operator-opamp-bridge/internal/config/flags.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ const (
2424
kubeConfigPathFlagName = "kubeconfig-path"
2525
heartbeatIntervalFlagName = "heartbeat-interval"
2626
nameFlagName = "name"
27+
modeFlagName = "mode"
2728
defaultHeartbeatInterval = 30 * time.Second
29+
defaultMode = "operator"
2830
)
2931

3032
var defaultKubeConfigPath = filepath.Join(homedir.HomeDir(), ".kube", "config")
@@ -39,6 +41,7 @@ func GetFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet {
3941
flagSet.String(kubeConfigPathFlagName, defaultKubeConfigPath, "absolute path to the KubeconfigPath file.")
4042
flagSet.Duration(heartbeatIntervalFlagName, defaultHeartbeatInterval, "The interval to use for sending a heartbeat. Setting it to 0 disables the heartbeat.")
4143
flagSet.String(nameFlagName, opampBridgeName, "The name of the bridge to use for querying managed collectors.")
44+
flagSet.String(modeFlagName, defaultMode, `Operating mode: "operator" (default, uses CRDs) or "standalone" (manages ConfigMaps for Deployments/DaemonSets).`)
4245
zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling))
4346
zapCmdLineOpts.BindFlags(zapFlagSet)
4447
flagSet.AddGoFlagSet(zapFlagSet)
@@ -65,6 +68,10 @@ func getListenAddr(flagSet *pflag.FlagSet) (value string, changed bool, err erro
6568
return getFlagValueAndChanged[string](flagSet, listenAddrFlagName)
6669
}
6770

71+
func getMode(flagSet *pflag.FlagSet) (value string, changed bool, err error) {
72+
return getFlagValueAndChanged[string](flagSet, modeFlagName)
73+
}
74+
6875
func getFlagValueAndChanged[T any](flagSet *pflag.FlagSet, flagName string) (value T, changed bool, err error) {
6976
var zero T
7077
if changed = flagSet.Changed(flagName); !changed {

0 commit comments

Comments
 (0)