Skip to content

Commit b044221

Browse files
author
yuanye
committed
add producer-consumer project
1 parent 07750a4 commit b044221

3 files changed

Lines changed: 193 additions & 0 deletions

File tree

producer-consumer/Makefile

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
###########################################
3+
#Makefile for simple programs
4+
###########################################
5+
INC=
6+
LIB= -lpthread
7+
CC=g++ -std=c++0x
8+
# display all warnings
9+
CC_FLAG=-Wall -g
10+
11+
PRG=app
12+
OBJ=producer_consumer.o
13+
14+
$(PRG):$(OBJ)
15+
$(CC) $(INC) $(LIB) -o $@ $(OBJ) $(LIB)
16+
17+
.SUFFIXES: .c .o .cpp
18+
.cpp.o:
19+
$(CC) $(CC_FLAG) $(INC) -c $*.cpp -o $*.o
20+
21+
.PRONY:clean
22+
clean:
23+
@echo "Removing linked and compiled files......"
24+
rm -f $(OBJ) $(PRG)

producer-consumer/circular_buf.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#pragma once
2+
#include <stdint.h>
3+
#include <vector>
4+
#include <stdexcept>
5+
#include <sstream>
6+
7+
template<typename T>
8+
class CircularBuf {
9+
public:
10+
explicit CircularBuf(size_t n) {
11+
buf_ = std::vector<T>(n);
12+
item_num_ = before_ = rear_ = 0;
13+
}
14+
15+
inline bool empty() const {
16+
return item_num_ == 0;
17+
}
18+
19+
inline bool full() const {
20+
return item_num_ == buf_.size();
21+
}
22+
23+
inline size_t capacity() const {
24+
return buf_.size();
25+
}
26+
27+
inline size_t size() const {
28+
return item_num_;
29+
}
30+
31+
std::string str() const {
32+
auto elems = this->items();
33+
std::ostringstream oss;
34+
oss << "[";
35+
for (size_t i = 0; i < elems.size(); i++) {
36+
oss << elems[i];
37+
if (i != elems.size()-1) {
38+
oss << " ";
39+
}
40+
}
41+
oss << "]";
42+
return oss.str();
43+
}
44+
45+
inline std::vector<T> items() const {
46+
std::vector<T> ret;
47+
size_t count = item_num_;
48+
size_t curIdx = before_;
49+
while(count != 0) {
50+
curIdx = next(curIdx);
51+
ret.push_back(buf_[curIdx]);
52+
--count;
53+
}
54+
return ret;
55+
};
56+
57+
// return true on success
58+
inline void push(const T& t) {
59+
if (item_num_ >= buf_.size()) {
60+
throw std::logic_error("size exceeded.");
61+
}
62+
rear_ = next(rear_);
63+
buf_[rear_] = t;
64+
item_num_ +=1;
65+
}
66+
67+
inline T poll() {
68+
if (item_num_ == 0) {
69+
throw std::logic_error("container is empty.");
70+
}
71+
before_ = next(before_);
72+
item_num_ -= 1;
73+
return buf_[before_];
74+
}
75+
76+
inline T peek() const {
77+
if (item_num_ == 0) {
78+
throw std::logic_error("container is empty.");
79+
}
80+
return buf_[next(before_)];
81+
}
82+
private:
83+
size_t next(size_t cur) const {
84+
return cur+1 >= buf_.size()? 0:cur+1;
85+
}
86+
std::vector<T> buf_;
87+
size_t item_num_;
88+
size_t before_; // pop element from before_+1
89+
size_t rear_; // insert element to rear
90+
};
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
2+
#include "circular_buf.h"
3+
#include <mutex>
4+
#include <condition_variable>
5+
#include <thread>
6+
#include <chrono>
7+
8+
std::mutex mu;
9+
std::condition_variable cv;
10+
11+
void produce(CircularBuf<int> &buf, int producer_id, int product_id, size_t milli_sec = 10) {
12+
std::unique_lock<std::mutex> ul(mu);
13+
cv.wait(ul, [&]{
14+
printf("Producer %d: buffer(at address %p) %zu / %zu\n",
15+
producer_id, &buf, buf.size(), buf.capacity());
16+
return !buf.full();
17+
});
18+
// not full
19+
std::this_thread::sleep_for(std::chrono::milliseconds(milli_sec));
20+
buf.push(product_id);
21+
printf("Producer %d: produced %d...\n", producer_id, product_id);
22+
ul.unlock();
23+
cv.notify_one();
24+
}
25+
26+
void consume(CircularBuf<int> &buf, int consumer_id, size_t milli_sec = 7) {
27+
std::unique_lock<std::mutex> ul(mu);
28+
cv.wait(ul, [&]{
29+
printf("Consumer %d: buffer(at address %p) %zu / %zu \n",
30+
consumer_id, &buf, buf.size(), buf.capacity());
31+
return !buf.empty();
32+
});
33+
// not empty
34+
std::this_thread::sleep_for(std::chrono::milliseconds(milli_sec));
35+
int num = buf.poll();
36+
printf("Consumer %d: consumed %d...\n", consumer_id, num);
37+
ul.unlock();
38+
cv.notify_one();
39+
}
40+
41+
int main(int argc, char const *argv[]) {
42+
CircularBuf<int> buf(5);
43+
44+
std::vector<std::thread> threads;
45+
int n = std::thread::hardware_concurrency();
46+
int product_num = 10;
47+
// half producer, half consumer
48+
for (int id = 0; id < n/2; id++) {
49+
threads.push_back(std::thread([&](int producer_id){
50+
// produce 10 products
51+
for (int i = 1; i <= product_num; i++) {
52+
produce(buf, producer_id, 500 + i);
53+
printf("++++ Producer %d: produced %dth product\n", producer_id, i);
54+
printf("Current buf: %s\n", buf.str().c_str());
55+
}
56+
printf("**** Producer %d quit\n", producer_id);
57+
}, id));
58+
59+
threads.push_back(std::thread([&](int consumer_id){
60+
// consume 10 products
61+
for (int i = 1; i <= product_num; i++) {
62+
consume(buf, consumer_id);
63+
printf("---- Consumer %d: consumed %dth product\n", consumer_id, i);
64+
printf("Current buf: %s\n", buf.str().c_str());
65+
}
66+
printf("**** Consumer %d quit.\n", consumer_id);
67+
}, id));
68+
}
69+
70+
printf("%lu threads created...\n", threads.size());
71+
72+
for (auto& thread : threads) {
73+
thread.join();
74+
}
75+
76+
return 0;
77+
}
78+
79+
// compiled with: clang++ producer_consumer.cpp -lpthread -o app -std=c++11

0 commit comments

Comments
 (0)