Skip to content

Commit 50b5f7b

Browse files
committed
wip
1 parent 6b5203f commit 50b5f7b

5 files changed

Lines changed: 112 additions & 112 deletions

File tree

cmd/operator-opamp-bridge/internal/config/flags.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626
nameFlagName = "name"
2727
modeFlagName = "mode"
2828
defaultHeartbeatInterval = 30 * time.Second
29+
defaultMode = "operator"
2930
)
3031

3132
var defaultKubeConfigPath = filepath.Join(homedir.HomeDir(), ".kube", "config")
@@ -40,7 +41,7 @@ func GetFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet {
4041
flagSet.String(kubeConfigPathFlagName, defaultKubeConfigPath, "absolute path to the KubeconfigPath file.")
4142
flagSet.Duration(heartbeatIntervalFlagName, defaultHeartbeatInterval, "The interval to use for sending a heartbeat. Setting it to 0 disables the heartbeat.")
4243
flagSet.String(nameFlagName, opampBridgeName, "The name of the bridge to use for querying managed collectors.")
43-
flagSet.String(modeFlagName, "", `Operating mode: "operator" (default, uses CRDs) or "standalone" (manages ConfigMaps for Deployments/DaemonSets).`)
44+
flagSet.String(modeFlagName, defaultMode, `Operating mode: "operator" (default, uses CRDs) or "standalone" (manages ConfigMaps for Deployments/DaemonSets).`)
4445
zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling))
4546
zapCmdLineOpts.BindFlags(zapFlagSet)
4647
flagSet.AddGoFlagSet(zapFlagSet)

cmd/operator-opamp-bridge/internal/workload/client.go

Lines changed: 57 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
v1 "k8s.io/api/core/v1"
1515
"k8s.io/apimachinery/pkg/api/errors"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/labels"
18+
"k8s.io/apimachinery/pkg/selection"
1719
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/yaml"
1821

1922
"github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/internal/operator"
2023
)
@@ -24,23 +27,16 @@ const (
2427
managedByValue = "opamp-bridge"
2528
restartAnnotation = "kubectl.kubernetes.io/restartedAt"
2629

27-
// ManagedLabelKey is the label that opts a Deployment or DaemonSet into standalone OpAMP ConfigMap management.
28-
// Workloads must carry this label with value "true" to be discovered by the bridge.
30+
// ManagedLabelKey opts a Deployment or DaemonSet into standalone OpAMP ConfigMap management.
31+
// The label value must be the name of the ConfigMap to manage (e.g. "otel-agent-config").
2932
ManagedLabelKey = "opentelemetry.io/opamp-standalone-configmap"
30-
31-
// AnnotationConfigMapName is a required annotation specifying the ConfigMap the bridge writes collector config into.
32-
AnnotationConfigMapName = "opentelemetry.io/opamp-standalone-configmap-name"
33-
34-
// AnnotationConfigMapKey is an optional annotation specifying the key within the ConfigMap.
35-
// Defaults to the workload name if not set.
36-
AnnotationConfigMapKey = "opentelemetry.io/opamp-standalone-configmap-key"
3733
)
3834

3935
var _ operator.ConfigApplier = &Client{}
4036

4137
// Client implements operator.ConfigApplier for standalone Deployment/DaemonSet
42-
// workloads. It discovers targets dynamically via the ManagedLabelKey label and
43-
// reads ConfigMap details from annotations on the workload.
38+
// workloads. It discovers targets via the ManagedLabelKey label whose value is
39+
// the name of the ConfigMap to manage.
4440
type Client struct {
4541
log logr.Logger
4642
k8sClient client.Client
@@ -55,39 +51,28 @@ func NewClient(name string, log logr.Logger, c client.Client) *Client {
5551
}
5652
}
5753

58-
// resolveTarget looks up a Deployment or DaemonSet by name/namespace and returns
59-
// its ConfigMap name and key from annotations.
60-
func (c *Client) resolveTarget(ctx context.Context, name, namespace string) (configMapName, configMapKey string, workloadKind string, workloadName string, err error) {
61-
// Try Deployment first
54+
// resolveTarget looks up a Deployment or DaemonSet by name/namespace and reads
55+
// the target ConfigMap name from the ManagedLabelKey label value.
56+
func (c *Client) resolveTarget(ctx context.Context, name, namespace string) (configMapName, workloadKind, workloadName string, err error) {
6257
deploy := &appsv1.Deployment{}
6358
if getErr := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, deploy); getErr == nil {
64-
if deploy.Labels[ManagedLabelKey] != "true" {
65-
return "", "", "", "", fmt.Errorf("workload %s/%s does not have label %s=true", namespace, name, ManagedLabelKey)
59+
cmName := deploy.Labels[ManagedLabelKey]
60+
if cmName == "" {
61+
return "", "", "", fmt.Errorf("workload %s/%s has label %s but its value (ConfigMap name) is empty", namespace, name, ManagedLabelKey)
6662
}
67-
cmName, cmKey := configMapDetailsFromAnnotations(deploy.Annotations, name)
68-
return cmName, cmKey, "Deployment", name, nil
63+
return cmName, "Deployment", name, nil
6964
}
7065

71-
// Try DaemonSet
7266
ds := &appsv1.DaemonSet{}
7367
if getErr := c.k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, ds); getErr == nil {
74-
if ds.Labels[ManagedLabelKey] != "true" {
75-
return "", "", "", "", fmt.Errorf("workload %s/%s does not have label %s=true", namespace, name, ManagedLabelKey)
68+
cmName := ds.Labels[ManagedLabelKey]
69+
if cmName == "" {
70+
return "", "", "", fmt.Errorf("workload %s/%s has label %s but its value (ConfigMap name) is empty", namespace, name, ManagedLabelKey)
7671
}
77-
cmName, cmKey := configMapDetailsFromAnnotations(ds.Annotations, name)
78-
return cmName, cmKey, "DaemonSet", name, nil
72+
return cmName, "DaemonSet", name, nil
7973
}
8074

81-
return "", "", "", "", fmt.Errorf("no managed Deployment or DaemonSet named %s found in namespace %s", name, namespace)
82-
}
83-
84-
func configMapDetailsFromAnnotations(annotations map[string]string, defaultKey string) (cmName, cmKey string) {
85-
cmName = annotations[AnnotationConfigMapName]
86-
cmKey = annotations[AnnotationConfigMapKey]
87-
if cmKey == "" {
88-
cmKey = defaultKey
89-
}
90-
return
75+
return "", "", "", fmt.Errorf("no managed Deployment or DaemonSet named %s found in namespace %s", name, namespace)
9176
}
9277

9378
func (c *Client) Apply(name, namespace string, configFile *protobufs.AgentConfigFile) error {
@@ -97,12 +82,14 @@ func (c *Client) Apply(name, namespace string, configFile *protobufs.AgentConfig
9782

9883
ctx := context.Background()
9984

100-
configMapName, configMapKey, workloadKind, workloadName, err := c.resolveTarget(ctx, name, namespace)
85+
configMapName, workloadKind, workloadName, err := c.resolveTarget(ctx, name, namespace)
10186
if err != nil {
10287
return err
10388
}
104-
if configMapName == "" {
105-
return fmt.Errorf("workload %s/%s is missing annotation %s", namespace, name, AnnotationConfigMapName)
89+
90+
var newData map[string]string
91+
if err := yaml.Unmarshal(configFile.Body, &newData); err != nil {
92+
return fmt.Errorf("failed to unmarshal config body into ConfigMap data: %w", err)
10693
}
10794

10895
c.log.Info("Applying config", "name", name, "namespace", namespace, "configMap", configMapName)
@@ -122,20 +109,15 @@ func (c *Client) Apply(name, namespace string, configFile *protobufs.AgentConfig
122109
managedByLabel: managedByValue,
123110
},
124111
},
125-
Data: map[string]string{
126-
configMapKey: string(configFile.Body),
127-
},
112+
Data: newData,
128113
}
129114
if createErr := c.k8sClient.Create(ctx, cm); createErr != nil {
130115
return fmt.Errorf("failed to create ConfigMap %s/%s: %w", namespace, configMapName, createErr)
131116
}
132117
} else if err != nil {
133118
return fmt.Errorf("failed to get ConfigMap %s/%s: %w", namespace, configMapName, err)
134119
} else {
135-
if cm.Data == nil {
136-
cm.Data = map[string]string{}
137-
}
138-
cm.Data[configMapKey] = string(configFile.Body)
120+
cm.Data = newData
139121
if updateErr := c.k8sClient.Update(ctx, cm); updateErr != nil {
140122
return fmt.Errorf("failed to update ConfigMap %s/%s: %w", namespace, configMapName, updateErr)
141123
}
@@ -147,14 +129,11 @@ func (c *Client) Apply(name, namespace string, configFile *protobufs.AgentConfig
147129
func (c *Client) Delete(name, namespace string) error {
148130
ctx := context.Background()
149131

150-
configMapName, _, _, _, err := c.resolveTarget(ctx, name, namespace)
132+
configMapName, _, _, err := c.resolveTarget(ctx, name, namespace)
151133
if err != nil {
152134
// Workload no longer exists or isn't managed; nothing to delete.
153135
return nil
154136
}
155-
if configMapName == "" {
156-
return nil
157-
}
158137

159138
cm := &v1.ConfigMap{}
160139
err = c.k8sClient.Get(ctx, client.ObjectKey{
@@ -174,10 +153,15 @@ func (c *Client) ListInstances() ([]operator.CollectorInstance, error) {
174153
ctx := context.Background()
175154
var result []operator.CollectorInstance
176155

177-
managedLabel := client.MatchingLabels{ManagedLabelKey: "true"}
156+
// Match any workload that has the label set to a non-empty value.
157+
labelReq, err := labels.NewRequirement(ManagedLabelKey, selection.Exists, nil)
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to build label requirement: %w", err)
160+
}
161+
managedSelector := client.MatchingLabelsSelector{Selector: labels.NewSelector().Add(*labelReq)}
178162

179163
deployList := &appsv1.DeploymentList{}
180-
if err := c.k8sClient.List(ctx, deployList, managedLabel); err != nil {
164+
if err := c.k8sClient.List(ctx, deployList, managedSelector); err != nil {
181165
return nil, fmt.Errorf("failed to list managed Deployments: %w", err)
182166
}
183167
for i := range deployList.Items {
@@ -190,7 +174,7 @@ func (c *Client) ListInstances() ([]operator.CollectorInstance, error) {
190174
}
191175

192176
dsList := &appsv1.DaemonSetList{}
193-
if err := c.k8sClient.List(ctx, dsList, managedLabel); err != nil {
177+
if err := c.k8sClient.List(ctx, dsList, managedSelector); err != nil {
194178
return nil, fmt.Errorf("failed to list managed DaemonSets: %w", err)
195179
}
196180
for i := range dsList.Items {
@@ -213,21 +197,17 @@ func (c *Client) GetCollectorPods(selectorLabels map[string]string, namespace st
213197
}
214198

215199
func (c *Client) getDeploymentInstance(ctx context.Context, deploy *appsv1.Deployment) (*standaloneCollectorInstance, error) {
216-
cmName, cmKey := configMapDetailsFromAnnotations(deploy.Annotations, deploy.Name)
200+
cmName := deploy.Labels[ManagedLabelKey]
217201
if cmName == "" {
218-
return nil, fmt.Errorf("missing annotation %s on Deployment %s/%s", AnnotationConfigMapName, deploy.Namespace, deploy.Name)
202+
return nil, fmt.Errorf("Deployment %s/%s has label %s but its value (ConfigMap name) is empty", deploy.Namespace, deploy.Name, ManagedLabelKey)
219203
}
220204

221205
var desired int32
222206
if deploy.Spec.Replicas != nil {
223207
desired = *deploy.Spec.Replicas
224208
}
225209

226-
var configBody []byte
227-
cm := &v1.ConfigMap{}
228-
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: cmName, Namespace: deploy.Namespace}, cm); err == nil && cm.Data != nil {
229-
configBody = []byte(cm.Data[cmKey])
230-
}
210+
configBody := c.readConfigMapData(ctx, cmName, deploy.Namespace)
231211

232212
return &standaloneCollectorInstance{
233213
name: deploy.Name,
@@ -240,16 +220,12 @@ func (c *Client) getDeploymentInstance(ctx context.Context, deploy *appsv1.Deplo
240220
}
241221

242222
func (c *Client) getDaemonSetInstance(ctx context.Context, ds *appsv1.DaemonSet) (*standaloneCollectorInstance, error) {
243-
cmName, cmKey := configMapDetailsFromAnnotations(ds.Annotations, ds.Name)
223+
cmName := ds.Labels[ManagedLabelKey]
244224
if cmName == "" {
245-
return nil, fmt.Errorf("missing annotation %s on DaemonSet %s/%s", AnnotationConfigMapName, ds.Namespace, ds.Name)
225+
return nil, fmt.Errorf("DaemonSet %s/%s has label %s but its value (ConfigMap name) is empty", ds.Namespace, ds.Name, ManagedLabelKey)
246226
}
247227

248-
var configBody []byte
249-
cm := &v1.ConfigMap{}
250-
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: cmName, Namespace: ds.Namespace}, cm); err == nil && cm.Data != nil {
251-
configBody = []byte(cm.Data[cmKey])
252-
}
228+
configBody := c.readConfigMapData(ctx, cmName, ds.Namespace)
253229

254230
return &standaloneCollectorInstance{
255231
name: ds.Name,
@@ -261,6 +237,22 @@ func (c *Client) getDaemonSetInstance(ctx context.Context, ds *appsv1.DaemonSet)
261237
}, nil
262238
}
263239

240+
// readConfigMapData fetches a ConfigMap and serializes all its data keys into
241+
// a single YAML document (map[string]string). Returns nil if the ConfigMap
242+
// does not exist or has no data.
243+
func (c *Client) readConfigMapData(ctx context.Context, cmName, namespace string) []byte {
244+
cm := &v1.ConfigMap{}
245+
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: cmName, Namespace: namespace}, cm); err != nil || len(cm.Data) == 0 {
246+
return nil
247+
}
248+
b, err := yaml.Marshal(cm.Data)
249+
if err != nil {
250+
c.log.Error(err, "Failed to marshal ConfigMap data", "configMap", cmName, "namespace", namespace)
251+
return nil
252+
}
253+
return b
254+
}
255+
264256
func (c *Client) triggerRollout(ctx context.Context, kind, name, namespace string) error {
265257
restartVal := time.Now().Format(time.RFC3339)
266258

0 commit comments

Comments
 (0)