diff --git a/internal/etw/approvers/approver_test.go b/internal/etw/approvers/approver_test.go index 8a61dbc47..0829f82a4 100644 --- a/internal/etw/approvers/approver_test.go +++ b/internal/etw/approvers/approver_test.go @@ -29,6 +29,7 @@ import ( "github.com/rabbitstack/fibratus/pkg/sys/etw" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "golang.org/x/sys/windows" ) @@ -142,12 +143,18 @@ func TestApproversFileEventWithRulesApproved(t *testing.T) { p.Approve(cr) assert.Equal(t, 1, len(p.fs.irps)) + sw := stackwalkRecord(t, stackwalkBuf) + rec, approved := p.Approve(sw) + require.Equal(t, event.StackWalkEventGUID, rec.Header.ProviderID) + require.False(t, approved) + foe := buildMatchingFileOpEnd(t, createBuf, uint64(windows.FILE_OPEN)) - rec, approved := p.Approve(foe) + rec, approved = p.Approve(foe) assert.True(t, approved, "Pending CreateFile should be approved") assert.Equal(t, event.CreateFileID, rec.Header.EventDescriptor.Opcode, "should return stored CreateFile record") assert.NotNil(t, rec.ExtendedData, "extended data should be attached") + assert.NotNil(t, rec.ReadEventHeaderFileExtendedDataItemsCallstack(), "file callstack should be attached") } func TestApproversFileEventWithRulesRejected(t *testing.T) { diff --git a/internal/etw/approvers/fs.go b/internal/etw/approvers/fs.go index 34f3681c5..eb190f4da 100644 --- a/internal/etw/approvers/fs.go +++ b/internal/etw/approvers/fs.go @@ -26,12 +26,15 @@ import ( "github.com/rabbitstack/fibratus/pkg/event" devmapper "github.com/rabbitstack/fibratus/pkg/fs" "github.com/rabbitstack/fibratus/pkg/sys/etw" + "github.com/rabbitstack/fibratus/pkg/util/va" "golang.org/x/sys/windows" ) var ( fsApproverApprovals = expvar.NewInt("approver.fs.approvals") fsApproverRejections = expvar.NewInt("approver.fs.rejections") + + fsApproverCallstackMisses = expvar.NewInt("approver.fs.callstack.misses") ) // irp acts as a scratch area for the pending IRP request @@ -39,9 +42,10 @@ var ( // The memory buffer backing the event record must outlive // event processors scope. type irp struct { - rec *etw.EventRecord - buf []byte // keeps the data buffer alive - items *etw.FileExtendedDataItems // keeps the extended data items alive + rec *etw.EventRecord + buf []byte // keeps the data buffer alive + items *etw.FileExtendedDataItems // keeps the extended data items alive + callstack []va.Address // CreateFile stack return addresses } // fs is the file system approver that accepts or discards @@ -80,7 +84,8 @@ func newFSApprover(r *config.RulesCompileResult, processors *processors.Chain) A } func (f *fs) Approve(r *etw.EventRecord) (*etw.EventRecord, bool) { - if r.Header.ProviderID != event.FileEventGUID { + if r.Header.ProviderID != event.FileEventGUID && + !(r.Header.ProviderID == event.StackWalkEventGUID && r.Header.EventDescriptor.Opcode == event.StackWalkID) { return r, true } @@ -91,6 +96,21 @@ func (f *fs) Approve(r *etw.EventRecord) (*etw.EventRecord, bool) { return r, false } + // decorate the pending CreateFile with the return addresses + // obtained from the StackWalk event by correlating the timestamp + if r.Header.ProviderID == event.StackWalkEventGUID && r.Header.EventDescriptor.Opcode == event.StackWalkID { + ts := r.ReadUint64(0) + for i, rec := range f.irps { + if rec.rec.Header.Timestamp == ts { + irp := f.irps[i] + irp.callstack = r.ReadCallstack(16) + f.irps[i] = irp + return r, false + } + } + return r, true + } + if r.Header.EventDescriptor.Opcode == event.FileOpEndID { disposition := r.ReadUint64(8) @@ -108,13 +128,17 @@ func (f *fs) Approve(r *etw.EventRecord) (*etw.EventRecord, bool) { return r, false } + if irp.callstack == nil { + fsApproverCallstackMisses.Add(1) + } + rec := irp.rec status := r.ReadUint32(16) // if the I/O status is different than file open // or the rules compilation result is not present // we'll allow events flow downstream processors if f.r == nil || disposition != windows.FILE_OPEN { - irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status) + irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status, irp.callstack) f.irps[i] = irp return rec, true } @@ -134,15 +158,11 @@ func (f *fs) Approve(r *etw.EventRecord) (*etw.EventRecord, bool) { // CreateFile operation delete(f.irps, i) fsApproverRejections.Add(1) - stackID := uint64(rec.Header.ProcessID + rec.Header.ThreadID) - if f.processors != nil { - f.processors.DequeueStackwalk(stackID) - } return rec, false } fsApproverApprovals.Add(1) - irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status) + irp.items = etw.AppendEventHeaderFileExtendedDataItems(rec, disposition, status, irp.callstack) f.irps[i] = irp return rec, true } diff --git a/internal/etw/approvers/fs_test.go b/internal/etw/approvers/fs_test.go index 78b471266..3080ad307 100644 --- a/internal/etw/approvers/fs_test.go +++ b/internal/etw/approvers/fs_test.go @@ -35,6 +35,7 @@ func createFileRecord(t *testing.T, buf []byte) *etw.EventRecord { b := make([]byte, len(buf)) copy(b, buf) r := &etw.EventRecord{} + r.Header.Timestamp = 250273540393 // matches StackWalk timestamp parameter r.Header.ProviderID = event.FileEventGUID r.Header.EventDescriptor.Opcode = event.CreateFileID r.BufferLen = uint16(len(b)) @@ -58,6 +59,19 @@ func fileOpEndRecord(t *testing.T, buf []byte) *etw.EventRecord { return r } +func stackwalkRecord(t *testing.T, buf []byte) *etw.EventRecord { + t.Helper() + b := make([]byte, len(buf)) + copy(b, buf) + r := &etw.EventRecord{} + r.Header.ProviderID = event.StackWalkEventGUID + r.Header.EventDescriptor.Opcode = event.StackWalkID + r.BufferLen = uint16(len(b)) + r.Buffer = uintptr(unsafe.Pointer(&b[0])) + t.Cleanup(func() { _ = b }) + return r +} + var ( createBuf = []byte{ 200, 7, 94, 150, 141, 215, 255, 255, // Irp @@ -93,6 +107,33 @@ var ( 0, 0, 0, 0, 40, 0, 0, 0, // ExtraInformation 0, 0, 0, 0, // Status } + + stackwalkBuf = []byte{ + 41, 41, 119, 69, 58, 0, 0, 0, + 12, 15, 0, 0, + 160, 29, 0, 0, + + 148, 12, 254, 189, 5, 248, 255, 255, + 175, 137, 229, 189, 5, 248, 255, 255, + 134, 47, 242, 189, 5, 248, 255, 255, + 36, 109, 255, 80, 5, 248, 255, 255, + 175, 186, 130, 79, 5, 248, 255, 255, + 160, 177, 130, 79, 5, 248, 255, 255, + 224, 104, 137, 79, 5, 248, 255, 255, + 59, 149, 250, 189, 5, 248, 255, 255, + 179, 148, 250, 189, 5, 248, 255, 255, + 59, 168, 73, 190, 5, 248, 255, 255, + 218, 136, 73, 190, 5, 248, 255, 255, + 227, 101, 73, 190, 5, 248, 255, 255, + 196, 202, 73, 190, 5, 248, 255, 255, + 85, 217, 43, 190, 5, 248, 255, 255, + + 20, 69, 114, 9, 253, 127, 0, 0, + 31, 51, 25, 6, 253, 127, 0, 0, + 105, 96, 138, 36, 246, 127, 0, 0, + 109, 37, 146, 36, 246, 127, 0, 0, + 227, 37, 146, 36, 246, 127, 0, 0, + } ) // buildMatchingFileOpEnd builds a FileOpEnd whose IRP matches the given CreateFile buffer. diff --git a/internal/etw/processors/chain.go b/internal/etw/processors/chain.go index 04c872e78..ab94b445f 100644 --- a/internal/etw/processors/chain.go +++ b/internal/etw/processors/chain.go @@ -60,12 +60,6 @@ func (c *Chain) ProcessEvent(e *event.Event) (*event.Event, error) { return evt, nil } -func (c *Chain) DequeueStackwalk(stackID uint64) { - if c.fsProcessor != nil { - c.fsProcessor.(*fsProcessor).dequeueStackwalk(stackID) - } -} - // Close closes the processor chain and frees all allocated resources. func (c *Chain) Close() error { for _, processor := range c.processors { diff --git a/internal/etw/processors/fs_windows.go b/internal/etw/processors/fs_windows.go index 20dbebbb2..a121c19e0 100644 --- a/internal/etw/processors/fs_windows.go +++ b/internal/etw/processors/fs_windows.go @@ -20,8 +20,6 @@ package processors import ( "expvar" - "sync" - "time" "github.com/rabbitstack/fibratus/pkg/config" "github.com/rabbitstack/fibratus/pkg/event" @@ -53,13 +51,6 @@ type fsProcessor struct { psnap ps.Snapshotter config *config.Config - - // buckets stores stack walk events per stack id - buckets map[uint64][]*event.Event - mu sync.Mutex - purger *time.Ticker - - quit chan struct{} } // FileInfo stores file information obtained from event state. @@ -73,23 +64,16 @@ func newFsProcessor( psnap ps.Snapshotter, config *config.Config, ) Processor { - f := &fsProcessor{ - files: make(map[uint64]*FileInfo), - hsnap: hsnap, - psnap: psnap, - config: config, - buckets: make(map[uint64][]*event.Event), - purger: time.NewTicker(time.Second * 5), - quit: make(chan struct{}, 1), + return &fsProcessor{ + files: make(map[uint64]*FileInfo), + hsnap: hsnap, + psnap: psnap, + config: config, } - - go f.purge() - - return f } func (f *fsProcessor) ProcessEvent(e *event.Event) (*event.Event, bool, error) { - if e.Category == event.File || e.IsStackWalk() { + if e.Category == event.File { evt, err := f.processEvent(e) return evt, false, err } @@ -97,7 +81,7 @@ func (f *fsProcessor) ProcessEvent(e *event.Event) (*event.Event, bool, error) { } func (*fsProcessor) Name() ProcessorType { return Fs } -func (f *fsProcessor) Close() { f.quit <- struct{}{} } +func (f *fsProcessor) Close() {} func (f *fsProcessor) getFileInfo(name string, opts uint32) *FileInfo { return &FileInfo{Name: name, Type: fs.GetFileType(name, opts)} @@ -138,20 +122,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) { } return e, f.psnap.AddMmap(e) - case event.StackWalk: - if !event.IsCurrentProcDropped(e.PID) { - f.mu.Lock() - defer f.mu.Unlock() - - // append the event to the bucket indexed by stack id - id := e.StackID() - q, ok := f.buckets[id] - if !ok { - f.buckets[id] = []*event.Event{e} - } else { - f.buckets[id] = append(q, e) - } - } case event.CreateFile: fileObject := e.Params.MustGetUint64(params.FileObject) @@ -170,30 +140,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) { e.AppendEnum(params.FileType, uint32(fileinfo.Type), fs.FileTypes) } - // attach stack walk return addresses. CreateFile events - // represent an edge case in callstack enrichment. Since - // the events are delayed until the respective FileOpEnd - // event arrives, we enable stack tracing for CreateFile - // events. When the CreateFile event is generated, we store - // it in pending IRP map. Subsequently, the stack walk event - // is put inside the queue. After FileOpEnd event arrives, - // the previous stack walk for CreateFile is popped from - // the queue and the callstack parameter attached to the - // event. - if f.config.EventSource.StackEnrichment { - f.mu.Lock() - defer f.mu.Unlock() - - id := e.StackID() - q, ok := f.buckets[id] - if ok && len(q) > 0 { - var s *event.Event - s, f.buckets[id] = q[len(q)-1], q[:len(q)-1] - callstack := s.Params.MustGetSlice(params.Callstack) - e.AppendParam(params.Callstack, params.Slice, callstack) - } - } - return e, nil case event.ReleaseFile: fileReleaseCount.Add(1) @@ -267,16 +213,6 @@ func (f *fsProcessor) processEvent(e *event.Event) (*event.Event, error) { return e, nil } -func (f *fsProcessor) dequeueStackwalk(stackID uint64) { - f.mu.Lock() - defer f.mu.Unlock() - - q, ok := f.buckets[stackID] - if ok && len(q) > 0 { - f.buckets[stackID] = q[:len(q)-1] - } -} - func (f *fsProcessor) findFile(fileKey, fileObject uint64) *FileInfo { fileinfo, ok := f.files[fileKey] if ok { @@ -307,31 +243,3 @@ func (f *fsProcessor) getMappedFile(pid uint32, addr uint64) string { defer windows.Close(process) return fs.GetDevMapper().Convert(sys.GetMappedFile(process, uintptr(addr))) } - -func (f *fsProcessor) purge() { - for { - select { - case <-f.purger.C: - f.mu.Lock() - - // evict unmatched stack traces - for id, q := range f.buckets { - s := make([]*event.Event, 0, len(q)) - for _, evt := range q { - if time.Since(evt.Timestamp) <= time.Second*30 { - s = append(s, evt) - } - } - if len(s) == 0 { - delete(f.buckets, id) - } else { - f.buckets[id] = s - } - } - - f.mu.Unlock() - case <-f.quit: - return - } - } -} diff --git a/internal/etw/trace.go b/internal/etw/trace.go index c08634db6..b8b196dcc 100644 --- a/internal/etw/trace.go +++ b/internal/etw/trace.go @@ -64,7 +64,7 @@ func initEventTraceProps(c config.EventSourceConfig) etw.EventTraceProperties { Wnode: etw.WnodeHeader{ BufferSize: uint32(unsafe.Sizeof(etw.EventTraceProperties{})) + maxTracePropsSize, Flags: etw.WnodeTraceFlagGUID, - ClientContext: 1, // QPC clock resolution + ClientContext: 2, // System time: The system time provides a time stamp that tracks changes to the system’s clock }, BufferSize: bufferSize, LogFileMode: mode, diff --git a/pkg/event/event_windows.go b/pkg/event/event_windows.go index 7c1d21bfc..7879aa5b4 100644 --- a/pkg/event/event_windows.go +++ b/pkg/event/event_windows.go @@ -76,6 +76,14 @@ func New(seq uint64, r *etw.EventRecord) *Event { return e } +// RawTimestamp returns the raw event record system timestamp. +func (e *Event) RawTimestamp() uint64 { + nsec := e.Timestamp.UnixNano() + nsec /= 100 + nsec += 116444736000000000 + return uint64(nsec) +} + func (e *Event) adjustPID() { switch e.Category { case Module: @@ -253,14 +261,6 @@ func (e *Event) IsOpenDisposition() bool { return e.IsCreateFile() && e.Params.MustGetUint32(params.FileOperation) == windows.FILE_OPEN } -// StackID returns the integer that is used to stich the callstack present in the StackWalk event. -func (e *Event) StackID() uint64 { - if e.IsCreateProcess() { - return uint64(e.Params.MustGetPpid() + e.Tid) - } - return uint64(e.PID + e.Tid) -} - // StackPID returns the process id as seen the creator // from the callstack execution perspective. For example, // the pid associated with CreateProcess events is the diff --git a/pkg/event/param_decoder_windows.go b/pkg/event/param_decoder_windows.go index 3e8ad801c..1071a8335 100644 --- a/pkg/event/param_decoder_windows.go +++ b/pkg/event/param_decoder_windows.go @@ -30,7 +30,6 @@ import ( "github.com/rabbitstack/fibratus/pkg/util/key" "github.com/rabbitstack/fibratus/pkg/util/signature" "github.com/rabbitstack/fibratus/pkg/util/utf16" - "github.com/rabbitstack/fibratus/pkg/util/va" "golang.org/x/sys/windows" "golang.org/x/sys/windows/registry" ) @@ -169,10 +168,11 @@ func (d *ParamDecoder) DecodeFile(r *etw.EventRecord, e *Event) { e.AppendParam(params.FileShareMask, params.Flags, r.ReadUint32(28), WithFlags(FileShareModeFlags)) e.AppendParam(params.FilePath, params.DOSPath, r.ConsumeUTF16String(32)) - // read create disposition/status from extended data items + // read create disposition, status, and callstack from extended data items disposition, status := r.ReadEventHeaderFileExtendedDataItems() e.AppendParam(params.NTStatus, params.Status, status) e.AppendEnum(params.FileOperation, disposition, fs.FileCreateDispositions) + e.AppendParam(params.Callstack, params.Slice, r.ReadEventHeaderFileExtendedDataItemsCallstack()) case FileOpEndID: // typedef struct _PERFINFO_FILE_OPERATION_END { // ULONG_PTR Irp; @@ -537,22 +537,10 @@ func (d *ParamDecoder) DecodeStackwalk(r *etw.EventRecord, e *Event) { // ULONG ThreadId; // PVOID Addresses[1]; // Address of captured Stack address // } STACK_WALK_EVENT_DATA, *PSTACK_WALK_EVENT_DATA; - - // Skip TimeStamp (uint64) + e.AppendParam(params.CallstackTimestamp, params.Uint64, r.ReadUint64(0)) e.AppendParam(params.ProcessID, params.PID, r.ReadUint32(8)) e.AppendParam(params.ThreadID, params.TID, r.ReadUint32(12)) - - var n uint16 - var offset uint16 = 16 - - frames := (r.BufferLen - offset) / 8 - callstack := make([]va.Address, frames) - for n < frames { - callstack[n] = va.Address(r.ReadUint64(offset)) - offset += 8 - n++ - } - e.AppendParam(params.Callstack, params.Slice, callstack) + e.AppendParam(params.Callstack, params.Slice, r.ReadCallstack(16)) } // DecodeMemory decodes memory event payloads. diff --git a/pkg/event/param_decoder_windows_test.go b/pkg/event/param_decoder_windows_test.go index 09363a3cf..b408fe2e9 100644 --- a/pkg/event/param_decoder_windows_test.go +++ b/pkg/event/param_decoder_windows_test.go @@ -147,7 +147,7 @@ func TestDecodeFile(t *testing.T) { { name: "CreateFile", opcode: CreateFileID, assertions: func(t *testing.T, e *Event) { - assert.Len(t, e.Params, 9) + assert.Len(t, e.Params, 10) assert.Equal(t, uint64(0xffffd78d965e07c8), e.Params.MustGetUint64(params.FileIrpPtr)) assert.Equal(t, uint64(0xffffd78d920b6650), e.Params.MustGetUint64(params.FileObject)) assert.Equal(t, `\Device\HarddiskVolume3\WINDOWS\AppCompat\Programs\Amcache.hve`, e.Params.MustGetString(params.FilePath)) @@ -156,6 +156,7 @@ func TestDecodeFile(t *testing.T) { assert.Equal(t, uint32(6536), e.Params.MustGetTid()) assert.Equal(t, "SUPERSEDE", e.GetParamAsString(params.FileOperation)) assert.Equal(t, "Success", e.GetParamAsString(params.NTStatus)) + assert.Contains(t, e.Params, params.Callstack) }, buf: []byte{ 200, 7, 94, 150, 141, 215, 255, 255, @@ -798,7 +799,8 @@ func TestDecodeStackWalk(t *testing.T) { e := &Event{Params: make(Params)} paramDecoder.DecodeStackwalk(r, e) - assert.Len(t, e.Params, 3) + assert.Len(t, e.Params, 4) + assert.Equal(t, uint64(250273540393), e.Params.MustGetUint64(params.CallstackTimestamp)) assert.Equal(t, uint32(3852), e.Params.MustGetPid()) assert.Equal(t, uint32(7584), e.Params.MustGetTid()) assert.Equal(t, []va.Address{0xfffff805bdfe0c94, 0xfffff805bde589af, 0xfffff805bdf22f86, 0xfffff80550ff6d24, 0xfffff8054f82baaf, 0xfffff8054f82b1a0, 0xfffff8054f8968e0, 0xfffff805bdfa953b, 0xfffff805bdfa94b3, 0xfffff805be49a83b, 0xfffff805be4988da, 0xfffff805be4965e3, 0xfffff805be49cac4, 0xfffff805be2bd955, 0x7ffd09724514, 0x7ffd0619331f, 0x7ff6248a6069, 0x7ff62492256d, 0x7ff6249225e3}, e.Params.MustGetSlice(params.Callstack)) diff --git a/pkg/event/params/params_windows.go b/pkg/event/params/params_windows.go index 19c86c030..c11cfca0b 100644 --- a/pkg/event/params/params_windows.go +++ b/pkg/event/params/params_windows.go @@ -32,6 +32,8 @@ const ( ThreadID = "tid" // Callstack field represents the thread callstack. Callstack = "callstack" + // CallstackTimestamp field identifvies the callstack timestamp parameter. + CallstackTimestamp = "callstack_timestamp" // ProcessParentID field represents the parent process identifier. ProcessParentID = "ppid" // ProcessRealParentID field presents the real parent process identifier. diff --git a/pkg/event/queue.go b/pkg/event/queue.go index ddaa6c93c..a580e253d 100644 --- a/pkg/event/queue.go +++ b/pkg/event/queue.go @@ -130,6 +130,7 @@ func (q *Queue) push(e *Event) error { if q.enqueueAlways { enqueue = true } + for _, listener := range q.listeners { enq, err := listener.ProcessEvent(e) if err != nil { @@ -139,13 +140,11 @@ func (q *Queue) push(e *Event) error { enqueue = true } } - if q.stackEnrichment && e.IsTerminateThread() { - id := uint64(e.Params.MustGetPid() + e.Params.MustGetTid()) - q.decorator.RemoveBucket(id) - } + if enqueue || len(q.listeners) == 0 { q.q <- e eventsEnqueued.Add(1) } + return nil } diff --git a/pkg/event/stackwalk.go b/pkg/event/stackwalk.go index b45f9f974..124fecbeb 100644 --- a/pkg/event/stackwalk.go +++ b/pkg/event/stackwalk.go @@ -29,11 +29,11 @@ import ( ) // maxQueueTTLPeriod specifies the maximum period -// for the events to reside in the queue. -var maxQueueTTLPeriod = time.Second * 10 +// for the events to reside in the bucket queue +var maxQueueTTLPeriod = time.Millisecond * 500 -// flusherInterval specifies the interval for the queue flushing. -var flusherInterval = time.Second * 5 +// flusherInterval specifies the interval for the queue flushing +var flusherInterval = time.Millisecond * 250 // stackwalkFlushes computes overall flushes for unmatched stackwalk events var stackwalkFlushes = expvar.NewInt("stackwalk.flushes") @@ -44,19 +44,23 @@ var stackwalkFlushesProcs = expvar.NewMap("stackwalk.flushes.procs") // stackwalkFlushesEvents computes overall flushes for unmatched stackwalks per event type var stackwalkFlushesEvents = expvar.NewMap("stackwalk.flushes.events") -// stackwalkEnqueued counts the number of enqueued events in individual buckets -var stackwalkEnqueued = expvar.NewInt("stackwalk.enqueued") +// stackwalkUnordered counts the unordered stackwalk matches +var stackwalkUnorderedMatches = expvar.NewInt("stackwalk.unordered.matches") -// stackwalkBuckets counts the number of overall stackwalk buckets per stack id -var stackwalkBuckets = expvar.NewInt("stackwalk.buckets") +// stackwalkUnorderedEnqueued counts enqueued unordered stackwalk matches +var stackwalkUnorderedEnqueued = expvar.NewInt("stackwalk.unordered.enqueued") -// StackwalkDecorator maintains a FIFO queue where events -// eligible for stack enrichment are queued. Upon arrival +// stackwalkSurrogateProcessHits counts the number of hits where the callstack is attributed from +// the surrogate thread creation event +var stackwalkSurrogateProcessHits = expvar.NewInt("stackwalk.surrogate.process.hits") + +// StackwalkDecorator maintains a per-timestamp buckets where +// events eligible for stack enrichment are queued. Upon arrival // of the respective stack walk event, the acting event is -// popped from the queue and enriched with return addresses +// popped from the bucket and enriched with return addresses // which are later subject to symbolization. type StackwalkDecorator struct { - buckets map[uint64][]*Event + buckets map[uint64]*Event q *Queue mux sync.Mutex @@ -72,7 +76,7 @@ type StackwalkDecorator struct { func NewStackwalkDecorator(q *Queue) *StackwalkDecorator { s := &StackwalkDecorator{ q: q, - buckets: make(map[uint64][]*Event), + buckets: make(map[uint64]*Event), procs: make(map[uint32]*Event), flusher: time.NewTicker(flusherInterval), quit: make(chan struct{}, 1), @@ -95,17 +99,19 @@ func (s *StackwalkDecorator) Push(e *Event) { s.procs[e.Params.MustGetPid()] = e } - // append the event to the bucket indexed by stack id - id := e.StackID() - q, ok := s.buckets[id] - if !ok { - s.buckets[id] = []*Event{e} + id := e.RawTimestamp() + + // check if we have the previous stack walk event + if evt, ok := s.buckets[id]; ok && evt.IsStackWalk() { + stackwalkUnorderedMatches.Add(1) + callstack := evt.Params.MustGetSlice(params.Callstack) + e.AppendParam(params.Callstack, params.Slice, callstack) + delete(s.buckets, id) + s.q.push(e) } else { - s.buckets[id] = append(q, e) + // store the event to the bucket indexed by stack id + s.buckets[id] = e } - - stackwalkBuckets.Set(int64(len(s.buckets))) - stackwalkEnqueued.Add(int64(len(s.buckets[id]))) } // Pop receives the stack walk event and pops the oldest @@ -116,21 +122,17 @@ func (s *StackwalkDecorator) Pop(e *Event) *Event { s.mux.Lock() defer s.mux.Unlock() - id := e.StackID() - q, ok := s.buckets[id] + id := e.Params.MustGetUint64(params.CallstackTimestamp) + evt, ok := s.buckets[id] if !ok { + // enqueue if the stack walk arrived out of order + if e.Params.MustGetPid() != currentPid { + s.buckets[id] = e + stackwalkUnorderedEnqueued.Add(1) + } return e } - - var evt *Event - if len(q) > 0 { - evt, s.buckets[id] = q[0], q[1:] - stackwalkEnqueued.Add(-int64(len(s.buckets[id]))) - } - - if evt == nil { - return e - } + delete(s.buckets, id) if evt.IsSurrogateProcess() && s.procs[evt.Params.MustGetPid()] != nil { delete(s.procs, evt.Params.MustGetPid()) @@ -152,17 +154,8 @@ func (s *StackwalkDecorator) Pop(e *Event) *Event { ev.AppendParam(params.Callstack, params.Slice, callstack) _ = s.q.push(ev) delete(s.procs, pid) - // find the most recent CreateProcess event and - // remove it from buckets as we have the callstack - qu := s.buckets[ev.StackID()] - for i := len(qu) - 1; i >= 0; i-- { - proc := qu[i] - if !proc.IsCreateProcess() && proc.Params.MustGetPid() != pid { - continue - } - qu = append(qu[:i], qu[i+1:]...) - } - s.buckets[ev.StackID()] = qu + delete(s.buckets, ev.RawTimestamp()) + stackwalkSurrogateProcessHits.Add(1) } } @@ -174,14 +167,6 @@ func (s *StackwalkDecorator) Stop() { s.quit <- struct{}{} } -// RemoveBucket removes the bucket and all enqueued events. -func (s *StackwalkDecorator) RemoveBucket(id uint64) { - s.mux.Lock() - defer s.mux.Unlock() - delete(s.buckets, id) - stackwalkBuckets.Set(int64(len(s.buckets))) -} - func (s *StackwalkDecorator) doFlush() { for { select { @@ -209,20 +194,12 @@ func (s *StackwalkDecorator) flush() []error { expired := make([]*Event, 0) - for id, q := range s.buckets { - n := make([]*Event, 0, len(q)) - for _, evt := range q { - if time.Since(evt.Timestamp) < maxQueueTTLPeriod { - n = append(n, evt) - continue - } - expired = append(expired, evt) - } - if len(n) == 0 { - delete(s.buckets, id) - } else { - s.buckets[id] = n + for id, evt := range s.buckets { + if time.Since(evt.Timestamp) < maxQueueTTLPeriod { + continue } + expired = append(expired, evt) + delete(s.buckets, id) } s.mux.Unlock() @@ -232,14 +209,15 @@ func (s *StackwalkDecorator) flush() []error { // This allows incoming Push()/Pop() calls to proceed // while the channel send may block briefly for _, evt := range expired { + if evt.IsStackWalk() { + continue + } stackwalkFlushes.Add(1) + err := s.q.push(evt) if err != nil { errs = append(errs, err) } - if stackwalkEnqueued.Value() > 0 { - stackwalkEnqueued.Add(-1) - } if evt.PS != nil { stackwalkFlushesProcs.Add(evt.PS.Name, 1) } diff --git a/pkg/event/stackwalk_test.go b/pkg/event/stackwalk_test.go index 165581a9f..56a4e3bad 100644 --- a/pkg/event/stackwalk_test.go +++ b/pkg/event/stackwalk_test.go @@ -23,180 +23,240 @@ import ( "time" "github.com/rabbitstack/fibratus/pkg/event/params" - "github.com/rabbitstack/fibratus/pkg/fs" - pstypes "github.com/rabbitstack/fibratus/pkg/ps/types" - "github.com/rabbitstack/fibratus/pkg/util/va" + "github.com/rabbitstack/fibratus/pkg/util/filetime" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestStackwalkDecorator(t *testing.T) { - q := NewQueue(50, false, true) - cd := NewStackwalkDecorator(q) - +func makeEvent(pid, tid uint32, cpu uint8, ts uint64, typ Type, pars ...Param) *Event { e := &Event{ - Type: CreateFile, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 2, - Name: "CreateFile", - Timestamp: time.Now(), - Category: File, - Params: Params{ - params.FileObject: {Name: params.FileObject, Type: params.Uint64, Value: uint64(12456738026482168384)}, - params.FilePath: {Name: params.FilePath, Type: params.UnicodeString, Value: "C:\\Windows\\system32\\user32.dll"}, - params.FileType: {Name: params.FileType, Type: params.AnsiString, Value: "file"}, - params.FileOperation: {Name: params.FileOperation, Type: params.Enum, Value: uint32(1), Enum: fs.FileCreateDispositions}, - }, + PID: pid, + Tid: tid, + CPU: cpu, + Type: typ, + Timestamp: filetime.ToEpoch(ts), + Params: Params{}, } - e1 := &Event{ - Type: CreateFile, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 3, - Name: "CreateFile", - Timestamp: time.Now(), - Category: File, - Params: Params{ - params.FileObject: {Name: params.FileObject, Type: params.Uint64, Value: uint64(12456738026482168384)}, - params.FilePath: {Name: params.FilePath, Type: params.UnicodeString, Value: "C:\\Windows\\system32\\kernel32.dll"}, - params.FileType: {Name: params.FileType, Type: params.AnsiString, Value: "file"}, - params.FileOperation: {Name: params.FileOperation, Type: params.Enum, Value: uint32(1), Enum: fs.FileCreateDispositions}, - }, + for _, par := range pars { + e.Params.Append(par.Name, par.Type, par.Value) } - cd.Push(e) - cd.Push(e1) - - assert.Len(t, cd.buckets[e.StackID()], 2) + return e +} - sw := &Event{ +func makeStackWalk(pid, tid uint32, cpu uint8, triggerTS uint64, addrs []uintptr) *Event { + e := &Event{ + PID: pid, + Tid: tid, + CPU: cpu, Type: StackWalk, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 4, - Name: "StackWalk", - Timestamp: time.Now(), - Params: Params{ - params.Callstack: {Name: params.Callstack, Type: params.Slice, Value: []va.Address{0x7ffb5eb70dc4, 0x7ffb5c191deb, 0x7ffb3138592e}}, - }, + Timestamp: filetime.ToEpoch(triggerTS + 50), // slight offset to prove we don't use StackWalk timestamp + Params: Params{}, } + e.Params.Append(params.CallstackTimestamp, params.Uint64, triggerTS) + e.Params.Append(params.ProcessID, params.PID, pid) + e.Params.Append(params.ThreadID, params.TID, tid) + e.Params.Append(params.Callstack, params.Slice, addrs) + return e +} - evt := cd.Pop(sw) - assert.Len(t, cd.buckets[e.StackID()], 1) - assert.Equal(t, CreateFile, evt.Type) - assert.True(t, evt.Params.Contains(params.Callstack)) - assert.Equal(t, "C:\\Windows\\system32\\user32.dll", evt.GetParamAsString(params.FilePath)) +func newTestDecorator() (*StackwalkDecorator, *Queue) { + q := NewQueue(100, true, true) + d := NewStackwalkDecorator(q) + return d, q } -func TestStackwalkDecoratorSurrogateProcess(t *testing.T) { - q := NewQueue(50, false, true) - cd := NewStackwalkDecorator(q) +func TestPushThenPop(t *testing.T) { + d, _ := newTestDecorator() + defer d.Stop() - e := &Event{ - Type: CreateProcess, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 2, - Name: "CreateProcess", - Timestamp: time.Now(), - Category: Process, - Params: Params{ - params.ProcessID: {Name: params.ProcessID, Type: params.PID, Value: uint32(859)}, - params.ProcessParentID: {Name: params.ProcessParentID, Type: params.PID, Value: uint32(4523)}, - params.ProcessRealParentID: {Name: params.ProcessRealParentID, Type: params.PID, Value: uint32(8846)}, - }, - } + const ts = uint64(133_000_000_000_000_000) + addrs := []uintptr{0xDEAD, 0xBEEF} + + e := makeEvent(100, 200, 3, ts, LoadModule) + d.Push(e) + + sw := makeStackWalk(100, 200, 3, ts, addrs) + got := d.Pop(sw) + + require.NotNil(t, got) + assert.Equal(t, e, got) + callstack, err := got.Params.GetSlice(params.Callstack) + require.NoError(t, err) + assert.Equal(t, addrs, callstack) +} + +func TestPopThenPush(t *testing.T) { + d, q := newTestDecorator() + defer d.Stop() - e1 := &Event{ - Type: CreateThread, - Tid: 2484, - PID: 1411, - CPU: 1, - Seq: 3, - Name: "CreateThread", - Timestamp: time.Now(), - Category: Thread, - PS: &pstypes.PS{ - Name: "svchost.exe", - Exe: `C:\WINDOWS\system32\svchost.exe`, - Cmdline: `C:\WINDOWS\system32\svchost.exe -k netsvcs -p -s seclogon`, - }, - Params: Params{ - params.ProcessID: {Name: params.ProcessID, Type: params.PID, Value: uint32(859)}, - }, + const ts = uint64(133_000_000_000_000_001) + addrs := []uintptr{0xCAFE, 0xBABE} + + sw := makeStackWalk(100, 200, 3, ts, addrs) + got := d.Pop(sw) + assert.Equal(t, sw, got, "Pop should return the stackwalk itself when trigger not yet seen") + + e := makeEvent(100, 200, 3, ts, LoadModule) + d.Push(e) + + // event should have been pushed directly to the queue with callstack attached + queued := <-q.Events() + require.NotNil(t, queued) + callstack, err := queued.Params.GetSlice(params.Callstack) + require.NoError(t, err) + assert.Equal(t, addrs, callstack) +} + +func TestSurrogateProcess(t *testing.T) { + d, _ := newTestDecorator() + defer d.Stop() + + const ts = uint64(133_000_000_000_000_010) + addrs := []uintptr{0xFACE} + + // surrogate CreateProcess event + pars := []Param{ + {Name: params.ProcessID, Type: params.PID, Value: uint32(500)}, + {Name: params.ProcessParentID, Type: params.PID, Value: uint32(1999)}, + {Name: params.ProcessRealParentID, Type: params.PID, Value: uint32(2929)}, } + e := makeEvent(500, 600, 1, ts, CreateProcess, pars...) + d.Push(e) - cd.Push(e) + sw := makeStackWalk(500, 600, 1, ts, addrs) + got := d.Pop(sw) + require.NotNil(t, got) - assert.Len(t, cd.buckets[e.StackID()], 1) - assert.Len(t, cd.buckets[e1.StackID()], 0) - assert.Len(t, cd.procs, 1) + // surrogate entry should be cleaned up after match + d.mux.Lock() + _, stillPresent := d.procs[500] + d.mux.Unlock() + assert.False(t, stillPresent, "surrogate proc entry should be deleted after pop") +} - cd.Push(e1) - assert.Len(t, cd.buckets[e1.StackID()], 1) +func TestCreateRemoteThreadForwardToSurrogate(t *testing.T) { + d, q := newTestDecorator() + defer d.Stop() - sw := &Event{ - Type: StackWalk, - Tid: 2484, - PID: 1411, - CPU: 1, - Seq: 4, - Name: "StackWalk", - Timestamp: time.Now(), - Params: Params{ - params.Callstack: {Name: params.Callstack, Type: params.Slice, Value: []va.Address{0x7ffb5eb70dc4, 0x7ffb5c191deb, 0x7ffb3138592e}}, - }, + const tsProc = uint64(133_000_000_000_000_020) + const tsThread = uint64(133_000_000_000_000_021) + addrs := []uintptr{0x1234, 0x5678} + + // park the surrogate CreateProcess event + pars := []Param{ + {Name: params.ProcessID, Type: params.PID, Value: uint32(1700)}, + {Name: params.ProcessParentID, Type: params.PID, Value: uint32(1999)}, + {Name: params.ProcessRealParentID, Type: params.PID, Value: uint32(2929)}, } + procEvt := makeEvent(1700, 800, 0, tsProc, CreateProcess, pars...) + d.Push(procEvt) + + // park the CreateRemoteThread event targeting the surrogate pid + threadEvt := makeEvent(700, 801, 0, tsThread, CreateThread, Param{Name: params.ProcessID, Type: params.PID, Value: uint32(1700)}) + d.Push(threadEvt) - thread := cd.Pop(sw) - proc := <-q.Events() - assert.Equal(t, CreateProcess, proc.Type) - assert.Equal(t, CreateThread, thread.Type) - assert.Len(t, cd.buckets[e.StackID()], 0) - assert.Len(t, cd.buckets[e1.StackID()], 0) - assert.True(t, proc.Params.Contains(params.Callstack)) - assert.True(t, thread.Params.Contains(params.Callstack)) + sw := makeStackWalk(700, 801, 0, tsThread, addrs) + d.Pop(sw) + + // the surrogate proc event should have been pushed to the queue with the callstack + queued := <-q.Events() + require.True(t, queued.IsCreateProcess()) + cs, err := queued.Params.GetSlice(params.Callstack) + require.NoError(t, err) + assert.Equal(t, addrs, cs) } -func init() { - maxQueueTTLPeriod = time.Second * 2 - flusherInterval = time.Second +func TestFlushExpiredEvents(t *testing.T) { + // shrink TTL and flusher interval for the test + origTTL := maxQueueTTLPeriod + origInterval := flusherInterval + maxQueueTTLPeriod = 100 * time.Millisecond + flusherInterval = 50 * time.Millisecond + defer func() { + maxQueueTTLPeriod = origTTL + flusherInterval = origInterval + }() + + d, q := newTestDecorator() + defer d.Stop() + + const ts = uint64(133_000_000_000_000_030) + e := makeEvent(900, 901, 0, ts, LoadModule) + d.Push(e) + + // wait for TTL + flusher to run + time.Sleep(300 * time.Millisecond) + + queued := q.Events() + require.Len(t, queued, 1, "expired event should be flushed to queue without callstack") + assert.Equal(t, e, <-queued) } -func TestStackwalkDecoratorFlush(t *testing.T) { - q := NewQueue(50, false, true) - q.RegisterListener(&DummyListener{}) - cd := NewStackwalkDecorator(q) - defer cd.Stop() +func TestFlushDoesNotExpireRecentEvents(t *testing.T) { + origTTL := maxQueueTTLPeriod + origInterval := flusherInterval + maxQueueTTLPeriod = 500 * time.Millisecond + flusherInterval = 100 * time.Millisecond + defer func() { + maxQueueTTLPeriod = origTTL + flusherInterval = origInterval + }() - e := &Event{ - Type: CreateFile, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 2, - Name: "CreateFile", - Timestamp: time.Now(), - Category: File, - Params: Params{ - params.FileObject: {Name: params.FileObject, Type: params.Uint64, Value: uint64(12456738026482168384)}, - params.FilePath: {Name: params.FilePath, Type: params.UnicodeString, Value: "C:\\Windows\\system32\\user32.dll"}, - params.FileType: {Name: params.FileType, Type: params.AnsiString, Value: "file"}, - params.FileOperation: {Name: params.FileOperation, Type: params.Enum, Value: uint32(1), Enum: fs.FileCreateDispositions}, - }, - } + d, q := newTestDecorator() + defer d.Stop() + + const ts = uint64(133_000_000_000_000_040) + e := makeEvent(902, 903, 0, ts, LoadModule) + d.Push(e) + + // wait less than TTL + time.Sleep(80 * time.Millisecond) + + assert.Empty(t, q.Events()) +} + +func TestPopSkipsSelfStackWalk(t *testing.T) { + d, _ := newTestDecorator() + defer d.Stop() + + const ts = uint64(133_000_000_000_000_050) + addrs := []uintptr{0xDEAD} + + sw := makeStackWalk(uint32(currentPid), 100, 0, ts, addrs) + d.Pop(sw) + + d.mux.Lock() + _, parked := d.buckets[ts] + d.mux.Unlock() + + assert.False(t, parked, "self-pid stackwalk should not be parked in buckets") +} + +func TestNoCallstackCrossContamination(t *testing.T) { + d, _ := newTestDecorator() + defer d.Stop() + + const tsA = uint64(133_000_000_000_000_060) + const tsB = uint64(133_000_000_000_000_061) + addrsA := []uintptr{0xAAAA} + addrsB := []uintptr{0xBBBB} + + eA := makeEvent(100, 200, 0, tsA, RegCreateKey) + eB := makeEvent(100, 200, 0, tsB, RegCreateKey) + d.Push(eA) + d.Push(eB) + + swB := makeStackWalk(100, 200, 0, tsB, addrsB) + swA := makeStackWalk(100, 200, 0, tsA, addrsA) + + gotB := d.Pop(swB) + gotA := d.Pop(swA) - cd.Push(e) - assert.Len(t, cd.buckets[e.StackID()], 1) - time.Sleep(time.Millisecond * 3100) + csA, _ := gotA.Params.GetSlice(params.Callstack) + csB, _ := gotB.Params.GetSlice(params.Callstack) - evt := <-q.Events() - assert.Len(t, cd.buckets[e.StackID()], 0) - assert.Equal(t, CreateFile, evt.Type) - assert.False(t, evt.Params.Contains(params.Callstack)) + assert.Equal(t, addrsA, csA, "event A must not receive event B's callstack") + assert.Equal(t, addrsB, csB, "event B must not receive event A's callstack") } diff --git a/pkg/sys/etw/types.go b/pkg/sys/etw/types.go index 738b1ba5b..2a217c920 100644 --- a/pkg/sys/etw/types.go +++ b/pkg/sys/etw/types.go @@ -575,21 +575,25 @@ const ( ExtTypeDisposition = 0x8000 // ExtTypeStatus represents a custom extended item type for the file system status. ExtTypeStatus = 0x8001 + // ExtTypeCallstack represents a customer extended item type for the file system event callstack. + ExtTypeCallstack = 0x8002 ) // FileExtendedDataItems stores file extended data items. type FileExtendedDataItems struct { status uint32 disposition uint32 + callstack []va.Address items []EventHeaderExtendedDataItem } // AppendEventHeaderFileExtendedDataItems appends custom file extendeed data items to the event record. -func AppendEventHeaderFileExtendedDataItems(r *EventRecord, disposition uint64, status uint32) *FileExtendedDataItems { +func AppendEventHeaderFileExtendedDataItems(r *EventRecord, disposition uint64, status uint32, callstack []va.Address) *FileExtendedDataItems { f := &FileExtendedDataItems{ disposition: uint32(disposition), status: status, - items: make([]EventHeaderExtendedDataItem, 2), + callstack: callstack, + items: make([]EventHeaderExtendedDataItem, 3), } f.items[0] = EventHeaderExtendedDataItem{ @@ -602,6 +606,15 @@ func AppendEventHeaderFileExtendedDataItems(r *EventRecord, disposition uint64, DataSize: 4, DataPtr: uint64(uintptr(unsafe.Pointer(&f.status))), } + if len(f.callstack) > 0 { + f.items[2] = EventHeaderExtendedDataItem{ + ExtType: ExtTypeCallstack, + DataSize: uint16(len(f.callstack) * 8), + DataPtr: uint64( + uintptr(unsafe.Pointer(&f.callstack[0])), + ), + } + } r.ExtendedDataCount = uint16(len(f.items)) r.ExtendedData = &f.items[0] @@ -636,6 +649,26 @@ func (r *EventRecord) ReadEventHeaderFileExtendedDataItems() (uint32, uint32) { return disposition, status } +// ReadEventHeaderFileExtendedDataItems reads the callstack from the custom file extended data items. +func (r *EventRecord) ReadEventHeaderFileExtendedDataItemsCallstack() []va.Address { + if r.ExtendedData == nil { + return nil + } + + items := unsafe.Slice(r.ExtendedData, r.ExtendedDataCount) + for _, item := range items { + if item.ExtType != ExtTypeCallstack { + continue + } + + n := int(item.DataSize) / 8 + + return unsafe.Slice((*va.Address)(unsafe.Pointer(uintptr(item.DataPtr))), n) + } + + return nil +} + // NewClassicEventID creates a new instance of classic event identifier. func NewClassicEventID(guid windows.GUID, typ uint16) ClassicEventID { return ClassicEventID{GUID: guid, Type: uint8(typ)} @@ -850,6 +883,22 @@ func (e *EventRecord) ReadSID(offset uint16, isWbemSid bool) ([]byte, uint16) { return b, end } +// ReadCallstack reads callstack return addresses at the given offset. +func (e *EventRecord) ReadCallstack(offset uint16) []va.Address { + var n uint16 + m := offset + + frames := (e.BufferLen - offset) / 8 + callstack := make([]va.Address, frames) + for n < frames { + callstack[n] = va.Address(e.ReadUint64(m)) + m += 8 + n++ + } + + return callstack +} + // EventExtendedItemStackTrace64 defines a call stack on a 64-bit machine. type EventExtendedItemStackTrace64 struct { // MatchID represents the unique identifier that you use to match