Skip to content

Commit 103437d

Browse files
committed
rename to aggregator, add testing
1 parent 460c064 commit 103437d

4 files changed

Lines changed: 408 additions & 21 deletions

File tree

cmd/deployment-tracker/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ func main() {
122122
os.Exit(1)
123123
}
124124

125-
// Create MetadataService
126-
metadataService := metadata.NewMetadataService(metadataClient)
125+
// Create metadata aggregator
126+
metadataAggregator := metadata.NewAggregator(metadataClient)
127127

128128
// Start the metrics server
129129
var promSrv = &http.Server{
@@ -164,7 +164,7 @@ func main() {
164164
cancel()
165165
}()
166166

167-
cntrl, err := controller.New(clientset, metadataService, namespace, excludeNamespaces, &cntrlCfg)
167+
cntrl, err := controller.New(clientset, metadataAggregator, namespace, excludeNamespaces, &cntrlCfg)
168168
if err != nil {
169169
slog.Error("Failed to create controller",
170170
"error", err)

internal/controller/controller.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type ttlCache interface {
3939
Delete(k any)
4040
}
4141

42+
type podMetadataAggregator interface {
43+
AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata
44+
}
45+
4246
// PodEvent represents a pod event to be processed.
4347
type PodEvent struct {
4448
Key string
@@ -48,20 +52,20 @@ type PodEvent struct {
4852

4953
// Controller is the Kubernetes controller for tracking deployments.
5054
type Controller struct {
51-
clientset kubernetes.Interface
52-
metadataService *metadata.MetadataService
53-
podInformer cache.SharedIndexInformer
54-
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
55-
apiClient *deploymentrecord.Client
56-
cfg *Config
55+
clientset kubernetes.Interface
56+
metadataAggregator podMetadataAggregator
57+
podInformer cache.SharedIndexInformer
58+
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
59+
apiClient *deploymentrecord.Client
60+
cfg *Config
5761
// best effort cache to avoid redundant posts
5862
// post requests are idempotent, so if this cache fails due to
5963
// restarts or other events, nothing will break.
6064
observedDeployments ttlCache
6165
}
6266

6367
// New creates a new deployment tracker controller.
64-
func New(clientset kubernetes.Interface, metadataService *metadata.MetadataService, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
68+
func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
6569
// Create informer factory
6670
factory := createInformerFactory(clientset, namespace, excludeNamespaces)
6771

@@ -94,7 +98,7 @@ func New(clientset kubernetes.Interface, metadataService *metadata.MetadataServi
9498

9599
cntrl := &Controller{
96100
clientset: clientset,
97-
metadataService: metadataService,
101+
metadataAggregator: metadataAggregator,
98102
podInformer: podInformer,
99103
workqueue: queue,
100104
apiClient: apiClient,
@@ -348,7 +352,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
348352
// Gather aggregate metadata for adds/updates
349353
var aggPodMetadata *metadata.AggregatePodMetadata
350354
if status != deploymentrecord.StatusDecommissioned {
351-
aggPodMetadata = c.metadataService.AggregatePodMetadata(ctx, podToPartialMetadata(pod))
355+
aggPodMetadata = c.metadataAggregator.AggregatePodMetadata(ctx, podToPartialMetadata(pod))
352356
}
353357

354358
// Record info for each container in the pod

internal/metadata/metadata.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ const (
2626
MaxCustomTagLength = 100
2727
)
2828

29-
type MetadataService struct {
29+
// Aggregator uses the Kubernetes metadata client to aggregate metadata for a pod and its ownership hierarchy.
30+
type Aggregator struct {
3031
metadataClient k8smetadata.Interface
3132
}
3233

@@ -36,13 +37,15 @@ type AggregatePodMetadata struct {
3637
Tags map[string]string
3738
}
3839

39-
func NewMetadataService(metadataClient k8smetadata.Interface) *MetadataService {
40-
return &MetadataService{
40+
// NewAggregator creates a new Aggregator with a Kubernetes metadata client.
41+
func NewAggregator(metadataClient k8smetadata.Interface) *Aggregator {
42+
return &Aggregator{
4143
metadataClient: metadataClient,
4244
}
4345
}
4446

45-
func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata {
47+
// AggregatePodMetadata takes a pod's partial object metadata and traverses its ownership hierarchy to return AggregatePodMetadata.
48+
func (m *Aggregator) AggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata {
4649
aggMetadata := &AggregatePodMetadata{
4750
RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool),
4851
Tags: make(map[string]string),
@@ -64,19 +67,19 @@ func (ms *MetadataService) AggregatePodMetadata(ctx context.Context, obj *metav1
6467
visited[current.GetUID()] = true
6568

6669
extractMetadataFromObject(current, aggMetadata)
67-
ms.addOwnersToQueue(ctx, current, &queue)
70+
m.addOwnersToQueue(ctx, current, &queue)
6871
}
6972

7073
return aggMetadata
7174
}
7275

7376
// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing
7477
// to collect their metadata.
75-
func (ms *MetadataService) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) {
78+
func (m *Aggregator) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) {
7679
ownerRefs := current.GetOwnerReferences()
7780

7881
for _, owner := range ownerRefs {
79-
ownerObj, err := ms.getOwnerMetadata(ctx, current.GetNamespace(), owner)
82+
ownerObj, err := m.getOwnerMetadata(ctx, current.GetNamespace(), owner)
8083
if err != nil {
8184
slog.Warn("Failed to get owner object for metadata collection",
8285
"namespace", current.GetNamespace(),
@@ -96,7 +99,7 @@ func (ms *MetadataService) addOwnersToQueue(ctx context.Context, current *metav1
9699
}
97100

98101
// getOwnerMetadata retrieves partial object metadata for an owner ref.
99-
func (ms *MetadataService) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) {
102+
func (m *Aggregator) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) {
100103
gvr := schema.GroupVersionResource{
101104
Group: "apps",
102105
Version: "v1",
@@ -115,7 +118,7 @@ func (ms *MetadataService) getOwnerMetadata(ctx context.Context, namespace strin
115118
return nil, nil
116119
}
117120

118-
obj, err := ms.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
121+
obj, err := m.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
119122
if err != nil {
120123
if k8serrors.IsNotFound(err) {
121124
slog.Debug("Owner object not found for metadata collection",

0 commit comments

Comments
 (0)