Skip to content

Commit c5b4159

Browse files
Merge pull request #4 from threadedstream/use_vtprotobuf
use vtprotobuf client for optimizing memory usage
2 parents 2f8c981 + cb081a7 commit c5b4159

12 files changed

Lines changed: 8533 additions & 998 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ input profiles.
1818
In order to understand how merger works, we need to dig a bit deeper into a profile structure
1919

2020
```go
21-
// Profile is an in-memory representation of profile.proto.
21+
// Profile is an in-memory representation of merged_profile.proto.
2222
type Profile struct {
2323
SampleType []*ValueType
2424
DefaultSampleType string

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ go 1.21
55
require (
66
github.com/google/pprof v0.0.0-20220829040838-70bd9ae97f40
77
github.com/pkg/errors v0.9.1
8+
github.com/planetscale/vtprotobuf v0.6.0
89
github.com/stretchr/testify v1.9.0
9-
google.golang.org/protobuf v1.28.1
10+
google.golang.org/protobuf v1.31.0
1011
)
1112

1213
require (

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ github.com/google/pprof v0.0.0-20220829040838-70bd9ae97f40 h1:ykKxL12NZd3JmWZnyq
77
github.com/google/pprof v0.0.0-20220829040838-70bd9ae97f40/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
88
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
99
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
10+
github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA=
11+
github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
1012
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1113
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1214
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
1315
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
1416
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
1517
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
1618
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
17-
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
18-
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
19+
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
20+
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
1921
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2022
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2123
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

merge.go

Lines changed: 68 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (pu *ByteProfileUnPacker) UnpackRaw(compressedRawProfile []byte, idx uint64
5757
pu.mergedProfile = new(MergedByteProfile)
5858
}
5959

60-
if err = proto.Unmarshal(rawProfile, pu.mergedProfile); err != nil {
60+
if err = pu.mergedProfile.UnmarshalVT(rawProfile); err != nil {
6161
return nil, err
6262
}
6363

@@ -68,6 +68,7 @@ func (pu *ByteProfileUnPacker) Unpack(idx uint64) ([]byte, error) {
6868
return pu.mergedProfile.Profiles[idx], nil
6969
}
7070

71+
// ProfileUnPacker recovers any of the profiles stored inside mergedProfile
7172
// ProfileUnPacker recovers any of the profiles stored inside mergedProfile
7273
type ProfileUnPacker struct {
7374
mergedProfile *MergedProfile
@@ -266,7 +267,7 @@ func (pu *ProfileUnPacker) unpackLocation(p *profile.Profile, id uint64) *profil
266267
return loc
267268
}
268269

269-
func (pu *ProfileUnPacker) unpackLine(p *profile.Profile, line *Line) profile.Line {
270+
func (pu *ProfileUnPacker) unpackLine(p *profile.Profile, line *MergeLine) profile.Line {
270271
return profile.Line{
271272
Line: line.Line,
272273
Function: pu.unpackFunction(p, line.FunctionId),
@@ -277,7 +278,7 @@ func (pu *ProfileUnPacker) getString(id int) string {
277278
if id < 0 || id > len(pu.mergedProfile.StringTable) {
278279
return ""
279280
}
280-
return pu.mergedProfile.StringTable[id-1]
281+
return pu.mergedProfile.StringTable[id]
281282
}
282283

283284
func (pu *ProfileUnPacker) unpackFunction(p *profile.Profile, id uint64) *profile.Function {
@@ -360,7 +361,7 @@ func (bm *ByteProfileMerger) WriteCompressed(w io.Writer) error {
360361
}
361362

362363
func (bm *ByteProfileMerger) WriteUncompressed(w io.Writer) error {
363-
serialized, err := proto.Marshal(bm.mergedProfile)
364+
serialized, err := bm.mergedProfile.MarshalVT()
364365
if err != nil {
365366
return err
366367
}
@@ -392,7 +393,7 @@ func (pw *ProfileMerger) WriteCompressed(w io.Writer) error {
392393
// Write writes the profile as a gzip-compressed marshaled protobuf.
393394
zw := gzip.NewWriter(w)
394395
defer zw.Close()
395-
serialized, err := proto.Marshal(pw.mergedProfile)
396+
serialized, err := pw.mergedProfile.MarshalVT()
396397
if err != nil {
397398
return err
398399
}
@@ -402,15 +403,15 @@ func (pw *ProfileMerger) WriteCompressed(w io.Writer) error {
402403
}
403404

404405
func (pw *ProfileMerger) WriteUncompressed(w io.Writer) error {
405-
serialized, err := proto.Marshal(pw.mergedProfile)
406+
serialized, err := pw.mergedProfile.MarshalVT()
406407
if err != nil {
407408
return err
408409
}
409410
_, err = w.Write(serialized)
410411
return err
411412
}
412413

413-
func (pw *ProfileMerger) Merge(ps ...*profile.Profile) *MergedProfile {
414+
func (pw *ProfileMerger) Merge(ps ...*Profile) *MergedProfile {
414415
pw.mergedProfile.NumFunctions = make([]uint64, 0, len(ps))
415416
pw.mergedProfile.NumLocations = make([]uint64, 0, len(ps))
416417
pw.mergedProfile.NumSampleTypes = make([]uint64, 0, len(ps))
@@ -432,65 +433,66 @@ func (pw *ProfileMerger) Merge(ps ...*profile.Profile) *MergedProfile {
432433
pw.mergePeriods(ps...)
433434
pw.mergePeriodTypes(ps...)
434435

435-
pw.mergedProfile.StringTable = make([]string, len(pw.stringTable), len(pw.stringTable))
436+
pw.mergedProfile.StringTable = make([]string, len(pw.stringTable)+1)
437+
pw.mergedProfile.StringTable[0] = ""
436438
for st, id := range pw.stringTable {
437-
pw.mergedProfile.StringTable[id-1] = st
439+
pw.mergedProfile.StringTable[id] = st
438440
}
439441

440442
return pw.mergedProfile
441443
}
442444

443-
func (pw *ProfileMerger) mergeSamples(ps ...*profile.Profile) {
445+
func (pw *ProfileMerger) mergeSamples(ps ...*Profile) {
444446
// allocate samples slice beforehand
445447
size := 0
446448
for _, p := range ps {
447449
size += len(p.Sample)
448450
}
449-
pw.mergedProfile.Samples = make([]*Sample, 0, size)
451+
pw.mergedProfile.Samples = make([]*MergeSample, 0, size)
450452

451453
for _, p := range ps {
452454
for _, s := range p.Sample {
453-
pw.mergedProfile.Samples = append(pw.mergedProfile.Samples, pw.asMergedSample(s))
455+
pw.mergedProfile.Samples = append(pw.mergedProfile.Samples, pw.asMergedSample(s, p))
454456
}
455457
}
456458
}
457459

458-
func (pw *ProfileMerger) mergePeriodTypes(ps ...*profile.Profile) {
460+
func (pw *ProfileMerger) mergePeriodTypes(ps ...*Profile) {
459461
pw.mergedProfile.PeriodTypes = make([]int64, 0, len(ps)*2)
460462

461463
for _, p := range ps {
462464
pw.mergedProfile.PeriodTypes = append(pw.mergedProfile.PeriodTypes,
463-
int64(pw.putString(p.PeriodType.Type)),
464-
int64(pw.putString(p.PeriodType.Unit)),
465+
p.PeriodType.Type,
466+
p.PeriodType.Unit,
465467
)
466468
}
467469
}
468470

469-
func (pw *ProfileMerger) mergeTimeNanos(ps ...*profile.Profile) {
471+
func (pw *ProfileMerger) mergeTimeNanos(ps ...*Profile) {
470472
pw.mergedProfile.TimesNanos = make([]int64, 0, len(ps))
471473

472474
for _, p := range ps {
473475
pw.mergedProfile.TimesNanos = append(pw.mergedProfile.TimesNanos, p.TimeNanos)
474476
}
475477
}
476478

477-
func (pw *ProfileMerger) mergeDurationNanos(ps ...*profile.Profile) {
479+
func (pw *ProfileMerger) mergeDurationNanos(ps ...*Profile) {
478480
pw.mergedProfile.DurationsNanos = make([]int64, 0, len(ps))
479481

480482
for _, p := range ps {
481483
pw.mergedProfile.DurationsNanos = append(pw.mergedProfile.DurationsNanos, p.DurationNanos)
482484
}
483485
}
484486

485-
func (pw *ProfileMerger) mergePeriods(ps ...*profile.Profile) {
487+
func (pw *ProfileMerger) mergePeriods(ps ...*Profile) {
486488
pw.mergedProfile.Periods = make([]int64, 0, len(ps))
487489

488490
for _, p := range ps {
489491
pw.mergedProfile.Periods = append(pw.mergedProfile.Periods, p.Period)
490492
}
491493
}
492494

493-
func (pw *ProfileMerger) mergeSampleTypes(ps ...*profile.Profile) {
495+
func (pw *ProfileMerger) mergeSampleTypes(ps ...*Profile) {
494496
size := 0
495497
for _, p := range ps {
496498
size += len(p.SampleType)
@@ -501,24 +503,24 @@ func (pw *ProfileMerger) mergeSampleTypes(ps ...*profile.Profile) {
501503
for _, p := range ps {
502504
for _, vt := range p.SampleType {
503505
pw.mergedProfile.SampleType = append(pw.mergedProfile.SampleType,
504-
int64(pw.putString(vt.Type)),
505-
int64(pw.putString(vt.Unit)),
506+
int64(pw.getStringRef(uint64(vt.Type), p)),
507+
int64(pw.getStringRef(uint64(vt.Unit), p)),
506508
)
507509
}
508510
}
509511
}
510512

511-
func (pw *ProfileMerger) putMapping(src *profile.Mapping) uint64 {
513+
func (pw *ProfileMerger) putMapping(src *Mapping, p *Profile) uint64 {
512514
if src == nil {
513515
return math.MaxUint64
514516
}
515517

516-
mapping := &Mapping{
517-
MemoryStart: src.Start,
518-
MemoryLimit: src.Limit,
519-
FileOffset: src.Offset,
520-
Filename: int64(pw.putString(src.File)),
521-
BuildId: int64(pw.putString(src.BuildID)),
518+
mapping := &MergeMapping{
519+
MemoryStart: src.MemoryStart,
520+
MemoryLimit: src.MemoryLimit,
521+
FileOffset: src.FileOffset,
522+
Filename: int64(pw.getStringRef(uint64(src.Filename), p)),
523+
BuildId: int64(pw.getStringRef(uint64(src.BuildId), p)),
522524
HasFilenames: src.HasFilenames,
523525
HasFunctions: src.HasFunctions,
524526
HasInlineFrames: src.HasInlineFrames,
@@ -537,51 +539,52 @@ func (pw *ProfileMerger) putMapping(src *profile.Mapping) uint64 {
537539
return mapping.Id
538540
}
539541

540-
func (pw *ProfileMerger) asMergedSample(s *profile.Sample) *Sample {
541-
mergedProfileSample := &Sample{
542-
LocationId: make([]int64, 0, len(s.Location)),
542+
func (pw *ProfileMerger) asMergedSample(s *Sample, p *Profile) *MergeSample {
543+
mergedProfileSample := &MergeSample{
544+
LocationId: make([]int64, 0, len(s.LocationId)),
543545
Value: s.Value,
544546
}
545547

546-
for _, loc := range s.Location {
547-
mergedProfileSample.LocationId = append(mergedProfileSample.LocationId, int64(pw.putLocation(loc)))
548+
for _, locId := range s.LocationId {
549+
mergedProfileSample.LocationId = append(mergedProfileSample.LocationId, int64(pw.putLocation(p.Location[locId-1], p)))
548550
}
549551

550552
return mergedProfileSample
551553
}
552554

553-
func (pw *ProfileMerger) asMergedValueType(vt *profile.ValueType) *ValueType {
554-
return &ValueType{
555-
Type: int64(pw.putString(vt.Type)),
556-
Unit: int64(pw.putString(vt.Unit)),
555+
func (pw *ProfileMerger) asMergedValueType(vt *ValueType) *MergeValueType {
556+
return &MergeValueType{
557+
Type: vt.Type,
558+
Unit: vt.Unit,
557559
}
558560
}
559561

560-
func (pw *ProfileMerger) asMergedProfileLines(lines []profile.Line) []*Line {
561-
mergedProfileLines := make([]*Line, 0, len(lines))
562+
func (pw *ProfileMerger) asMergedProfileLines(lines []*Line, p *Profile) []*MergeLine {
563+
mergedProfileLines := make([]*MergeLine, 0, len(lines))
562564
for _, ln := range lines {
563-
mergedProfileLines = append(mergedProfileLines, pw.asMergedProfileLine(ln))
565+
mergedProfileLines = append(mergedProfileLines, pw.asMergedProfileLine(ln, p))
564566
}
565567
return mergedProfileLines
566568
}
567569

568-
func (pw *ProfileMerger) asMergedProfileLine(line profile.Line) *Line {
569-
return &Line{
570-
FunctionId: pw.putFunction(line.Function),
570+
func (pw *ProfileMerger) asMergedProfileLine(line *Line, p *Profile) *MergeLine {
571+
return &MergeLine{
572+
FunctionId: pw.putFunction(p.Function[line.FunctionId-1], p),
571573
Line: line.Line,
572574
}
573575
}
574576

575-
func (pw *ProfileMerger) putString(val string) int {
576-
id, ok := pw.stringTable[val]
577-
if !ok {
578-
id = len(pw.stringTable) + 1
579-
pw.stringTable[val] = id
577+
func (pw *ProfileMerger) getStringRef(id uint64, p *Profile) int {
578+
strVal := p.StringTable[id]
579+
if localId, ok := pw.stringTable[strVal]; ok {
580+
return localId
580581
}
581-
return id
582+
newId := len(pw.stringTable) + 1
583+
pw.stringTable[strVal] = newId
584+
return newId
582585
}
583586

584-
func (pw *ProfileMerger) getFunctionKey(fn *Function) functionKey {
587+
func (pw *ProfileMerger) getFunctionKey(fn *MergeFunction) functionKey {
585588
return functionKey{
586589
name: fn.Name,
587590
systemName: fn.SystemName,
@@ -590,7 +593,7 @@ func (pw *ProfileMerger) getFunctionKey(fn *Function) functionKey {
590593
}
591594
}
592595

593-
func (pw *ProfileMerger) getMappingKey(m *Mapping) mappingKey {
596+
func (pw *ProfileMerger) getMappingKey(m *MergeMapping) mappingKey {
594597
key := mappingKey{
595598
start: m.MemoryStart,
596599
limit: m.MemoryLimit,
@@ -607,7 +610,7 @@ func (pw *ProfileMerger) getMappingKey(m *Mapping) mappingKey {
607610
return key
608611
}
609612

610-
func (pw *ProfileMerger) getLocationKey(loc *Location) locationKey {
613+
func (pw *ProfileMerger) getLocationKey(loc *MergeLocation) locationKey {
611614
key := locationKey{
612615
mappingID: loc.MappingId,
613616
address: loc.Address,
@@ -626,27 +629,27 @@ func (pw *ProfileMerger) getLocationKey(loc *Location) locationKey {
626629
return key
627630
}
628631

629-
func (pw *ProfileMerger) putLine(src profile.Line) *Line {
630-
return &Line{
631-
FunctionId: pw.putFunction(src.Function),
632+
func (pw *ProfileMerger) putLine(src *Line, p *Profile) *MergeLine {
633+
return &MergeLine{
634+
FunctionId: pw.putFunction(p.Function[src.FunctionId-1], p),
632635
Line: src.Line,
633636
}
634637
}
635638

636-
func (pw *ProfileMerger) putLocation(src *profile.Location) uint64 {
639+
func (pw *ProfileMerger) putLocation(src *Location, p *Profile) uint64 {
637640
if src == nil {
638641
return math.MaxUint64
639642
}
640643

641-
loc := &Location{
642-
MappingId: pw.putMapping(src.Mapping),
644+
loc := &MergeLocation{
645+
MappingId: pw.putMapping(p.Mapping[src.MappingId-1], p),
643646
Address: src.Address,
644-
Line: make([]*Line, len(src.Line), len(src.Line)),
647+
Line: make([]*MergeLine, len(src.Line), len(src.Line)),
645648
IsFolded: src.IsFolded,
646649
}
647650

648651
for i, line := range src.Line {
649-
loc.Line[i] = pw.putLine(line)
652+
loc.Line[i] = pw.putLine(line, p)
650653
}
651654

652655
key := pw.getLocationKey(loc)
@@ -660,15 +663,15 @@ func (pw *ProfileMerger) putLocation(src *profile.Location) uint64 {
660663
return loc.Id
661664
}
662665

663-
func (pw *ProfileMerger) putFunction(src *profile.Function) uint64 {
666+
func (pw *ProfileMerger) putFunction(src *Function, p *Profile) uint64 {
664667
if src == nil {
665668
return math.MaxUint64
666669
}
667670

668-
f := &Function{
669-
Name: int64(pw.putString(src.Name)),
670-
SystemName: int64(pw.putString(src.SystemName)),
671-
Filename: int64(pw.putString(src.Filename)),
671+
f := &MergeFunction{
672+
Name: int64(pw.getStringRef(uint64(src.Name), p)),
673+
SystemName: int64(pw.getStringRef(uint64(src.SystemName), p)),
674+
Filename: int64(pw.getStringRef(uint64(src.Filename), p)),
672675
StartLine: src.StartLine,
673676
}
674677

0 commit comments

Comments
 (0)