diff --git a/flags/flags.go b/flags/flags.go index 5aa1bac627..f521405b96 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -345,6 +345,8 @@ type FlagsRemoteStore struct { ClientCert string `help:"Client certificate for mTLS"` ClientKey string `help:"Client key for mTLS"` + + UseV2Schema bool `name:"use-v2-schema" default:"false" help:"Use v2 Arrow schema with inline stacktraces and ListView deduplication (experimental)."` } // FlagsDebuginfo contains flags to configure debuginfo. diff --git a/go.mod b/go.mod index cacbb58681..479e6353d7 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module github.com/parca-dev/parca-agent go 1.25.0 require ( - buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2 - buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1 + buf.build/gen/go/parca-dev/parca/grpc/go v1.6.1-20260225102827-5fda07223114.1 + buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.11-20260225102827-5fda07223114.1 buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.36.6-20250320161912-af2aab87b1b3.1 github.com/KimMachineGun/automemlimit v0.7.3 github.com/alecthomas/kong v1.12.1 github.com/alecthomas/kong-yaml v0.2.0 - github.com/apache/arrow/go/v16 v16.1.0 + github.com/apache/arrow-go/v18 v18.5.2 github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be github.com/containerd/containerd v1.7.29 @@ -64,6 +64,8 @@ require ( github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/hcsshim v0.12.9 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect + github.com/apache/thrift v0.22.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -97,7 +99,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/flatbuffers v25.2.10+incompatible // indirect + github.com/google/flatbuffers v25.12.19+incompatible // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -111,11 +113,14 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.11 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/mdlayher/kobject v0.0.0-20200520190114-19ca17470d7d // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.4.1 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/locker v1.0.1 // indirect @@ -134,7 +139,7 @@ require ( github.com/opencontainers/runtime-spec v1.2.1 // indirect github.com/opencontainers/selinux v1.13.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect diff --git a/go.sum b/go.sum index bce850a983..987f19a3ad 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,9 @@ buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.36.6-20240617172848-e1dbca2775a7.1 h1:DHj/fDjM+Ij3KR1IpFs6WdNcCtD4+Th3tEWyx3Xgs14= buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.36.6-20240617172848-e1dbca2775a7.1/go.mod h1:iCb72C37pWGhjKDeq9IbcMqVJAnXXHs5tEjiePfouhk= -buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2 h1:AiTEEAQ8AHE5udDizcA2r3GNjxxlk4f2Buot/hA2oYM= -buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2/go.mod h1:U8BtFPtz71GSALR7K7ALn39RnvrKsD++lJzMI5Gf4fs= -buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1 h1:gXHJuGlWoXUkH9O9Qxw9skNAhIDgNgJx0tYSbI8fjIo= -buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1/go.mod h1:58eXMQL4tavOGzyXIveWU4f3yTFGmLIHxM9uk11DOIo= +buf.build/gen/go/parca-dev/parca/grpc/go v1.6.1-20260225102827-5fda07223114.1 h1:vxHTP5quJBa7/yMhL5KHfci9LxgB970bBJPByU9K9EQ= +buf.build/gen/go/parca-dev/parca/grpc/go v1.6.1-20260225102827-5fda07223114.1/go.mod h1:Gm9/4TuxSC1DmFMqCBuLOWQ2nbRPjt7rSEQiTIksYBY= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.11-20260225102827-5fda07223114.1 h1:lEamNY09GPFoJ3avHHtvnCR+mjx2sZx418nTVrgjC9Q= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.11-20260225102827-5fda07223114.1/go.mod h1:TR9iiFuhuMGsEil3U6KZnYmvZ1W2SY6uw5HPP+4mJU4= buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.36.6-20250320161912-af2aab87b1b3.1 h1:EuFqAB/kfs/jh9aUGcvBjcxtU89wnXwsuQfcwGX1rhE= buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.36.6-20250320161912-af2aab87b1b3.1/go.mod h1:ea/VK8bRnfyOuhQRzIk5hGigCqbZdzI8SHNLC3IyABU= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -32,8 +32,12 @@ github.com/alecthomas/kong-yaml v0.2.0 h1:iiVVqVttmOsHKawlaW/TljPsjaEv1O4ODx6dlo github.com/alecthomas/kong-yaml v0.2.0/go.mod h1:vMvOIy+wpB49MCZ0TA3KMts38Mu9YfRP03Q1StN69/g= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9p7FXI= -github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/apache/arrow-go/v18 v18.5.2 h1:3uoHjoaEie5eVsxx/Bt64hKwZx4STb+beAkqKOlq/lY= +github.com/apache/arrow-go/v18 v18.5.2/go.mod h1:yNoizNTT4peTciJ7V01d2EgOkE1d0fQ1vZcFOsVtFsw= +github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= +github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs= github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -166,8 +170,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= -github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs= +github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -219,10 +223,12 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= -github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= -github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -246,6 +252,10 @@ github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/ github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -303,8 +313,8 @@ github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.202611-0.20260318195408-a0 github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.202611-0.20260318195408-a0d0c958888a/go.mod h1:4DvUDuFwWO3OJoe824nY4Tw6jBrLGEY/hS6dsGDd0V8= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -353,6 +363,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xyproto/ainur v1.3.3 h1:DjbkZg7iNblH1abwfIQG2siI0z3LOyVuWEmCq2S9GKc= github.com/xyproto/ainur v1.3.3/go.mod h1:Sn5x2wSx2Q9RoZHIqJr927vVeM0oKwBl4lCMG+My/rk= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= @@ -499,8 +511,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= -gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= -gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/main.go b/main.go index ad841f5675..a5990d1f87 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,7 @@ import ( telemetrygrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/telemetry/v1alpha1/telemetryv1alpha1grpc" telemetrypb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/telemetry/v1alpha1" _ "github.com/KimMachineGun/automemlimit" - "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/armon/circbuf" "github.com/common-nighthawk/go-figure" "github.com/felixge/fgprof" @@ -424,6 +424,7 @@ func mainWithExitCode() flags.ExitCode { f.Metadata.DisableCPULabel, f.Metadata.DisableThreadIDLabel, f.Metadata.DisableThreadCommLabel, + f.RemoteStore.UseV2Schema, ) if err != nil { return flags.Failure("Failed to start reporting: %v", err) diff --git a/reporter/arrow.go b/reporter/arrow.go index cb09891091..18b734c077 100644 --- a/reporter/arrow.go +++ b/reporter/arrow.go @@ -5,12 +5,70 @@ import ( "slices" "unsafe" - "github.com/apache/arrow/go/v16/arrow" - "github.com/apache/arrow/go/v16/arrow/array" - "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" "golang.org/x/exp/maps" ) +func stringRunEndBuilder(arr array.Builder) *StringRunEndBuilder { + ree := arr.(*array.RunEndEncodedBuilder) + sb := ree.ValueBuilder().(*array.StringBuilder) + return &StringRunEndBuilder{ + ree: ree, + sb: sb, + } +} + +type StringRunEndBuilder struct { + ree *array.RunEndEncodedBuilder + sb *array.StringBuilder +} + +func (b *StringRunEndBuilder) Release() { + b.ree.Release() +} + +func (b *StringRunEndBuilder) NewArray() arrow.Array { + return b.ree.NewArray() +} + +func (b *StringRunEndBuilder) Len() int { + return b.ree.Len() +} + +func (b *StringRunEndBuilder) EnsureLength(l int) { + for b.ree.Len() < l { + b.AppendNull() + } +} + +func (b *StringRunEndBuilder) AppendNull() { + b.ree.AppendNull() +} + +func (b *StringRunEndBuilder) AppendString(v string) { + if b.sb.Len() > 0 && + !b.sb.IsNull(b.sb.Len()-1) && + v == b.sb.Value(int(b.sb.Len()-1)) { + b.ree.ContinueRun(1) + return + } + b.ree.Append(1) + b.sb.Append(v) +} + +func (b *StringRunEndBuilder) AppendStringN(v string, n uint64) { + if b.sb.Len() > 0 && + !b.sb.IsNull(b.sb.Len()-1) && + v == b.sb.Value(int(b.sb.Len()-1)) { + b.ree.ContinueRun(n) + return + } + b.ree.Append(n) + b.sb.Append(v) +} + func binaryDictionaryRunEndBuilder(arr array.Builder) *BinaryDictionaryRunEndBuilder { ree := arr.(*array.RunEndEncodedBuilder) bd := ree.ValueBuilder().(*array.BinaryDictionaryBuilder) @@ -105,6 +163,10 @@ func (b *Uint64RunEndBuilder) NewArray() arrow.Array { return b.ree.NewArray() } +func (b *Uint64RunEndBuilder) Append(v uint64) { + b.AppendN(v, 1) +} + func (b *Uint64RunEndBuilder) AppendN(v uint64, n uint64) { if b.ub.Len() > 0 && v == b.ub.Value(b.ub.Len()-1) { b.ree.ContinueRun(n) diff --git a/reporter/arrow_v2.go b/reporter/arrow_v2.go new file mode 100644 index 0000000000..ffab7581d1 --- /dev/null +++ b/reporter/arrow_v2.go @@ -0,0 +1,682 @@ +package reporter + +import ( + "slices" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/bitutil" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/arrow-go/v18/arrow/memory" + "go.opentelemetry.io/ebpf-profiler/libpf" + "golang.org/x/exp/maps" +) + +// V2 Schema Constants +const ( + MetadataSchemaVersionV2 = "v2" +) + +// FunctionV2 represents a function for deduplication purposes. +type FunctionV2 struct { + SystemName string + Filename string + StartLine uint64 +} + +// listEntryRef stores the offset and size for ListView deduplication. +type listEntryRef struct { + offset int + listSize int +} + +// V2 Schema Field Definitions using StringView for better memory efficiency + +var ( + // FilenameDictTypeV2 is a dictionary of filenames for efficient storage. + FilenameDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + } + + // FunctionFieldTypeV2 defines the function struct type for v2. + FunctionFieldTypeV2 = arrow.StructOf( + arrow.Field{Name: "system_name", Type: arrow.BinaryTypes.StringView, Nullable: true}, + arrow.Field{Name: "filename", Type: FilenameDictTypeV2, Nullable: true}, + arrow.Field{Name: "start_line", Type: arrow.PrimitiveTypes.Uint64, Nullable: false}, + ) + + // FunctionDictTypeV2 is a dictionary of functions for efficient storage. + FunctionDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: FunctionFieldTypeV2, + } + + // LineFieldTypeV2 defines the line struct type for v2. + LineFieldTypeV2 = arrow.StructOf( + arrow.Field{Name: "line", Type: arrow.PrimitiveTypes.Uint64, Nullable: false}, + arrow.Field{Name: "column", Type: arrow.PrimitiveTypes.Uint64, Nullable: false}, + arrow.Field{Name: "function", Type: FunctionDictTypeV2, Nullable: false}, + ) + + // FrameTypeDictTypeV2 is a dictionary of frame types with Uint32 index. + FrameTypeDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + } + + // MappingFileDictTypeV2 is a dictionary of mapping files for efficient storage. + MappingFileDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + } + + // MappingBuildIDDictTypeV2 is a dictionary of mapping build IDs for efficient storage. + MappingBuildIDDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + } + + // LocationTypeV2 defines the location struct type for v2. + LocationTypeV2 = arrow.StructOf( + arrow.Field{Name: "address", Type: arrow.PrimitiveTypes.Uint64, Nullable: false}, + arrow.Field{Name: "frame_type", Type: FrameTypeDictTypeV2, Nullable: true}, + arrow.Field{Name: "mapping_file", Type: MappingFileDictTypeV2, Nullable: true}, + arrow.Field{Name: "mapping_build_id", Type: MappingBuildIDDictTypeV2, Nullable: true}, + arrow.Field{Name: "lines", Type: arrow.ListViewOf(LineFieldTypeV2), Nullable: true}, + ) + + // LocationDictTypeV2 is a dictionary of locations for efficient storage. + LocationDictTypeV2 = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: LocationTypeV2, + } + + // StacktraceTypeV2 is a ListView of dictionary-encoded locations. + // - Dictionary deduplicates individual locations + // - ListView allows reusing offset/size for identical stacktraces + StacktraceTypeV2 = arrow.ListViewOf(LocationDictTypeV2) + + // StacktraceFieldV2 is the field definition for stacktraces in the v2 sample schema. + StacktraceFieldV2 = arrow.Field{ + Name: "stacktrace", + Type: StacktraceTypeV2, + Nullable: true, + } + + TimestampFieldV2 = arrow.Field{ + Name: "timestamp", + Type: &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}, + } + + ProducerFieldV2 = arrow.Field{ + Name: "producer", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + SampleTypeFieldV2 = arrow.Field{ + Name: "sample_type", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + SampleUnitFieldV2 = arrow.Field{ + Name: "sample_unit", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + PeriodTypeFieldV2 = arrow.Field{ + Name: "period_type", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + PeriodUnitFieldV2 = arrow.Field{ + Name: "period_unit", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + TemporalityFieldV2 = arrow.Field{ + Name: "temporality", + Nullable: true, + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String), + } + + DurationFieldV2 = arrow.Field{ + Name: "duration", + Type: arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Uint64), + } + + StacktraceIDFieldV2 = arrow.Field{ + Name: "stacktrace_id", + Type: extensions.NewUUIDType(), + } + + labelArrowTypeV2 = arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + }, + ) +) + +// FunctionDictBuilderV2 deduplicates functions using a map. +type FunctionDictBuilderV2 struct { + mem memory.Allocator + index map[FunctionV2]uint32 + builder *array.StructBuilder + sysName *array.StringViewBuilder + filename *array.BinaryDictionaryBuilder + startLn *array.Uint64Builder +} + +// NewFunctionDictBuilderV2 creates a new FunctionDictBuilderV2. +func NewFunctionDictBuilderV2(mem memory.Allocator) *FunctionDictBuilderV2 { + builder := array.NewStructBuilder(mem, FunctionFieldTypeV2) + return &FunctionDictBuilderV2{ + mem: mem, + index: make(map[FunctionV2]uint32), + builder: builder, + sysName: builder.FieldBuilder(0).(*array.StringViewBuilder), + filename: builder.FieldBuilder(1).(*array.BinaryDictionaryBuilder), + startLn: builder.FieldBuilder(2).(*array.Uint64Builder), + } +} + +// AppendFunction adds a function and returns its dictionary index. +func (b *FunctionDictBuilderV2) AppendFunction(f FunctionV2) uint32 { + if idx, ok := b.index[f]; ok { + return idx + } + + idx := uint32(len(b.index)) + b.index[f] = idx + + b.builder.Append(true) + if f.SystemName == "" { + b.sysName.AppendNull() + } else { + b.sysName.AppendString(f.SystemName) + } + if f.Filename == "" { + b.filename.AppendNull() + } else { + b.filename.AppendString(f.Filename) + } + b.startLn.Append(f.StartLine) + + return idx +} + +// Len returns the number of unique functions. +func (b *FunctionDictBuilderV2) Len() int { + return len(b.index) +} + +// Release releases the builder resources. +func (b *FunctionDictBuilderV2) Release() { + b.builder.Release() +} + +// StacktraceDictBuilderV2 deduplicates stacktraces using TraceHash and ListView. +// Structure: ListView[Dictionary[Uint32, LocationTypeV2]] +// - Dictionary handles location-level deduplication (manual construction) +// - ListView handles stacktrace-level deduplication via offset/size reuse +// - Functions within lines are dictionary-encoded for deduplication +// +// Since Arrow v16 doesn't have StructDictionaryBuilder, we build the dictionaries +// manually: separate arrays for values, uint32 arrays for indices. +type StacktraceDictBuilderV2 struct { + mem memory.Allocator + index map[libpf.TraceHash]listEntryRef + + // ListView components (built manually) + offsets *array.Int32Builder + sizes *array.Int32Builder + + // Dictionary indices for locations (what the ListView values reference) + indices *array.Uint32Builder + + // Location fields (individual builders, composed into struct at build time) + locAddress *array.Uint64Builder + locFrameType *array.BinaryDictionaryBuilder + locMappingFile *array.BinaryDictionaryBuilder + locMappingID *array.BinaryDictionaryBuilder + + // Lines list: offsets track where each location's lines start + lineListOffsets *array.Int32Builder + + // Line fields + lineNumber *array.Uint64Builder + lineColumn *array.Uint64Builder + + // Function dictionary encoding + funcIndices *array.Uint32Builder + funcDict *FunctionDictBuilderV2 + + // Track locations for deduplication: frame -> dictionary index + LocationIndex map[libpf.Frame]uint32 + + // Number of ListView entries + length int +} + +// NewStacktraceDictBuilderV2 creates a new StacktraceDictBuilderV2. +func NewStacktraceDictBuilderV2(mem memory.Allocator) *StacktraceDictBuilderV2 { + return &StacktraceDictBuilderV2{ + mem: mem, + index: make(map[libpf.TraceHash]listEntryRef), + offsets: array.NewInt32Builder(mem), + sizes: array.NewInt32Builder(mem), + indices: array.NewUint32Builder(mem), + locAddress: array.NewUint64Builder(mem), + locFrameType: array.NewBuilder(mem, FrameTypeDictTypeV2).(*array.BinaryDictionaryBuilder), + locMappingFile: array.NewBuilder(mem, MappingFileDictTypeV2).(*array.BinaryDictionaryBuilder), + locMappingID: array.NewBuilder(mem, MappingBuildIDDictTypeV2).(*array.BinaryDictionaryBuilder), + lineListOffsets: array.NewInt32Builder(mem), + lineNumber: array.NewUint64Builder(mem), + lineColumn: array.NewUint64Builder(mem), + funcIndices: array.NewUint32Builder(mem), + funcDict: NewFunctionDictBuilderV2(mem), + LocationIndex: make(map[libpf.Frame]uint32), + length: 0, + } +} + +// AppendStacktrace appends a stacktrace, reusing ListView dimensions for duplicates. +// The appendLocation callback is called for each frame; it handles dedup, frame +// resolution, and writing to the arrow builders, returning the dictionary index. +func (b *StacktraceDictBuilderV2) AppendStacktrace( + traceHash libpf.TraceHash, + frames libpf.Frames, + appendLocation func(frame libpf.Frame) uint32, +) { + if entry, ok := b.index[traceHash]; ok { + // Reuse existing ListView dimensions + b.offsets.Append(int32(entry.offset)) + b.sizes.Append(int32(entry.listSize)) + b.length++ + return + } + + // New stacktrace - resolve and append each frame + startOffset := b.indices.Len() + listSize := 0 + + for _, frameHandle := range frames { + frame := frameHandle.Value() + idx := appendLocation(frame) + b.indices.Append(idx) + listSize++ + } + + // Record the entry for future deduplication + b.index[traceHash] = listEntryRef{ + offset: startOffset, + listSize: listSize, + } + + // Append the dimensions for this new entry + b.offsets.Append(int32(startOffset)) + b.sizes.Append(int32(listSize)) + b.length++ +} + +// AppendNull appends a null stacktrace. +// For ListView, null is represented by size=0 with any offset. +func (b *StacktraceDictBuilderV2) AppendNull() { + b.offsets.Append(0) + b.sizes.Append(0) + b.length++ +} + +// Len returns the number of stacktraces appended. +func (b *StacktraceDictBuilderV2) Len() int { + return b.length +} + +// UniqueStacktraces returns the number of unique stacktraces. +func (b *StacktraceDictBuilderV2) UniqueStacktraces() int { + return len(b.index) +} + +// NewArray builds and returns the ListView[Dictionary[Uint32, LocationTypeV2]] array. +// This manually constructs the full array hierarchy since Arrow v16 lacks StructDictionaryBuilder. +// The hierarchy is: ListView → Dict[Uint32, LocationStruct] → ... → lines list → Dict[Uint32, FunctionStruct]. +func (b *StacktraceDictBuilderV2) NewArray() arrow.Array { + numLocations := b.locAddress.Len() + + // Build stacktrace ListView components + stOffsets := b.offsets.NewArray() + defer stOffsets.Release() + stSizes := b.sizes.NewArray() + defer stSizes.Release() + locIndices := b.indices.NewArray() + defer locIndices.Release() + + // Build function dictionary: Dict[Uint32, FunctionStruct] + funcValues := b.funcDict.builder.NewArray() + defer funcValues.Release() + funcIdxArr := b.funcIndices.NewArray() + defer funcIdxArr.Release() + funcDictArr := array.NewDictionaryArray(FunctionDictTypeV2, funcIdxArr, funcValues) + defer funcDictArr.Release() + + // Build line number and column arrays + lineNumArr := b.lineNumber.NewArray() + defer lineNumArr.Release() + lineColArr := b.lineColumn.NewArray() + defer lineColArr.Release() + numLines := lineNumArr.Len() + + // Build line struct: {line: Uint64, column: Uint64, function: Dict[Uint32, FunctionStruct]} + lineStructData := array.NewData( + LineFieldTypeV2, + numLines, + []*memory.Buffer{nil}, // validity (all lines valid) + []arrow.ArrayData{lineNumArr.Data(), lineColArr.Data(), funcDictArr.Data()}, + 0, 0, + ) + defer lineStructData.Release() + + // Build line ListView offsets and sizes + lineOffsetsArr := b.lineListOffsets.NewArray() + defer lineOffsetsArr.Release() + + // Compute sizes from consecutive offsets + lineOffsetsData := lineOffsetsArr.(*array.Int32).Int32Values() + lineSizesBuilder := array.NewInt32Builder(b.mem) + for i := 0; i < numLocations; i++ { + if i < numLocations-1 { + lineSizesBuilder.Append(lineOffsetsData[i+1] - lineOffsetsData[i]) + } else { + lineSizesBuilder.Append(int32(numLines) - lineOffsetsData[i]) + } + } + lineSizesArr := lineSizesBuilder.NewArray() + defer lineSizesArr.Release() + lineSizesBuilder.Release() + + // Build validity bitmap for lines ListView: locations with 0 lines + // are marked as null so the server-side async symbolizer attempts to + // symbolize those frames. + lineSizesData := lineSizesArr.(*array.Int32).Int32Values() + var linesValidityBuf *memory.Buffer + linesNullCount := 0 + for i := 0; i < numLocations; i++ { + if lineSizesData[i] == 0 { + linesNullCount++ + } + } + if linesNullCount > 0 { + validityBytes := make([]byte, bitutil.BytesForBits(int64(numLocations))) + for i := 0; i < numLocations; i++ { + if lineSizesData[i] > 0 { + bitutil.SetBit(validityBytes, i) + } + } + linesValidityBuf = memory.NewBufferBytes(validityBytes) + } + + // Build lines ListView: ListView[LineStruct] + linesListData := array.NewData( + arrow.ListViewOf(LineFieldTypeV2), + numLocations, + []*memory.Buffer{ + linesValidityBuf, // validity (null where lines size is 0) + lineOffsetsArr.Data().Buffers()[1], // offsets buffer + lineSizesArr.Data().Buffers()[1], // sizes buffer + }, + []arrow.ArrayData{lineStructData}, + linesNullCount, 0, + ) + defer linesListData.Release() + + // Build location field arrays + addrArr := b.locAddress.NewArray() + defer addrArr.Release() + ftArr := b.locFrameType.NewArray() + defer ftArr.Release() + mfArr := b.locMappingFile.NewArray() + defer mfArr.Release() + midArr := b.locMappingID.NewArray() + defer midArr.Release() + + // Build location struct from individual field arrays + locStructData := array.NewData( + LocationTypeV2, + numLocations, + []*memory.Buffer{nil}, // validity (all locations valid) + []arrow.ArrayData{ + addrArr.Data(), + ftArr.Data(), + mfArr.Data(), + midArr.Data(), + linesListData, + }, + 0, 0, + ) + defer locStructData.Release() + locStructArr := array.MakeFromData(locStructData) + defer locStructArr.Release() + + // Build location dictionary: Dict[Uint32, LocationStruct] + locDictArr := array.NewDictionaryArray(LocationDictTypeV2, locIndices, locStructArr) + defer locDictArr.Release() + + // Build ListView + listViewData := array.NewData( + StacktraceTypeV2, + b.length, + []*memory.Buffer{ + nil, // validity bitmap (no nulls) + stOffsets.Data().Buffers()[1], // offsets buffer + stSizes.Data().Buffers()[1], // sizes buffer + }, + []arrow.ArrayData{locDictArr.Data()}, + 0, 0, + ) + defer listViewData.Release() + + return array.NewListViewData(listViewData) +} + +// Release releases all builder resources. +func (b *StacktraceDictBuilderV2) Release() { + b.offsets.Release() + b.sizes.Release() + b.indices.Release() + b.locAddress.Release() + b.locFrameType.Release() + b.locMappingFile.Release() + b.locMappingID.Release() + b.lineListOffsets.Release() + b.lineNumber.Release() + b.lineColumn.Release() + b.funcIndices.Release() + b.funcDict.Release() +} + +// SampleWriterV2 writes samples with inline stacktraces using the v2 schema. +type SampleWriterV2 struct { + mem memory.Allocator + + labelBuilders map[string]*BinaryDictionaryRunEndBuilder + + // Stacktrace with deduplication + Stacktrace *StacktraceDictBuilderV2 + StacktraceID *extensions.UUIDBuilder + + // Sample data fields (same as v1) + Value *array.Int64Builder + Producer *StringRunEndBuilder + SampleType *StringRunEndBuilder + SampleUnit *StringRunEndBuilder + PeriodType *StringRunEndBuilder + PeriodUnit *StringRunEndBuilder + Temporality *StringRunEndBuilder + Period *Int64RunEndBuilder + Duration *Uint64RunEndBuilder + Timestamp *array.TimestampBuilder +} + +// NewSampleWriterV2 creates a new SampleWriterV2. +func NewSampleWriterV2(mem memory.Allocator) *SampleWriterV2 { + return &SampleWriterV2{ + mem: mem, + labelBuilders: make(map[string]*BinaryDictionaryRunEndBuilder), + Stacktrace: NewStacktraceDictBuilderV2(mem), + StacktraceID: extensions.NewUUIDBuilder(mem), + Value: array.NewInt64Builder(mem), + Producer: stringRunEndBuilder(array.NewBuilder(mem, ProducerFieldV2.Type)), + SampleType: stringRunEndBuilder(array.NewBuilder(mem, SampleTypeFieldV2.Type)), + SampleUnit: stringRunEndBuilder(array.NewBuilder(mem, SampleUnitFieldV2.Type)), + PeriodType: stringRunEndBuilder(array.NewBuilder(mem, PeriodTypeFieldV2.Type)), + PeriodUnit: stringRunEndBuilder(array.NewBuilder(mem, PeriodUnitFieldV2.Type)), + Temporality: stringRunEndBuilder(array.NewBuilder(mem, TemporalityFieldV2.Type)), + Period: int64RunEndBuilder(array.NewBuilder(mem, PeriodField.Type)), + Duration: uint64RunEndBuilder(array.NewBuilder(mem, DurationFieldV2.Type)), + Timestamp: array.NewBuilder(mem, TimestampFieldV2.Type).(*array.TimestampBuilder), + } +} + +// Label returns the label builder for the given label name, creating it if necessary. +func (w *SampleWriterV2) Label(labelName string) *BinaryDictionaryRunEndBuilder { + b, ok := w.labelBuilders[labelName] + if !ok { + b = binaryDictionaryRunEndBuilder(array.NewBuilder(w.mem, labelArrowTypeV2)) + w.labelBuilders[labelName] = b + } + + b.EnsureLength(w.Value.Len()) + return b +} + +// LabelAll sets a label value for all samples in the current batch. +func (w *SampleWriterV2) LabelAll(labelName, labelValue string) { + b, ok := w.labelBuilders[labelName] + if !ok { + b = binaryDictionaryRunEndBuilder(array.NewBuilder(w.mem, labelArrowTypeV2)) + w.labelBuilders[labelName] = b + } + + b.ree.Append(uint64(w.Value.Len() - b.ree.Len())) + b.bd.AppendString(labelValue) +} + +// labelField returns the Arrow field definition for a label within the labels struct. +func (w *SampleWriterV2) labelField(labelName string) arrow.Field { + return arrow.Field{ + Name: labelName, + Type: labelArrowTypeV2, + Nullable: true, + } +} + +// SampleSchemaV2 creates the v2 sample schema with the given label fields. +func SampleSchemaV2(profileLabelFields []arrow.Field) *arrow.Schema { + return arrow.NewSchema(ArrowSamplesFieldV2(profileLabelFields), newV2Metadata()) +} + +// ArrowSamplesFieldV2 returns the fields for the v2 sample schema. +func ArrowSamplesFieldV2(profileLabelFields []arrow.Field) []arrow.Field { + // 13 fields: labels (struct), stacktrace, stacktrace_id, value, producer, sample_type, sample_unit, period_type, period_unit, temporality, period, duration, timestamp + fields := make([]arrow.Field, 13) + + fields[0] = arrow.Field{ + Name: "labels", + Type: arrow.StructOf(profileLabelFields...), + Nullable: false, + } + fields[1] = StacktraceFieldV2 + fields[2] = StacktraceIDFieldV2 + fields[3] = ValueField + fields[4] = ProducerFieldV2 + fields[5] = SampleTypeFieldV2 + fields[6] = SampleUnitFieldV2 + fields[7] = PeriodTypeFieldV2 + fields[8] = PeriodUnitFieldV2 + fields[9] = TemporalityFieldV2 + fields[10] = PeriodField + fields[11] = DurationFieldV2 + fields[12] = TimestampFieldV2 + + return fields +} + +func newV2Metadata() *arrow.Metadata { + m := arrow.NewMetadata([]string{MetadataSchemaVersion}, []string{MetadataSchemaVersionV2}) + return &m +} + +// NewRecord builds and returns an Arrow record with all samples. +func (w *SampleWriterV2) NewRecord() arrow.Record { + labelNames := maps.Keys(w.labelBuilders) + slices.Sort(labelNames) + + labelChildArrays := make([]arrow.ArrayData, 0, len(labelNames)) + labelFields := make([]arrow.Field, 0, len(labelNames)) + + length := w.Value.Len() + for _, labelName := range labelNames { + b := w.labelBuilders[labelName] + + // Ensure all label arrays are backfilled to match the length + b.EnsureLength(length) + labelFields = append(labelFields, w.labelField(labelName)) + arr := b.NewArray() + labelChildArrays = append(labelChildArrays, arr.Data()) + defer arr.Release() + } + + // Build the labels struct array + labelsStructType := arrow.StructOf(labelFields...) + labelsStructData := array.NewData( + labelsStructType, + length, + []*memory.Buffer{nil}, // validity bitmap (no nulls) + labelChildArrays, + 0, 0, + ) + defer labelsStructData.Release() + labelsArray := array.MakeFromData(labelsStructData) + defer labelsArray.Release() + + return array.NewRecord( + SampleSchemaV2(labelFields), + []arrow.Array{ + labelsArray, + w.Stacktrace.NewArray(), + w.StacktraceID.NewArray(), + w.Value.NewArray(), + w.Producer.NewArray(), + w.SampleType.NewArray(), + w.SampleUnit.NewArray(), + w.PeriodType.NewArray(), + w.PeriodUnit.NewArray(), + w.Temporality.NewArray(), + w.Period.NewArray(), + w.Duration.NewArray(), + w.Timestamp.NewArray(), + }, + int64(length), + ) +} + +// Release releases all builder resources. +func (w *SampleWriterV2) Release() { + for _, b := range w.labelBuilders { + b.Release() + } + w.Stacktrace.Release() + w.StacktraceID.Release() + w.Value.Release() + w.Producer.Release() + w.SampleType.Release() + w.SampleUnit.Release() + w.PeriodType.Release() + w.PeriodUnit.Release() + w.Temporality.Release() + w.Period.Release() + w.Duration.Release() + w.Timestamp.Release() +} diff --git a/reporter/arrow_v2_test.go b/reporter/arrow_v2_test.go new file mode 100644 index 0000000000..9e49a13694 --- /dev/null +++ b/reporter/arrow_v2_test.go @@ -0,0 +1,411 @@ +package reporter + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +func TestFunctionDictBuilderV2_Deduplication(t *testing.T) { + mem := memory.NewGoAllocator() + builder := NewFunctionDictBuilderV2(mem) + defer builder.Release() + + // Add first function + f1 := FunctionV2{ + SystemName: "main", + Filename: "main.go", + StartLine: 10, + } + idx1 := builder.AppendFunction(f1) + require.Equal(t, uint32(0), idx1) + require.Equal(t, 1, builder.Len()) + + // Add same function again - should return same index + idx2 := builder.AppendFunction(f1) + require.Equal(t, uint32(0), idx2) + require.Equal(t, 1, builder.Len()) // Still only 1 unique function + + // Add different function + f2 := FunctionV2{ + SystemName: "helper", + Filename: "util.go", + StartLine: 5, + } + idx3 := builder.AppendFunction(f2) + require.Equal(t, uint32(1), idx3) + require.Equal(t, 2, builder.Len()) + + // Add first function again + idx4 := builder.AppendFunction(f1) + require.Equal(t, uint32(0), idx4) + require.Equal(t, 2, builder.Len()) +} + +// makeAppendLocation creates a test appendLocation callback that writes native-style +// locations (no lines) directly to the builder, using the builder's LocationIndex for dedup. +func makeAppendLocation(b *StacktraceDictBuilderV2, mappingFile, mappingBuildID string) func(libpf.Frame) uint32 { + return func(frame libpf.Frame) uint32 { + if idx, ok := b.LocationIndex[frame]; ok { + return idx + } + + idx := uint32(len(b.LocationIndex)) + b.LocationIndex[frame] = idx + + b.lineListOffsets.Append(int32(b.lineNumber.Len())) + b.locAddress.Append(uint64(frame.AddressOrLineno)) + b.locFrameType.AppendString(frame.Type.String()) + + if mappingFile == "" { + b.locMappingFile.AppendNull() + } else { + b.locMappingFile.AppendString(mappingFile) + } + + if mappingBuildID == "" { + b.locMappingID.AppendNull() + } else { + b.locMappingID.AppendString(mappingBuildID) + } + + // No lines for native-style locations + + return idx + } +} + +// makeAppendLocationWithLines creates a test appendLocation callback that writes +// locations with dictionary-encoded function info. +func makeAppendLocationWithLines(b *StacktraceDictBuilderV2) func(libpf.Frame) uint32 { + return func(frame libpf.Frame) uint32 { + if idx, ok := b.LocationIndex[frame]; ok { + return idx + } + + idx := uint32(len(b.LocationIndex)) + b.LocationIndex[frame] = idx + + b.lineListOffsets.Append(int32(b.lineNumber.Len())) + b.locAddress.Append(uint64(frame.AddressOrLineno)) + b.locFrameType.AppendString(frame.Type.String()) + + if frame.Type.IsAbort() { + b.locMappingFile.AppendString("agent-internal-error-frame") + b.locMappingID.AppendNull() + + b.lineNumber.Append(0) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: "aborted", + Filename: "", + StartLine: 0, + })) + } else { + + switch frame.Type { + case libpf.NativeFrame: + b.locMappingFile.AppendString("/usr/bin/app") + b.locMappingID.AppendString("build123") + // No lines + case libpf.KernelFrame: + b.locMappingFile.AppendString("[kernel.kallsyms]") + b.locMappingID.AppendNull() + + b.lineNumber.Append(uint64(frame.SourceLine)) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: frame.FunctionName.String(), + Filename: "", + StartLine: 0, + })) + default: + b.locMappingFile.AppendString(frame.Type.String()) + b.locMappingID.AppendNull() + + b.lineNumber.Append(uint64(frame.SourceLine)) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: frame.FunctionName.String(), + Filename: frame.SourceFile.String(), + StartLine: 0, + })) + } + } + + return idx + } +} + +func TestStacktraceDictBuilderV2_Deduplication(t *testing.T) { + mem := memory.NewGoAllocator() + builder := NewStacktraceDictBuilderV2(mem) + defer builder.Release() + + // Create test frames + frame1 := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x1000, + } + frame2 := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x2000, + } + + appendLocation := makeAppendLocation(builder, "/usr/bin/test", "abc123") + + // Create frames for first stacktrace + frames1 := libpf.Frames{} + frames1.Append(&frame1) + frames1.Append(&frame2) + + // Create a trace hash + hash1 := libpf.NewTraceHash(1, 2) + + // Append first stacktrace + builder.AppendStacktrace(hash1, frames1, appendLocation) + require.Equal(t, 1, builder.Len()) + require.Equal(t, 1, builder.UniqueStacktraces()) + + // Append same stacktrace again (same hash) - should reuse dimensions + builder.AppendStacktrace(hash1, frames1, appendLocation) + require.Equal(t, 2, builder.Len()) // Total appended + require.Equal(t, 1, builder.UniqueStacktraces()) // Still only 1 unique + + // Create different stacktrace + frame3 := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x3000, + } + frames2 := libpf.Frames{} + frames2.Append(&frame3) + + hash2 := libpf.NewTraceHash(3, 4) + + // Append different stacktrace + builder.AppendStacktrace(hash2, frames2, appendLocation) + require.Equal(t, 3, builder.Len()) + require.Equal(t, 2, builder.UniqueStacktraces()) + + // Append first stacktrace again + builder.AppendStacktrace(hash1, frames1, appendLocation) + require.Equal(t, 4, builder.Len()) + require.Equal(t, 2, builder.UniqueStacktraces()) + + // Build the array to verify it works + arr := builder.NewArray() + require.NotNil(t, arr) + require.Equal(t, 4, arr.Len()) + arr.Release() +} + +func TestSampleWriterV2_Basic(t *testing.T) { + mem := memory.NewGoAllocator() + writer := NewSampleWriterV2(mem) + defer writer.Release() + + appendLocation := makeAppendLocation(writer.Stacktrace, "/usr/bin/test", "abc123") + + // Create a sample + frame := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x1000, + } + frames := libpf.Frames{} + frames.Append(&frame) + hash := libpf.NewTraceHash(1, 2) + + // Add labels + writer.Label("service").AppendString("my-service") + writer.Label("pod").AppendString("pod-1") + + // Add sample data + writer.Stacktrace.AppendStacktrace(hash, frames, appendLocation) + writer.StacktraceID.AppendBytes([16]byte(hash.Bytes())) + writer.Value.Append(1) + writer.Producer.AppendString("parca_agent") + writer.SampleType.AppendString("samples") + writer.SampleUnit.AppendString("count") + writer.PeriodType.AppendString("cpu") + writer.PeriodUnit.AppendString("nanoseconds") + writer.Temporality.AppendString("delta") + writer.Period.Append(int64(1e9) / 19) + writer.Duration.Append(uint64(1e9)) + writer.Timestamp.Append(arrow.Timestamp(int64(1234567890))) + + // Build record + record := writer.NewRecord() + require.NotNil(t, record) + require.Equal(t, int64(1), record.NumRows()) + + // Verify schema + schema := record.Schema() + require.NotNil(t, schema) + + // Check metadata + val, ok := schema.Metadata().GetValue(MetadataSchemaVersion) + require.True(t, ok) + require.Equal(t, MetadataSchemaVersionV2, val) + + record.Release() +} + +func TestSampleWriterV2_MultipleFrameTypes(t *testing.T) { + mem := memory.NewGoAllocator() + writer := NewSampleWriterV2(mem) + defer writer.Release() + + appendLocation := makeAppendLocationWithLines(writer.Stacktrace) + + // Test with native frame + nativeFrame := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x1000, + } + nativeFrames := libpf.Frames{} + nativeFrames.Append(&nativeFrame) + nativeHash := libpf.NewTraceHash(1, 1) + + writer.Stacktrace.AppendStacktrace(nativeHash, nativeFrames, appendLocation) + writer.StacktraceID.AppendBytes([16]byte(nativeHash.Bytes())) + writer.Value.Append(1) + writer.Producer.AppendString("parca_agent") + writer.SampleType.AppendString("samples") + writer.SampleUnit.AppendString("count") + writer.PeriodType.AppendString("cpu") + writer.PeriodUnit.AppendString("nanoseconds") + writer.Temporality.AppendString("delta") + writer.Period.Append(int64(1e9) / 19) + writer.Duration.Append(uint64(1e9)) + writer.Timestamp.Append(arrow.Timestamp(int64(1234567890))) + + // Test with kernel frame + kernelFrame := libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0x2000, + FunctionName: libpf.Intern("do_syscall_64"), + SourceLine: 100, + } + kernelFrames := libpf.Frames{} + kernelFrames.Append(&kernelFrame) + kernelHash := libpf.NewTraceHash(2, 2) + + writer.Stacktrace.AppendStacktrace(kernelHash, kernelFrames, appendLocation) + writer.StacktraceID.AppendBytes([16]byte(kernelHash.Bytes())) + writer.Value.Append(1) + writer.Producer.AppendString("parca_agent") + writer.SampleType.AppendString("samples") + writer.SampleUnit.AppendString("count") + writer.PeriodType.AppendString("cpu") + writer.PeriodUnit.AppendString("nanoseconds") + writer.Temporality.AppendString("delta") + writer.Period.Append(int64(1e9) / 19) + writer.Duration.Append(uint64(1e9)) + writer.Timestamp.Append(arrow.Timestamp(int64(1234567891))) + + // Build and verify + record := writer.NewRecord() + require.NotNil(t, record) + require.Equal(t, int64(2), record.NumRows()) + + record.Release() +} + +func TestFunctionDictBuilderV2_UsedInStacktrace(t *testing.T) { + mem := memory.NewGoAllocator() + builder := NewStacktraceDictBuilderV2(mem) + defer builder.Release() + + // Create frames that share the same function at different addresses + frame1 := libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0x1000, + FunctionName: libpf.Intern("do_syscall_64"), + SourceLine: 100, + } + frame2 := libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0x2000, + FunctionName: libpf.Intern("do_syscall_64"), // same function, different address + SourceLine: 200, + } + frame3 := libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0x3000, + FunctionName: libpf.Intern("sys_read"), // different function + SourceLine: 50, + } + + appendLocation := makeAppendLocationWithLines(builder) + + frames := libpf.Frames{} + frames.Append(&frame1) + frames.Append(&frame2) + frames.Append(&frame3) + + hash := libpf.NewTraceHash(1, 1) + builder.AppendStacktrace(hash, frames, appendLocation) + + // 3 unique locations (different addresses) + require.Equal(t, 3, len(builder.LocationIndex)) + + // But only 2 unique functions ("do_syscall_64" is deduplicated despite different lines) + require.Equal(t, 2, builder.funcDict.Len()) + + // Build the array to verify structure + arr := builder.NewArray() + require.NotNil(t, arr) + require.Equal(t, 1, arr.Len()) + arr.Release() +} + +func TestStacktraceDictBuilderV2_NullLinesForUnsymbolizedFrames(t *testing.T) { + mem := memory.NewGoAllocator() + builder := NewStacktraceDictBuilderV2(mem) + defer builder.Release() + + // Native frame: no lines appended → lines should be null + nativeFrame := libpf.Frame{ + Type: libpf.NativeFrame, + AddressOrLineno: 0x1000, + } + // Kernel frame: has lines → lines should be non-null + kernelFrame := libpf.Frame{ + Type: libpf.KernelFrame, + AddressOrLineno: 0x2000, + FunctionName: libpf.Intern("do_syscall_64"), + SourceLine: 100, + } + + appendLocation := makeAppendLocationWithLines(builder) + + frames := libpf.Frames{} + frames.Append(&nativeFrame) + frames.Append(&kernelFrame) + + hash := libpf.NewTraceHash(10, 20) + builder.AppendStacktrace(hash, frames, appendLocation) + + arr := builder.NewArray() + defer arr.Release() + require.Equal(t, 1, arr.Len()) + + // The stacktrace is a ListView of Dict[Uint32, LocationStruct]. + // Drill into the location dictionary values to inspect the lines field. + listView := arr.(*array.ListView) + locDict := listView.ListValues().(*array.Dictionary) + locStruct := locDict.Dictionary().(*array.Struct) + + // lines is the 5th field (index 4) in LocationTypeV2 + linesField := locStruct.Field(4) + + // Location 0 is the native frame → lines should be null + require.True(t, linesField.IsNull(0), "native frame (no lines) should have null lines for async symbolization") + + // Location 1 is the kernel frame → lines should be non-null + require.False(t, linesField.IsNull(1), "kernel frame (has lines) should have non-null lines") +} diff --git a/reporter/parca_reporter.go b/reporter/parca_reporter.go index d843d7527a..0b89654c16 100644 --- a/reporter/parca_reporter.go +++ b/reporter/parca_reporter.go @@ -25,10 +25,10 @@ import ( debuginfogrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/debuginfo/v1alpha1/debuginfov1alpha1grpc" profilestoregrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/profilestore/v1alpha1/profilestorev1alpha1grpc" profilestorepb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1" - "github.com/apache/arrow/go/v16/arrow" - "github.com/apache/arrow/go/v16/arrow/array" - "github.com/apache/arrow/go/v16/arrow/ipc" - "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" lru "github.com/elastic/go-freelru" "github.com/klauspost/compress/zstd" "github.com/parca-dev/oomprof/oomprof" @@ -91,10 +91,15 @@ type ParcaReporter struct { disableThreadIDLabel bool disableThreadCommLabel bool - // samples stores the so far received samples. + // samples stores the so far received samples (v1 schema). sampleWriter *SampleWriter sampleWriterMu sync.Mutex + // v2 schema support + useV2Schema bool + sampleWriterV2 *SampleWriterV2 + sampleWriterV2Mu sync.Mutex + // stacks stores known stacks. stacks *lru.SyncedLRU[libpf.TraceHash, libpf.Frames] @@ -230,13 +235,18 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, r.emptySamples.Inc() } + // Dispatch to v2 path if enabled + if r.useV2Schema { + return r.reportTraceEventV2(trace, meta, labelRetrievalResult) + } + r.sampleWriterMu.Lock() defer r.sampleWriterMu.Unlock() buf := [16]byte{} trace.Hash.PutBytes16(&buf) - writeSample := func(value, duration, per int64, producer, sampleType, sampleUnit, periodType, periodUnit string) { + writeSample := func(value int64, duration int64, per int64, producer, sampleType, sampleUnit, periodType, periodUnit string) { // Write labels for _, lbl := range labelRetrievalResult.labels { r.sampleWriter.Label(lbl.Name).AppendString(lbl.Value) @@ -271,11 +281,11 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, switch meta.Origin { case support.TraceOriginSampling: - writeSample(1, time.Second.Nanoseconds(), 1e9/int64(r.samplesPerSecond), "parca_agent", "samples", "count", "cpu", "nanoseconds") + writeSample(1, int64(time.Second.Nanoseconds()), 1e9/int64(r.samplesPerSecond), "parca_agent", "samples", "count", "cpu", "nanoseconds") r.sampleWriter.Temporality.AppendString("delta") r.cpuSamples.Inc() case support.TraceOriginOffCPU: - writeSample(meta.OffTime, time.Second.Nanoseconds(), 1e9/int64(r.samplesPerSecond), "parca_agent", "wallclock", "nanoseconds", "samples", "count") + writeSample(meta.OffTime, int64(time.Second.Nanoseconds()), 1e9/int64(r.samplesPerSecond), "parca_agent", "wallclock", "nanoseconds", "samples", "count") r.sampleWriter.Temporality.AppendString("delta") r.offcpuSamples.Inc() case support.TraceOriginMemory: @@ -302,7 +312,7 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, } r.memorySamples.Inc() case support.TraceOriginCuda: - writeSample(meta.OffTime, time.Second.Nanoseconds(), 1e9/int64(r.samplesPerSecond), "parca_agent", "cuda", "nanoseconds", "cuda", "nanoseconds") + writeSample(meta.OffTime, int64(time.Second.Nanoseconds()), 1e9/int64(r.samplesPerSecond), "parca_agent", "cuda", "nanoseconds", "cuda", "nanoseconds") r.sampleWriter.Temporality.AppendString("delta") r.gpuSamples.Inc() } @@ -310,6 +320,227 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, return nil } +// reportTraceEventV2 handles trace events using the v2 schema with inline stacktraces. +func (r *ParcaReporter) reportTraceEventV2(trace *libpf.Trace, + meta *samples.TraceEventMeta, labelResult labelRetrievalResult) error { + + r.sampleWriterV2Mu.Lock() + defer r.sampleWriterV2Mu.Unlock() + + switch meta.Origin { + case support.TraceOriginSampling: + r.writeSampleV2(trace, meta, labelResult, 1, uint64(time.Second.Nanoseconds()), 1e9/int64(r.samplesPerSecond), true, "parca_agent", "samples", "count", "cpu", "nanoseconds") + case support.TraceOriginOffCPU: + r.writeSampleV2(trace, meta, labelResult, meta.OffTime, uint64(time.Second.Nanoseconds()), 0, true, "parca_agent", "wallclock", "nanoseconds", "samples", "count") + case support.TraceOriginMemory: + log.Infof("Received memory trace event for TID %d, PID %d, comm %s", meta.TID, meta.PID, meta.Comm) + memPeriod := int64(512 * 1024) // 512 KiB + if meta.Allocs != meta.Frees { + r.writeSampleV2(trace, meta, labelResult, int64(meta.Allocs-meta.Frees), 0, memPeriod, false, "memory", "inuse_objects", "count", "space", "bytes") + } + if meta.AllocBytes != meta.FreeBytes { + r.writeSampleV2(trace, meta, labelResult, int64(meta.AllocBytes-meta.FreeBytes), 0, memPeriod, false, "memory", "inuse_space", "bytes", "space", "bytes") + } + if r.reportAllocs { + r.writeSampleV2(trace, meta, labelResult, int64(meta.Allocs), 0, memPeriod, false, "memory", "alloc_objects", "count", "space", "bytes") + r.writeSampleV2(trace, meta, labelResult, int64(meta.AllocBytes), 0, memPeriod, false, "memory", "alloc_space", "bytes", "space", "bytes") + } + case support.TraceOriginCuda: + r.writeSampleV2(trace, meta, labelResult, meta.OffTime, uint64(time.Second.Nanoseconds()), 1, true, "parca_agent", "cuda", "nanoseconds", "cuda", "nanoseconds") + } + + return nil +} + +func (r *ParcaReporter) writeSampleV2( + trace *libpf.Trace, + meta *samples.TraceEventMeta, + labelResult labelRetrievalResult, + value int64, duration uint64, per int64, + delta bool, + producer, sampleType, sampleUnit, periodType, periodUnit string, +) { + for _, lbl := range labelResult.labels { + r.sampleWriterV2.Label(lbl.Name).AppendString(lbl.Value) + } + + for k, v := range trace.CustomLabels { + ks := k.String() + if !utf8.ValidString(ks) { + log.Warnf("ignoring non-UTF8 label: %s", hex.EncodeToString([]byte(ks))) + continue + } + vs, ok := maybeFixTruncation(v.String(), support.CustomLabelMaxValLen-1) + if !ok { + log.Warnf("ignoring non-UTF8 value for label %s: %s", ks, hex.EncodeToString([]byte(vs))) + continue + } + r.sampleWriterV2.Label(ks).AppendString(vs) + } + + r.sampleWriterV2.Stacktrace.AppendStacktrace(trace.Hash, trace.Frames, r.appendLocationV2) + r.sampleWriterV2.StacktraceID.AppendBytes([16]byte(trace.Hash.Bytes())) + + r.sampleWriterV2.Timestamp.Append(arrow.Timestamp(int64(meta.Timestamp))) + r.sampleWriterV2.Value.Append(value) + r.sampleWriterV2.SampleType.AppendString(sampleType) + r.sampleWriterV2.SampleUnit.AppendString(sampleUnit) + r.sampleWriterV2.PeriodType.AppendString(periodType) + r.sampleWriterV2.PeriodUnit.AppendString(periodUnit) + r.sampleWriterV2.Producer.AppendString(producer) + r.sampleWriterV2.Duration.Append(duration) + r.sampleWriterV2.Period.Append(per) + + if delta { + r.sampleWriterV2.Temporality.AppendString("delta") + } else { + r.sampleWriterV2.Temporality.AppendNull() + } +} + +// appendLocationV2 resolves a frame and appends it to the location dictionary. +// It uses the libpf.Frame value as the deduplication key, skipping resolution +// and arrow writes when the frame has already been seen. +// Functions are dictionary-encoded via FunctionDictBuilderV2. +func (r *ParcaReporter) appendLocationV2(frame libpf.Frame) uint32 { + b := r.sampleWriterV2.Stacktrace + + if idx, ok := b.LocationIndex[frame]; ok { + return idx + } + + idx := uint32(len(b.LocationIndex)) + b.LocationIndex[frame] = idx + + // Record line list offset for this location (before writing any lines) + b.lineListOffsets.Append(int32(b.lineNumber.Len())) + b.locAddress.Append(uint64(frame.AddressOrLineno)) + + if frame.Type.IsAbort() { + b.locFrameType.AppendString(frame.Type.String()) + b.locMappingFile.AppendString("agent-internal-error-frame") + b.locMappingID.AppendNull() + + b.lineNumber.Append(0) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: "aborted", + Filename: "", + StartLine: 0, + })) + + return idx + } + + switch frameKind := frame.Type; frameKind { + case libpf.NativeFrame: + b.locFrameType.AppendString(frame.Type.String()) + + var execInfo metadata.ExecInfo + var fid libpf.FileID + var exists bool + + if frame.Mapping.Valid() { + m := frame.Mapping.Value() + if m.File != (libpf.FrameMappingFile{}) { + mf := m.File.Value() + fid = mf.FileID + execInfo, exists = r.executables.Get(mf.FileID) + } + } + + if exists { + b.locMappingFile.AppendString(execInfo.FileName) + if execInfo.BuildID != "" { + b.locMappingID.AppendString(execInfo.BuildID) + } else { + b.locMappingID.AppendString(fid.StringNoQuotes()) + } + } else { + b.locMappingFile.AppendString("UNKNOWN") + b.locMappingID.AppendNull() + } + // No lines for native frames + + case libpf.KernelFrame: + b.locFrameType.AppendString(frame.Type.String()) + b.locMappingFile.AppendString("[kernel.kallsyms]") + b.locMappingID.AppendNull() + + var execInfo metadata.ExecInfo + var exists bool + if frame.Mapping.Valid() { + m := frame.Mapping.Value() + if m.File != (libpf.FrameMappingFile{}) { + mf := m.File.Value() + execInfo, exists = r.executables.Get(mf.FileID) + } + } + var moduleName string + if exists { + moduleName = execInfo.FileName + } else { + moduleName = "vmlinux" + } + + var symbol string + var lineNumber uint64 + if frame.FunctionName.String() != "" { + symbol = frame.FunctionName.String() + lineNumber = uint64(frame.SourceLine) + } else { + symbol = "UNKNOWN" + } + + b.lineNumber.Append(lineNumber) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: symbol, + Filename: moduleName, + StartLine: 0, + })) + + case oomprofMemoryFrame: + b.locFrameType.AppendString(libpf.NativeFrame.String()) + b.locMappingFile.AppendString(frame.SourceFile.String()) + b.locMappingID.AppendString(frame.FunctionName.String()) + // No lines for oomprof frames + + default: + // Interpreted frames (Python, Ruby, etc.) + b.locFrameType.AppendString(frame.Type.String()) + b.locMappingFile.AppendString(frameKind.String()) + b.locMappingID.AppendNull() + + var lineNumber uint64 + var functionName, filePath string + + if frame.FunctionName.String() != "" { + functionName = frame.FunctionName.String() + filePath = frame.SourceFile.String() + lineNumber = uint64(frame.SourceLine) + } else { + functionName = "UNREPORTED" + filePath = "UNREPORTED" + } + + // Empty path causes the backend to crash + if filePath == "" { + filePath = "UNKNOWN" + } + + b.lineNumber.Append(lineNumber) + b.lineColumn.Append(0) + b.funcIndices.Append(b.funcDict.AppendFunction(FunctionV2{ + SystemName: functionName, + Filename: filePath, + StartLine: 0, + })) + } + + return idx +} + func (r *ParcaReporter) addMetadataForPID(ctx context.Context, pid libpf.PID, lb *labels.Builder) bool { cache := true @@ -616,6 +847,7 @@ func New( disableCPULabel bool, disableThreadIDLabel bool, disableThreadCommLabel bool, + useV2Schema bool, ) (*ParcaReporter, error) { if offlineModeConfig != nil && !disableSymbolUpload { return nil, errors.New("Illogical configuration: offline mode with symbol upload enabled") @@ -692,12 +924,23 @@ func New( reg.MustRegister(skippedByRelabeling) reg.MustRegister(samplesByType) + // Initialize sample writer based on schema version + var sampleWriter *SampleWriter + var sampleWriterV2 *SampleWriterV2 + if useV2Schema { + sampleWriterV2 = NewSampleWriterV2(mem) + } else { + sampleWriter = NewSampleWriter(mem) + } + r := &ParcaReporter{ stopSignal: make(chan libpf.Void), client: nil, executables: executables, labels: labels, - sampleWriter: NewSampleWriter(mem), + sampleWriter: sampleWriter, + useV2Schema: useV2Schema, + sampleWriterV2: sampleWriterV2, stacks: stacks, mem: mem, externalLabels: externalLabels, @@ -975,6 +1218,11 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error { } func (r *ParcaReporter) logDataForOfflineMode(ctx context.Context, buf *bytes.Buffer) error { + // Dispatch to v2 path if enabled + if r.useV2Schema { + return r.logDataForOfflineModeV2(ctx, buf) + } + record, nLabelCols := r.buildSampleRecord(ctx) defer record.Release() @@ -1111,6 +1359,11 @@ func (r *ParcaReporter) logDataForOfflineMode(ctx context.Context, buf *bytes.Bu // reportDataToBackend creates and sends out an arrow record for a Parca backend. func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buffer) error { + // Dispatch to v2 path if enabled + if r.useV2Schema { + return r.reportDataToBackendV2(ctx, buf) + } + record, _ := r.buildSampleRecord(ctx) defer record.Release() @@ -1297,7 +1550,7 @@ func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs w.FunctionName.AppendString("missing stacktrace") w.FunctionSystemName.AppendString("") w.FunctionFilename.AppendNull() - w.FunctionStartLine.Append(int64(0)) + w.FunctionStartLine.Append(0) w.IsComplete.Append(false) continue } @@ -1329,7 +1582,7 @@ func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs w.FunctionName.AppendString("aborted") w.FunctionSystemName.AppendString("") w.FunctionFilename.AppendNull() - w.FunctionStartLine.Append(int64(0)) + w.FunctionStartLine.Append(0) continue } switch frameKind := frame.Type; frameKind { @@ -1411,7 +1664,7 @@ func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs w.FunctionName.AppendString(symbol) w.FunctionSystemName.AppendString("") w.MappingFile.AppendString("[kernel.kallsyms]") - w.FunctionStartLine.Append(int64(0)) + w.FunctionStartLine.Append(0) case oomprofMemoryFrame: // This is a special frame that is used to report OOMProf samples. w.FrameType.AppendString(libpf.NativeFrame.String()) @@ -1458,7 +1711,7 @@ func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs w.FunctionName.AppendString(functionName) w.FunctionSystemName.AppendString("") w.FunctionFilename.AppendString(filePath) - w.FunctionStartLine.Append(int64(0)) + w.FunctionStartLine.Append(0) } } @@ -1467,3 +1720,138 @@ func (r *ParcaReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs return w.NewRecord(stacktraceIDs), nil } + +// buildSampleRecordV2 builds an Arrow record using the v2 schema with inline stacktraces. +func (r *ParcaReporter) buildSampleRecordV2(ctx context.Context) arrow.Record { + newWriter := NewSampleWriterV2(r.mem) + + r.sampleWriterV2Mu.Lock() + w := r.sampleWriterV2 + r.sampleWriterV2 = newWriter + r.sampleWriterV2Mu.Unlock() + + defer w.Release() + + // Complete the record with all values that are the same for all rows + rows := uint64(w.Value.Len()) + r.writeCommonLabelsV2(w, rows) + + return w.NewRecord() +} + +// writeCommonLabelsV2 writes common labels to all rows in the v2 writer. +func (r *ParcaReporter) writeCommonLabelsV2(w *SampleWriterV2, rows uint64) { + for _, label := range r.externalLabels { + w.LabelAll(label.Name, label.Value) + } +} + +// logDataForOfflineModeV2 logs data for offline mode using the v2 schema. +// V2 records are self-contained, so no separate stacktrace record is needed. +func (r *ParcaReporter) logDataForOfflineModeV2(ctx context.Context, buf *bytes.Buffer) error { + record := r.buildSampleRecordV2(ctx) + defer record.Release() + + if record.NumRows() == 0 { + log.Debugf("Skip logging batch with no samples") + return nil + } + + buf.Reset() + + w := ipc.NewWriter(buf, + ipc.WithSchema(record.Schema()), + ipc.WithAllocator(r.mem), + ) + + if err := w.Write(record); err != nil { + return fmt.Errorf("failed to write v2 samples: %w", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close v2 samples writer: %w", err) + } + + r.offlineModeLogMu.Lock() + defer r.offlineModeLogMu.Unlock() + if r.offlineModeLogFile == nil { + fpath := fmt.Sprintf("%s/%d-%d%s", r.offlineModeConfig.StoragePath, time.Now().Unix(), os.Getpid(), DATA_FILE_EXTENSION) + + logFile, err := setupOfflineModeLog(fpath) + if err != nil { + return fmt.Errorf("failed to set up offline mode log file: %w", err) + } + r.offlineModeLogFile = logFile + r.offlineModeLogPath = fpath + r.offlineModeLoggedStacks.Purge() + r.offlineModeNBatchesInCurrentFile = 0 + } + + sz := uint32(buf.Len()) + if err := binary.Write(r.offlineModeLogFile, binary.BigEndian, sz); err != nil { + return fmt.Errorf("failed to write to log %s: %w", r.offlineModeLogPath, err) + } + + if _, err := r.offlineModeLogFile.Write(buf.Bytes()); err != nil { + return fmt.Errorf("failed to write to log %s: %v", r.offlineModeLogPath, err) + } + + r.sampleWrites.Add(float64(record.NumRows())) + r.sampleWriteRequestBytes.Add(float64(buf.Len())) + + // V2 records are self-contained, no separate stacktrace record needed + // We need to fsync before updating the number of records at the head of the file + if err := r.offlineModeLogFile.Sync(); err != nil { + return fmt.Errorf("failed to fsync log %s: %v", r.offlineModeLogPath, err) + } + + r.offlineModeNBatchesInCurrentFile += 1 + n := r.offlineModeNBatchesInCurrentFile + log.Debugf("wrote v2 batch %d", n) + + if _, err := r.offlineModeLogFile.WriteAt([]byte{byte(n / 256), byte(n)}, 6); err != nil { + return fmt.Errorf("failed to write to log %s: %v", r.offlineModeLogPath, err) + } + + return nil +} + +// reportDataToBackendV2 sends a v2 schema record to the backend. +// V2 records are self-contained with inline stacktraces, so no back-and-forth is needed. +func (r *ParcaReporter) reportDataToBackendV2(ctx context.Context, buf *bytes.Buffer) error { + record := r.buildSampleRecordV2(ctx) + defer record.Release() + + if record.NumRows() == 0 { + log.Debugf("Skip sending of v2 profile with no samples") + return nil + } + + buf.Reset() + w := ipc.NewWriter(buf, + ipc.WithSchema(record.Schema()), + ipc.WithAllocator(r.mem), + ipc.WithLZ4(), + ) + + if err := w.Write(record); err != nil { + return err + } + + if err := w.Close(); err != nil { + return err + } + + if _, err := r.client.WriteArrow(ctx, &profilestorepb.WriteArrowRequest{ + IpcBuffer: buf.Bytes(), + }); err != nil { + return err + } + + r.sampleWrites.Add(float64(record.NumRows())) + r.sampleWriteRequestBytes.Add(float64(buf.Len())) + + log.Debugf("Sent v2 profile with %d samples", record.NumRows()) + + return nil +} diff --git a/uploader/log_uploader.go b/uploader/log_uploader.go index 63fcf9c35d..a7cb6eeca8 100644 --- a/uploader/log_uploader.go +++ b/uploader/log_uploader.go @@ -13,10 +13,10 @@ import ( profilestoregrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/profilestore/v1alpha1/profilestorev1alpha1grpc" profilestorepb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1" - "github.com/apache/arrow/go/v16/arrow" - "github.com/apache/arrow/go/v16/arrow/array" - "github.com/apache/arrow/go/v16/arrow/ipc" - "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "github.com/parca-dev/parca-agent/flags" @@ -204,6 +204,7 @@ func getLocationsReader(locations *array.List) (*locationsReader, error) { return nil, fmt.Errorf("missing required field %q in line struct", "line") } lineNumber, ok := line.Field(fieldIdx).(*array.Int64) + if !ok { return nil, fmt.Errorf("expected column line to be of type Int64, got %T", line.Field(fieldIdx)) }