Skip to content

Commit 4c935a4

Browse files
Merge pull request #7 from threadedstream/output_goroutine_debug
Output goroutine debug
2 parents eabdfdb + d3b7b82 commit 4c935a4

12 files changed

Lines changed: 2568 additions & 408 deletions

byte_merger.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package ppmerge
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"io"
7+
8+
"google.golang.org/protobuf/proto"
9+
)
10+
11+
type ByteProfileMerger struct {
12+
mergedProfile *MergedByteProfile
13+
}
14+
15+
func NewByteProfileMerger() *ByteProfileMerger {
16+
return &ByteProfileMerger{
17+
mergedProfile: new(MergedByteProfile),
18+
}
19+
}
20+
21+
func (bm *ByteProfileMerger) Merge(profiles ...[]byte) *MergedByteProfile {
22+
bm.mergedProfile.Profiles = make([][]byte, len(profiles))
23+
for i, p := range profiles {
24+
bm.mergedProfile.Profiles[i] = p
25+
}
26+
27+
return bm.mergedProfile
28+
}
29+
30+
func (bm *ByteProfileMerger) WriteCompressed(w io.Writer) error {
31+
// Write writes the profile as a gzip-compressed marshaled protobuf.
32+
zw := gzip.NewWriter(w)
33+
defer zw.Close()
34+
serialized, err := proto.Marshal(bm.mergedProfile)
35+
if err != nil {
36+
return err
37+
}
38+
39+
_, err = zw.Write(serialized)
40+
return err
41+
}
42+
43+
func (bm *ByteProfileMerger) WriteUncompressed(w io.Writer) error {
44+
serialized, err := bm.mergedProfile.MarshalVT()
45+
if err != nil {
46+
return err
47+
}
48+
_, err = w.Write(serialized)
49+
return err
50+
}
51+
52+
// ByteProfileUnPacker is the unpacker for MergedByteProfile
53+
type ByteProfileUnPacker struct {
54+
mergedProfile *MergedByteProfile
55+
}
56+
57+
// NewByteProfileUnPacker returns new ByteProfileUnPacker instance
58+
func NewByteProfileUnPacker(mergedProfile *MergedByteProfile) *ByteProfileUnPacker {
59+
return &ByteProfileUnPacker{
60+
mergedProfile: mergedProfile,
61+
}
62+
}
63+
64+
func (pu *ByteProfileUnPacker) UnpackRaw(compressedRawProfile []byte, idx uint64) ([]byte, error) {
65+
bb := bytes.NewBuffer(compressedRawProfile)
66+
67+
gzReader, err := gzip.NewReader(bb)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
rawProfile, err := io.ReadAll(gzReader)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
if pu.mergedProfile == nil {
78+
pu.mergedProfile = new(MergedByteProfile)
79+
}
80+
81+
if err = pu.mergedProfile.UnmarshalVT(rawProfile); err != nil {
82+
return nil, err
83+
}
84+
85+
return pu.Unpack(idx)
86+
}
87+
88+
func (pu *ByteProfileUnPacker) Unpack(idx uint64) ([]byte, error) {
89+
return pu.mergedProfile.Profiles[idx], nil
90+
}

goroutine_merger.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package ppmerge
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"io"
7+
8+
"google.golang.org/protobuf/proto"
9+
)
10+
11+
type GoroutineProfileMerger struct {
12+
mergedProfile *MergedGoroutineProfile
13+
stringTable map[string]uint64
14+
}
15+
16+
func NewGoroutineProfileMerger() *GoroutineProfileMerger {
17+
return &GoroutineProfileMerger{
18+
mergedProfile: MergedGoroutineProfileFromVTPool(),
19+
stringTable: map[string]uint64{
20+
"": 0,
21+
},
22+
}
23+
}
24+
25+
func (gpm *GoroutineProfileMerger) WriteCompressed(w io.Writer) error {
26+
// Write writes the profile as a gzip-compressed marshaled protobuf.
27+
zw := gzip.NewWriter(w)
28+
defer zw.Close()
29+
serialized, err := gpm.mergedProfile.MarshalVT()
30+
if err != nil {
31+
return err
32+
}
33+
34+
_, err = zw.Write(serialized)
35+
return err
36+
}
37+
38+
func (gpm *GoroutineProfileMerger) Merge(gps ...*GoroutineProfile) *MergedGoroutineProfile {
39+
gpm.mergedProfile.Totals = make([]uint64, 0, len(gps))
40+
gpm.mergedProfile.NumStacktraces = make([]uint64, 0, len(gps))
41+
42+
gpm.merge(gps...)
43+
44+
gpm.finalizeStringTable()
45+
return gpm.mergedProfile
46+
}
47+
48+
func (gpm *GoroutineProfileMerger) merge(gps ...*GoroutineProfile) {
49+
var resultStacktraces []*Stacktrace
50+
for _, gp := range gps {
51+
gpm.mergedProfile.Totals = append(gpm.mergedProfile.Totals, gp.Total)
52+
stacktraces := gp.GetStacktraces()
53+
54+
gpm.mergedProfile.NumStacktraces = append(gpm.mergedProfile.NumStacktraces, uint64(len(stacktraces)))
55+
56+
for _, st := range stacktraces {
57+
resultStacktrace := new(Stacktrace)
58+
resultStacktrace.Total = st.Total
59+
60+
resultStacktrace.PC = make([]uint64, len(st.PC))
61+
copy(resultStacktrace.PC, st.PC)
62+
63+
frames := st.GetFrames()
64+
if frames == nil {
65+
resultStacktraces = append(resultStacktraces, resultStacktrace)
66+
continue
67+
}
68+
resultStacktrace.Frames = make([]*Frame, 0, len(frames))
69+
for _, f := range frames {
70+
resultStacktrace.Frames = append(resultStacktrace.Frames, gpm.remapFrame(f, gp.StringTable))
71+
}
72+
resultStacktraces = append(resultStacktraces, resultStacktrace)
73+
}
74+
}
75+
76+
gpm.mergedProfile.Stacktraces = resultStacktraces
77+
}
78+
79+
func (gpm *GoroutineProfileMerger) remapFrame(frame *Frame, gpStringTable []string) *Frame {
80+
return &Frame{
81+
Address: frame.Address,
82+
FunctionName: gpm.putString(gpStringTable[frame.FunctionName]),
83+
Offset: frame.Offset,
84+
Filename: gpm.putString(gpStringTable[frame.Filename]),
85+
Line: frame.Line,
86+
}
87+
}
88+
89+
func (gpm *GoroutineProfileMerger) finalizeStringTable() {
90+
gpm.mergedProfile.StringTable = make([]string, len(gpm.stringTable))
91+
for k, v := range gpm.stringTable {
92+
gpm.mergedProfile.StringTable[v] = k
93+
}
94+
}
95+
96+
func (gpm *GoroutineProfileMerger) putString(val string) uint64 {
97+
if id, ok := gpm.stringTable[val]; ok {
98+
return id
99+
}
100+
id := uint64(len(gpm.stringTable))
101+
gpm.stringTable[val] = id
102+
return id
103+
}
104+
105+
type GoroutineProfileUnPacker struct {
106+
mergedProfile *MergedGoroutineProfile
107+
stringTable map[string]uint64
108+
}
109+
110+
func NewGoroutineProfileUnPacker(mergedProfile *MergedGoroutineProfile) *GoroutineProfileUnPacker {
111+
return &GoroutineProfileUnPacker{
112+
mergedProfile: mergedProfile,
113+
stringTable: map[string]uint64{
114+
"": 0,
115+
},
116+
}
117+
}
118+
119+
func (gpu *GoroutineProfileUnPacker) UnpackRaw(compressedRawProfile []byte, idx uint64) (*GoroutineProfile, error) {
120+
bb := bytes.NewBuffer(compressedRawProfile)
121+
122+
gzReader, err := gzip.NewReader(bb)
123+
if err != nil {
124+
return nil, err
125+
}
126+
127+
rawProfile, err := io.ReadAll(gzReader)
128+
if err != nil {
129+
return nil, err
130+
}
131+
132+
if gpu.mergedProfile == nil {
133+
gpu.mergedProfile = MergedGoroutineProfileFromVTPool()
134+
}
135+
136+
if err = proto.Unmarshal(rawProfile, gpu.mergedProfile); err != nil {
137+
return nil, err
138+
}
139+
140+
return gpu.Unpack(idx)
141+
}
142+
143+
func (gpu *GoroutineProfileUnPacker) Unpack(idx uint64) (*GoroutineProfile, error) {
144+
if idx >= uint64(len(gpu.mergedProfile.NumStacktraces)) {
145+
return nil, indexOutOfRangeErr
146+
}
147+
gp := GoroutineProfileFromVTPool()
148+
149+
gp.Total = gpu.mergedProfile.Totals[idx]
150+
151+
numStacktraces := gpu.mergedProfile.NumStacktraces[idx]
152+
153+
var offset uint64
154+
for i := uint64(0); i < idx; i++ {
155+
offset += gpu.mergedProfile.NumStacktraces[i]
156+
}
157+
158+
limit := offset + numStacktraces
159+
160+
gp.Stacktraces = make([]*Stacktrace, 0, numStacktraces)
161+
for offset < limit {
162+
gp.Stacktraces = append(gp.Stacktraces, gpu.remapStacktrace(gpu.mergedProfile.Stacktraces[offset]))
163+
offset++
164+
}
165+
166+
gpu.finalizeStringTable(gp)
167+
168+
return gp, nil
169+
}
170+
171+
func (gpu *GoroutineProfileUnPacker) remapStacktrace(st *Stacktrace) *Stacktrace {
172+
resultStacktrace := new(Stacktrace)
173+
resultStacktrace.Total = st.Total
174+
175+
resultStacktrace.PC = make([]uint64, len(st.PC))
176+
copy(resultStacktrace.PC, st.PC)
177+
178+
frames := st.GetFrames()
179+
if frames == nil {
180+
return resultStacktrace
181+
}
182+
183+
resultStacktrace.Frames = make([]*Frame, 0, len(st.Frames))
184+
for _, f := range frames {
185+
resultStacktrace.Frames = append(resultStacktrace.Frames, gpu.remapFrame(f))
186+
}
187+
188+
return resultStacktrace
189+
}
190+
191+
func (gpu *GoroutineProfileUnPacker) remapFrame(frame *Frame) *Frame {
192+
return &Frame{
193+
Address: frame.Address,
194+
FunctionName: gpu.putString(gpu.mergedProfile.StringTable[frame.FunctionName]),
195+
Offset: frame.Offset,
196+
Filename: gpu.putString(gpu.mergedProfile.StringTable[frame.Filename]),
197+
Line: frame.Line,
198+
}
199+
}
200+
201+
func (gpu *GoroutineProfileUnPacker) finalizeStringTable(gp *GoroutineProfile) {
202+
gp.StringTable = make([]string, len(gpu.stringTable))
203+
for k, v := range gpu.stringTable {
204+
gp.StringTable[v] = k
205+
}
206+
}
207+
208+
func (gpu *GoroutineProfileUnPacker) putString(val string) uint64 {
209+
if id, ok := gpu.stringTable[val]; ok {
210+
return id
211+
}
212+
id := uint64(len(gpu.stringTable))
213+
gpu.stringTable[val] = id
214+
return id
215+
}

0 commit comments

Comments
 (0)