Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions log/block_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class block_queue
{
if (max_size <= 0)
{
std::cerr << "Fatal Error: block_queue init (max_size <= 0)" << std::endl;
exit(-1);
}

Expand All @@ -29,6 +30,7 @@ class block_queue
m_size = 0;
m_front = -1;
m_back = -1;
m_close = false;
}

void clear()
Expand All @@ -40,14 +42,17 @@ class block_queue
m_mutex.unlock();
}

~block_queue()
{
~block_queue() {
m_mutex.lock();
if (m_array != NULL)
m_close = true;
m_cond.broadcast(); // 强行叫醒所有还在睡觉的 pop 线程
if (m_array != NULL){
delete [] m_array;

m_array = NULL;
}
m_mutex.unlock();
}

//判断队列是否满了
bool full()
{
Expand Down Expand Up @@ -77,20 +82,24 @@ class block_queue
bool front(T &value)
{
m_mutex.lock();
if (0 == m_size)
// check: 如果队列已关闭或为空,直接返回
if (m_close || 0 == m_size)
{
m_mutex.unlock();
return false;
}
value = m_array[m_front];
// 求队首元素索引
int index = ( m_front +1 ) % m_max_size;
value = m_array[index];
m_mutex.unlock();
return true;
}
//返回队尾元素
bool back(T &value)
{
m_mutex.lock();
if (0 == m_size)
// check: 如果队列已关闭或为空,直接返回
if (m_close || 0 == m_size)
{
m_mutex.unlock();
return false;
Expand Down Expand Up @@ -128,6 +137,13 @@ class block_queue
{

m_mutex.lock();

// check: 如果队列已关闭,直接返回
if (m_close) {
m_mutex.unlock();
return false;
}

if (m_size >= m_max_size)
{

Expand All @@ -141,7 +157,7 @@ class block_queue

m_size++;

m_cond.broadcast();
m_cond.signal(); // 减少唤醒多个线程带来的可能额外性能损耗
m_mutex.unlock();
return true;
}
Expand All @@ -151,15 +167,26 @@ class block_queue

m_mutex.lock();
while (m_size <= 0)
{

{
// check: 直接退出返回
if (m_close) {
m_mutex.unlock();
return false;
}

if (!m_cond.wait(m_mutex.get()))
{
m_mutex.unlock();
return false;
}
}

// check:从 wait 醒来后,可能资源已被析构,必须再次判断
if (m_close) {
m_mutex.unlock();
return false;
}

m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
Expand All @@ -173,20 +200,29 @@ class block_queue
struct timespec t = {0, 0};
struct timeval now = {0, 0};
gettimeofday(&now, NULL);

long long total_ns = (now.tv_usec * 1000LL) + ((ms_timeout % 1000) * 1000000LL);
t.tv_sec = now.tv_sec + (ms_timeout / 1000) + (total_ns / 1000000000LL);
t.tv_nsec = total_ns % 1000000000LL;

m_mutex.lock();
if (m_size <= 0)
{
t.tv_sec = now.tv_sec + ms_timeout / 1000;
t.tv_nsec = (ms_timeout % 1000) * 1000;
if (!m_cond.timewait(m_mutex.get(), t))
{

// 用 while 对抗虚假唤醒
while (m_size <= 0) {
// check: 直接退出返回
if (m_close) {
m_mutex.unlock();
return false;
}
// timewait 内部是用绝对时间比较的,所以 t 算好一次就可以一直用
if (!m_cond.timewait(m_mutex.get(), t)) {
m_mutex.unlock();
return false; // 真正超时
}
}

if (m_size <= 0)
{
// check:从 timewait 醒来后,可能资源已被析构,必须再次判断
if (m_close) {
m_mutex.unlock();
return false;
}
Expand All @@ -207,6 +243,7 @@ class block_queue
int m_max_size;
int m_front;
int m_back;
bool m_close;
};

#endif