Skip to content

Commit 8d5b304

Browse files
authored
Add metadata package (#42)
* create metadata service * rename to aggregator, add testing * address comments
1 parent 86eb953 commit 8d5b304

File tree

4 files changed

+612
-195
lines changed

4 files changed

+612
-195
lines changed

cmd/deployment-tracker/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
"time"
1414

1515
"github.com/github/deployment-tracker/internal/controller"
16-
"k8s.io/client-go/metadata"
16+
"github.com/github/deployment-tracker/internal/metadata"
17+
k8smetadata "k8s.io/client-go/metadata"
1718

1819
"github.com/prometheus/client_golang/prometheus/promhttp"
1920
"k8s.io/client-go/kubernetes"
@@ -114,13 +115,16 @@ func main() {
114115
}
115116

116117
// Create metadata client
117-
metadataClient, err := metadata.NewForConfig(k8sCfg)
118+
metadataClient, err := k8smetadata.NewForConfig(k8sCfg)
118119
if err != nil {
119120
slog.Error("Error creating Kubernetes metadata client",
120121
"error", err)
121122
os.Exit(1)
122123
}
123124

125+
// Create metadata aggregator
126+
metadataAggregator := metadata.NewAggregator(metadataClient)
127+
124128
// Start the metrics server
125129
var promSrv = &http.Server{
126130
Addr: ":" + metricsPort,
@@ -160,7 +164,7 @@ func main() {
160164
cancel()
161165
}()
162166

163-
cntrl, err := controller.New(clientset, metadataClient, namespace, excludeNamespaces, &cntrlCfg)
167+
cntrl, err := controller.New(clientset, metadataAggregator, namespace, excludeNamespaces, &cntrlCfg)
164168
if err != nil {
165169
slog.Error("Failed to create controller",
166170
"error", err)

internal/controller/controller.go

Lines changed: 18 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@ import (
88
"slices"
99
"strings"
1010
"time"
11-
"unicode/utf8"
1211

12+
"github.com/github/deployment-tracker/internal/metadata"
1313
"github.com/github/deployment-tracker/pkg/deploymentrecord"
1414
"github.com/github/deployment-tracker/pkg/dtmetrics"
1515
"github.com/github/deployment-tracker/pkg/ociutil"
16-
"k8s.io/apimachinery/pkg/runtime/schema"
17-
"k8s.io/apimachinery/pkg/types"
1816
amcache "k8s.io/apimachinery/pkg/util/cache"
19-
"k8s.io/client-go/metadata"
2017

2118
corev1 "k8s.io/api/core/v1"
2219
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -34,14 +31,6 @@ const (
3431
EventCreated = "CREATED"
3532
// EventDeleted indicates that a pod has been deleted.
3633
EventDeleted = "DELETED"
37-
// MetadataAnnotationPrefix is the annotation key prefix for deployment record metadata like runtime risk and tags.
38-
MetadataAnnotationPrefix = "metadata.github.com/"
39-
// RuntimeRisksAnnotationKey is the tag key for runtime risks. Comes after MetadataAnnotationPrefix.
40-
RuntimeRisksAnnotationKey = "runtime-risks"
41-
// MaxCustomTags is the maximum number of custom tags per deployment record.
42-
MaxCustomTags = 5
43-
// MaxCustomTagLength is the maximum length for a custom tag key or value.
44-
MaxCustomTagLength = 100
4534
)
4635

4736
type ttlCache interface {
@@ -50,35 +39,33 @@ type ttlCache interface {
5039
Delete(k any)
5140
}
5241

42+
type podMetadataAggregator interface {
43+
BuildAggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata
44+
}
45+
5346
// PodEvent represents a pod event to be processed.
5447
type PodEvent struct {
5548
Key string
5649
EventType string
5750
DeletedPod *corev1.Pod // Only populated for delete events
5851
}
5952

60-
// AggregatePodMetadata represents combined metadata for a pod and its ownership hierarchy.
61-
type AggregatePodMetadata struct {
62-
RuntimeRisks map[deploymentrecord.RuntimeRisk]bool
63-
Tags map[string]string
64-
}
65-
6653
// Controller is the Kubernetes controller for tracking deployments.
6754
type Controller struct {
68-
clientset kubernetes.Interface
69-
metadataClient metadata.Interface
70-
podInformer cache.SharedIndexInformer
71-
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
72-
apiClient *deploymentrecord.Client
73-
cfg *Config
55+
clientset kubernetes.Interface
56+
metadataAggregator podMetadataAggregator
57+
podInformer cache.SharedIndexInformer
58+
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
59+
apiClient *deploymentrecord.Client
60+
cfg *Config
7461
// best effort cache to avoid redundant posts
7562
// post requests are idempotent, so if this cache fails due to
7663
// restarts or other events, nothing will break.
7764
observedDeployments ttlCache
7865
}
7966

8067
// New creates a new deployment tracker controller.
81-
func New(clientset kubernetes.Interface, metadataClient metadata.Interface, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
68+
func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
8269
// Create informer factory
8370
factory := createInformerFactory(clientset, namespace, excludeNamespaces)
8471

@@ -111,7 +98,7 @@ func New(clientset kubernetes.Interface, metadataClient metadata.Interface, name
11198

11299
cntrl := &Controller{
113100
clientset: clientset,
114-
metadataClient: metadataClient,
101+
metadataAggregator: metadataAggregator,
115102
podInformer: podInformer,
116103
workqueue: queue,
117104
apiClient: apiClient,
@@ -363,9 +350,9 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
363350
var lastErr error
364351

365352
// Gather aggregate metadata for adds/updates
366-
var aggPodMetadata *AggregatePodMetadata
353+
var aggPodMetadata *metadata.AggregatePodMetadata
367354
if status != deploymentrecord.StatusDecommissioned {
368-
aggPodMetadata = c.aggregateMetadata(ctx, podToPartialMetadata(pod))
355+
aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod))
369356
}
370357

371358
// Record info for each container in the pod
@@ -405,7 +392,7 @@ func (c *Controller) deploymentExists(ctx context.Context, namespace, name strin
405392
}
406393

407394
// recordContainer records a single container's deployment info.
408-
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *AggregatePodMetadata) error {
395+
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error {
409396
var cacheKey string
410397

411398
dn := getARDeploymentName(pod, container, c.cfg.Template)
@@ -521,7 +508,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
521508
case deploymentrecord.StatusDecommissioned:
522509
cacheKey = getCacheKey(EventDeleted, dn, digest)
523510
c.observedDeployments.Set(cacheKey, true, 2*time.Minute)
524-
// If there was a previous created event, remove that
511+
// If there was a previous create event, remove that
525512
cacheKey = getCacheKey(EventCreated, dn, digest)
526513
c.observedDeployments.Delete(cacheKey)
527514
default:
@@ -531,95 +518,6 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
531518
return nil
532519
}
533520

534-
// aggregateMetadata returns aggregated metadata for a pod and its owners.
535-
func (c *Controller) aggregateMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata {
536-
aggMetadata := &AggregatePodMetadata{
537-
RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool),
538-
Tags: make(map[string]string),
539-
}
540-
queue := []*metav1.PartialObjectMetadata{obj}
541-
visited := make(map[types.UID]bool)
542-
543-
for len(queue) > 0 {
544-
current := queue[0]
545-
queue = queue[1:]
546-
547-
if visited[current.GetUID()] {
548-
slog.Warn("Already visited object, skipping to avoid cycles",
549-
"UID", current.GetUID(),
550-
"name", current.GetName(),
551-
)
552-
continue
553-
}
554-
visited[current.GetUID()] = true
555-
556-
extractMetadataFromObject(current, aggMetadata)
557-
c.addOwnersToQueue(ctx, current, &queue)
558-
}
559-
560-
return aggMetadata
561-
}
562-
563-
// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing
564-
// to collect their metadata.
565-
func (c *Controller) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) {
566-
ownerRefs := current.GetOwnerReferences()
567-
568-
for _, owner := range ownerRefs {
569-
ownerObj, err := c.getOwnerMetadata(ctx, current.GetNamespace(), owner)
570-
if err != nil {
571-
slog.Warn("Failed to get owner object for metadata collection",
572-
"namespace", current.GetNamespace(),
573-
"owner_kind", owner.Kind,
574-
"owner_name", owner.Name,
575-
"error", err,
576-
)
577-
continue
578-
}
579-
580-
if ownerObj == nil {
581-
continue
582-
}
583-
584-
*queue = append(*queue, ownerObj)
585-
}
586-
}
587-
588-
// getOwnerMetadata retrieves partial object metadata for an owner ref.
589-
func (c *Controller) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) {
590-
gvr := schema.GroupVersionResource{
591-
Group: "apps",
592-
Version: "v1",
593-
}
594-
595-
switch owner.Kind {
596-
case "ReplicaSet":
597-
gvr.Resource = "replicasets"
598-
case "Deployment":
599-
gvr.Resource = "deployments"
600-
default:
601-
slog.Debug("Unsupported owner kind for runtime risk collection",
602-
"kind", owner.Kind,
603-
"name", owner.Name,
604-
)
605-
return nil, nil
606-
}
607-
608-
obj, err := c.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
609-
if err != nil {
610-
if k8serrors.IsNotFound(err) {
611-
slog.Debug("Owner object not found for metadata collection",
612-
"namespace", namespace,
613-
"owner_kind", owner.Kind,
614-
"owner_name", owner.Name,
615-
)
616-
return nil, nil
617-
}
618-
return nil, err
619-
}
620-
return obj, nil
621-
}
622-
623521
func getCacheKey(ev, dn, digest string) string {
624522
return ev + "||" + dn + "||" + digest
625523
}
@@ -676,7 +574,7 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc
676574

677575
// getARDeploymentName converts the pod's metadata into the correct format
678576
// for the deployment name for the artifact registry (this is not the same
679-
// as the K8s deployment's name!
577+
// as the K8s deployment's name!)
680578
// The deployment name must unique within logical, physical environment and
681579
// the cluster.
682580
func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string {
@@ -728,78 +626,6 @@ func getDeploymentName(pod *corev1.Pod) string {
728626
return ""
729627
}
730628

731-
// extractMetadataFromObject extracts metadata from an object.
732-
func extractMetadataFromObject(obj *metav1.PartialObjectMetadata, aggPodMetadata *AggregatePodMetadata) {
733-
annotations := obj.GetAnnotations()
734-
735-
// Extract runtime risks
736-
if risks, exists := annotations[MetadataAnnotationPrefix+RuntimeRisksAnnotationKey]; exists {
737-
for _, risk := range strings.Split(risks, ",") {
738-
r := deploymentrecord.ValidateRuntimeRisk(risk)
739-
if r != "" {
740-
aggPodMetadata.RuntimeRisks[r] = true
741-
}
742-
}
743-
}
744-
745-
// Extract tags by sorted keys to ensure tags are deterministic
746-
// if over the limit and some are dropped, the same ones will be dropped each time.
747-
keys := make([]string, 0, len(annotations))
748-
for key := range annotations {
749-
keys = append(keys, key)
750-
}
751-
slices.Sort(keys)
752-
753-
for _, key := range keys {
754-
if len(aggPodMetadata.Tags) >= MaxCustomTags {
755-
break
756-
}
757-
758-
if strings.HasPrefix(key, MetadataAnnotationPrefix) {
759-
tagKey := strings.TrimPrefix(key, MetadataAnnotationPrefix)
760-
tagValue := annotations[key]
761-
762-
if RuntimeRisksAnnotationKey == tagKey {
763-
// ignore runtime risks for custom tags
764-
continue
765-
}
766-
if utf8.RuneCountInString(tagKey) > MaxCustomTagLength || utf8.RuneCountInString(tagValue) > MaxCustomTagLength {
767-
slog.Warn("Tag key or value exceeds max length, skipping",
768-
"object_name", obj.GetName(),
769-
"kind", obj.Kind,
770-
"tag_key", tagKey,
771-
"tag_value", tagValue,
772-
"key_length", utf8.RuneCountInString(tagKey),
773-
"value_length", utf8.RuneCountInString(tagValue),
774-
"max_length", MaxCustomTagLength,
775-
)
776-
continue
777-
}
778-
if tagKey == "" {
779-
slog.Warn("Tag key is empty, skipping",
780-
"object_name", obj.GetName(),
781-
"kind", obj.Kind,
782-
"annotation", key,
783-
"tag_key", tagKey,
784-
"tag_value", tagValue,
785-
)
786-
continue
787-
}
788-
if _, exists := aggPodMetadata.Tags[tagKey]; exists {
789-
slog.Debug("Duplicate tag key found, skipping",
790-
"object_name", obj.GetName(),
791-
"kind", obj.Kind,
792-
"tag_key", tagKey,
793-
"existing_value", aggPodMetadata.Tags[tagKey],
794-
"new_value", tagValue,
795-
)
796-
continue
797-
}
798-
aggPodMetadata.Tags[tagKey] = tagValue
799-
}
800-
}
801-
}
802-
803629
func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata {
804630
return &metav1.PartialObjectMetadata{
805631
TypeMeta: metav1.TypeMeta{

0 commit comments

Comments
 (0)