Skip to content

k8s log acquisition datasource#4221

Open
sabban wants to merge 49 commits intomasterfrom
k8s/acquisition
Open

k8s log acquisition datasource#4221
sabban wants to merge 49 commits intomasterfrom
k8s/acquisition

Conversation

@sabban
Copy link
Copy Markdown
Contributor

@sabban sabban commented Jan 22, 2026

k8s log acquisition datasource

@github-actions
Copy link
Copy Markdown

@sabban: There are no 'kind' label on this PR. You need a 'kind' label to generate the release automatically.

  • /kind feature
  • /kind enhancement
  • /kind refactoring
  • /kind fix
  • /kind chore
  • /kind dependencies
Details

I am a bot created to help the crowdsecurity developers manage community feedback and contributions. You can check out my manifest file to understand my behavior and what I can do. If you want to use this for your project, you can check out the BirthdayResearch/oss-governance-bot repository.

@github-actions
Copy link
Copy Markdown

@sabban: There are no area labels on this PR. You can add as many areas as you see fit.

  • /area agent
  • /area local-api
  • /area cscli
  • /area appsec
  • /area security
  • /area configuration
Details

I am a bot created to help the crowdsecurity developers manage community feedback and contributions. You can check out my manifest file to understand my behavior and what I can do. If you want to use this for your project, you can check out the BirthdayResearch/oss-governance-bot repository.

@sabban
Copy link
Copy Markdown
Contributor Author

sabban commented Jan 22, 2026

/kind feature
/area agent

@sabban sabban changed the title k8s log acquisition datasource [wip] k8s log acquisition datasource Jan 22, 2026
@sabban sabban marked this pull request as draft January 22, 2026 13:21
@sabban sabban changed the title [wip] k8s log acquisition datasource k8s log acquisition datasource Jan 22, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 1, 2026

Codecov Report

❌ Patch coverage is 1.53061% with 193 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.88%. Comparing base (013cdea) to head (5c29612).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
pkg/acquisition/modules/kubernetes/run.go 0.00% 130 Missing ⚠️
pkg/acquisition/modules/kubernetes/config.go 0.00% 35 Missing ⚠️
pkg/acquisition/modules/kubernetes/utils.go 0.00% 12 Missing ⚠️
pkg/acquisition/modules/kubernetes/source.go 0.00% 8 Missing ⚠️
pkg/acquisition/modules/kubernetes/metrics.go 0.00% 6 Missing ⚠️
pkg/acquisition/modules/kubernetes/init.go 50.00% 0 Missing and 1 partial ⚠️
pkg/database/flush.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4221      +/-   ##
==========================================
- Coverage   63.77%   62.88%   -0.90%     
==========================================
  Files         455      479      +24     
  Lines       32650    33577     +927     
==========================================
+ Hits        20824    21116     +292     
- Misses       9752    10346     +594     
- Partials     2074     2115      +41     
Flag Coverage Δ
bats 46.42% <1.53%> (-0.30%) ⬇️
unit-linux 35.90% <1.53%> (-0.14%) ⬇️
unit-windows 24.31% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mmetc
Copy link
Copy Markdown
Contributor

mmetc commented Feb 5, 2026

quick 2¢, I didn't follow the logic yet:

  • the package is not imported from pkg/acquisition
  • name of the package/datasource is way too verbose for comfort
  • log.Fatal should just return errors
  • vestigial presence of tomb
  • there is an empty .keep file
  • references to DockerAcquisition, DockerDatasource
  • does one shot acquisition make sense for this DS?
  • the schema file should go in pkg/acquisition/schemas/

@sabban sabban marked this pull request as ready for review February 17, 2026 08:32
@sabban sabban requested a review from mmetc February 17, 2026 09:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new Kubernetes log acquisition datasource, wiring it into the build/component system, metrics, and acquisition config schema so CrowdSec can tail pod logs selected by namespace + label selector.

Changes:

  • Introduces a new kubernetes acquisition module that watches pods via informers and streams container logs into the pipeline.
  • Adds Prometheus acquisition metrics + component/build registrations for the new datasource.
  • Extends acquisition JSON schema to allow source: kubernetes entries and updates Go module dependencies to include client-go.

Reviewed changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
pkg/metrics/acquisition_kubernetes.go Adds a Prometheus counter for Kubernetes datasource lines read.
pkg/database/flush.go Excludes loopback IP agents from the auto-delete query.
pkg/cwversion/component/component.go Registers the new datasource_kubernetes component flag.
pkg/acquisition/schemas/kubernetes.yaml Adds schema for Kubernetes acquisition configuration.
pkg/acquisition/schemas/datasource.yaml References the new Kubernetes schema.
pkg/acquisition/modules/kubernetes/utils.go Builds Kubernetes REST config (in-cluster fallback to kubeconfig).
pkg/acquisition/modules/kubernetes/source.go Implements datasource identity/mode/uuid methods.
pkg/acquisition/modules/kubernetes/run.go Implements informer-based pod tracking and log streaming.
pkg/acquisition/modules/kubernetes/metrics.go Exposes Kubernetes acquisition metrics collectors.
pkg/acquisition/modules/kubernetes/init.go Registers the datasource factory in the acquisition registry.
pkg/acquisition/modules/kubernetes/config.go Adds YAML config parsing/defaults/validation for the datasource.
pkg/acquisition/modules/kubernetes.go Build-tagged blank import to register the datasource.
pkg/acquisition/modules/journalctl/config.go Minor formatting cleanup.
go.mod / go.sum Adds Kubernetes client dependencies.
Makefile Adds datasource_kubernetes to the component list.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

err := fn()
if err != nil {
return err
}
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followPodLogs() never exits when the context is canceled: fn() returns nil when ctx.Err()!=nil, but the outer for-loop immediately calls fn() again, resulting in a tight spin and preventing pod worker goroutines (and Stream shutdown) from completing. Update the loop to break/return when ctx is done (and consider a small backoff when restarting after EOF).

Suggested change
}
}
if ctx.Err() != nil {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second):
}

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +160
sc := bufio.NewScanner(stream)
for sc.Scan() {
if err := ctx.Err(); err != nil {
return nil
}
if err := onLine(sc.Text(), ns+"/"+pod+"/"+container, labels, metricsLevel, out); err != nil {
return err
}
}
if ctx.Err() != nil {
return nil
}
return sc.Err()
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bufio.Scanner over the pod log stream uses the default max token size (~64KiB); Kubernetes log lines can exceed this and will cause scanning to stop with an error. Configure Scanner.Buffer with a higher limit or switch to bufio.Reader to avoid unexpected stream termination on long lines.

Copilot uses AI. Check for mistakes.
Comment on lines +184 to +188
evt := pipeline.MakeEvent(true, pipeline.LOG, true)
evt.Line = l
evt.Process = true
evt.Type = pipeline.LOG
out <- evt
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processLine() creates events with pipeline.MakeEvent(true, ...), which forces TIMEMACHINE mode regardless of the datasource configuration. This should use the configured UseTimeMachine flag (like other datasources do), otherwise live mode behavior will be incorrect.

Copilot uses AI. Check for mistakes.
Comment on lines +170 to +171
//dev/ source := pod.Namespace + "/" + pod.Name + "/" + cont.Name

Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover debug/commented line ("//dev/") should be removed to keep the module clean and avoid confusing readers about intended behavior.

Suggested change
//dev/ source := pod.Namespace + "/" + pod.Name + "/" + cont.Name

Copilot uses AI. Check for mistakes.
Comment on lines +51 to +52
home, _ := os.UserHomeDir()
c.KubeConfigFile = filepath.Join(home, ".kube", "config")
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetDefaults() ignores the error from os.UserHomeDir(); if it fails, kube_config becomes a relative path (".kube/config"), which is surprising and can break out-of-cluster usage. Handle the error explicitly (eg keep kube_config empty, or fall back to a well-defined absolute path).

Suggested change
home, _ := os.UserHomeDir()
c.KubeConfigFile = filepath.Join(home, ".kube", "config")
home, err := os.UserHomeDir()
if err == nil {
c.KubeConfigFile = filepath.Join(home, ".kube", "config")
}

Copilot uses AI. Check for mistakes.
func (c *Configuration) Validate() error {
if c.Selector == "" {
return errors.New("selector must be set in kubernetes acquisition")
}
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate() only checks Selector; it should also reject unsupported modes (eg anything other than tail) to match the schema/expected behavior and to be consistent with other datasources' config validation.

Suggested change
}
}
if c.Mode != configuration.TAIL_MODE {
return fmt.Errorf("unsupported mode %q in kubernetes acquisition, only %q is supported", c.Mode, configuration.TAIL_MODE)
}

Copilot uses AI. Check for mistakes.
Comment on lines +79 to +84
err := s.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}

s.logger = logger
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configure() sets s.logger after UnmarshalConfig(), so UnmarshalConfig() can't emit its trace log even when a logger is provided. Assign s.logger before calling UnmarshalConfig() to keep behavior consistent with other acquisition modules.

Suggested change
err := s.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
s.logger = logger
s.logger = logger
err := s.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +96
auth:
type: object
minProperties: 1
additionalProperties: false
description: >
Inline Kubernetes client authentication overriding kubeconfig defaults.
Mirrors clientcmd/api.Cluster and AuthInfo fields; keys match kubeconfig
file entries (for example server, certificate-authority, client-certificate,
token, exec, auth-provider, ...).
properties:
cluster:
type: object
additionalProperties: true
description: >
Optional cluster stanza (clientcmd/api.Cluster) with fields such as
server, certificate-authority, tls-server-name, or
insecure-skip-tls-verify.
user:
type: object
additionalProperties: true
description: >
Optional user stanza (clientcmd/api.AuthInfo) with fields such as
client-certificate, client-key, token, token-file, exec or auth-provider.
required:
- source
- selector
allOf:
- description: auth and kube_config are mutually exclusive.
not:
required: [auth, kube_config]
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kubernetes acquisition schema doesn't match the implemented configuration: the Go Configuration supports kube_context but the schema doesn't define it, while the schema defines an 'auth' block (and mutual exclusion with kube_config) that the module doesn't implement at all. Please update the schema to reflect actual supported fields/constraints, or implement the documented auth behavior.

Suggested change
auth:
type: object
minProperties: 1
additionalProperties: false
description: >
Inline Kubernetes client authentication overriding kubeconfig defaults.
Mirrors clientcmd/api.Cluster and AuthInfo fields; keys match kubeconfig
file entries (for example server, certificate-authority, client-certificate,
token, exec, auth-provider, ...).
properties:
cluster:
type: object
additionalProperties: true
description: >
Optional cluster stanza (clientcmd/api.Cluster) with fields such as
server, certificate-authority, tls-server-name, or
insecure-skip-tls-verify.
user:
type: object
additionalProperties: true
description: >
Optional user stanza (clientcmd/api.AuthInfo) with fields such as
client-certificate, client-key, token, token-file, exec or auth-provider.
required:
- source
- selector
allOf:
- description: auth and kube_config are mutually exclusive.
not:
required: [auth, kube_config]
kube_context:
type: string
description: >
Name of the kubeconfig context to use when multiple contexts are available.
required:
- source
- selector

Copilot uses AI. Check for mistakes.
Comment on lines +37 to +129
func (s *Source) Stream(ctx context.Context, out chan pipeline.Event) error {
var wg sync.WaitGroup
var mu sync.Mutex

s.logger.WithFields(log.Fields{
"namespace": s.config.Namespace,
"selector": s.config.Selector,
"unique_id": s.config.UniqueId,
}).Info("starting kubernetes acquisition")

cfg, err := s.config.buildClientConfig(s.logger)
if err != nil {
return fmt.Errorf("building kubernetes client config for namespace=%q selector=%q unique_id=%q: %w", s.config.Namespace, s.config.Selector, s.config.UniqueId, err)
}
cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("can't create a kubernetes client for namespace=%q selector=%q unique_id=%q: %w", s.config.Namespace, s.config.Selector, s.config.UniqueId, err)
}

cancels := map[types.UID]context.CancelFunc{}

f := informers.NewSharedInformerFactoryWithOptions(cs, 0,
informers.WithNamespace(s.config.Namespace),
informers.WithTweakListOptions(func(o *metav1.ListOptions) {
// We set the LabelSelector on the ListOptions to filter pods at the
// API level, so we only get events for pods that match our
// selector. This is more efficient than getting all pod events and
// filtering them in our event handlers.
o.LabelSelector = s.config.Selector
}),
)
inf := f.Core().V1().Pods().Informer()

// We ignore the ResourceEventHandlerRegistration returned by
// AddEventHandler since we don't need to remove the handlers until shutdown,
// and we will stop the entire informer at that time.
s.logger.WithFields(log.Fields{
"namespace": s.config.Namespace,
"selector": s.config.Selector,
"unique_id": s.config.UniqueId,
}).Info("adding kubernetes event handler")
_, err = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
p := obj.(*corev1.Pod)
s.logger.Debugf("ADD %s labels=%v", podRef(p), p.Labels)
s.tailPod(ctx, cs, p, out, &wg, &mu, cancels)
},
UpdateFunc: func(oldObj, newObj any) {
oldP := oldObj.(*corev1.Pod)
newP := newObj.(*corev1.Pod)

if oldP.Status.Phase != newP.Status.Phase {
s.logger.Debugf("UPDATE phase %s -> %s", podRef(oldP), podRef(newP))
} else {
s.logger.Tracef("UPDATE %s", podRef(newP))
}

s.tailPod(ctx, cs, newP, out, &wg, &mu, cancels)
},
DeleteFunc: func(obj any) {
pod, ok := obj.(*corev1.Pod)
if !ok {
t, _ := obj.(cache.DeletedFinalStateUnknown)
pod, _ = t.Obj.(*corev1.Pod)
s.logger.Debugf("DELETE(tombstone) %s", podRef(pod))
} else {
s.logger.Debugf("DELETE %s", podRef(pod))
}

if pod != nil {
s.stopPod(pod, &mu, cancels)
}
},
})

if err != nil {
return fmt.Errorf("while adding event handler for namespace=%q selector=%q unique_id=%q: %w", s.config.Namespace, s.config.Selector, s.config.UniqueId, err)
}
f.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
return fmt.Errorf("cache sync failed for namespace=%q selector=%q unique_id=%q", s.config.Namespace, s.config.Selector, s.config.UniqueId)
}

<-ctx.Done()
mu.Lock()
for _, c := range cancels {
c()
}
mu.Unlock()
wg.Wait()

return nil
}
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new datasource introduces non-trivial behavior (pod event handling, log streaming, shutdown semantics), but there are no tests covering config validation and context-cancel shutdown. Adding focused unit/integration tests (similar to other acquisition modules) would help prevent regressions (eg ensuring Stream returns on ctx cancel and no goroutine leaks).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants