Skip to content

Commit 0a332dc

Browse files
fix: scope event reducers by agent and gpid
1 parent b20049a commit 0a332dc

File tree

4 files changed

+66
-2
lines changed

4 files changed

+66
-2
lines changed

server/ingester/event/decoder/file_agg_reducer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ func sameFileAggKey(a, b *dbwriter.FileAggEventStore) bool {
2929
if a == nil || b == nil {
3030
return false
3131
}
32-
return a.EventType == b.EventType &&
32+
return a.VTAPID == b.VTAPID &&
33+
a.RootPID == b.RootPID &&
34+
a.GProcessID == b.GProcessID &&
35+
a.EventType == b.EventType &&
3336
a.ProcessKName == b.ProcessKName &&
3437
a.AppInstance == b.AppInstance &&
3538
a.FileDir == b.FileDir &&

server/ingester/event/decoder/file_agg_reducer_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ func makeRawFileEvent(processKName, eventType, fileName string, bytes uint32) *d
1313
e.IsFileEvent = true
1414
e.ProcessKName = processKName
1515
e.EventType = eventType
16+
e.VTAPID = 1
17+
e.RootPID = 42
18+
e.GProcessID = 100
1619
e.FileDir = "/tmp/"
1720
e.FileName = fileName
1821
e.Bytes = bytes
@@ -78,3 +81,41 @@ func TestFileAggReducerFlushesOnKeyChange(t *testing.T) {
7881
t.Fatalf("expected pending aggregate for codex-tui.log, got %+v", last)
7982
}
8083
}
84+
85+
func TestFileAggReducerFlushesOnDifferentGProcessID(t *testing.T) {
86+
reducer := NewFileAggReducer()
87+
first := makeRawFileEvent("codex", "write", "same.txt", 8)
88+
second := makeRawFileEvent("codex", "write", "same.txt", 8)
89+
second.GProcessID = 200
90+
91+
if flushed := reducer.Add(first); flushed != nil {
92+
t.Fatalf("unexpected flush on first event")
93+
}
94+
flushed := reducer.Add(second)
95+
if flushed == nil {
96+
t.Fatalf("expected flush when gprocess_id changes")
97+
}
98+
if flushed.GProcessID != 100 {
99+
t.Fatalf("flushed gprocess_id = %d, want 100", flushed.GProcessID)
100+
}
101+
flushed.Release()
102+
}
103+
104+
func TestFileAggReducerFlushesOnDifferentAgentID(t *testing.T) {
105+
reducer := NewFileAggReducer()
106+
first := makeRawFileEvent("codex", "write", "same.txt", 8)
107+
second := makeRawFileEvent("codex", "write", "same.txt", 8)
108+
second.VTAPID = 2
109+
110+
if flushed := reducer.Add(first); flushed != nil {
111+
t.Fatalf("unexpected flush on first event")
112+
}
113+
flushed := reducer.Add(second)
114+
if flushed == nil {
115+
t.Fatalf("expected flush when agent_id changes")
116+
}
117+
if flushed.VTAPID != 1 {
118+
t.Fatalf("flushed agent_id = %d, want 1", flushed.VTAPID)
119+
}
120+
flushed.Release()
121+
}

server/ingester/event/decoder/file_mgmt_reducer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func NewFileMgmtReducer() *FileMgmtReducer {
1616

1717
func (r *FileMgmtReducer) key(e *dbwriter.FileMgmtEventStore) string {
1818
return fmt.Sprintf("%d|%d|%s|%s|%s|%s",
19-
e.RootPID, e.GProcessID, e.MountSource, e.MountPoint, e.FileDir, e.FileName,
19+
e.VTAPID, e.RootPID, e.MountSource, e.MountPoint, e.FileDir, e.FileName,
2020
)
2121
}
2222

server/ingester/event/decoder/file_mgmt_reducer_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
func makeFileMgmtEvent(processKName, eventType, fileName string) *dbwriter.FileMgmtEventStore {
1010
e := dbwriter.AcquireFileMgmtEventStore()
11+
e.VTAPID = 1
1112
e.ProcessKName = processKName
1213
e.EventType = eventType
1314
e.FileDir = "/tmp/"
@@ -49,3 +50,22 @@ func TestFileMgmtReducerSuppressesDuplicateCreateUntilDelete(t *testing.T) {
4950
out.Release()
5051
}
5152
}
53+
54+
func TestFileMgmtReducerKeepsDifferentAgentCreatesSeparate(t *testing.T) {
55+
reducer := NewFileMgmtReducer()
56+
first := makeFileMgmtEvent("bash", "create", "dup.txt")
57+
second := makeFileMgmtEvent("bash", "create", "dup.txt")
58+
second.VTAPID = 2
59+
60+
if out := reducer.Add(first); out == nil {
61+
t.Fatalf("first create should pass through")
62+
} else {
63+
out.Release()
64+
}
65+
66+
if out := reducer.Add(second); out == nil {
67+
t.Fatalf("create from another agent should not be suppressed")
68+
} else {
69+
out.Release()
70+
}
71+
}

0 commit comments

Comments
 (0)