Skip to content

Commit d8a24c5

Browse files
authored
Temporary object pools. Fixes and tuning (#47)
* Bytewise crc32 * Bump go-astikit to 0.30. Make crc32 generator. Remove old crc32 calculation func and corresponding tests/benchmarks. * Replace OpenFile with Create in crc32 generator. Some minor changes * Add pooling for packet slices and raw data payload. Replace map[uint16] with map[uint32] see runtime/map_fast32.go . Cut out mutexes. Make DemuxerData slices of known size. Bump GO to 1.19. Fix BenchmarkParsePSIData and BenchmarkDemuxer_NextData. Copy FirstPacket without payload to DemuxerData. * Move pools to separate file. Rollback to GO 1.13 * Comments and naming * Some formatting and esContexts map[uint32] * Remove packetSlice pool. Wrap tempPayload in object to reduce allocations. * Mark packetPool and programMap methods as Unlocked * Naming and comments * Naming --------- Co-authored-by: Danil Korymov <danil@24h.tv>
1 parent f2825ee commit d8a24c5

12 files changed

Lines changed: 153 additions & 97 deletions

data.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package astits
33
import (
44
"encoding/binary"
55
"fmt"
6-
76
"github.com/asticode/go-astikit"
87
)
98

@@ -54,19 +53,28 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa
5453
l += len(p.Payload)
5554
}
5655

56+
// Get the slice for payload from pool
57+
payload := bytesPool.get(l)
58+
defer bytesPool.put(payload)
59+
5760
// Append payload
58-
var payload = make([]byte, l)
5961
var c int
6062
for _, p := range ps {
61-
c += copy(payload[c:], p.Payload)
63+
c += copy(payload.s[c:], p.Payload)
6264
}
6365

6466
// Create reader
65-
i := astikit.NewBytesIterator(payload)
67+
i := astikit.NewBytesIterator(payload.s)
6668

6769
// Parse PID
6870
pid := ps[0].Header.PID
6971

72+
// Copy first packet headers, so we can safely deallocate original payload
73+
fp := &Packet{
74+
Header: ps[0].Header,
75+
AdaptationField: ps[0].AdaptationField,
76+
}
77+
7078
// Parse payload
7179
if pid == PIDCAT {
7280
// Information in a CAT payload is private and dependent on the CA system. Use the PacketsParser
@@ -80,8 +88,8 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa
8088
}
8189

8290
// Append data
83-
ds = psiData.toData(ps[0], pid)
84-
} else if isPESPayload(payload) {
91+
ds = psiData.toData(fp, pid)
92+
} else if isPESPayload(payload.s) {
8593
// Parse PES data
8694
var pesData *PESData
8795
if pesData, err = parsePESData(i); err != nil {
@@ -90,19 +98,21 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa
9098
}
9199

92100
// Append data
93-
ds = append(ds, &DemuxerData{
94-
FirstPacket: ps[0],
95-
PES: pesData,
96-
PID: pid,
97-
})
101+
ds = []*DemuxerData{
102+
{
103+
FirstPacket: fp,
104+
PES: pesData,
105+
PID: pid,
106+
},
107+
}
98108
}
99109
return
100110
}
101111

102112
// isPSIPayload checks whether the payload is a PSI one
103113
func isPSIPayload(pid uint16, pm *programMap) bool {
104114
return pid == PIDPAT || // PAT
105-
pm.exists(pid) || // PMT
115+
pm.existsUnlocked(pid) || // PMT
106116
((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) //DVB
107117
}
108118

@@ -125,15 +135,18 @@ func isPSIComplete(ps []*Packet) bool {
125135
l += len(p.Payload)
126136
}
127137

138+
// Get the slice for payload from pool
139+
payload := bytesPool.get(l)
140+
defer bytesPool.put(payload)
141+
128142
// Append payload
129-
var payload = make([]byte, l)
130143
var o int
131144
for _, p := range ps {
132-
o += copy(payload[o:], p.Payload)
145+
o += copy(payload.s[o:], p.Payload)
133146
}
134147

135148
// Create reader
136-
i := astikit.NewBytesIterator(payload)
149+
i := astikit.NewBytesIterator(payload.s)
137150

138151
// Get next byte
139152
b, err := i.NextByte()

data_psi.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ func parsePSISectionSyntaxData(i *astikit.BytesIterator, h *PSISectionHeader, sh
458458
// toData parses the PSI tables and returns a set of DemuxerData
459459
func (d *PSIData) toData(firstPacket *Packet, pid uint16) (ds []*DemuxerData) {
460460
// Loop through sections
461+
ds = make([]*DemuxerData, 0, len(d.Sections))
461462
for _, s := range d.Sections {
462463
// No data
463464
if s.Syntax == nil || s.Syntax.Data == nil {

data_psi_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,9 @@ func TestWritePSIData(t *testing.T) {
376376
}
377377

378378
func BenchmarkParsePSIData(b *testing.B) {
379+
pb := psiBytes()
379380
b.ReportAllocs()
380381
for i := 0; i < b.N; i++ {
381-
parsePSIData(astikit.NewBytesIterator(psiBytes()))
382+
parsePSIData(astikit.NewBytesIterator(pb))
382383
}
383384
}

data_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,15 @@ func TestParseData(t *testing.T) {
4444
}
4545
ds, err = parseData(ps, nil, pm)
4646
assert.NoError(t, err)
47-
assert.Equal(t, []*DemuxerData{{FirstPacket: ps[0], PES: pesWithHeader(), PID: uint16(256)}}, ds)
47+
assert.Equal(t, []*DemuxerData{
48+
{
49+
FirstPacket: &Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField},
50+
PES: pesWithHeader(),
51+
PID: uint16(256),
52+
}}, ds)
4853

4954
// PSI
50-
pm.set(uint16(256), uint16(1))
55+
pm.setUnlocked(uint16(256), uint16(1))
5156
p = psiBytes()
5257
ps = []*Packet{
5358
{
@@ -61,7 +66,10 @@ func TestParseData(t *testing.T) {
6166
}
6267
ds, err = parseData(ps, nil, pm)
6368
assert.NoError(t, err)
64-
assert.Equal(t, psi.toData(ps[0], uint16(256)), ds)
69+
assert.Equal(t, psi.toData(
70+
&Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField},
71+
uint16(256),
72+
), ds)
6573
}
6674

6775
func TestIsPSIPayload(t *testing.T) {
@@ -73,7 +81,7 @@ func TestIsPSIPayload(t *testing.T) {
7381
}
7482
}
7583
assert.Equal(t, []int{0, 16, 17, 18, 19, 20, 30, 31}, pids)
76-
pm.set(uint16(1), uint16(0))
84+
pm.setUnlocked(uint16(1), uint16(0))
7785
assert.True(t, isPSIPayload(uint16(1), pm))
7886
}
7987

demuxer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) {
139139
if err == ErrNoMorePackets {
140140
for {
141141
// Dump packet pool
142-
if ps = dmx.packetPool.dump(); len(ps) == 0 {
142+
if ps = dmx.packetPool.dumpUnlocked(); len(ps) == 0 {
143143
break
144144
}
145145

@@ -165,7 +165,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) {
165165
}
166166

167167
// Add packet to the pool
168-
if ps = dmx.packetPool.add(p); len(ps) == 0 {
168+
if ps = dmx.packetPool.addUnlocked(p); len(ps) == 0 {
169169
continue
170170
}
171171

@@ -195,7 +195,7 @@ func (dmx *Demuxer) updateData(ds []*DemuxerData) (d *DemuxerData) {
195195
for _, pgm := range v.PAT.Programs {
196196
// Program number 0 is reserved to NIT
197197
if pgm.ProgramNumber > 0 {
198-
dmx.programMap.set(pgm.ProgramMapID, pgm.ProgramNumber)
198+
dmx.programMap.setUnlocked(pgm.ProgramMapID, pgm.ProgramNumber)
199199
}
200200
}
201201
}

demuxer_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ func TestDemuxerNextData(t *testing.T) {
9595
ds = append(ds, d)
9696
}
9797
}
98-
assert.Equal(t, psi.toData(p, PIDPAT), ds)
99-
assert.Equal(t, map[uint16]uint16{0x3: 0x2, 0x5: 0x4}, dmx.programMap.p)
98+
assert.Equal(t, psi.toData(
99+
&Packet{Header: p.Header, AdaptationField: p.AdaptationField},
100+
PIDPAT,
101+
), ds)
102+
assert.Equal(t, map[uint32]uint16{0x3: 0x2, 0x5: 0x4}, dmx.programMap.p)
100103

101104
// No more packets
102105
_, err = dmx.NextData()
@@ -158,7 +161,7 @@ func TestDemuxerNextDataPATPMT(t *testing.T) {
158161
func TestDemuxerRewind(t *testing.T) {
159162
r := bytes.NewReader([]byte("content"))
160163
dmx := NewDemuxer(context.Background(), r)
161-
dmx.packetPool.add(&Packet{Header: PacketHeader{PID: 1}})
164+
dmx.packetPool.addUnlocked(&Packet{Header: PacketHeader{PID: 1}})
162165
dmx.dataBuffer = append(dmx.dataBuffer, &DemuxerData{})
163166
b := make([]byte, 2)
164167
_, err := r.Read(b)
@@ -184,11 +187,10 @@ func BenchmarkDemuxer_NextData(b *testing.B) {
184187
w.Write(b2)
185188

186189
r := bytes.NewReader(buf.Bytes())
190+
dmx := NewDemuxer(context.Background(), r)
187191

188192
for i := 0; i < b.N; i++ {
189193
r.Seek(0, io.SeekStart)
190-
dmx := NewDemuxer(context.Background(), r)
191-
192194
for _, s := range psi.Sections {
193195
if !s.Header.TableID.isUnknown() {
194196
dmx.NextData()

muxer.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ type Muxer struct {
4545
buf bytes.Buffer
4646
bufWriter *astikit.BitsWriter
4747

48-
esContexts map[uint16]*esContext
48+
// We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys
49+
esContexts map[uint32]*esContext
4950
tablesRetransmitCounter int
5051
}
5152

@@ -90,14 +91,14 @@ func NewMuxer(ctx context.Context, w io.Writer, opts ...func(*Muxer)) *Muxer {
9091
patCC: newWrappingCounter(0b1111),
9192
pmtCC: newWrappingCounter(0b1111),
9293

93-
esContexts: map[uint16]*esContext{},
94+
esContexts: map[uint32]*esContext{},
9495
}
9596

9697
m.bufWriter = astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: &m.buf})
9798
m.bitsWriter = astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: m.w})
9899

99100
// TODO multiple programs support
100-
m.pm.set(pmtStartPID, programNumberStart)
101+
m.pm.setUnlocked(pmtStartPID, programNumberStart)
101102
m.pmUpdated = true
102103

103104
for _, opt := range opts {
@@ -125,7 +126,7 @@ func (m *Muxer) AddElementaryStream(es PMTElementaryStream) error {
125126

126127
m.pmt.ElementaryStreams = append(m.pmt.ElementaryStreams, &es)
127128

128-
m.esContexts[es.ElementaryPID] = newEsContext(&es)
129+
m.esContexts[uint32(es.ElementaryPID)] = newEsContext(&es)
129130
// invalidate pmt cache
130131
m.pmtBytes.Reset()
131132
m.pmtUpdated = true
@@ -146,7 +147,7 @@ func (m *Muxer) RemoveElementaryStream(pid uint16) error {
146147
}
147148

148149
m.pmt.ElementaryStreams = append(m.pmt.ElementaryStreams[:foundIdx], m.pmt.ElementaryStreams[foundIdx+1:]...)
149-
delete(m.esContexts, pid)
150+
delete(m.esContexts, uint32(pid))
150151
m.pmtBytes.Reset()
151152
m.pmtUpdated = true
152153
return nil
@@ -162,7 +163,7 @@ func (m *Muxer) SetPCRPID(pid uint16) {
162163
// Currently only PES packets are supported
163164
// Be aware that after successful call WriteData will set d.AdaptationField.StuffingLength value to zero
164165
func (m *Muxer) WriteData(d *MuxerData) (int, error) {
165-
ctx, ok := m.esContexts[d.PID]
166+
ctx, ok := m.esContexts[uint32(d.PID)]
166167
if !ok {
167168
return 0, ErrPIDNotFound
168169
}
@@ -320,7 +321,7 @@ func (m *Muxer) WriteTables() (int, error) {
320321
}
321322

322323
func (m *Muxer) generatePAT() error {
323-
d := m.pm.toPATData()
324+
d := m.pm.toPATDataUnlocked()
324325

325326
versionNumber := m.patVersion.get()
326327
if m.pmUpdated {

packet_pool.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package astits
22

33
import (
44
"sort"
5-
"sync"
65
)
76

87
// packetAccumulator keeps track of packets for a single PID and decides when to flush them
@@ -26,7 +25,12 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
2625

2726
// Empty buffer if we detect a discontinuity
2827
if hasDiscontinuity(mps, p) {
29-
mps = make([]*Packet, 0, cap(mps))
28+
// Reset current slice or make new
29+
if cap(mps) > 0 {
30+
mps = mps[:0]
31+
} else {
32+
mps = make([]*Packet, 0, 10)
33+
}
3034
}
3135

3236
// Throw away packet if it's the same as the previous one
@@ -44,7 +48,7 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
4448

4549
// Check if PSI payload is complete
4650
if b.programMap != nil &&
47-
(b.pid == PIDPAT || b.programMap.exists(b.pid)) &&
51+
(b.pid == PIDPAT || b.programMap.existsUnlocked(b.pid)) &&
4852
isPSIComplete(mps) {
4953
ps = mps
5054
mps = nil
@@ -56,24 +60,23 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
5660

5761
// packetPool represents a queue of packets for each PID in the stream
5862
type packetPool struct {
59-
b map[uint16]*packetAccumulator // Indexed by PID
60-
m *sync.Mutex
63+
// We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys
64+
b map[uint32]*packetAccumulator // Indexed by PID
6165

6266
programMap *programMap
6367
}
6468

6569
// newPacketPool creates a new packet pool with an optional parser and programMap
6670
func newPacketPool(programMap *programMap) *packetPool {
6771
return &packetPool{
68-
b: make(map[uint16]*packetAccumulator),
69-
m: &sync.Mutex{},
72+
b: make(map[uint32]*packetAccumulator),
7073

7174
programMap: programMap,
7275
}
7376
}
7477

75-
// add adds a new packet to the pool
76-
func (b *packetPool) add(p *Packet) (ps []*Packet) {
78+
// addUnlocked adds a new packet to the pool
79+
func (b *packetPool) addUnlocked(p *Packet) (ps []*Packet) {
7780
// Throw away packet if error indicator
7881
if p.Header.TransportErrorIndicator {
7982
return
@@ -85,33 +88,27 @@ func (b *packetPool) add(p *Packet) (ps []*Packet) {
8588
return
8689
}
8790

88-
// Lock
89-
b.m.Lock()
90-
defer b.m.Unlock()
91-
9291
// Make sure accumulator exists
93-
acc, ok := b.b[p.Header.PID]
92+
acc, ok := b.b[uint32(p.Header.PID)]
9493
if !ok {
9594
acc = newPacketAccumulator(p.Header.PID, b.programMap)
96-
b.b[p.Header.PID] = acc
95+
b.b[uint32(p.Header.PID)] = acc
9796
}
9897

9998
// Add to the accumulator
10099
return acc.add(p)
101100
}
102101

103-
// dump dumps the packet pool by looking for the first item with packets inside
104-
func (b *packetPool) dump() (ps []*Packet) {
105-
b.m.Lock()
106-
defer b.m.Unlock()
102+
// dumpUnlocked dumps the packet pool by looking for the first item with packets inside
103+
func (b *packetPool) dumpUnlocked() (ps []*Packet) {
107104
var keys []int
108105
for k := range b.b {
109106
keys = append(keys, int(k))
110107
}
111108
sort.Ints(keys)
112109
for _, k := range keys {
113-
ps = b.b[uint16(k)].q
114-
delete(b.b, uint16(k))
110+
ps = b.b[uint32(k)].q
111+
delete(b.b, uint32(k))
115112
if len(ps) > 0 {
116113
return
117114
}

0 commit comments

Comments
 (0)