diff --git a/news/899.feature.rst b/news/899.feature.rst new file mode 100644 index 0000000000..493702beb0 --- /dev/null +++ b/news/899.feature.rst @@ -0,0 +1,4 @@ +Add ``allocation_timestamps`` support to ``Tracker`` and a ``speedscope`` +output format for ``memray transform``. Speedscope exports now fall back to +temporal allocation records to preserve chronological ordering when a capture +does not include per-allocation timestamps. diff --git a/src/memray/_ipython/flamegraph.py b/src/memray/_ipython/flamegraph.py index 31f0aee22f..2aabef9f55 100644 --- a/src/memray/_ipython/flamegraph.py +++ b/src/memray/_ipython/flamegraph.py @@ -20,6 +20,11 @@ from memray.commands.common import warn_if_not_enough_symbols from memray.reporters.flamegraph import FlameGraphReporter +_typed_cell_magic = cast( + Callable[[Callable[..., Any]], Callable[..., Any]], + cell_magic, +) + TEMPLATE = """\ from memray import Tracker, FileFormat with Tracker( @@ -133,7 +138,7 @@ def argument_parser() -> argparse.ArgumentParser: @magics_class class FlamegraphMagics(Magics): - @cell_magic # type: ignore + @_typed_cell_magic def memray_flamegraph(self, line: str, cell: str) -> None: """Memory profile the code in the cell and display a flame graph.""" if self.shell is None: diff --git a/src/memray/_memray.pyi b/src/memray/_memray.pyi index e25e04c7e2..957e626bf4 100644 --- a/src/memray/_memray.pyi +++ b/src/memray/_memray.pyi @@ -48,6 +48,8 @@ class AllocationRecord: @property def native_segment_generation(self) -> int: ... @property + def timestamp_us(self) -> int: ... + @property def thread_name(self) -> str: ... def hybrid_stack_trace( self, @@ -245,6 +247,7 @@ class Tracker: memory_interval_ms: int = ..., follow_fork: bool = ..., trace_python_allocators: bool = ..., + allocation_timestamps: bool = ..., file_format: FileFormat = ..., reference_tracking: bool = ..., track_object_lifetimes: bool = ..., @@ -258,6 +261,7 @@ class Tracker: memory_interval_ms: int = ..., follow_fork: bool = ..., trace_python_allocators: bool = ..., + allocation_timestamps: bool = ..., file_format: FileFormat = ..., reference_tracking: bool = ..., ) -> None: ... @@ -335,6 +339,7 @@ class RecordWriterTestHarness: file_path: str, native_traces: bool = False, trace_python_allocators: bool = False, + allocation_timestamps: bool = False, file_format: FileFormat = FileFormat.ALL_ALLOCATIONS, main_tid: int = 1, skipped_frames: int = 0, @@ -358,6 +363,7 @@ class RecordWriterTestHarness: size: int, allocator: int, native_frame_id: int = 0, + timestamp_us: int = 0, ) -> bool: ... def write_frame_push( self, diff --git a/src/memray/_memray.pyx b/src/memray/_memray.pyx index eee0661463..d7e43cad96 100644 --- a/src/memray/_memray.pyx +++ b/src/memray/_memray.pyx @@ -337,6 +337,10 @@ cdef class AllocationRecord: def native_segment_generation(self): return self._tuple[7] + @property + def timestamp_us(self): + return self._tuple[8] + @property def thread_name(self): if self.tid == -1: @@ -385,7 +389,7 @@ cdef class AllocationRecord: def __repr__(self): return (f"AllocationRecord") + f"allocations={self.n_allocations}, timestamp_us={self.timestamp_us}>") @cython.freelist(1024) @@ -732,6 +736,8 @@ cdef class Tracker: created during the tracking session and still alive at the end (or in other words, what objects are leaked by the code being tracked). Defaults to False. + allocation_timestamps (bool): Whether or not to record a timestamp for + every allocation and deallocation event. Defaults to False. follow_fork (bool): Whether or not to continue tracking in a subprocess that is forked from the tracked process (see :ref:`Tracking across Forks`). Defaults to False. @@ -752,6 +758,7 @@ cdef class Tracker: cdef unsigned int _memory_interval_ms cdef bool _follow_fork cdef bool _trace_python_allocators + cdef bool _allocation_timestamps cdef object _previous_profile_func cdef object _previous_thread_profile_func cdef unique_ptr[RecordWriter] _writer @@ -780,6 +787,7 @@ cdef class Tracker: def __cinit__(self, object file_name=None, *, object destination=None, bool native_traces=False, unsigned int memory_interval_ms = 10, bool follow_fork=False, bool trace_python_allocators=False, + bool allocation_timestamps=False, bool track_object_lifetimes=False, FileFormat file_format=FileFormat.ALL_ALLOCATIONS): if (file_name, destination).count(None) != 1: @@ -798,6 +806,7 @@ cdef class Tracker: self._memory_interval_ms = memory_interval_ms self._follow_fork = follow_fork self._trace_python_allocators = trace_python_allocators + self._allocation_timestamps = allocation_timestamps if file_name is not None: destination = FileDestination(path=file_name) @@ -809,6 +818,11 @@ cdef class Tracker: if file_format != FileFormat.ALL_ALLOCATIONS: raise RuntimeError("AGGREGATED_ALLOCATIONS requires an output file") + if allocation_timestamps and file_format != FileFormat.ALL_ALLOCATIONS: + raise RuntimeError( + "allocation_timestamps requires FileFormat.ALL_ALLOCATIONS" + ) + self._writer = move( createRecordWriter( move(self._make_writer(destination)), @@ -817,6 +831,7 @@ cdef class Tracker: file_format, trace_python_allocators, track_object_lifetimes, + allocation_timestamps, ) ) @@ -861,6 +876,7 @@ cdef class Tracker: self._follow_fork, self._trace_python_allocators, self._track_object_lifetimes, + self._allocation_timestamps, ) return self @@ -956,6 +972,7 @@ cdef _create_metadata(header, peak_memory): has_native_traces=header["native_traces"], trace_python_allocators=header["trace_python_allocators"], file_format=FileFormat(header["file_format"]), + has_allocation_timestamps=header["has_allocation_timestamps"], ) @@ -1806,6 +1823,7 @@ cdef class RecordWriterTestHarness: str file_path, bool native_traces=False, bool trace_python_allocators=False, + bool allocation_timestamps=False, track_object_lifetimes=False, records.FileFormat file_format=records.FileFormat.ALL_ALLOCATIONS, records.thread_id_t main_tid=1, @@ -1828,6 +1846,7 @@ cdef class RecordWriterTestHarness: file_format, trace_python_allocators, track_object_lifetimes, + allocation_timestamps, ) self._writer.get().setMainTidAndSkippedFrames(main_tid, skipped_frames) self.write_header(False) @@ -1892,13 +1911,15 @@ cdef class RecordWriterTestHarness: def write_allocation_record(self, records.thread_id_t tid, uintptr_t address, size_t size, unsigned char allocator, - size_t native_frame_id=0) -> bool: + size_t native_frame_id=0, + uint64_t timestamp_us=0) -> bool: """Write a native allocation record to the file.""" cdef records.AllocationRecord record record.address = address record.size = size record.allocator = allocator record.native_frame_id = native_frame_id + record.timestamp_us = timestamp_us return self._writer.get().writeThreadSpecificRecord(tid, record) def write_frame_push( diff --git a/src/memray/_memray/record_reader.cpp b/src/memray/_memray/record_reader.cpp index 936255b858..64eecd4d15 100644 --- a/src/memray/_memray/record_reader.cpp +++ b/src/memray/_memray/record_reader.cpp @@ -98,7 +98,10 @@ RecordReader::readHeader(HeaderRecord& header) sizeof(header.trace_python_allocators)) || !d_input->read( reinterpret_cast(&header.track_object_lifetimes), - sizeof(header.track_object_lifetimes))) + sizeof(header.track_object_lifetimes)) + || !d_input->read( + reinterpret_cast(&header.has_allocation_timestamps), + sizeof(header.has_allocation_timestamps))) { throw std::ios_base::failure("Failed to read input file header."); } @@ -318,6 +321,17 @@ RecordReader::parseAllocationRecord(AllocationRecord* record, unsigned int flags return false; } + if (d_header.has_allocation_timestamps) { + uint64_t delta_us = 0; + if (!readVarint(&delta_us)) { + return false; + } + d_last.allocation_timestamp_us += delta_us; + record->timestamp_us = d_last.allocation_timestamp_us; + } else { + record->timestamp_us = 0; + } + return true; } @@ -342,6 +356,7 @@ RecordReader::processAllocationRecord(const AllocationRecord& record) d_latest_allocation.native_segment_generation = 0; } d_latest_allocation.n_allocations = 1; + d_latest_allocation.timestamp_us = record.timestamp_us; return true; } @@ -1151,7 +1166,7 @@ RecordReader::dumpAllRecords() " n_allocations=%zd n_frames=%zd start_time=%lld end_time=%lld" " pid=%d main_tid=%lu skipped_frames_on_main_tid=%zd" " command_line=%s python_allocator=%s trace_python_allocators=%s" - " track_object_lifetimes=%s\n", + " track_object_lifetimes=%s has_allocation_timestamps=%s\n", (int)sizeof(d_header.magic), d_header.magic, d_header.version, @@ -1168,7 +1183,8 @@ RecordReader::dumpAllRecords() d_header.command_line.c_str(), python_allocator.c_str(), d_header.trace_python_allocators ? "true" : "false", - d_header.track_object_lifetimes ? "true" : "false"); + d_header.track_object_lifetimes ? "true" : "false", + d_header.has_allocation_timestamps ? "true" : "false"); switch (d_header.file_format) { case FileFormat::ALL_ALLOCATIONS: @@ -1222,11 +1238,21 @@ RecordReader::dumpAllRecordsFromAllAllocationsFile() ""; allocator = unknownAllocator.c_str(); } - printf("address=%p size=%zd allocator=%s native_frame_id=%zd\n", - (void*)record.address, - record.size, - allocator, - record.native_frame_id); + if (d_header.has_allocation_timestamps) { + printf("address=%p size=%zd allocator=%s native_frame_id=%zd timestamp_us=%" PRIu64 + "\n", + (void*)record.address, + record.size, + allocator, + record.native_frame_id, + record.timestamp_us); + } else { + printf("address=%p size=%zd allocator=%s native_frame_id=%zd\n", + (void*)record.address, + record.size, + allocator, + record.native_frame_id); + } } break; case RecordType::FRAME_PUSH: { printf("FRAME_PUSH "); diff --git a/src/memray/_memray/record_writer.cpp b/src/memray/_memray/record_writer.cpp index 390552e779..8aeade9437 100644 --- a/src/memray/_memray/record_writer.cpp +++ b/src/memray/_memray/record_writer.cpp @@ -65,7 +65,8 @@ class StreamingRecordWriter : public RecordWriter const std::string& command_line, bool native_traces, bool trace_python_allocators, - bool track_object_lifetimes); + bool track_object_lifetimes, + bool has_allocation_timestamps); StreamingRecordWriter(StreamingRecordWriter& other) = delete; StreamingRecordWriter(StreamingRecordWriter&& other) = delete; @@ -128,7 +129,8 @@ class AggregatingRecordWriter : public RecordWriter const std::string& command_line, bool native_traces, bool trace_python_allocators, - bool track_object_lifetimes); + bool track_object_lifetimes, + bool has_allocation_timestamps); AggregatingRecordWriter(StreamingRecordWriter& other) = delete; AggregatingRecordWriter(StreamingRecordWriter&& other) = delete; @@ -181,8 +183,10 @@ createRecordWriter( bool native_traces, FileFormat file_format, bool trace_python_allocators, - bool track_object_lifetimes) + bool track_object_lifetimes, + bool has_allocation_timestamps) { + has_allocation_timestamps = file_format == FileFormat::ALL_ALLOCATIONS && has_allocation_timestamps; switch (file_format) { case FileFormat::ALL_ALLOCATIONS: return std::make_unique( @@ -190,14 +194,16 @@ createRecordWriter( command_line, native_traces, trace_python_allocators, - track_object_lifetimes); + track_object_lifetimes, + has_allocation_timestamps); case FileFormat::AGGREGATED_ALLOCATIONS: return std::make_unique( std::move(sink), command_line, native_traces, trace_python_allocators, - track_object_lifetimes); + track_object_lifetimes, + has_allocation_timestamps); default: throw std::runtime_error("Invalid file format enumerator"); } @@ -208,7 +214,8 @@ StreamingRecordWriter::StreamingRecordWriter( const std::string& command_line, bool native_traces, bool trace_python_allocators, - bool track_object_lifetimes) + bool track_object_lifetimes, + bool has_allocation_timestamps) : RecordWriter(std::move(sink)) , d_stats({0, 0, duration_cast(system_clock::now().time_since_epoch()).count()}) { @@ -225,7 +232,8 @@ StreamingRecordWriter::StreamingRecordWriter( 0, getPythonAllocator(), trace_python_allocators, - track_object_lifetimes}; + track_object_lifetimes, + has_allocation_timestamps}; strncpy(d_header.magic, MAGIC, sizeof(d_header.magic)); } @@ -398,6 +406,8 @@ StreamingRecordWriter::writeThreadSpecificRecord(thread_id_t tid, const Allocati // [native_id] - Delta-encoded native_frame_id (only if native traces enabled // AND not a simple deallocator) // [size] - Varint-encoded size (only if not a simple deallocator) + // [timestamp] - Delta-encoded timestamp in microseconds since tracker start + // (only if allocation timestamps are enabled) // // Example sequences: // - Cached malloc(256): [0b10011110] [size:256] @@ -415,15 +425,24 @@ StreamingRecordWriter::writeThreadSpecificRecord(thread_id_t tid, const Allocati int pointer_cache_index = pointerCacheIndex(record.address); token |= (pointer_cache_index & 0x0f) << 3; - return writeSimpleType(token) - && (pointer_cache_index != -1 - || writeIntegralDelta(&d_last.data_pointer, record.address >> 3)) - && (allocator_id < 8 || writeSimpleType(record.allocator)) - && (!d_header.native_traces - || hooks::allocatorKind(record.allocator) == hooks::AllocatorKind::SIMPLE_DEALLOCATOR - || writeIntegralDelta(&d_last.native_frame_id, record.native_frame_id)) - && (hooks::allocatorKind(record.allocator) == hooks::AllocatorKind::SIMPLE_DEALLOCATOR - || writeVarint(record.size)); + if (!writeSimpleType(token) + || (pointer_cache_index == -1 && !writeIntegralDelta(&d_last.data_pointer, record.address >> 3)) + || (allocator_id >= 8 && !writeSimpleType(record.allocator)) + || (d_header.native_traces + && hooks::allocatorKind(record.allocator) != hooks::AllocatorKind::SIMPLE_DEALLOCATOR + && !writeIntegralDelta(&d_last.native_frame_id, record.native_frame_id)) + || (hooks::allocatorKind(record.allocator) != hooks::AllocatorKind::SIMPLE_DEALLOCATOR + && !writeVarint(record.size))) + { + return false; + } + if (d_header.has_allocation_timestamps) { + if (!writeVarint(record.timestamp_us - d_last.allocation_timestamp_us)) { + return false; + } + d_last.allocation_timestamp_us = record.timestamp_us; + } + return true; } bool @@ -489,7 +508,8 @@ RecordWriter::writeHeaderCommon(const HeaderRecord& header) or !writeString(header.command_line.c_str()) or !writeSimpleType(header.pid) or !writeSimpleType(header.main_tid) or !writeSimpleType(header.skipped_frames_on_main_tid) or !writeSimpleType(header.python_allocator) or !writeSimpleType(header.trace_python_allocators) - or !writeSimpleType(header.track_object_lifetimes)) + or !writeSimpleType(header.track_object_lifetimes) + or !writeSimpleType(header.has_allocation_timestamps)) { return false; } @@ -517,7 +537,8 @@ StreamingRecordWriter::cloneInChildProcess() d_header.command_line, d_header.native_traces, d_header.trace_python_allocators, - d_header.track_object_lifetimes); + d_header.track_object_lifetimes, + d_header.has_allocation_timestamps); } AggregatingRecordWriter::AggregatingRecordWriter( @@ -525,7 +546,8 @@ AggregatingRecordWriter::AggregatingRecordWriter( const std::string& command_line, bool native_traces, bool trace_python_allocators, - bool track_object_lifetimes) + bool track_object_lifetimes, + bool has_allocation_timestamps) : RecordWriter(std::move(sink)) { memcpy(d_header.magic, MAGIC, sizeof(d_header.magic)); @@ -538,6 +560,7 @@ AggregatingRecordWriter::AggregatingRecordWriter( d_header.python_allocator = getPythonAllocator(); d_header.trace_python_allocators = trace_python_allocators; d_header.track_object_lifetimes = track_object_lifetimes; + d_header.has_allocation_timestamps = has_allocation_timestamps; d_stats.start_time = duration_cast(system_clock::now().time_since_epoch()).count(); } @@ -675,7 +698,8 @@ AggregatingRecordWriter::cloneInChildProcess() d_header.command_line, d_header.native_traces, d_header.trace_python_allocators, - d_header.track_object_lifetimes); + d_header.track_object_lifetimes, + d_header.has_allocation_timestamps); } bool @@ -757,6 +781,7 @@ AggregatingRecordWriter::writeThreadSpecificRecord(thread_id_t tid, const Alloca } allocation.native_segment_generation = d_mappings_by_generation.size(); allocation.n_allocations = 1; + allocation.timestamp_us = record.timestamp_us; d_high_water_mark_aggregator.addAllocation(allocation); return true; } diff --git a/src/memray/_memray/record_writer.h b/src/memray/_memray/record_writer.h index a8d3f34d10..5d32e41129 100644 --- a/src/memray/_memray/record_writer.h +++ b/src/memray/_memray/record_writer.h @@ -64,7 +64,8 @@ createRecordWriter( bool native_traces, FileFormat file_format, bool trace_python_allocators, - bool track_object_lifetimes); + bool track_object_lifetimes, + bool has_allocation_timestamps); template bool inline RecordWriter::writeSimpleType(const T& item) diff --git a/src/memray/_memray/record_writer.pxd b/src/memray/_memray/record_writer.pxd index 4f102501d0..a8928b757e 100644 --- a/src/memray/_memray/record_writer.pxd +++ b/src/memray/_memray/record_writer.pxd @@ -55,4 +55,5 @@ cdef extern from "record_writer.h" namespace "memray::tracking_api": FileFormat file_format, bool trace_python_allocators, bool track_object_lifetimes, + bool has_allocation_timestamps, ) except+ diff --git a/src/memray/_memray/records.cpp b/src/memray/_memray/records.cpp index 2f182a6b29..fb87ee706f 100644 --- a/src/memray/_memray/records.cpp +++ b/src/memray/_memray/records.cpp @@ -15,7 +15,7 @@ Allocation::toPythonObject() const // operations speeds up the parsing moderately. Additionally, some of // the types we need to convert from are not supported by PyBuildValue // natively. - PyObject* tuple = PyTuple_New(8); + PyObject* tuple = PyTuple_New(9); if (tuple == nullptr) { return nullptr; } @@ -51,6 +51,9 @@ Allocation::toPythonObject() const elem = PyLong_FromSize_t(native_segment_generation); __CHECK_ERROR(elem); PyTuple_SET_ITEM(tuple, 7, elem); + elem = PyLong_FromUnsignedLongLong(timestamp_us); + __CHECK_ERROR(elem); + PyTuple_SET_ITEM(tuple, 8, elem); #undef __CHECK_ERROR return tuple; } @@ -67,6 +70,7 @@ AggregatedAllocation::contributionToHighWaterMark() const frame_index, native_segment_generation, n_allocations_in_high_water_mark, + 0, }; } @@ -82,6 +86,7 @@ AggregatedAllocation::contributionToLeaks() const frame_index, native_segment_generation, n_allocations_leaked, + 0, }; } diff --git a/src/memray/_memray/records.h b/src/memray/_memray/records.h index 310e9506c6..5b837f08b4 100644 --- a/src/memray/_memray/records.h +++ b/src/memray/_memray/records.h @@ -18,7 +18,7 @@ namespace memray::tracking_api { extern const char MAGIC[7]; // Value assigned in records.cpp -const int CURRENT_HEADER_VERSION = 12; +const int CURRENT_HEADER_VERSION = 13; using frame_id_t = size_t; using thread_id_t = unsigned long; @@ -113,6 +113,7 @@ struct HeaderRecord PythonAllocatorType python_allocator{}; bool trace_python_allocators{}; bool track_object_lifetimes{false}; + bool has_allocation_timestamps{false}; }; struct MemoryRecord @@ -134,6 +135,7 @@ struct AllocationRecord size_t size; hooks::Allocator allocator; frame_id_t native_frame_id{0}; + uint64_t timestamp_us{0}; }; struct Allocation @@ -146,6 +148,7 @@ struct Allocation size_t frame_index{0}; size_t native_segment_generation{0}; size_t n_allocations{1}; + uint64_t timestamp_us{0}; PyObject* toPythonObject() const; }; @@ -308,6 +311,7 @@ struct DeltaEncodedFields uintptr_t data_pointer{}; frame_id_t native_frame_id{}; int code_firstlineno{}; + uint64_t allocation_timestamp_us{}; }; template diff --git a/src/memray/_memray/records.pxd b/src/memray/_memray/records.pxd index 3544041831..94a4bc6f04 100644 --- a/src/memray/_memray/records.pxd +++ b/src/memray/_memray/records.pxd @@ -39,6 +39,7 @@ cdef extern from "records.h" namespace "memray::tracking_api": size_t size Allocator allocator frame_id_t native_frame_id + uint64_t timestamp_us struct Frame: code_object_id_t code_object_id @@ -89,6 +90,7 @@ cdef extern from "records.h" namespace "memray::tracking_api": int python_allocator bool trace_python_allocators bool track_object_lifetimes + bool has_allocation_timestamps cdef cppclass Allocation: thread_id_t tid @@ -99,6 +101,7 @@ cdef extern from "records.h" namespace "memray::tracking_api": size_t frame_index size_t native_segment_generation size_t n_allocations + uint64_t timestamp_us object toPythonObject() diff --git a/src/memray/_memray/tracking_api.cpp b/src/memray/_memray/tracking_api.cpp index e63bfaffae..3dc1692e2b 100644 --- a/src/memray/_memray/tracking_api.cpp +++ b/src/memray/_memray/tracking_api.cpp @@ -771,13 +771,15 @@ Tracker::Tracker( unsigned int memory_interval, bool follow_fork, bool trace_python_allocators, - bool reference_tracking) + bool reference_tracking, + bool allocation_timestamps) : d_writer(std::move(record_writer)) , d_unwind_native_frames(native_traces) , d_memory_interval(memory_interval) , d_follow_fork(follow_fork) , d_trace_python_allocators(trace_python_allocators) , d_reference_tracking(reference_tracking) +, d_allocation_timestamps(allocation_timestamps) { static std::once_flag once; call_once(once, [] { @@ -1067,7 +1069,8 @@ Tracker::childFork() old_tracker->d_memory_interval, old_tracker->d_follow_fork, old_tracker->d_trace_python_allocators, - old_tracker->d_reference_tracking)); + old_tracker->d_reference_tracking, + old_tracker->d_allocation_timestamps)); StopTheWorldGuard stop_the_world; std::unique_lock lock(*s_mutex); @@ -1122,13 +1125,23 @@ Tracker::trackAllocationImpl( return d_writer->writeRecord(UnresolvedNativeFrame{ip, index}); }); } - AllocationRecord record{reinterpret_cast(ptr), size, func, native_index}; + AllocationRecord record{ + reinterpret_cast(ptr), + size, + func, + native_index, + d_allocation_timestamps ? currentTimestampUs() : 0}; if (!d_writer->writeThreadSpecificRecord(thread_id(), record)) { std::cerr << "Failed to write output, deactivating tracking" << std::endl; deactivate(); } } else { - AllocationRecord record{reinterpret_cast(ptr), size, func}; + AllocationRecord record{ + reinterpret_cast(ptr), + size, + func, + 0, + d_allocation_timestamps ? currentTimestampUs() : 0}; if (!d_writer->writeThreadSpecificRecord(thread_id(), record)) { std::cerr << "Failed to write output, deactivating tracking" << std::endl; deactivate(); @@ -1140,7 +1153,12 @@ void Tracker::trackDeallocationImpl(void* ptr, size_t size, hooks::Allocator func) { registerCachedThreadName(); - AllocationRecord record{reinterpret_cast(ptr), size, func}; + AllocationRecord record{ + reinterpret_cast(ptr), + size, + func, + 0, + d_allocation_timestamps ? currentTimestampUs() : 0}; if (!d_writer->writeThreadSpecificRecord(thread_id(), record)) { std::cerr << "Failed to write output, deactivating tracking" << std::endl; deactivate(); @@ -1441,7 +1459,8 @@ Tracker::createTracker( unsigned int memory_interval, bool follow_fork, bool trace_python_allocators, - bool reference_tracking) + bool reference_tracking, + bool allocation_timestamps) { s_instance_owner.reset(new Tracker( std::move(record_writer), @@ -1449,7 +1468,8 @@ Tracker::createTracker( memory_interval, follow_fork, trace_python_allocators, - reference_tracking)); + reference_tracking, + allocation_timestamps)); StopTheWorldGuard stop_the_world; std::unique_lock lock(*s_mutex); @@ -1472,6 +1492,14 @@ Tracker::getTracker() return s_instance; } +uint64_t +Tracker::currentTimestampUs() const +{ + return std::chrono::duration_cast( + std::chrono::steady_clock::now() - d_monotonic_start) + .count(); +} + static struct { PyMemAllocatorEx raw; diff --git a/src/memray/_memray/tracking_api.h b/src/memray/_memray/tracking_api.h index e9febf2ac2..dd91d87469 100644 --- a/src/memray/_memray/tracking_api.h +++ b/src/memray/_memray/tracking_api.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -239,7 +240,8 @@ class Tracker unsigned int memory_interval, bool follow_fork, bool trace_python_allocators, - bool reference_tracking); + bool reference_tracking, + bool allocation_timestamps); static PyObject* destroyTracker(); static Tracker* getTracker(); @@ -442,8 +444,10 @@ class Tracker const bool d_follow_fork; const bool d_trace_python_allocators; const bool d_reference_tracking; + const bool d_allocation_timestamps; linker::SymbolPatcher d_patcher; std::unique_ptr d_background_thread; + const std::chrono::steady_clock::time_point d_monotonic_start{std::chrono::steady_clock::now()}; std::unordered_map d_code_object_cache; code_object_id_t d_next_code_object_id{1}; @@ -476,9 +480,11 @@ class Tracker unsigned int memory_interval, bool follow_fork, bool trace_python_allocators, - bool reference_tracking); + bool reference_tracking, + bool allocation_timestamps); static bool areNativeTracesEnabled(); + uint64_t currentTimestampUs() const; }; } // namespace memray::tracking_api diff --git a/src/memray/_memray/tracking_api.pxd b/src/memray/_memray/tracking_api.pxd index 2a748d4b0e..82efbd8575 100644 --- a/src/memray/_memray/tracking_api.pxd +++ b/src/memray/_memray/tracking_api.pxd @@ -23,6 +23,7 @@ cdef extern from "tracking_api.h" namespace "memray::tracking_api": bool follow_fork, bool trace_pymalloc, bool reference_tracking, + bool allocation_timestamps, ) except+ @staticmethod diff --git a/src/memray/_metadata.py b/src/memray/_metadata.py index 96d6416b86..3dba100441 100644 --- a/src/memray/_metadata.py +++ b/src/memray/_metadata.py @@ -22,3 +22,4 @@ class Metadata: has_native_traces: bool trace_python_allocators: bool file_format: FileFormat + has_allocation_timestamps: bool = False diff --git a/src/memray/commands/run.py b/src/memray/commands/run.py index 4f006cf1ca..5092f84b87 100644 --- a/src/memray/commands/run.py +++ b/src/memray/commands/run.py @@ -48,6 +48,8 @@ def _run_tracker( kwargs["follow_fork"] = True if args.trace_python_allocators: kwargs["trace_python_allocators"] = True + if args.allocation_timestamps: + kwargs["allocation_timestamps"] = True if args.aggregate: kwargs["file_format"] = FileFormat.AGGREGATED_ALLOCATIONS tracker = Tracker(destination=destination, native_traces=args.native, **kwargs) @@ -84,6 +86,7 @@ def _child_process( port: int, native: bool, trace_python_allocators: bool, + allocation_timestamps: bool, run_as_module: bool, run_as_cmd: bool, quiet: bool, @@ -93,6 +96,7 @@ def _child_process( args = argparse.Namespace( native=native, trace_python_allocators=trace_python_allocators, + allocation_timestamps=allocation_timestamps, follow_fork=False, aggregate=False, run_as_module=run_as_module, @@ -112,7 +116,7 @@ def _run_child_process_and_attach(args: argparse.Namespace) -> None: raise MemrayCommandError(f"Invalid port: {port}", exit_code=1) arguments = ( - f"{port},{args.native},{args.trace_python_allocators}," + f"{port},{args.native},{args.trace_python_allocators},{args.allocation_timestamps}," f"{args.run_as_module},{args.run_as_cmd},{args.quiet}," f"{args.script!r},{args.script_args}" ) @@ -252,6 +256,12 @@ def prepare_parser(self, parser: argparse.ArgumentParser) -> None: help="Record allocations made by the pymalloc allocator", default=False, ) + parser.add_argument( + "--allocation-timestamps", + action="store_true", + help="Record a timestamp for every allocation and deallocation event", + default=False, + ) parser.add_argument( "-q", "--quiet", @@ -325,6 +335,8 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None parser.error("--follow-fork cannot be used with the live TUI") if args.aggregate and (args.live_mode or args.live_remote_mode): parser.error("--aggregate cannot be used with the live TUI") + if args.aggregate and args.allocation_timestamps: + parser.error("--allocation-timestamps requires non-aggregated output") with contextlib.suppress(OSError): if args.run_as_cmd and pathlib.Path(args.script).exists(): parser.error("remove the option -c to run a file") diff --git a/src/memray/commands/transform.py b/src/memray/commands/transform.py index b740807a91..3b6f324d8f 100644 --- a/src/memray/commands/transform.py +++ b/src/memray/commands/transform.py @@ -1,14 +1,23 @@ import argparse import importlib.util +import os import shutil import sys +from pathlib import Path +from typing import Callable +from typing import Optional +from typing import cast from rich import print as pprint +from memray import FileReader from memray._errors import MemrayCommandError +from memray._memray import FileFormat from ..reporters.transform import TransformReporter from .common import HighWatermarkCommand +from .common import warn_if_file_is_not_aggregated_and_is_too_big +from .common import warn_if_not_enough_symbols class TransformCommand(HighWatermarkCommand): @@ -72,3 +81,116 @@ def post_run_gprof2dot(self) -> None: print() print("To generate a graph from the transform file, run for example:") print(f"{command} -f json {self.output_file} | dot -Tpng -o output.png") + + def write_report( + self, + result_path: Path, + output_file: Path, + show_memory_leaks: bool, + temporary_allocation_threshold: int, + merge_threads: Optional[bool] = None, + inverted: Optional[bool] = None, + temporal: bool = False, + max_memory_records: Optional[int] = None, + no_web: bool = False, + ) -> None: + if self.reporter_name != "speedscope": + return super().write_report( + result_path=result_path, + output_file=output_file, + show_memory_leaks=show_memory_leaks, + temporary_allocation_threshold=temporary_allocation_threshold, + merge_threads=merge_threads, + inverted=inverted, + temporal=temporal, + max_memory_records=max_memory_records, + no_web=no_web, + ) + + try: + reporter_factory = cast( + Callable[..., TransformReporter], self.reporter_factory + ) + if max_memory_records is None: + reader = FileReader(os.fspath(result_path), report_progress=True) + else: + reader = FileReader( + os.fspath(result_path), + report_progress=True, + max_memory_records=max_memory_records, + ) + merge_threads = True if merge_threads is None else merge_threads + inverted = False if inverted is None else inverted + + native_traces = reader.metadata.has_native_traces + if native_traces: + warn_if_not_enough_symbols() + + if not temporal and temporary_allocation_threshold < 0: + warn_if_file_is_not_aggregated_and_is_too_big(reader, result_path) + + memory_records = tuple(reader.get_memory_snapshots()) + + use_temporal_fallback = ( + reader.metadata.file_format == FileFormat.ALL_ALLOCATIONS + and not reader.metadata.has_allocation_timestamps + and temporary_allocation_threshold < 0 + ) + + if use_temporal_fallback: + if show_memory_leaks: + temporal_allocations = reader.get_temporal_allocation_records( + merge_threads=merge_threads + ) + reporter = reporter_factory( + temporal_allocations, + memory_records=memory_records, + native_traces=native_traces, + ) + else: + ( + temporal_allocations, + high_water_mark_by_snapshot, + ) = reader.get_temporal_high_water_mark_allocation_records( + merge_threads=merge_threads + ) + reporter = reporter_factory( + temporal_allocations, + memory_records=memory_records, + native_traces=native_traces, + high_water_mark_by_snapshot=high_water_mark_by_snapshot, + ) + else: + if show_memory_leaks: + snapshot_allocations = reader.get_leaked_allocation_records( + merge_threads=merge_threads + ) + elif temporary_allocation_threshold >= 0: + snapshot_allocations = reader.get_temporary_allocation_records( + threshold=temporary_allocation_threshold, + merge_threads=merge_threads, + ) + else: + snapshot_allocations = reader.get_high_watermark_allocation_records( + merge_threads=merge_threads + ) + reporter = reporter_factory( + snapshot_allocations, + memory_records=memory_records, + native_traces=native_traces, + ) + except OSError as e: + raise MemrayCommandError( + f"Failed to parse allocation records in {result_path}\nReason: {e}", + exit_code=1, + ) + + with open(os.fspath(output_file.expanduser()), "w") as f: + reporter.render( + outfile=f, + metadata=reader.metadata, + show_memory_leaks=show_memory_leaks, + merge_threads=merge_threads, + inverted=inverted, + no_web=no_web, + ) diff --git a/src/memray/reporters/transform.py b/src/memray/reporters/transform.py index 323902264f..46686fbbdd 100644 --- a/src/memray/reporters/transform.py +++ b/src/memray/reporters/transform.py @@ -4,22 +4,30 @@ from typing import Dict from typing import Iterable from typing import List +from typing import Optional from typing import TextIO from typing import Tuple +from typing import Union +from typing import cast from memray import AllocationRecord from memray import AllocatorType from memray import MemorySnapshot from memray import Metadata +from memray._memray import TemporalAllocationRecord +from memray._version import __version__ from memray.reporters.common import format_thread_name Location = Tuple[str, str] +FrameLocation = Tuple[str, str, int] +FrameSample = Tuple[int, ...] class TransformReporter: SUFFIX_MAP = { "gprof2dot": ".json", "csv": ".csv", + "speedscope": ".speedscope.json", } def __init__( @@ -29,13 +37,15 @@ def __init__( format: str, native_traces: bool, memory_records: Iterable[MemorySnapshot], + high_water_mark_by_snapshot: Optional[List[int]] = None, **kwargs: Any, ) -> None: super().__init__() self.allocations = allocations self.format = format self.native_traces = native_traces - self.memory_records = memory_records + self.memory_records = tuple(memory_records) + self.high_water_mark_by_snapshot = high_water_mark_by_snapshot def render_as_gprof2dot( self, @@ -46,11 +56,7 @@ def render_as_gprof2dot( all_locations: List[Dict[str, str]] = [] events = [] for record in self.allocations: - stack_trace = ( - tuple(record.hybrid_stack_trace()) - if self.native_traces - else record.stack_trace() - ) + stack_trace = self._stack_trace_for_record(record) call_chain = [] for func, mod, _ in stack_trace: location = (func, mod) @@ -73,6 +79,276 @@ def render_as_gprof2dot( } json.dump(result, outfile) + def _stack_trace_for_record( + self, record: Union[AllocationRecord, TemporalAllocationRecord] + ) -> Tuple[Tuple[str, str, int], ...]: + return ( + tuple(record.hybrid_stack_trace()) + if self.native_traces + else tuple(record.stack_trace()) + ) + + def _speedscope_sample_for_record( + self, + record: Union[AllocationRecord, TemporalAllocationRecord], + *, + location_to_index: Dict[FrameLocation, int], + frames: List[Dict[str, Any]], + ) -> FrameSample: + stack_trace = self._stack_trace_for_record(record) + if not stack_trace: + return () + + # Speedscope sampled stacks are root-to-leaf. Memray exposes leaf-to-root. + sample = [] + for func, mod, line in reversed(stack_trace): + location = (func, mod, line) + index = location_to_index.get(location) + if index is None: + index = len(frames) + frame: Dict[str, Any] = {"name": func} + if mod: + frame["file"] = mod + if line > 0: + frame["line"] = line + frames.append(frame) + location_to_index[location] = index + sample.append(index) + return tuple(sample) + + def _add_speedscope_sample( + self, + *, + sample: FrameSample, + size: int, + n_allocations: int, + order_key: int, + sample_weights: Dict[FrameSample, List[int]], + sample_order: Dict[FrameSample, int], + ) -> None: + if not sample: + return + + aggregated = sample_weights.setdefault(sample, [0, 0]) + aggregated[0] += size + aggregated[1] += n_allocations + + existing_order = sample_order.get(sample) + if existing_order is None or order_key < existing_order: + sample_order[sample] = order_key + + def _aggregate_snapshot_speedscope_samples( + self, + allocations: Iterable[AllocationRecord], + *, + metadata: Optional[Metadata], + ) -> Tuple[List[Dict[str, Any]], List[Tuple[FrameSample, List[int]]]]: + location_to_index: Dict[FrameLocation, int] = {} + frames: List[Dict[str, Any]] = [] + sample_weights: Dict[FrameSample, List[int]] = {} + sample_order: Dict[FrameSample, int] = {} + has_exact_timestamps = ( + metadata is not None and metadata.has_allocation_timestamps + ) + + for sequence, record in enumerate(allocations): + sample = self._speedscope_sample_for_record( + record, + location_to_index=location_to_index, + frames=frames, + ) + order_key = record.timestamp_us if has_exact_timestamps else sequence + self._add_speedscope_sample( + sample=sample, + size=record.size, + n_allocations=record.n_allocations, + order_key=order_key, + sample_weights=sample_weights, + sample_order=sample_order, + ) + + ordered_samples = sorted( + sample_weights.items(), + key=lambda item: (sample_order[item[0]], item[0]), + ) + return frames, ordered_samples + + def _snapshot_order_key(self, snapshot_index: int) -> int: + if 0 <= snapshot_index < len(self.memory_records): + # Convert ms-since-epoch to µs for comparison with timestamp_us + return self.memory_records[snapshot_index].time * 1000 + if self.memory_records: + return self.memory_records[-1].time * 1000 + return snapshot_index + + def _peak_snapshot_index(self) -> int: + high_water_mark_by_snapshot = self.high_water_mark_by_snapshot or [0] + return max( + range(len(high_water_mark_by_snapshot)), + key=high_water_mark_by_snapshot.__getitem__, + ) + + def _contribution_for_temporal_record( + self, + record: TemporalAllocationRecord, + *, + show_memory_leaks: bool, + peak_snapshot: Optional[int] = None, + ) -> Tuple[int, int, Optional[int]]: + size = 0 + n_allocations = 0 + first_snapshot = None + + if show_memory_leaks: + for interval in record.intervals: + if interval.deallocated_before_snapshot is not None: + continue + size += interval.n_bytes + n_allocations += interval.n_allocations + snapshot = interval.allocated_before_snapshot + if first_snapshot is None or snapshot < first_snapshot: + first_snapshot = snapshot + return size, n_allocations, first_snapshot + + if peak_snapshot is None: + peak_snapshot = self._peak_snapshot_index() + for interval in record.intervals: + if interval.allocated_before_snapshot > peak_snapshot: + continue + if ( + interval.deallocated_before_snapshot is not None + and peak_snapshot >= interval.deallocated_before_snapshot + ): + continue + size += interval.n_bytes + n_allocations += interval.n_allocations + snapshot = interval.allocated_before_snapshot + if first_snapshot is None or snapshot < first_snapshot: + first_snapshot = snapshot + return size, n_allocations, first_snapshot + + def _aggregate_temporal_speedscope_samples( + self, + allocations: Iterable[TemporalAllocationRecord], + *, + show_memory_leaks: bool, + ) -> Tuple[List[Dict[str, Any]], List[Tuple[FrameSample, List[int]]]]: + location_to_index: Dict[FrameLocation, int] = {} + frames: List[Dict[str, Any]] = [] + sample_weights: Dict[FrameSample, List[int]] = {} + sample_order: Dict[FrameSample, int] = {} + + peak_snapshot = None if show_memory_leaks else self._peak_snapshot_index() + + for sequence, record in enumerate(allocations): + sample = self._speedscope_sample_for_record( + record, + location_to_index=location_to_index, + frames=frames, + ) + ( + size, + n_allocations, + first_snapshot, + ) = self._contribution_for_temporal_record( + record, + show_memory_leaks=show_memory_leaks, + peak_snapshot=peak_snapshot, + ) + if size <= 0 and n_allocations <= 0: + continue + + order_key = ( + self._snapshot_order_key(first_snapshot) + if first_snapshot is not None + else sequence + ) + self._add_speedscope_sample( + sample=sample, + size=size, + n_allocations=n_allocations, + order_key=order_key, + sample_weights=sample_weights, + sample_order=sample_order, + ) + + ordered_samples = sorted( + sample_weights.items(), + key=lambda item: (sample_order[item[0]], item[0]), + ) + return frames, ordered_samples + + def _create_speedscope_profile( + self, + *, + name: str, + unit: str, + sample_weights: List[Tuple[FrameSample, List[int]]], + weight_index: int, + ) -> Dict[str, Any]: + samples: List[List[int]] = [] + weights: List[int] = [] + + for sample, aggregated in sample_weights: + weight = aggregated[weight_index] + if weight <= 0: + continue + samples.append(list(sample)) + weights.append(weight) + + return { + "type": "sampled", + "name": name, + "unit": unit, + "startValue": 0, + "endValue": sum(weights), + "samples": samples, + "weights": weights, + } + + def render_as_speedscope( + self, + outfile: TextIO, + **kwargs: Any, + ) -> None: + metadata = kwargs.get("metadata") + show_memory_leaks = kwargs.get("show_memory_leaks", False) + allocations = list(self.allocations) + + if allocations and self._is_temporal_record(allocations[0]): + frames, sample_weights = self._aggregate_temporal_speedscope_samples( + cast(List[TemporalAllocationRecord], allocations), + show_memory_leaks=show_memory_leaks, + ) + else: + frames, sample_weights = self._aggregate_snapshot_speedscope_samples( + allocations, + metadata=metadata, + ) + + result = { + "$schema": "https://www.speedscope.app/file-format-schema.json", + "shared": {"frames": frames}, + "profiles": [ + self._create_speedscope_profile( + name="Memory", + unit="bytes", + sample_weights=sample_weights, + weight_index=0, + ), + self._create_speedscope_profile( + name="Allocations", + unit="none", + sample_weights=sample_weights, + weight_index=1, + ), + ], + "name": metadata.command_line if metadata is not None else "memray", + "activeProfileIndex": 0, + "exporter": f"memray@{__version__}", + } + json.dump(result, outfile) + def render( self, outfile: TextIO, @@ -108,11 +384,7 @@ def render_as_csv( ] ) for record in self.allocations: - stack_trace = ( - tuple(record.hybrid_stack_trace()) - if self.native_traces - else record.stack_trace() - ) + stack_trace = self._stack_trace_for_record(record) writer.writerow( [ AllocatorType(record.allocator).name, @@ -123,3 +395,9 @@ def render_as_csv( "|".join(f"{func};{mod};{line}" for func, mod, line in stack_trace), ] ) + + @staticmethod + def _is_temporal_record( + record: Union[AllocationRecord, TemporalAllocationRecord] + ) -> bool: + return hasattr(record, "intervals") diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index bdb0c88120..b39184b950 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -105,3 +105,33 @@ def test_aggregated_capture_with_socket_destination(): file_format=FileFormat.AGGREGATED_ALLOCATIONS, ): # pragma: no cover pass + + +def test_allocation_timestamps_are_exposed_in_records(tmp_path): + allocator = MemoryAllocator() + result_file = tmp_path / "test.bin" + + with Tracker(result_file, allocation_timestamps=True): + allocator.valloc(1234) + allocator.free() + + with FileReader(result_file) as reader: + records = list(filter_relevant_allocations(reader.get_allocation_records())) + assert reader.metadata.has_allocation_timestamps is True + + assert len(records) == 2 + assert records[0].timestamp_us > 0 + assert records[1].timestamp_us >= records[0].timestamp_us + + +def test_allocation_timestamps_require_all_allocations(tmp_path): + result_file = tmp_path / "test.bin" + + with pytest.raises( + RuntimeError, match="allocation_timestamps requires FileFormat.ALL_ALLOCATIONS" + ): + Tracker( + result_file, + allocation_timestamps=True, + file_format=FileFormat.AGGREGATED_ALLOCATIONS, + ) diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py index 8d569fa68e..6f584600e5 100644 --- a/tests/integration/test_main.py +++ b/tests/integration/test_main.py @@ -1773,3 +1773,42 @@ def test_report_leaks_argument(self, tmp_path, simple_test_file): if "" in output_text: pytest.xfail("Hybrid stack generation is not fully working") assert str(source_file) in output_text + + def test_report_speedscope_argument(self, tmp_path, simple_test_file): + results_file, source_file = generate_sample_results( + tmp_path, simple_test_file, native=True + ) + + subprocess.run( + [ + sys.executable, + "-m", + "memray", + "transform", + "speedscope", + str(results_file), + ], + check=True, + capture_output=True, + text=True, + ) + + output_file = tmp_path / "memray-speedscope-result.speedscope.json" + assert output_file.exists() + + output_text = output_file.read_text() + if "" in output_text: + pytest.xfail("Hybrid stack generation is not fully working") + + output_data = json.loads(output_text) + assert output_data["$schema"] == ( + "https://www.speedscope.app/file-format-schema.json" + ) + assert [profile["type"] for profile in output_data["profiles"]] == [ + "sampled", + "sampled", + ] + assert any( + frame.get("file") == str(source_file) + for frame in output_data["shared"]["frames"] + ) diff --git a/tests/integration/test_record_writer.py b/tests/integration/test_record_writer.py index 58b4b7a9e8..fc90d8287f 100644 --- a/tests/integration/test_record_writer.py +++ b/tests/integration/test_record_writer.py @@ -142,7 +142,7 @@ def test_write_basic_records(tmp_path): assert header_fields == [ ("magic", "memray"), - ("version", "12"), + ("version", "13"), ("python_version", f"{sys.hexversion:08x}"), ("native_traces", "true"), ("file_format", "ALL_ALLOCATIONS"), @@ -157,6 +157,7 @@ def test_write_basic_records(tmp_path): ("python_allocator", allocator), ("trace_python_allocators", "true"), ("track_object_lifetimes", "false"), + ("has_allocation_timestamps", "false"), ] expected_parse_output = """ @@ -195,6 +196,40 @@ def test_write_basic_records(tmp_path): assert records == expected_records +def test_write_basic_records_with_allocation_timestamps(tmp_path): + output_file = tmp_path / "timestamps.memray" + + writer = RecordWriterTestHarness( + str(output_file), + file_format=FileFormat.ALL_ALLOCATIONS, + allocation_timestamps=True, + ) + + assert writer.write_allocation_record( + 1, 0x1000, 1024, AllocatorType.MALLOC, timestamp_us=11 + ) + assert writer.write_allocation_record( + 1, 0x1000, 0, AllocatorType.FREE, timestamp_us=29 + ) + assert writer.write_trailer() + + header_fields, records = parse_capture_file(output_file) + + assert dict(header_fields)["has_allocation_timestamps"] == "true" + assert records == [ + "CONTEXT_SWITCH tid=1", + ( + "ALLOCATION address=0x1000 size=1024 allocator=malloc " + "native_frame_id=0 timestamp_us=11" + ), + ( + "ALLOCATION address=0x1000 size=0 allocator=free " + "native_frame_id=0 timestamp_us=29" + ), + "TRAILER", + ] + + def test_write_aggregated_records(tmp_path): """Test writing aggregated records to a file.""" # GIVEN @@ -233,7 +268,7 @@ def test_write_aggregated_records(tmp_path): assert header_fields == [ ("magic", "memray"), - ("version", "12"), + ("version", "13"), ("python_version", f"{sys.hexversion:08x}"), ("native_traces", "false"), ("file_format", "AGGREGATED_ALLOCATIONS"), @@ -248,6 +283,7 @@ def test_write_aggregated_records(tmp_path): ("python_allocator", allocator), ("trace_python_allocators", "false"), ("track_object_lifetimes", "false"), + ("has_allocation_timestamps", "false"), ] records = sort_runs_of_same_record_type(records) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 942a12f438..88f029bf2c 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -80,6 +80,20 @@ def test_run_with_pymalloc_tracing( trace_python_allocators=True, ) + def test_run_with_allocation_timestamps( + self, getpid_mock, runpy_mock, tracker_mock, validate_mock + ): + getpid_mock.return_value = 0 + assert 0 == main(["run", "--allocation-timestamps", "-m", "foobar"]) + runpy_mock.run_module.assert_called_with( + "foobar", run_name="__main__", alter_sys=True + ) + tracker_mock.assert_called_with( + destination=FileDestination("memray-foobar.0.bin", overwrite=False), + native_traces=False, + allocation_timestamps=True, + ) + def test_run_override_output( self, getpid_mock, runpy_mock, tracker_mock, validate_mock ): @@ -168,7 +182,7 @@ def test_run_with_live( sys.executable, "-c", "from memray.commands.run import _child_process;" - "_child_process(1234,False,False,False,False,False," + "_child_process(1234,False,False,False,False,False,False," "'./directory/foobar.py',['arg1', 'arg2'])", ], stderr=-1, @@ -209,7 +223,7 @@ def test_run_with_live_and_trace_python_allocators( sys.executable, "-c", "from memray.commands.run import _child_process;" - "_child_process(1234,False,True,False,False,False," + "_child_process(1234,False,True,False,False,False,False," "'./directory/foobar.py',['arg1', 'arg2'])", ], stderr=-1, @@ -331,6 +345,15 @@ def test_run_with_trace_python_allocators_and_live_remote_mode( trace_python_allocators=True, ) + def test_run_with_aggregate_and_allocation_timestamps( + self, getpid_mock, runpy_mock, tracker_mock, validate_mock, capsys + ): + with pytest.raises(SystemExit): + main(["run", "--aggregate", "--allocation-timestamps", "-m", "foobar"]) + + captured = capsys.readouterr() + assert "--allocation-timestamps requires non-aggregated output" in captured.err + class TestFlamegraphSubCommand: @staticmethod diff --git a/tests/unit/test_highwatermark_command.py b/tests/unit/test_highwatermark_command.py index 57ff404179..ef6ada5bda 100644 --- a/tests/unit/test_highwatermark_command.py +++ b/tests/unit/test_highwatermark_command.py @@ -12,6 +12,7 @@ from memray._errors import MemrayCommandError from memray._memray import FileFormat from memray.commands.common import HighWatermarkCommand +from memray.commands.transform import TransformCommand class TestFilenameValidation: @@ -245,3 +246,73 @@ def test_tracker_and_reporter_interactions_for_temporary_allocations( reporter_factory_mock.assert_called_once() reporter_factory_mock().render.assert_called_once() + + +class TestTransformSpeedscopeFallback: + def test_uses_temporal_high_water_mark_without_exact_timestamps(self, tmp_path): + reporter_factory_mock = Mock() + command = TransformCommand() + command.reporter_name = "speedscope" + command.reporter_factory = reporter_factory_mock + result_path = tmp_path / "results.bin" + output_file = tmp_path / "output.txt" + result_path.touch() + + with patch("memray.commands.transform.FileReader") as reader_mock: + reader_mock.return_value.metadata.has_native_traces = False + reader_mock.return_value.metadata.file_format = FileFormat.ALL_ALLOCATIONS + reader_mock.return_value.metadata.has_allocation_timestamps = False + reader_mock.return_value.get_memory_snapshots.return_value = ["memory"] + ( + reader_mock.return_value.get_temporal_high_water_mark_allocation_records.return_value + ) = ("temporal", [0, 1]) + + command.write_report( + result_path=result_path, + output_file=output_file, + show_memory_leaks=False, + temporary_allocation_threshold=-1, + ) + + reader_mock.return_value.get_temporal_high_water_mark_allocation_records.assert_called_once_with( + merge_threads=True + ) + reader_mock.return_value.get_high_watermark_allocation_records.assert_not_called() + reporter_factory_mock.assert_called_once_with( + "temporal", + high_water_mark_by_snapshot=[0, 1], + memory_records=("memory",), + native_traces=False, + ) + reporter_factory_mock().render.assert_called_once() + + def test_uses_snapshot_high_water_mark_with_exact_timestamps(self, tmp_path): + reporter_factory_mock = Mock() + command = TransformCommand() + command.reporter_name = "speedscope" + command.reporter_factory = reporter_factory_mock + result_path = tmp_path / "results.bin" + output_file = tmp_path / "output.txt" + result_path.touch() + + with patch("memray.commands.transform.FileReader") as reader_mock: + reader_mock.return_value.metadata.has_native_traces = False + reader_mock.return_value.metadata.file_format = FileFormat.ALL_ALLOCATIONS + reader_mock.return_value.metadata.has_allocation_timestamps = True + reader_mock.return_value.get_memory_snapshots.return_value = ["memory"] + reader_mock.return_value.get_high_watermark_allocation_records.return_value = ( + "snapshot" + ) + + command.write_report( + result_path=result_path, + output_file=output_file, + show_memory_leaks=False, + temporary_allocation_threshold=-1, + ) + + reader_mock.return_value.get_high_watermark_allocation_records.assert_called_once_with( + merge_threads=True + ) + reader_mock.return_value.get_temporal_high_water_mark_allocation_records.assert_not_called() + reporter_factory_mock().render.assert_called_once() diff --git a/tests/unit/test_stats_reporter.py b/tests/unit/test_stats_reporter.py index b54493e8d3..63dec28f6e 100644 --- a/tests/unit/test_stats_reporter.py +++ b/tests/unit/test_stats_reporter.py @@ -438,11 +438,12 @@ def test_stats_output_json(fake_stats, tmp_path): "peak_memory": 1500000, "command_line": "fake stats", "pid": 123456, + "main_thread_id": 0x1, "python_allocator": "pymalloc", "has_native_traces": False, "trace_python_allocators": True, "file_format": 0, - "main_thread_id": 0x1, + "has_allocation_timestamps": False, }, } actual = json.loads(output_file.read_text()) diff --git a/tests/unit/test_transform_reporter.py b/tests/unit/test_transform_reporter.py index c1e109e7a6..cb4acec912 100644 --- a/tests/unit/test_transform_reporter.py +++ b/tests/unit/test_transform_reporter.py @@ -1,10 +1,17 @@ import csv import json +from datetime import datetime from io import StringIO from memray import AllocatorType +from memray import MemorySnapshot +from memray import Metadata +from memray._memray import FileFormat +from memray._version import __version__ from memray.reporters.transform import TransformReporter from tests.utils import MockAllocationRecord +from tests.utils import MockInterval +from tests.utils import MockTemporalAllocationRecord class TestGprof2DotTransformReporter: @@ -373,3 +380,261 @@ def test_multiple_stack_frames(self): assert output_data == [ ["MALLOC", "1", "1024", "1", "0x1", "me;foo.py;12|you;bar.py;21"] ] + + +class TestSpeedscopeTransformReporter: + def test_empty_report(self): + reporter = TransformReporter( + [], format="speedscope", memory_records=[], native_traces=False + ) + output = StringIO() + + reporter.render_as_speedscope(output) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data == { + "$schema": "https://www.speedscope.app/file-format-schema.json", + "activeProfileIndex": 0, + "exporter": f"memray@{__version__}", + "name": "memray", + "profiles": [ + { + "endValue": 0, + "name": "Memory", + "samples": [], + "startValue": 0, + "type": "sampled", + "unit": "bytes", + "weights": [], + }, + { + "endValue": 0, + "name": "Allocations", + "samples": [], + "startValue": 0, + "type": "sampled", + "unit": "none", + "weights": [], + }, + ], + "shared": {"frames": []}, + } + + def test_stacks_are_written_root_to_leaf(self): + peak_allocations = [ + MockAllocationRecord( + tid=1, + address=0x1000000, + size=1024, + allocator=AllocatorType.MALLOC, + stack_id=1, + n_allocations=1, + _stack=[ + ("leaf", "leaf.py", 30), + ("root", "root.py", 10), + ], + ), + ] + output = StringIO() + + reporter = TransformReporter( + peak_allocations, + format="speedscope", + memory_records=[], + native_traces=False, + ) + + reporter.render_as_speedscope(output) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data["shared"]["frames"] == [ + {"file": "root.py", "line": 10, "name": "root"}, + {"file": "leaf.py", "line": 30, "name": "leaf"}, + ] + assert output_data["profiles"][0]["samples"] == [[0, 1]] + assert output_data["profiles"][0]["weights"] == [1024] + assert output_data["profiles"][1]["samples"] == [[0, 1]] + assert output_data["profiles"][1]["weights"] == [1] + + def test_identical_stacks_are_aggregated(self): + peak_allocations = [ + MockAllocationRecord( + tid=1, + address=0x1000000, + size=1024, + allocator=AllocatorType.MALLOC, + stack_id=1, + n_allocations=1, + _stack=[ + ("leaf", "leaf.py", 30), + ("root", "root.py", 10), + ], + ), + MockAllocationRecord( + tid=1, + address=0x2000000, + size=2048, + allocator=AllocatorType.CALLOC, + stack_id=2, + n_allocations=4, + _stack=[ + ("leaf", "leaf.py", 30), + ("root", "root.py", 10), + ], + ), + ] + output = StringIO() + + reporter = TransformReporter( + peak_allocations, + format="speedscope", + memory_records=[], + native_traces=False, + ) + + reporter.render_as_speedscope(output) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data["profiles"][0]["samples"] == [[0, 1]] + assert output_data["profiles"][0]["weights"] == [3072] + assert output_data["profiles"][0]["endValue"] == 3072 + assert output_data["profiles"][1]["samples"] == [[0, 1]] + assert output_data["profiles"][1]["weights"] == [5] + assert output_data["profiles"][1]["endValue"] == 5 + + def test_stacks_with_exact_timestamps_are_ordered_by_timestamp(self): + peak_allocations = [ + MockAllocationRecord( + tid=1, + address=0x1000000, + size=1024, + allocator=AllocatorType.MALLOC, + stack_id=1, + n_allocations=1, + _stack=[("late", "late.py", 30)], + ), + MockAllocationRecord( + tid=1, + address=0x2000000, + size=2048, + allocator=AllocatorType.CALLOC, + stack_id=2, + n_allocations=2, + _stack=[("early", "early.py", 10)], + ), + ] + peak_allocations[0].timestamp_us = 50 + peak_allocations[1].timestamp_us = 10 + + reporter = TransformReporter( + peak_allocations, + format="speedscope", + memory_records=[], + native_traces=False, + ) + output = StringIO() + + reporter.render_as_speedscope( + output, + metadata=Metadata( + start_time=datetime(2024, 1, 1, 0, 0, 0), + end_time=datetime(2024, 1, 1, 0, 0, 1), + total_allocations=2, + total_frames=2, + peak_memory=3072, + command_line="memray", + pid=1, + main_thread_id=1, + python_allocator="pymalloc", + has_native_traces=False, + trace_python_allocators=False, + file_format=FileFormat.ALL_ALLOCATIONS, + has_allocation_timestamps=True, + ), + ) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data["shared"]["frames"] == [ + {"file": "late.py", "line": 30, "name": "late"}, + {"file": "early.py", "line": 10, "name": "early"}, + ] + assert output_data["profiles"][0]["samples"] == [[1], [0]] + assert output_data["profiles"][0]["weights"] == [2048, 1024] + assert output_data["profiles"][1]["samples"] == [[1], [0]] + assert output_data["profiles"][1]["weights"] == [2, 1] + + def test_temporal_fallback_orders_by_snapshot_time(self): + allocations = [ + MockTemporalAllocationRecord( + tid=1, + allocator=AllocatorType.MALLOC, + stack_id=1, + intervals=[MockInterval(1, None, 1, 200)], + _stack=[("later", "later.py", 20)], + ), + MockTemporalAllocationRecord( + tid=1, + allocator=AllocatorType.CALLOC, + stack_id=2, + intervals=[MockInterval(0, None, 1, 100)], + _stack=[("earlier", "earlier.py", 10)], + ), + ] + reporter = TransformReporter( + allocations, + format="speedscope", + memory_records=[ + MemorySnapshot(100, 0, 0), + MemorySnapshot(110, 0, 0), + ], + native_traces=False, + high_water_mark_by_snapshot=[100, 300], + ) + output = StringIO() + + reporter.render_as_speedscope(output, show_memory_leaks=False) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data["profiles"][0]["samples"] == [[1], [0]] + assert output_data["profiles"][0]["weights"] == [100, 200] + assert output_data["profiles"][1]["samples"] == [[1], [0]] + assert output_data["profiles"][1]["weights"] == [1, 1] + + def test_temporal_leak_fallback_omits_freed_intervals(self): + allocations = [ + MockTemporalAllocationRecord( + tid=1, + allocator=AllocatorType.MALLOC, + stack_id=1, + intervals=[MockInterval(0, None, 1, 100)], + _stack=[("leaked", "leaked.py", 10)], + ), + MockTemporalAllocationRecord( + tid=1, + allocator=AllocatorType.CALLOC, + stack_id=2, + intervals=[MockInterval(0, 1, 1, 200)], + _stack=[("freed", "freed.py", 20)], + ), + ] + reporter = TransformReporter( + allocations, + format="speedscope", + memory_records=[MemorySnapshot(100, 0, 0)], + native_traces=False, + ) + output = StringIO() + + reporter.render_as_speedscope(output, show_memory_leaks=True) + output.seek(0) + + output_data = json.loads(output.read()) + assert output_data["profiles"][0]["samples"] == [[0]] + assert output_data["profiles"][0]["weights"] == [100] + assert output_data["profiles"][1]["samples"] == [[0]] + assert output_data["profiles"][1]["weights"] == [1] diff --git a/tests/utils.py b/tests/utils.py index 0f8d866339..b0513833d0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -52,8 +52,27 @@ def filter_relevant_pymalloc_allocations(records, size): ) +class _MockStackTraceMixin: + @staticmethod + def _get_stack_trace(stack, max_stacks): + if max_stacks == 0: + return stack + else: + return stack[:max_stacks] + + def stack_trace(self, max_stacks=0): + if self._stack is None: + raise AssertionError("did not expect a call to `stack_trace`") + return self._get_stack_trace(self._stack, max_stacks) + + def hybrid_stack_trace(self, max_stacks=0): + if self._hybrid_stack is None: + raise AssertionError("did not expect a call to `hybrid_stack_trace`") + return self._get_stack_trace(self._hybrid_stack, max_stacks) + + @dataclass -class MockAllocationRecord: +class MockAllocationRecord(_MockStackTraceMixin): """Mimics :py:class:`memray._memray.AllocationRecord`.""" tid: int @@ -65,23 +84,26 @@ class MockAllocationRecord: _stack: Optional[List[Tuple[str, str, int]]] = None _hybrid_stack: Optional[List[Tuple[str, str, int]]] = None thread_name: str = "" + timestamp_us: int = 0 - @staticmethod - def __get_stack_trace(stack, max_stacks): - if max_stacks == 0: - return stack - else: - return stack[:max_stacks] - def stack_trace(self, max_stacks=0): - if self._stack is None: - raise AssertionError("did not expect a call to `stack_trace`") - return self.__get_stack_trace(self._stack, max_stacks) +@dataclass +class MockInterval: + allocated_before_snapshot: int + deallocated_before_snapshot: Optional[int] + n_allocations: int + n_bytes: int - def hybrid_stack_trace(self, max_stacks=0): - if self._hybrid_stack is None: - raise AssertionError("did not expect a call to `hybrid_stack_trace`") - return self.__get_stack_trace(self._hybrid_stack, max_stacks) + +@dataclass +class MockTemporalAllocationRecord(_MockStackTraceMixin): + tid: int + allocator: AllocatorType + stack_id: int + intervals: List[MockInterval] + _stack: Optional[List[Tuple[str, str, int]]] = None + _hybrid_stack: Optional[List[Tuple[str, str, int]]] = None + thread_name: str = "" @contextmanager