Skip to content

Commit 17d915b

Browse files
author
yuyangzi
committed
channel.close时不再清空缓存, 以便于还可以继续读
1 parent 093285d commit 17d915b

2 files changed

Lines changed: 336 additions & 59 deletions

File tree

libgo/routine_sync/channel.h

Lines changed: 86 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
3333
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
3434
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);
3535

36-
if (closed_)
37-
return false;
38-
39-
if (pushWaiting_) { // 有push协程在等待, 读出来 & 清理
36+
if (pushWaiting_ && pushWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有push协程在等待, 读出来 & 清理
4037
t = *pushQ_;
4138
pushQ_ = nullptr;
4239

@@ -51,6 +48,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
5148
return true;
5249
}
5350

51+
if (closed_)
52+
return false;
53+
5454
if (!isWait) {
5555
RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false",
5656
id(), __func__);
@@ -68,15 +68,16 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
6868
id(), __func__);
6969

7070
(void)waiting.wait_until_p(lock, abstime, [&]{ return popWaiting_ != &waiting; });
71-
bool ok = popWaiting_ != &waiting;
71+
bool changed = (popWaiting_ != &waiting);
72+
bool ok = changed && popWaiting_ != (ConditionVariable*)kClosedWaiting;
7273

73-
RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d",
74-
id(), __func__, ok);
74+
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d",
75+
id(), __func__, closed_, changed, ok);
7576

7677
if (ok) {
7778
// 成功
7879
t = std::move(temp); // 对外部T的写操作放到本线程来做, 降低使用难度
79-
} else {
80+
} else if (!changed) {
8081
// 超时,清理
8182
popQ_ = nullptr;
8283
popWaiting_ = nullptr;
@@ -93,13 +94,13 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
9394
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
9495
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);
9596

97+
if (closed_)
98+
return false;
99+
96100
if (!popWaiting_) {
97101
return pop_impl_with_signal_noqueued(t, isWait, abstime, lock);
98102
}
99103

100-
if (closed_)
101-
return false;
102-
103104
if (!isWait) {
104105
RS_DBG(dbg_channel, "channel=%ld | %s | pop contended && not wait | return false",
105106
id(), __func__);
@@ -119,8 +120,12 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
119120
}
120121
}
121122

122-
RS_DBG(dbg_channel, "channel=%ld | %s | waked | pop idle",
123-
id(), __func__);
123+
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | pop idle",
124+
id(), __func__, closed_);
125+
126+
if (closed_)
127+
return false;
128+
124129
return pop_impl_with_signal_noqueued(t, isWait, abstime, lock);
125130
}
126131

@@ -132,10 +137,7 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
132137
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
133138
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);
134139

135-
if (closed_)
136-
return false;
137-
138-
if (popWaiting_) { // 有pop协程在等待, 写入 & 清理
140+
if (popWaiting_ && popWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有pop协程在等待, 写入 & 清理
139141
*popQ_ = t;
140142
popQ_ = nullptr;
141143

@@ -150,6 +152,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
150152
return true;
151153
}
152154

155+
if (closed_)
156+
return false;
157+
153158
if (!isWait) {
154159
RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false",
155160
id(), __func__);
@@ -166,18 +171,20 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
166171
id(), __func__);
167172

168173
(void)waiting.wait_until_p(lock, abstime, [&]{ return pushWaiting_ != &waiting; });
169-
bool ok = pushWaiting_ != &waiting;
174+
bool changed = (pushWaiting_ != &waiting);
175+
bool ok = changed && pushWaiting_ != (ConditionVariable*)kClosedWaiting;
170176

171-
RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d",
172-
id(), __func__, ok);
177+
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d",
178+
id(), __func__, closed_, changed, ok);
173179

174180
if (ok) {
175181
// 成功
176-
} else {
182+
} else if (!changed) {
177183
// 超时,清理
178184
pushQ_ = nullptr;
179185
pushWaiting_ = nullptr;
180186
}
187+
181188
return ok;
182189
}
183190

@@ -194,6 +201,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
194201
return push_impl_with_signal_noqueued(t, isWait, abstime, lock);
195202
}
196203

204+
if (closed_)
205+
return false;
206+
197207
if (!isWait) {
198208
RS_DBG(dbg_channel, "channel=%ld | %s | push contended && not wait | return false",
199209
id(), __func__);
@@ -213,11 +223,38 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
213223
}
214224
}
215225

216-
RS_DBG(dbg_channel, "channel=%ld | %s | waked | push idle",
217-
id(), __func__);
226+
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | push idle",
227+
id(), __func__, closed_);
228+
229+
if (closed_)
230+
return false;
231+
218232
return push_impl_with_signal_noqueued(t, isWait, abstime, lock);
219233
}
220234

235+
void impl_with_signal_close(std::unique_lock<Mutex> & lock)
236+
{
237+
long push_wakeup = 0;
238+
long pop_wakeup = 0;
239+
pushQ_ = nullptr;
240+
popQ_ = nullptr;
241+
if (pushWaiting_) {
242+
pushWaiting_->fast_notify_all(lock);
243+
pushWaiting_ = (ConditionVariable*)kClosedWaiting;
244+
push_wakeup = 1;
245+
}
246+
if (popWaiting_) {
247+
popWaiting_->fast_notify_all(lock);
248+
popWaiting_ = (ConditionVariable*)kClosedWaiting;
249+
pop_wakeup = 1;
250+
}
251+
252+
(void)push_wakeup;
253+
(void)pop_wakeup;
254+
RS_DBG(dbg_channel, "channel(...)=%ld | %s | no-cap branch | push-wakeup=%ld | pop-wakeup=%ld",
255+
id(), __func__, push_wakeup, pop_wakeup);
256+
}
257+
221258
protected:
222259
Mutex mtx_;
223260
ConditionVariable pushCv_;
@@ -228,6 +265,8 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
228265
T* popQ_ {nullptr};
229266
ConditionVariable* pushWaiting_ {nullptr};
230267
ConditionVariable* popWaiting_ {nullptr};
268+
269+
static const std::size_t kClosedWaiting = (std::size_t)-1;
231270
};
232271

233272
template <
@@ -248,6 +287,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
248287
using ChannelImplWithSignal<T>::pop_impl_with_signal;
249288
using ChannelImplWithSignal<T>::push_impl_with_signal;
250289
using ChannelImplWithSignal<T>::id;
290+
using ChannelImplWithSignal<T>::impl_with_signal_close;
251291

252292
explicit ChannelImpl(std::size_t capacity = 0)
253293
: cap_(capacity)
@@ -302,22 +342,16 @@ class ChannelImpl : public ChannelImplWithSignal<T>
302342
std::unique_lock<Mutex> lock(mtx_);
303343
closed_ = true;
304344
if (!cap_) {
305-
pushQ_ = nullptr;
306-
popQ_ = nullptr;
307-
if (pushWaiting_) {
308-
pushWaiting_->fast_notify_all(lock);
309-
pushWaiting_ = nullptr;
310-
}
311-
if (popWaiting_) {
312-
popWaiting_->fast_notify_all(lock);
313-
popWaiting_ = nullptr;
314-
}
345+
impl_with_signal_close(lock);
315346
}
316347

317-
pushCv_.fast_notify_all(lock);
318-
popCv_.fast_notify_all(lock);
319-
QueueT q;
320-
std::swap(q, q_);
348+
long push_wakeup = pushCv_.fast_notify_all(lock);
349+
long pop_wakeup = popCv_.fast_notify_all(lock);
350+
(void)push_wakeup;
351+
(void)pop_wakeup;
352+
353+
RS_DBG(dbg_channel, "channel(queue)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld",
354+
id(), __func__, cap_, q_.size(), push_wakeup, pop_wakeup);
321355
}
322356

323357
private:
@@ -349,7 +383,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
349383
return false;
350384
}
351385

352-
auto p = [this]{ return q_.size() < cap_; };
386+
auto p = [this]{ return q_.size() < cap_ || closed_; };
353387

354388
RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait",
355389
id(), __func__);
@@ -404,7 +438,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
404438
return false;
405439
}
406440

407-
auto p = [this]{ return !q_.empty(); };
441+
auto p = [this]{ return !q_.empty() || closed_; };
408442

409443
RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait",
410444
id(), __func__);
@@ -418,7 +452,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
418452
if (status == std::cv_status::timeout)
419453
return false;
420454

421-
if (closed_)
455+
if (q_.empty())
422456
return false;
423457

424458
t = q_.front();
@@ -454,6 +488,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
454488
using ChannelImplWithSignal<nullptr_t>::pop_impl_with_signal;
455489
using ChannelImplWithSignal<nullptr_t>::push_impl_with_signal;
456490
using ChannelImplWithSignal<nullptr_t>::id;
491+
using ChannelImplWithSignal<nullptr_t>::impl_with_signal_close;
457492

458493
explicit ChannelImpl(std::size_t capacity = 0)
459494
: cap_(capacity), count_(0)
@@ -508,21 +543,16 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
508543
std::unique_lock<Mutex> lock(mtx_);
509544
closed_ = true;
510545
if (!cap_) {
511-
pushQ_ = nullptr;
512-
popQ_ = nullptr;
513-
if (pushWaiting_) {
514-
pushWaiting_->fast_notify_all(lock);
515-
pushWaiting_ = nullptr;
516-
}
517-
if (popWaiting_) {
518-
popWaiting_->fast_notify_all(lock);
519-
popWaiting_ = nullptr;
520-
}
546+
impl_with_signal_close(lock);
521547
}
522548

523-
pushCv_.fast_notify_all(lock);
524-
popCv_.fast_notify_all(lock);
525-
count_ = 0;
549+
long push_wakeup = pushCv_.fast_notify_all(lock);
550+
long pop_wakeup = popCv_.fast_notify_all(lock);
551+
(void)push_wakeup;
552+
(void)pop_wakeup;
553+
554+
RS_DBG(dbg_channel, "channel(void)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld",
555+
id(), __func__, cap_, count_, push_wakeup, pop_wakeup);
526556
}
527557

528558
private:
@@ -554,7 +584,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
554584
return false;
555585
}
556586

557-
auto p = [this]{ return count_ < cap_; };
587+
auto p = [this]{ return count_ < cap_ || closed_; };
558588

559589
RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait",
560590
id(), __func__);
@@ -607,7 +637,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
607637
return false;
608638
}
609639

610-
auto p = [this]{ return count_ > 0; };
640+
auto p = [this]{ return count_ > 0 || closed_; };
611641

612642
RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait",
613643
id(), __func__);
@@ -621,7 +651,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
621651
if (status == std::cv_status::timeout)
622652
return false;
623653

624-
if (closed_)
654+
if (count_ <= 0)
625655
return false;
626656

627657
--count_;

0 commit comments

Comments
 (0)