Skip to content

Commit 460c064

Browse files
committed
create metadata service
1 parent 86eb953 commit 460c064

4 files changed

Lines changed: 225 additions & 195 deletions

File tree

cmd/deployment-tracker/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
"time"
1414

1515
"github.com/github/deployment-tracker/internal/controller"
16-
"k8s.io/client-go/metadata"
16+
"github.com/github/deployment-tracker/internal/metadata"
17+
k8smetadata "k8s.io/client-go/metadata"
1718

1819
"github.com/prometheus/client_golang/prometheus/promhttp"
1920
"k8s.io/client-go/kubernetes"
@@ -114,13 +115,16 @@ func main() {
114115
}
115116

116117
// Create metadata client
117-
metadataClient, err := metadata.NewForConfig(k8sCfg)
118+
metadataClient, err := k8smetadata.NewForConfig(k8sCfg)
118119
if err != nil {
119120
slog.Error("Error creating Kubernetes metadata client",
120121
"error", err)
121122
os.Exit(1)
122123
}
123124

125+
// Create MetadataService
126+
metadataService := metadata.NewMetadataService(metadataClient)
127+
124128
// Start the metrics server
125129
var promSrv = &http.Server{
126130
Addr: ":" + metricsPort,
@@ -160,7 +164,7 @@ func main() {
160164
cancel()
161165
}()
162166

163-
cntrl, err := controller.New(clientset, metadataClient, namespace, excludeNamespaces, &cntrlCfg)
167+
cntrl, err := controller.New(clientset, metadataService, namespace, excludeNamespaces, &cntrlCfg)
164168
if err != nil {
165169
slog.Error("Failed to create controller",
166170
"error", err)

internal/controller/controller.go

Lines changed: 14 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@ import (
88
"slices"
99
"strings"
1010
"time"
11-
"unicode/utf8"
1211

12+
"github.com/github/deployment-tracker/internal/metadata"
1313
"github.com/github/deployment-tracker/pkg/deploymentrecord"
1414
"github.com/github/deployment-tracker/pkg/dtmetrics"
1515
"github.com/github/deployment-tracker/pkg/ociutil"
16-
"k8s.io/apimachinery/pkg/runtime/schema"
17-
"k8s.io/apimachinery/pkg/types"
1816
amcache "k8s.io/apimachinery/pkg/util/cache"
19-
"k8s.io/client-go/metadata"
2017

2118
corev1 "k8s.io/api/core/v1"
2219
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -34,14 +31,6 @@ const (
3431
EventCreated = "CREATED"
3532
// EventDeleted indicates that a pod has been deleted.
3633
EventDeleted = "DELETED"
37-
// MetadataAnnotationPrefix is the annotation key prefix for deployment record metadata like runtime risk and tags.
38-
MetadataAnnotationPrefix = "metadata.github.com/"
39-
// RuntimeRisksAnnotationKey is the tag key for runtime risks. Comes after MetadataAnnotationPrefix.
40-
RuntimeRisksAnnotationKey = "runtime-risks"
41-
// MaxCustomTags is the maximum number of custom tags per deployment record.
42-
MaxCustomTags = 5
43-
// MaxCustomTagLength is the maximum length for a custom tag key or value.
44-
MaxCustomTagLength = 100
4534
)
4635

4736
type ttlCache interface {
@@ -57,28 +46,22 @@ type PodEvent struct {
5746
DeletedPod *corev1.Pod // Only populated for delete events
5847
}
5948

60-
// AggregatePodMetadata represents combined metadata for a pod and its ownership hierarchy.
61-
type AggregatePodMetadata struct {
62-
RuntimeRisks map[deploymentrecord.RuntimeRisk]bool
63-
Tags map[string]string
64-
}
65-
6649
// Controller is the Kubernetes controller for tracking deployments.
6750
type Controller struct {
68-
clientset kubernetes.Interface
69-
metadataClient metadata.Interface
70-
podInformer cache.SharedIndexInformer
71-
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
72-
apiClient *deploymentrecord.Client
73-
cfg *Config
51+
clientset kubernetes.Interface
52+
metadataService *metadata.MetadataService
53+
podInformer cache.SharedIndexInformer
54+
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
55+
apiClient *deploymentrecord.Client
56+
cfg *Config
7457
// best effort cache to avoid redundant posts
7558
// post requests are idempotent, so if this cache fails due to
7659
// restarts or other events, nothing will break.
7760
observedDeployments ttlCache
7861
}
7962

8063
// New creates a new deployment tracker controller.
81-
func New(clientset kubernetes.Interface, metadataClient metadata.Interface, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
64+
func New(clientset kubernetes.Interface, metadataService *metadata.MetadataService, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
8265
// Create informer factory
8366
factory := createInformerFactory(clientset, namespace, excludeNamespaces)
8467

@@ -111,7 +94,7 @@ func New(clientset kubernetes.Interface, metadataClient metadata.Interface, name
11194

11295
cntrl := &Controller{
11396
clientset: clientset,
114-
metadataClient: metadataClient,
97+
metadataService: metadataService,
11598
podInformer: podInformer,
11699
workqueue: queue,
117100
apiClient: apiClient,
@@ -363,9 +346,9 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
363346
var lastErr error
364347

365348
// Gather aggregate metadata for adds/updates
366-
var aggPodMetadata *AggregatePodMetadata
349+
var aggPodMetadata *metadata.AggregatePodMetadata
367350
if status != deploymentrecord.StatusDecommissioned {
368-
aggPodMetadata = c.aggregateMetadata(ctx, podToPartialMetadata(pod))
351+
aggPodMetadata = c.metadataService.AggregatePodMetadata(ctx, podToPartialMetadata(pod))
369352
}
370353

371354
// Record info for each container in the pod
@@ -405,7 +388,7 @@ func (c *Controller) deploymentExists(ctx context.Context, namespace, name strin
405388
}
406389

407390
// recordContainer records a single container's deployment info.
408-
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *AggregatePodMetadata) error {
391+
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error {
409392
var cacheKey string
410393

411394
dn := getARDeploymentName(pod, container, c.cfg.Template)
@@ -521,7 +504,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
521504
case deploymentrecord.StatusDecommissioned:
522505
cacheKey = getCacheKey(EventDeleted, dn, digest)
523506
c.observedDeployments.Set(cacheKey, true, 2*time.Minute)
524-
// If there was a previous created event, remove that
507+
// If there was a previous create event, remove that
525508
cacheKey = getCacheKey(EventCreated, dn, digest)
526509
c.observedDeployments.Delete(cacheKey)
527510
default:
@@ -531,95 +514,6 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
531514
return nil
532515
}
533516

534-
// aggregateMetadata returns aggregated metadata for a pod and its owners.
535-
func (c *Controller) aggregateMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata {
536-
aggMetadata := &AggregatePodMetadata{
537-
RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool),
538-
Tags: make(map[string]string),
539-
}
540-
queue := []*metav1.PartialObjectMetadata{obj}
541-
visited := make(map[types.UID]bool)
542-
543-
for len(queue) > 0 {
544-
current := queue[0]
545-
queue = queue[1:]
546-
547-
if visited[current.GetUID()] {
548-
slog.Warn("Already visited object, skipping to avoid cycles",
549-
"UID", current.GetUID(),
550-
"name", current.GetName(),
551-
)
552-
continue
553-
}
554-
visited[current.GetUID()] = true
555-
556-
extractMetadataFromObject(current, aggMetadata)
557-
c.addOwnersToQueue(ctx, current, &queue)
558-
}
559-
560-
return aggMetadata
561-
}
562-
563-
// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing
564-
// to collect their metadata.
565-
func (c *Controller) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) {
566-
ownerRefs := current.GetOwnerReferences()
567-
568-
for _, owner := range ownerRefs {
569-
ownerObj, err := c.getOwnerMetadata(ctx, current.GetNamespace(), owner)
570-
if err != nil {
571-
slog.Warn("Failed to get owner object for metadata collection",
572-
"namespace", current.GetNamespace(),
573-
"owner_kind", owner.Kind,
574-
"owner_name", owner.Name,
575-
"error", err,
576-
)
577-
continue
578-
}
579-
580-
if ownerObj == nil {
581-
continue
582-
}
583-
584-
*queue = append(*queue, ownerObj)
585-
}
586-
}
587-
588-
// getOwnerMetadata retrieves partial object metadata for an owner ref.
589-
func (c *Controller) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) {
590-
gvr := schema.GroupVersionResource{
591-
Group: "apps",
592-
Version: "v1",
593-
}
594-
595-
switch owner.Kind {
596-
case "ReplicaSet":
597-
gvr.Resource = "replicasets"
598-
case "Deployment":
599-
gvr.Resource = "deployments"
600-
default:
601-
slog.Debug("Unsupported owner kind for runtime risk collection",
602-
"kind", owner.Kind,
603-
"name", owner.Name,
604-
)
605-
return nil, nil
606-
}
607-
608-
obj, err := c.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
609-
if err != nil {
610-
if k8serrors.IsNotFound(err) {
611-
slog.Debug("Owner object not found for metadata collection",
612-
"namespace", namespace,
613-
"owner_kind", owner.Kind,
614-
"owner_name", owner.Name,
615-
)
616-
return nil, nil
617-
}
618-
return nil, err
619-
}
620-
return obj, nil
621-
}
622-
623517
func getCacheKey(ev, dn, digest string) string {
624518
return ev + "||" + dn + "||" + digest
625519
}
@@ -676,7 +570,7 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc
676570

677571
// getARDeploymentName converts the pod's metadata into the correct format
678572
// for the deployment name for the artifact registry (this is not the same
679-
// as the K8s deployment's name!
573+
// as the K8s deployment's name!)
680574
// The deployment name must unique within logical, physical environment and
681575
// the cluster.
682576
func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string {
@@ -728,78 +622,6 @@ func getDeploymentName(pod *corev1.Pod) string {
728622
return ""
729623
}
730624

731-
// extractMetadataFromObject extracts metadata from an object.
732-
func extractMetadataFromObject(obj *metav1.PartialObjectMetadata, aggPodMetadata *AggregatePodMetadata) {
733-
annotations := obj.GetAnnotations()
734-
735-
// Extract runtime risks
736-
if risks, exists := annotations[MetadataAnnotationPrefix+RuntimeRisksAnnotationKey]; exists {
737-
for _, risk := range strings.Split(risks, ",") {
738-
r := deploymentrecord.ValidateRuntimeRisk(risk)
739-
if r != "" {
740-
aggPodMetadata.RuntimeRisks[r] = true
741-
}
742-
}
743-
}
744-
745-
// Extract tags by sorted keys to ensure tags are deterministic
746-
// if over the limit and some are dropped, the same ones will be dropped each time.
747-
keys := make([]string, 0, len(annotations))
748-
for key := range annotations {
749-
keys = append(keys, key)
750-
}
751-
slices.Sort(keys)
752-
753-
for _, key := range keys {
754-
if len(aggPodMetadata.Tags) >= MaxCustomTags {
755-
break
756-
}
757-
758-
if strings.HasPrefix(key, MetadataAnnotationPrefix) {
759-
tagKey := strings.TrimPrefix(key, MetadataAnnotationPrefix)
760-
tagValue := annotations[key]
761-
762-
if RuntimeRisksAnnotationKey == tagKey {
763-
// ignore runtime risks for custom tags
764-
continue
765-
}
766-
if utf8.RuneCountInString(tagKey) > MaxCustomTagLength || utf8.RuneCountInString(tagValue) > MaxCustomTagLength {
767-
slog.Warn("Tag key or value exceeds max length, skipping",
768-
"object_name", obj.GetName(),
769-
"kind", obj.Kind,
770-
"tag_key", tagKey,
771-
"tag_value", tagValue,
772-
"key_length", utf8.RuneCountInString(tagKey),
773-
"value_length", utf8.RuneCountInString(tagValue),
774-
"max_length", MaxCustomTagLength,
775-
)
776-
continue
777-
}
778-
if tagKey == "" {
779-
slog.Warn("Tag key is empty, skipping",
780-
"object_name", obj.GetName(),
781-
"kind", obj.Kind,
782-
"annotation", key,
783-
"tag_key", tagKey,
784-
"tag_value", tagValue,
785-
)
786-
continue
787-
}
788-
if _, exists := aggPodMetadata.Tags[tagKey]; exists {
789-
slog.Debug("Duplicate tag key found, skipping",
790-
"object_name", obj.GetName(),
791-
"kind", obj.Kind,
792-
"tag_key", tagKey,
793-
"existing_value", aggPodMetadata.Tags[tagKey],
794-
"new_value", tagValue,
795-
)
796-
continue
797-
}
798-
aggPodMetadata.Tags[tagKey] = tagValue
799-
}
800-
}
801-
}
802-
803625
func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata {
804626
return &metav1.PartialObjectMetadata{
805627
TypeMeta: metav1.TypeMeta{

0 commit comments

Comments
 (0)