Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/etw/approvers/approver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/rabbitstack/fibratus/pkg/sys/etw"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -142,12 +143,18 @@ func TestApproversFileEventWithRulesApproved(t *testing.T) {
p.Approve(cr)
assert.Equal(t, 1, len(p.fs.irps))

sw := stackwalkRecord(t, stackwalkBuf)
rec, approved := p.Approve(sw)
require.Equal(t, event.StackWalkEventGUID, rec.Header.ProviderID)
require.False(t, approved)

foe := buildMatchingFileOpEnd(t, createBuf, uint64(windows.FILE_OPEN))
rec, approved := p.Approve(foe)
rec, approved = p.Approve(foe)
assert.True(t, approved, "Pending CreateFile should be approved")
assert.Equal(t, event.CreateFileID, rec.Header.EventDescriptor.Opcode,
"should return stored CreateFile record")
assert.NotNil(t, rec.ExtendedData, "extended data should be attached")
assert.NotNil(t, rec.ReadEventHeaderFileExtendedDataItemsCallstack(), "file callstack should be attached")
}

func TestApproversFileEventWithRulesRejected(t *testing.T) {
Expand Down
40 changes: 30 additions & 10 deletions internal/etw/approvers/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@
"github.com/rabbitstack/fibratus/pkg/event"
devmapper "github.com/rabbitstack/fibratus/pkg/fs"
"github.com/rabbitstack/fibratus/pkg/sys/etw"
"github.com/rabbitstack/fibratus/pkg/util/va"
"golang.org/x/sys/windows"
)

var (
fsApproverApprovals = expvar.NewInt("approver.fs.approvals")
fsApproverRejections = expvar.NewInt("approver.fs.rejections")

fsApproverCallstackMisses = expvar.NewInt("approver.fs.callstack.misses")
)

// irp acts as a scratch area for the pending IRP request
// that is used as a signal to promote the file operation.
// The memory buffer backing the event record must outlive
// event processors scope.
type irp struct {
rec *etw.EventRecord
buf []byte // keeps the data buffer alive
items *etw.FileExtendedDataItems // keeps the extended data items alive
rec *etw.EventRecord
buf []byte // keeps the data buffer alive
items *etw.FileExtendedDataItems // keeps the extended data items alive
callstack []va.Address // CreateFile stack return addresses
}

// fs is the file system approver that accepts or discards
Expand Down Expand Up @@ -80,7 +84,8 @@
}

func (f *fs) Approve(r *etw.EventRecord) (*etw.EventRecord, bool) {
if r.Header.ProviderID != event.FileEventGUID {
if r.Header.ProviderID != event.FileEventGUID &&
!(r.Header.ProviderID == event.StackWalkEventGUID && r.Header.EventDescriptor.Opcode == event.StackWalkID) {

Check failure on line 88 in internal/etw/approvers/fs.go

View workflow job for this annotation

GitHub Actions / lint

QF1001: could apply De Morgan's law (staticcheck)
return r, true
}

Expand All @@ -91,6 +96,21 @@
return r, false
}

// decorate the pending CreateFile with the return addresses
// obtained from the StackWalk event by correlating the timestamp
if r.Header.ProviderID == event.StackWalkEventGUID && r.Header.EventDescriptor.Opcode == event.StackWalkID {
ts := r.ReadUint64(0)
for i, rec := range f.irps {
if rec.rec.Header.Timestamp == ts {
irp := f.irps[i]
irp.callstack = r.ReadCallstack(16)
f.irps[i] = irp
return r, false
}
}
return r, true
}

if r.Header.EventDescriptor.Opcode == event.FileOpEndID {
disposition := r.ReadUint64(8)

Expand All @@ -108,13 +128,17 @@
return r, false
}

if irp.callstack == nil {
fsApproverCallstackMisses.Add(1)
}

rec := irp.rec
status := r.ReadUint32(16)
// if the I/O status is different than file open
// or the rules compilation result is not present
// we'll allow events flow downstream processors
if f.r == nil || disposition != windows.FILE_OPEN {
irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status)
irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status, irp.callstack)
f.irps[i] = irp
return rec, true
}
Expand All @@ -134,15 +158,11 @@
// CreateFile operation
delete(f.irps, i)
fsApproverRejections.Add(1)
stackID := uint64(rec.Header.ProcessID + rec.Header.ThreadID)
if f.processors != nil {
f.processors.DequeueStackwalk(stackID)
}
return rec, false
}

fsApproverApprovals.Add(1)
irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status)
irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status, irp.callstack)
f.irps[i] = irp
return rec, true
}
Expand Down
41 changes: 41 additions & 0 deletions internal/etw/approvers/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func createFileRecord(t *testing.T, buf []byte) *etw.EventRecord {
b := make([]byte, len(buf))
copy(b, buf)
r := &etw.EventRecord{}
r.Header.Timestamp = 250273540393 // matches StackWalk timestamp parameter
r.Header.ProviderID = event.FileEventGUID
r.Header.EventDescriptor.Opcode = event.CreateFileID
r.BufferLen = uint16(len(b))
Expand All @@ -58,6 +59,19 @@ func fileOpEndRecord(t *testing.T, buf []byte) *etw.EventRecord {
return r
}

func stackwalkRecord(t *testing.T, buf []byte) *etw.EventRecord {
t.Helper()
b := make([]byte, len(buf))
copy(b, buf)
r := &etw.EventRecord{}
r.Header.ProviderID = event.StackWalkEventGUID
r.Header.EventDescriptor.Opcode = event.StackWalkID
r.BufferLen = uint16(len(b))
r.Buffer = uintptr(unsafe.Pointer(&b[0]))
t.Cleanup(func() { _ = b })
return r
}

var (
createBuf = []byte{
200, 7, 94, 150, 141, 215, 255, 255, // Irp
Expand Down Expand Up @@ -93,6 +107,33 @@ var (
0, 0, 0, 0, 40, 0, 0, 0, // ExtraInformation
0, 0, 0, 0, // Status
}

stackwalkBuf = []byte{
41, 41, 119, 69, 58, 0, 0, 0,
12, 15, 0, 0,
160, 29, 0, 0,

148, 12, 254, 189, 5, 248, 255, 255,
175, 137, 229, 189, 5, 248, 255, 255,
134, 47, 242, 189, 5, 248, 255, 255,
36, 109, 255, 80, 5, 248, 255, 255,
175, 186, 130, 79, 5, 248, 255, 255,
160, 177, 130, 79, 5, 248, 255, 255,
224, 104, 137, 79, 5, 248, 255, 255,
59, 149, 250, 189, 5, 248, 255, 255,
179, 148, 250, 189, 5, 248, 255, 255,
59, 168, 73, 190, 5, 248, 255, 255,
218, 136, 73, 190, 5, 248, 255, 255,
227, 101, 73, 190, 5, 248, 255, 255,
196, 202, 73, 190, 5, 248, 255, 255,
85, 217, 43, 190, 5, 248, 255, 255,

20, 69, 114, 9, 253, 127, 0, 0,
31, 51, 25, 6, 253, 127, 0, 0,
105, 96, 138, 36, 246, 127, 0, 0,
109, 37, 146, 36, 246, 127, 0, 0,
227, 37, 146, 36, 246, 127, 0, 0,
}
)

// buildMatchingFileOpEnd builds a FileOpEnd whose IRP matches the given CreateFile buffer.
Expand Down
6 changes: 0 additions & 6 deletions internal/etw/processors/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ func (c *Chain) ProcessEvent(e *event.Event) (*event.Event, error) {
return evt, nil
}

func (c *Chain) DequeueStackwalk(stackID uint64) {
if c.fsProcessor != nil {
c.fsProcessor.(*fsProcessor).dequeueStackwalk(stackID)
}
}

// Close closes the processor chain and frees all allocated resources.
func (c *Chain) Close() error {
for _, processor := range c.processors {
Expand Down
106 changes: 7 additions & 99 deletions internal/etw/processors/fs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package processors

import (
"expvar"
"sync"
"time"

"github.com/rabbitstack/fibratus/pkg/config"
"github.com/rabbitstack/fibratus/pkg/event"
Expand Down Expand Up @@ -53,13 +51,6 @@ type fsProcessor struct {
psnap ps.Snapshotter

config *config.Config

// buckets stores stack walk events per stack id
buckets map[uint64][]*event.Event
mu sync.Mutex
purger *time.Ticker

quit chan struct{}
}

// FileInfo stores file information obtained from event state.
Expand All @@ -73,31 +64,24 @@ func newFsProcessor(
psnap ps.Snapshotter,
config *config.Config,
) Processor {
f := &fsProcessor{
files: make(map[uint64]*FileInfo),
hsnap: hsnap,
psnap: psnap,
config: config,
buckets: make(map[uint64][]*event.Event),
purger: time.NewTicker(time.Second * 5),
quit: make(chan struct{}, 1),
return &fsProcessor{
files: make(map[uint64]*FileInfo),
hsnap: hsnap,
psnap: psnap,
config: config,
}

go f.purge()

return f
}

func (f *fsProcessor) ProcessEvent(e *event.Event) (*event.Event, bool, error) {
if e.Category == event.File || e.IsStackWalk() {
if e.Category == event.File {
evt, err := f.processEvent(e)
return evt, false, err
}
return e, true, nil
}

func (*fsProcessor) Name() ProcessorType { return Fs }
func (f *fsProcessor) Close() { f.quit <- struct{}{} }
func (f *fsProcessor) Close() {}

func (f *fsProcessor) getFileInfo(name string, opts uint32) *FileInfo {
return &FileInfo{Name: name, Type: fs.GetFileType(name, opts)}
Expand Down Expand Up @@ -138,20 +122,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) {
}

return e, f.psnap.AddMmap(e)
case event.StackWalk:
if !event.IsCurrentProcDropped(e.PID) {
f.mu.Lock()
defer f.mu.Unlock()

// append the event to the bucket indexed by stack id
id := e.StackID()
q, ok := f.buckets[id]
if !ok {
f.buckets[id] = []*event.Event{e}
} else {
f.buckets[id] = append(q, e)
}
}
case event.CreateFile:
fileObject := e.Params.MustGetUint64(params.FileObject)

Expand All @@ -170,30 +140,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) {
e.AppendEnum(params.FileType, uint32(fileinfo.Type), fs.FileTypes)
}

// attach stack walk return addresses. CreateFile events
// represent an edge case in callstack enrichment. Since
// the events are delayed until the respective FileOpEnd
// event arrives, we enable stack tracing for CreateFile
// events. When the CreateFile event is generated, we store
// it in pending IRP map. Subsequently, the stack walk event
// is put inside the queue. After FileOpEnd event arrives,
// the previous stack walk for CreateFile is popped from
// the queue and the callstack parameter attached to the
// event.
if f.config.EventSource.StackEnrichment {
f.mu.Lock()
defer f.mu.Unlock()

id := e.StackID()
q, ok := f.buckets[id]
if ok && len(q) > 0 {
var s *event.Event
s, f.buckets[id] = q[len(q)-1], q[:len(q)-1]
callstack := s.Params.MustGetSlice(params.Callstack)
e.AppendParam(params.Callstack, params.Slice, callstack)
}
}

return e, nil
case event.ReleaseFile:
fileReleaseCount.Add(1)
Expand Down Expand Up @@ -267,16 +213,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) {
return e, nil
}

func (f *fsProcessor) dequeueStackwalk(stackID uint64) {
f.mu.Lock()
defer f.mu.Unlock()

q, ok := f.buckets[stackID]
if ok && len(q) > 0 {
f.buckets[stackID] = q[:len(q)-1]
}
}

func (f *fsProcessor) findFile(fileKey, fileObject uint64) *FileInfo {
fileinfo, ok := f.files[fileKey]
if ok {
Expand Down Expand Up @@ -307,31 +243,3 @@ func (f *fsProcessor) getMappedFile(pid uint32, addr uint64) string {
defer windows.Close(process)
return fs.GetDevMapper().Convert(sys.GetMappedFile(process, uintptr(addr)))
}

func (f *fsProcessor) purge() {
for {
select {
case <-f.purger.C:
f.mu.Lock()

// evict unmatched stack traces
for id, q := range f.buckets {
s := make([]*event.Event, 0, len(q))
for _, evt := range q {
if time.Since(evt.Timestamp) <= time.Second*30 {
s = append(s, evt)
}
}
if len(s) == 0 {
delete(f.buckets, id)
} else {
f.buckets[id] = s
}
}

f.mu.Unlock()
case <-f.quit:
return
}
}
}
2 changes: 1 addition & 1 deletion internal/etw/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func initEventTraceProps(c config.EventSourceConfig) etw.EventTraceProperties {
Wnode: etw.WnodeHeader{
BufferSize: uint32(unsafe.Sizeof(etw.EventTraceProperties{})) + maxTracePropsSize,
Flags: etw.WnodeTraceFlagGUID,
ClientContext: 1, // QPC clock resolution
ClientContext: 2, // System time: The system time provides a time stamp that tracks changes to the system’s clock
},
BufferSize: bufferSize,
LogFileMode: mode,
Expand Down
Loading
Loading