Skip to content
This repository was archived by the owner on Jul 27, 2023. It is now read-only.

Commit 9afcd7c

Browse files
authored
Merge pull request #215 from DataDog/shang/fix-chunk-bug
[proc] put containers and processes running in containers in the same chunk
2 parents 6fe361f + 110630f commit 9afcd7c

5 files changed

Lines changed: 405 additions & 70 deletions

File tree

checks/container.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ func (c *ContainerCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Me
5858
}
5959

6060
groupSize := len(ctrList) / cfg.MaxPerMessage
61-
if len(ctrList) != cfg.MaxPerMessage {
61+
if len(ctrList) != cfg.MaxPerMessage*groupSize {
6262
groupSize++
6363
}
64-
chunked := fmtContainers(ctrList, c.lastRates, c.lastRun, groupSize)
64+
chunked := chunkContainers(ctrList, c.lastRates, c.lastRun, groupSize, cfg.MaxPerMessage)
6565
messages := make([]model.MessageBody, 0, groupSize)
6666
totalContainers := float64(0)
6767
for i := 0; i < groupSize; i++ {
@@ -83,13 +83,9 @@ func (c *ContainerCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Me
8383
return messages, nil
8484
}
8585

86-
// fmtContainers formats and chunks the ctrList into a slice of chunks using a specific
87-
// number of chunks. len(result) MUST EQUAL chunks.
88-
func fmtContainers(ctrList []*containers.Container, lastRates map[string]util.ContainerRateMetrics, lastRun time.Time, chunks int) [][]*model.Container {
89-
perChunk := (len(ctrList) / chunks) + 1
90-
chunked := make([][]*model.Container, chunks)
91-
chunk := make([]*model.Container, 0, perChunk)
92-
i := 0
86+
// fmtContainers loops through container list and converts them to a list of container objects
87+
func fmtContainers(ctrList []*containers.Container, lastRates map[string]util.ContainerRateMetrics, lastRun time.Time) []*model.Container {
88+
containers := make([]*model.Container, 0, len(ctrList))
9389
for _, ctr := range ctrList {
9490
lastCtr, ok := lastRates[ctr.ID]
9591
if !ok {
@@ -112,7 +108,7 @@ func fmtContainers(ctrList []*containers.Container, lastRates map[string]util.Co
112108
tags = []string{}
113109
}
114110

115-
chunk = append(chunk, &model.Container{
111+
containers = append(containers, &model.Container{
116112
Id: ctr.ID,
117113
Type: ctr.Type,
118114
CpuLimit: float32(ctr.CPULimit),
@@ -135,15 +131,26 @@ func fmtContainers(ctrList []*containers.Container, lastRates map[string]util.Co
135131
Started: ctr.StartedAt,
136132
Tags: tags,
137133
})
134+
}
135+
return containers
136+
}
137+
138+
// chunkContainers formats and chunks the ctrList into a slice of chunks using a specific number of chunks.
139+
func chunkContainers(ctrList []*containers.Container, lastRates map[string]util.ContainerRateMetrics, lastRun time.Time, chunks, perChunk int) [][]*model.Container {
140+
chunked := make([][]*model.Container, 0, chunks)
141+
chunk := make([]*model.Container, 0, perChunk)
142+
143+
containers := fmtContainers(ctrList, lastRates, lastRun)
138144

145+
for _, ctr := range containers {
146+
chunk = append(chunk, ctr)
139147
if len(chunk) == perChunk {
140-
chunked[i] = chunk
148+
chunked = append(chunked, chunk)
141149
chunk = make([]*model.Container, 0, perChunk)
142-
i++
143150
}
144151
}
145152
if len(chunk) > 0 {
146-
chunked[i] = chunk
153+
chunked = append(chunked, chunk)
147154
}
148155
return chunked
149156
}

checks/container_nolinux.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,12 @@ func (c *ContainerCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Me
4242
return nil, nil
4343
}
4444

45-
// fmtContainers formats and chunks the containers into a slice of chunks using a specific
45+
// chunkContainers formats and chunks the containers into a slice of chunks using a specific
4646
// number of chunks. len(result) MUST EQUAL chunks.
47-
func fmtContainers(
48-
ctrList []*containers.Container,
49-
lastRates map[string]util.ContainerRateMetrics,
50-
lastRun time.Time,
51-
chunks int,
52-
) [][]*model.Container {
47+
func chunkContainers(ctrList []*containers.Container, lastRates map[string]util.ContainerRateMetrics, lastRun time.Time, chunks, perChunk int) [][]*model.Container {
5348
return make([][]*model.Container, chunks)
5449
}
50+
51+
func fmtContainers(ctrList []*containers.Container, lastRates map[string]util.ContainerRateMetrics, lastRun time.Time) []*model.Container {
52+
return make([]*model.Container, 0)
53+
}

checks/container_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,31 @@ func TestContainerChunking(t *testing.T) {
3535
last map[string]util.ContainerRateMetrics
3636
chunks int
3737
expected int
38+
maxSize int
3839
}{
3940
{
4041
cur: []*containers.Container{ctrs[0], ctrs[1], ctrs[2]},
4142
last: util.ExtractContainerRateMetric([]*containers.Container{ctrs[0], ctrs[1], ctrs[2]}),
4243
chunks: 2,
4344
expected: 3,
45+
maxSize: 2,
4446
},
4547
{
4648
cur: []*containers.Container{ctrs[0], ctrs[1], ctrs[2]},
4749
last: util.ExtractContainerRateMetric([]*containers.Container{ctrs[0], ctrs[2]}),
4850
chunks: 2,
4951
expected: 3,
52+
maxSize: 2,
5053
},
5154
{
5255
cur: []*containers.Container{ctrs[0], ctrs[2]},
5356
last: util.ExtractContainerRateMetric([]*containers.Container{ctrs[0], ctrs[1], ctrs[2]}),
54-
chunks: 20,
57+
chunks: 2,
5558
expected: 2,
59+
maxSize: 1,
5660
},
5761
} {
58-
chunked := fmtContainers(tc.cur, tc.last, lastRun, tc.chunks)
62+
chunked := chunkContainers(tc.cur, tc.last, lastRun, tc.chunks, tc.maxSize)
5963
assert.Len(t, chunked, tc.chunks, "len test %d", i)
6064
total := 0
6165
for _, c := range chunked {
@@ -74,22 +78,22 @@ func TestContainerChunking(t *testing.T) {
7478
}
7579
}
7680

77-
func TestContainerAddressList(t *testing.T) {
81+
func TestContainerAddresses(t *testing.T) {
7882
ctr := makeContainer("haha")
7983
ctr.AddressList = []containers.NetworkAddress{containers.NetworkAddress{IP: net.ParseIP("192.168.128.141"), Port: 443, Protocol: "TCP"}}
80-
results := fmtContainers([]*containers.Container{ctr}, map[string]util.ContainerRateMetrics{}, time.Now(), 1)
81-
assert.Equal(t, 1, len(results[0]))
84+
results := fmtContainers([]*containers.Container{ctr}, map[string]util.ContainerRateMetrics{}, time.Now())
85+
assert.Equal(t, 1, len(results))
8286
addrs := []*model.ContainerAddr{
8387
&model.ContainerAddr{Ip: "192.168.128.141", Port: int32(443), Protocol: model.ConnectionType_tcp},
8488
}
85-
assert.Equal(t, results[0][0].Addresses, addrs)
89+
assert.Equal(t, results[0].Addresses, addrs)
8690
}
8791

8892
func TestContainerNils(t *testing.T) {
8993
// Make sure formatting doesn't crash with nils
9094
cur := []*containers.Container{&containers.Container{}}
9195
last := map[string]util.ContainerRateMetrics{}
92-
fmtContainers(cur, last, time.Now(), 10)
96+
chunkContainers(cur, last, time.Now(), 10, 10)
9397
fmtContainerStats(cur, last, time.Now(), 10)
9498
// Make sure we get values when we have nils in last.
9599
cur = []*containers.Container{
@@ -103,7 +107,7 @@ func TestContainerNils(t *testing.T) {
103107
CPU: &metrics.CgroupTimesStat{},
104108
},
105109
}
106-
fmtContainers(cur, last, time.Now(), 10)
110+
chunkContainers(cur, last, time.Now(), 10, 10)
107111
fmtContainerStats(cur, last, time.Now(), 10)
108112
}
109113

checks/process.go

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/DataDog/datadog-process-agent/util"
1616
)
1717

18+
const emptyCtrID = ""
19+
1820
// Process is a singleton ProcessCheck.
1921
var Process = &ProcessCheck{}
2022

@@ -76,28 +78,10 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
7678
return nil, nil
7779
}
7880

79-
chunkedProcs := fmtProcesses(cfg, procs, p.lastProcs,
80-
ctrList, cpuTimes[0], p.lastCPUTime, p.lastRun)
81-
// In case we skip every process..
82-
if len(chunkedProcs) == 0 {
83-
return nil, nil
84-
}
85-
groupSize := len(chunkedProcs)
86-
chunkedContainers := fmtContainers(ctrList, p.lastCtrRates, p.lastRun, groupSize)
87-
messages := make([]model.MessageBody, 0, groupSize)
88-
totalProcs, totalContainers := float64(0), float64(0)
89-
for i := 0; i < groupSize; i++ {
90-
totalProcs += float64(len(chunkedProcs[i]))
91-
totalContainers += float64(len(chunkedContainers[i]))
92-
messages = append(messages, &model.CollectorProc{
93-
HostName: cfg.HostName,
94-
Info: p.sysInfo,
95-
Processes: chunkedProcs[i],
96-
Containers: chunkedContainers[i],
97-
GroupId: groupID,
98-
GroupSize: int32(groupSize),
99-
})
100-
}
81+
procsByCtr := fmtProcesses(cfg, procs, p.lastProcs, ctrList, cpuTimes[0], p.lastCPUTime, p.lastRun)
82+
containers := fmtContainers(ctrList, p.lastCtrRates, p.lastRun)
83+
84+
messages, totalProcs, totalContainers := createProcCtrMessages(procsByCtr, containers, cfg, p.sysInfo, groupID)
10185

10286
// Store the last state for comparison on the next run.
10387
// Note: not storing the filtered in case there are new processes that haven't had a chance to show up twice.
@@ -106,28 +90,102 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
10690
p.lastCPUTime = cpuTimes[0]
10791
p.lastRun = time.Now()
10892

109-
statsd.Client.Gauge("datadog.process.containers.host_count", totalContainers, []string{}, 1)
110-
statsd.Client.Gauge("datadog.process.processes.host_count", totalProcs, []string{}, 1)
93+
statsd.Client.Gauge("datadog.process.containers.host_count", float64(totalContainers), []string{}, 1)
94+
statsd.Client.Gauge("datadog.process.processes.host_count", float64(totalProcs), []string{}, 1)
11195
log.Debugf("collected processes in %s", time.Now().Sub(start))
11296
return messages, nil
11397
}
11498

99+
func createProcCtrMessages(
100+
procsByCtr map[string][]*model.Process,
101+
containers []*model.Container,
102+
cfg *config.AgentConfig,
103+
sysInfo *model.SystemInfo,
104+
groupID int32,
105+
) ([]model.MessageBody, int, int) {
106+
totalProcs, totalContainers := 0, 0
107+
msgs := make([]*model.CollectorProc, 0)
108+
109+
// we first split non-container processes in chunks
110+
chunks := chunkProcesses(procsByCtr[emptyCtrID], cfg.MaxPerMessage)
111+
for _, c := range chunks {
112+
msgs = append(msgs, &model.CollectorProc{
113+
HostName: cfg.HostName,
114+
Info: sysInfo,
115+
Processes: c,
116+
GroupId: groupID,
117+
})
118+
}
119+
120+
ctrProcs := make([]*model.Process, 0)
121+
ctrs := make([]*model.Container, 0, len(containers))
122+
for _, ctr := range containers {
123+
if procs, ok := procsByCtr[ctr.Id]; ok {
124+
ctrProcs = append(ctrProcs, procs...)
125+
}
126+
ctrs = append(ctrs, ctr)
127+
}
128+
129+
if len(ctrs) > 0 {
130+
msgs = append(msgs, &model.CollectorProc{
131+
HostName: cfg.HostName,
132+
Info: sysInfo,
133+
Processes: ctrProcs,
134+
Containers: ctrs,
135+
GroupId: groupID,
136+
})
137+
}
138+
139+
// fill in GroupSize for each CollectorProc and convert them to final messages
140+
// also count containers and processes
141+
messages := make([]model.MessageBody, 0, len(msgs))
142+
for _, m := range msgs {
143+
m.GroupSize = int32(len(msgs))
144+
messages = append(messages, m)
145+
totalProcs += len(m.Processes)
146+
totalContainers += len(m.Containers)
147+
}
148+
149+
return messages, totalProcs, totalContainers
150+
}
151+
152+
// chunkProcesses split non-container processes into chunks and return a list of chunks
153+
func chunkProcesses(procs []*model.Process, size int) [][]*model.Process {
154+
chunkCount := len(procs) / size
155+
if chunkCount*size < len(procs) {
156+
chunkCount++
157+
}
158+
chunks := make([][]*model.Process, 0, chunkCount)
159+
160+
for i := 0; i < len(procs); i += size {
161+
end := i + size
162+
if end > len(procs) {
163+
end = len(procs)
164+
}
165+
chunks = append(chunks, procs[i:end])
166+
}
167+
168+
return chunks
169+
}
170+
171+
// fmtProcesses goes through each process, converts them to process object and group them by containers
172+
// non-container processes would be in a single group with key as empty string ""
115173
func fmtProcesses(
116174
cfg *config.AgentConfig,
117175
procs, lastProcs map[int32]*process.FilledProcess,
118176
ctrList []*containers.Container,
119177
syst2, syst1 cpu.TimesStat,
120178
lastRun time.Time,
121-
) [][]*model.Process {
179+
) map[string][]*model.Process {
122180
cidByPid := make(map[int32]string, len(ctrList))
123181
for _, c := range ctrList {
124182
for _, p := range c.Pids {
125183
cidByPid[p] = c.ID
126184
}
127185
}
128186

129-
chunked := make([][]*model.Process, 0)
130-
chunk := make([]*model.Process, 0, cfg.MaxPerMessage)
187+
procsByCtr := make(map[string][]*model.Process)
188+
131189
for _, fp := range procs {
132190
if skipProcess(cfg, fp, lastProcs) {
133191
continue
@@ -136,7 +194,7 @@ func fmtProcesses(
136194
// Hide blacklisted args if the Scrubber is enabled
137195
fp.Cmdline = cfg.Scrubber.ScrubProcessCommand(fp)
138196

139-
chunk = append(chunk, &model.Process{
197+
proc := &model.Process{
140198
Pid: fp.Pid,
141199
Command: formatCommand(fp),
142200
User: formatUser(fp),
@@ -149,17 +207,17 @@ func fmtProcesses(
149207
VoluntaryCtxSwitches: uint64(fp.CtxSwitches.Voluntary),
150208
InvoluntaryCtxSwitches: uint64(fp.CtxSwitches.Involuntary),
151209
ContainerId: cidByPid[fp.Pid],
152-
})
153-
if len(chunk) == cfg.MaxPerMessage {
154-
chunked = append(chunked, chunk)
155-
chunk = make([]*model.Process, 0, cfg.MaxPerMessage)
156210
}
211+
_, ok := procsByCtr[proc.ContainerId]
212+
if !ok {
213+
procsByCtr[proc.ContainerId] = make([]*model.Process, 0)
214+
}
215+
procsByCtr[proc.ContainerId] = append(procsByCtr[proc.ContainerId], proc)
157216
}
158-
if len(chunk) > 0 {
159-
chunked = append(chunked, chunk)
160-
}
217+
161218
cfg.Scrubber.IncrementCacheAge()
162-
return chunked
219+
220+
return procsByCtr
163221
}
164222

165223
func formatCommand(fp *process.FilledProcess) *model.Command {

0 commit comments

Comments
 (0)