diff --git a/log/block_queue.h b/log/block_queue.h index 34c77bd5..20f87575 100644 --- a/log/block_queue.h +++ b/log/block_queue.h @@ -21,6 +21,7 @@ class block_queue { if (max_size <= 0) { + std::cerr << "Fatal Error: block_queue init (max_size <= 0)" << std::endl; exit(-1); } @@ -29,6 +30,7 @@ class block_queue m_size = 0; m_front = -1; m_back = -1; + m_close = false; } void clear() @@ -40,14 +42,17 @@ class block_queue m_mutex.unlock(); } - ~block_queue() - { + ~block_queue() { m_mutex.lock(); - if (m_array != NULL) + m_close = true; + m_cond.broadcast(); // 强行叫醒所有还在睡觉的 pop 线程 + if (m_array != NULL){ delete [] m_array; - + m_array = NULL; + } m_mutex.unlock(); } + //判断队列是否满了 bool full() { @@ -77,12 +82,15 @@ class block_queue bool front(T &value) { m_mutex.lock(); - if (0 == m_size) + // check: 如果队列已关闭或为空,直接返回 + if (m_close || 0 == m_size) { m_mutex.unlock(); return false; } - value = m_array[m_front]; + // 求队首元素索引 + int index = ( m_front +1 ) % m_max_size; + value = m_array[index]; m_mutex.unlock(); return true; } @@ -90,7 +98,8 @@ class block_queue bool back(T &value) { m_mutex.lock(); - if (0 == m_size) + // check: 如果队列已关闭或为空,直接返回 + if (m_close || 0 == m_size) { m_mutex.unlock(); return false; @@ -128,6 +137,13 @@ class block_queue { m_mutex.lock(); + + // check: 如果队列已关闭,直接返回 + if (m_close) { + m_mutex.unlock(); + return false; + } + if (m_size >= m_max_size) { @@ -141,7 +157,7 @@ class block_queue m_size++; - m_cond.broadcast(); + m_cond.signal(); // 减少唤醒多个线程带来的可能额外性能损耗 m_mutex.unlock(); return true; } @@ -151,8 +167,13 @@ class block_queue m_mutex.lock(); while (m_size <= 0) - { - + { + // check: 直接退出返回 + if (m_close) { + m_mutex.unlock(); + return false; + } + if (!m_cond.wait(m_mutex.get())) { m_mutex.unlock(); @@ -160,6 +181,12 @@ class block_queue } } + // check:从 wait 醒来后,可能资源已被析构,必须再次判断 + if (m_close) { + m_mutex.unlock(); + return false; + } + m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; m_size--; @@ -173,20 +200,29 @@ class block_queue struct timespec t = {0, 0}; struct timeval now = {0, 0}; gettimeofday(&now, NULL); + + long long total_ns = (now.tv_usec * 1000LL) + ((ms_timeout % 1000) * 1000000LL); + t.tv_sec = now.tv_sec + (ms_timeout / 1000) + (total_ns / 1000000000LL); + t.tv_nsec = total_ns % 1000000000LL; + m_mutex.lock(); - if (m_size <= 0) - { - t.tv_sec = now.tv_sec + ms_timeout / 1000; - t.tv_nsec = (ms_timeout % 1000) * 1000; - if (!m_cond.timewait(m_mutex.get(), t)) - { + + // 用 while 对抗虚假唤醒 + while (m_size <= 0) { + // check: 直接退出返回 + if (m_close) { m_mutex.unlock(); return false; } + // timewait 内部是用绝对时间比较的,所以 t 算好一次就可以一直用 + if (!m_cond.timewait(m_mutex.get(), t)) { + m_mutex.unlock(); + return false; // 真正超时 + } } - - if (m_size <= 0) - { + + // check:从 timewait 醒来后,可能资源已被析构,必须再次判断 + if (m_close) { m_mutex.unlock(); return false; } @@ -207,6 +243,7 @@ class block_queue int m_max_size; int m_front; int m_back; + bool m_close; }; #endif