Skip to content

Commit 7036e96

Browse files
committed
Add asid filtering for UDTF heap functions
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent 1c39c87 commit 7036e96

2 files changed

Lines changed: 59 additions & 17 deletions

File tree

src/pxl_scripts/px/collect_agent_heaps/collect_agent_heaps.pxl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
import px
1818

1919

20-
# TODO(ddelnano): asid is unused until gh#2245 is addressed.
2120
def collect_pprofs(asid: int):
2221
df = px.GetAgentStatus()
2322
df.ip_address = px.pluck_array(px.split(df.ip_address, ":"), 0)
2423
df.hostname_by_ip = px.pod_id_to_node_name(px.ip_to_pod_id(df.ip_address))
2524
df.hostname = px.select(df.hostname_by_ip == "", df.hostname, df.hostname_by_ip)
2625
df = df[['asid', 'hostname']]
27-
heap_stats = px._HeapGrowthStacks()
26+
heap_stats = px._HeapGrowthStacks(asid)
2827
df = df.merge(heap_stats, how='inner', left_on='asid', right_on='asid')
2928
df.asid = df.asid_x
3029
return df[['asid', 'hostname', 'heap']]

src/vizier/funcs/internal/debug.h

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,45 @@ namespace funcs {
3838

3939
constexpr int kMaxBufferSize = 1024 * 1024;
4040

41-
class HeapStatsUDTF final : public carnot::udf::UDTF<HeapStatsUDTF> {
41+
// Base template class for Heap UDTFs that provides ASID filtering functionality
42+
template <typename TDerived>
43+
class HeapUDTFWithAsidFilter : public carnot::udf::UDTF<TDerived> {
44+
public:
45+
using FunctionContext = typename carnot::udf::UDTF<TDerived>::FunctionContext;
46+
using RecordWriter = typename carnot::udf::UDTF<TDerived>::RecordWriter;
47+
using UDTFArg = carnot::udf::UDTFArg;
48+
49+
Status Init(FunctionContext* ctx, types::Int64Value asid) {
50+
asid_ = asid.val;
51+
return InitImpl(ctx);
52+
}
53+
54+
static constexpr auto InitArgs() {
55+
return MakeArray(UDTFArg::Make<types::INT64>(
56+
"asid", "Whether to filter the result set for a specific asid", -1));
57+
}
58+
59+
// This method handles the ASID filtering and delegates to NextRecordImpl
60+
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
61+
auto asid = ctx->metadata_state()->asid();
62+
if (asid_ != -1 && asid != asid_) {
63+
return false;
64+
}
65+
return NextRecordImpl(ctx, rw);
66+
}
67+
68+
protected:
69+
// Derived classes must implement this method
70+
virtual bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) = 0;
71+
72+
// Derived classes can optionally override this to do additional initialization
73+
virtual Status InitImpl(FunctionContext* /*ctx*/) { return Status::OK(); }
74+
75+
private:
76+
int64_t asid_ = -1;
77+
};
78+
79+
class HeapStatsUDTF final : public HeapUDTFWithAsidFilter<HeapStatsUDTF> {
4280
public:
4381
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
4482

@@ -49,7 +87,8 @@ class HeapStatsUDTF final : public carnot::udf::UDTF<HeapStatsUDTF> {
4987
"The pretty heap stats"));
5088
}
5189

52-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
90+
protected:
91+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
5392
#ifdef TCMALLOC
5493
std::string buf(kMaxBufferSize, '\0');
5594
MallocExtension::instance()->GetStats(&buf[0], buf.size());
@@ -66,7 +105,7 @@ class HeapStatsUDTF final : public carnot::udf::UDTF<HeapStatsUDTF> {
66105
}
67106
};
68107

69-
class HeapSampleUDTF final : public carnot::udf::UDTF<HeapSampleUDTF> {
108+
class HeapSampleUDTF final : public HeapUDTFWithAsidFilter<HeapSampleUDTF> {
70109
public:
71110
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
72111

@@ -77,7 +116,8 @@ class HeapSampleUDTF final : public carnot::udf::UDTF<HeapSampleUDTF> {
77116
"The pretty heap stats"));
78117
}
79118

80-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
119+
protected:
120+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
81121
#ifdef TCMALLOC
82122
std::string buf;
83123
MallocExtension::instance()->GetHeapSample(&buf);
@@ -93,7 +133,7 @@ class HeapSampleUDTF final : public carnot::udf::UDTF<HeapSampleUDTF> {
93133
}
94134
};
95135

96-
class HeapGrowthStacksUDTF final : public carnot::udf::UDTF<HeapGrowthStacksUDTF> {
136+
class HeapGrowthStacksUDTF final : public HeapUDTFWithAsidFilter<HeapGrowthStacksUDTF> {
97137
public:
98138
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
99139

@@ -104,7 +144,8 @@ class HeapGrowthStacksUDTF final : public carnot::udf::UDTF<HeapGrowthStacksUDTF
104144
"The pretty heap stats"));
105145
}
106146

107-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
147+
protected:
148+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
108149
#ifdef TCMALLOC
109150
std::string buf;
110151
MallocExtension::instance()->GetHeapGrowthStacks(&buf);
@@ -314,7 +355,7 @@ class AgentProcSMapsUDTF final : public carnot::udf::UDTF<AgentProcSMapsUDTF> {
314355
int current_idx_ = 0;
315356
};
316357

317-
class HeapReleaseFreeMemoryUDTF final : public carnot::udf::UDTF<HeapReleaseFreeMemoryUDTF> {
358+
class HeapReleaseFreeMemoryUDTF final : public HeapUDTFWithAsidFilter<HeapReleaseFreeMemoryUDTF> {
318359
public:
319360
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
320361

@@ -323,16 +364,15 @@ class HeapReleaseFreeMemoryUDTF final : public carnot::udf::UDTF<HeapReleaseFree
323364
"The short ID of the agent", types::SemanticType::ST_ASID));
324365
}
325366

326-
Status Init(FunctionContext*) { return Status::OK(); }
327-
328-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
367+
protected:
368+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
329369
px::ReleaseFreeMemory();
330370
rw->Append<IndexOf("asid")>(ctx->metadata_state()->asid());
331371
return false;
332372
}
333373
};
334374

335-
class HeapRangesUDTF final : public carnot::udf::UDTF<HeapRangesUDTF> {
375+
class HeapRangesUDTF final : public HeapUDTFWithAsidFilter<HeapRangesUDTF> {
336376
public:
337377
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
338378
static constexpr auto OutputRelation() {
@@ -352,7 +392,8 @@ class HeapRangesUDTF final : public carnot::udf::UDTF<HeapRangesUDTF> {
352392
types::SemanticType::ST_NONE));
353393
}
354394

355-
Status Init(FunctionContext*) {
395+
protected:
396+
Status InitImpl(FunctionContext*) override {
356397
#ifdef TCMALLOC
357398
auto range_func = [](void* udtf, const ::base::MallocRange* range) {
358399
static_cast<HeapRangesUDTF*>(udtf)->ranges_.push_back(*range);
@@ -361,7 +402,8 @@ class HeapRangesUDTF final : public carnot::udf::UDTF<HeapRangesUDTF> {
361402
#endif
362403
return Status::OK();
363404
}
364-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
405+
406+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
365407
#ifdef TCMALLOC
366408
if (idx_ >= ranges_.size()) {
367409
return false;
@@ -387,7 +429,7 @@ class HeapRangesUDTF final : public carnot::udf::UDTF<HeapRangesUDTF> {
387429
#endif
388430
};
389431

390-
class HeapStatsNumericUDTF final : public carnot::udf::UDTF<HeapStatsNumericUDTF> {
432+
class HeapStatsNumericUDTF final : public HeapUDTFWithAsidFilter<HeapStatsNumericUDTF> {
391433
public:
392434
static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ALL_AGENTS; }
393435

@@ -414,7 +456,8 @@ class HeapStatsNumericUDTF final : public carnot::udf::UDTF<HeapStatsNumericUDTF
414456
"Number of unmapped bytes in tcmalloc's pageheap", types::SemanticType::ST_BYTES));
415457
}
416458

417-
bool NextRecord(FunctionContext* ctx, RecordWriter* rw) {
459+
protected:
460+
bool NextRecordImpl(FunctionContext* ctx, RecordWriter* rw) override {
418461
#ifdef TCMALLOC
419462
size_t current_allocated_bytes, heap_size, central_cache_free, transfer_cache_free,
420463
thread_cache_free, pageheap_free, pageheap_unmapped;

0 commit comments

Comments
 (0)