Skip to content

Commit cb18376

Browse files
committed
将读写锁替换为WaitGroup
1 parent ed9d620 commit cb18376

3 files changed

Lines changed: 47 additions & 62 deletions

File tree

ring.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
type RingItem struct {
1111
avformat.AVPacket
12-
sync.RWMutex
12+
sync.WaitGroup
1313
*bytes.Buffer
1414
}
1515

@@ -27,7 +27,7 @@ func NewRing(exp int) (r *Ring) {
2727
r.Size = 1 << exp
2828
r.buffer = make([]RingItem, r.Size)
2929
r.RingItem = &r.buffer[0]
30-
r.Lock()
30+
r.Add(1)
3131
return
3232
}
3333
func (r *Ring) offset(v int) int {
@@ -71,14 +71,8 @@ func (r *Ring) GoBack() {
7171
func (r *Ring) NextW() {
7272
item := r.RingItem
7373
r.GoNext()
74-
r.RingItem.Lock()
75-
item.Unlock()
76-
}
77-
78-
// NextR 读下一个
79-
func (r *Ring) NextR() {
80-
r.RingItem.RUnlock()
81-
r.GoNext()
74+
r.RingItem.Add(1)
75+
item.Done()
8276
}
8377

8478
func (r *Ring) GetBuffer() *bytes.Buffer {

stream.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ func FindStream(streamPath string) *Stream {
2929
//GetStream 根据流路径获取流,如果不存在则创建一个新的
3030
func GetStream(streamPath string) (result *Stream) {
3131
item, loaded := streamCollection.LoadOrStore(streamPath, &Stream{
32-
Subscribers: make(map[string]*Subscriber),
33-
Control: make(chan interface{}),
34-
AVRing: NewRing(config.RingSize),
35-
WaitingMutex: new(sync.RWMutex),
32+
Subscribers: make(map[string]*Subscriber),
33+
Control: make(chan interface{}),
34+
AVRing: NewRing(config.RingSize),
3635
StreamInfo: StreamInfo{
3736
StreamPath: streamPath,
3837
SubscriberInfo: make([]*SubscriberInfo, 0),
@@ -42,7 +41,7 @@ func GetStream(streamPath string) (result *Stream) {
4241
if !loaded {
4342
Summary.Streams = append(Summary.Streams, &result.StreamInfo)
4443
result.Context, result.Cancel = context.WithCancel(context.Background())
45-
result.WaitingMutex.Lock() //等待发布者
44+
result.WaitPub.Add(1) //等待发布者
4645
go result.Run()
4746
}
4847
return
@@ -60,7 +59,7 @@ type Stream struct {
6059
AudioTag *AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
6160
FirstScreen *Ring //最近的关键帧位置,首屏渲染
6261
AVRing *Ring //数据环
63-
WaitingMutex *sync.RWMutex //用于订阅和等待发布者
62+
WaitPub sync.WaitGroup //用于订阅和等待发布者
6463
UseTimestamp bool //是否采用数据包中的时间戳
6564
}
6665

@@ -326,7 +325,7 @@ func (r *Stream) PushVideo(timestamp uint32, payload []byte) {
326325
}
327326
if video.IsKeyFrame {
328327
if r.FirstScreen == nil {
329-
defer r.WaitingMutex.Unlock()
328+
defer r.WaitPub.Done()
330329
r.FirstScreen = video.Clone()
331330
} else {
332331
oldNumber := r.FirstScreen.Number

subscriber.go

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type Subscriber struct {
2929
Cancel context.CancelFunc
3030
Sign string
3131
OffsetTime uint32
32+
startTime uint32
33+
avformat.SendPacket
3234
}
3335

3436
// IsClosed 检查订阅者是否已经关闭
@@ -52,55 +54,45 @@ func (s *Subscriber) Subscribe(streamPath string) (err error) {
5254
}
5355
GetStream(streamPath).Subscribe(s)
5456
defer s.UnSubscribe(s)
55-
//加锁解锁的目的是等待发布者首屏数据,如果发布者尚为发布,则会等待,否则就会往下执行
56-
s.WaitingMutex.RLock()
57-
s.WaitingMutex.RUnlock()
58-
sendPacket := avformat.NewSendPacket(s.VideoTag, 0)
59-
defer sendPacket.Recycle()
60-
s.OnData(sendPacket)
57+
//等待发布者首屏数据,如果发布者尚为发布,则会等待,否则就会往下执行
58+
s.WaitPub.Wait()
59+
s.sendAv(s.VideoTag, 0)
6160
packet := s.FirstScreen.Clone()
62-
startTime := packet.Timestamp
63-
packet.RLock()
64-
sendPacket.AVPacket = &packet.AVPacket
65-
s.OnData(sendPacket)
66-
packet.NextR()
67-
atsent := false
68-
dropping := false
69-
droped := 0
70-
for {
71-
select {
72-
case <-s.Done():
73-
return s.Err()
74-
default:
75-
s.TotalPacket++
76-
packet.RLock()
77-
if !dropping {
78-
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO && !atsent {
79-
sendPacket.AVPacket = s.AudioTag
80-
sendPacket.Timestamp = 0
81-
s.OnData(sendPacket)
82-
atsent = true
83-
}
84-
sendPacket.AVPacket = &packet.AVPacket
85-
sendPacket.Timestamp = packet.Timestamp - startTime
86-
s.OnData(sendPacket)
87-
if s.checkDrop(packet) {
88-
dropping = true
89-
droped = 0
90-
}
91-
packet.NextR()
92-
} else if packet.IsKeyFrame {
93-
//遇到关键帧则退出丢帧
94-
dropping = false
95-
//fmt.Println("drop package ", droped)
96-
s.TotalDrop += droped
97-
packet.RUnlock()
98-
} else {
99-
droped++
100-
packet.NextR()
61+
s.startTime = packet.Timestamp
62+
s.send(packet)
63+
packet.GoNext()
64+
for atsent, dropping, droped := false, false, 0; s.Err() == nil; packet.GoNext() {
65+
s.TotalPacket++
66+
if !dropping {
67+
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO && !atsent {
68+
s.sendAv(s.AudioTag, 0)
69+
atsent = true
10170
}
71+
s.send(packet)
72+
if s.checkDrop(packet) {
73+
dropping = true
74+
droped = 0
75+
}
76+
} else if packet.IsKeyFrame {
77+
//遇到关键帧则退出丢帧
78+
dropping = false
79+
//fmt.Println("drop package ", droped)
80+
s.TotalDrop += droped
81+
s.send(packet)
82+
} else {
83+
droped++
10284
}
10385
}
86+
return s.Err()
87+
}
88+
func (s *Subscriber) sendAv(packet *avformat.AVPacket, t uint32) {
89+
s.AVPacket = packet
90+
s.Timestamp = t
91+
s.OnData(&s.SendPacket)
92+
}
93+
func (s *Subscriber) send(packet *Ring) {
94+
packet.Wait()
95+
s.sendAv(&packet.AVPacket, packet.Timestamp-s.startTime)
10496
}
10597
func (s *Subscriber) checkDrop(packet *Ring) bool {
10698
pIndex := s.AVRing.Index

0 commit comments

Comments
 (0)