757
小米淨水器
並發編程(二):分析Boost對 互斥量和條件變量的封裝及實現生產者消費者問題
請閱讀上篇文章《並發編程實戰: POSIX 使用互斥量和條件變量實現生產者/消費者問題》。當然不閱讀亦不影響本篇文章的閱讀
。
Boost的互斥量,條件變量做了很好的封裝,因此比“原生的”POSIX mutex,condition variables好用。然後我們會通過分析boost相關源碼看一下boost linux是如何對pthread_mutex_t和pthread_cond_t進行的封裝。
首先看一下condition_variable_any的具體實現,代碼路徑:/boost/thread/pthread/condition_variable.hpp
class condition_variable_any
{
pthread_mutex_t internal_mutex;
pthread_cond_t cond;
condition_variable_any(condition_variable_any&);
condition_variable_any& operator=(condition_variable_any&);
public:
condition_variable_any()
{
int const res=pthread_mutex_init(&internal_mutex,NULL);
if(res)
{
boost::throw_exception(thread_resource_error());
}
int const res2=pthread_cond_init(&cond,NULL);
if(res2)
{
BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
boost::throw_exception(thread_resource_error());
}
}
~condition_variable_any()
{
BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
BOOST_VERIFY(!pthread_cond_destroy(&cond));
}
condition_variable_any的構造函數是對於內部使用的mutex和cond的初始化,對應的,析構函數則是這些資源的回收。
BOOST_VERIFY的實現:
#undef BOOST_VERIFY #if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) ) // 在任何情況下,expr一定會被求值。 #define BOOST_VERIFY(expr) ((void)(expr)) #else #define BOOST_VERIFY(expr) BOOST_ASSERT(expr) #endif因此不同於assert在Release版的被優化掉不同,我們可以放心的使用BOOST_VERITY,因此它的表達式肯定會被求值,而不用擔心assert的side effect。
接下來看一下condition_variable_any的核心實現:wait
template<typename lock_type>
void wait(lock_type& m)
{
int res=0;
{
thread_cv_detail::lock_on_exit<lock_type> guard;
detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
guard.activate(m);
res=pthread_cond_wait(&cond,&internal_mutex);
this_thread::interruption_point();
}
if(res)
{
boost::throw_exception(condition_error());
}
}
首先看一下lock_on_exit:
namespace thread_cv_detail
{
template<typename MutexType>
struct lock_on_exit
{
MutexType* m;
lock_on_exit():
m(0)
{}
void activate(MutexType& m_)
{
m_.unlock();
m=&m_;
}
~lock_on_exit()
{
if(m)
{
m->lock();
}
}
};
}
代碼很簡單,實現了在調用activate時將傳入的lock解鎖,在該變量生命期結束時將guard的lock加鎖。
接下來的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什麼意思呢?From /boost/thread/pthread/thread_data.hpp
class interruption_checker
{
thread_data_base* const thread_info;
pthread_mutex_t* m;
bool set;
void check_for_interruption()
{
if(thread_info->interrupt_requested)
{
thread_info->interrupt_requested=false;
throw thread_interrupted();
}
}
void operator=(interruption_checker&);
public:
explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):
thread_info(detail::get_current_thread_data()),m(cond_mutex),
set(thread_info && thread_info->interrupt_enabled)
{
if(set)
{
lock_guard<mutex> guard(thread_info->data_mutex);
check_for_interruption();
thread_info->cond_mutex=cond_mutex;
thread_info->current_cond=cond;
BOOST_VERIFY(!pthread_mutex_lock(m));
}
else
{
BOOST_VERIFY(!pthread_mutex_lock(m));
}
}
~interruption_checker()
{
if(set)
{
BOOST_VERIFY(!pthread_mutex_unlock(m));
lock_guard<mutex> guard(thread_info->data_mutex);
thread_info->cond_mutex=NULL;
thread_info->current_cond=NULL;
}
else
{
BOOST_VERIFY(!pthread_mutex_unlock(m));
}
}
代碼麵前,毫無隱藏。那句話就是此時如果有interrupt,那麼就interrupt吧。否則,lock傳入的mutex,也是為了res=pthread_cond_wait(&cond,&internal_mutex);做準備。
關於線程的中斷點,可以移步《【Boost】boost庫中thread多線程詳解5——談談線程中斷》。
對於boost::mutex,大家可以使用同樣的方法去解讀boost的實現,相對於condition variable,mutex的實現更加直觀。代碼路徑:/boost/thread/pthread/mutex.hpp。
namespace boost
{
class mutex
{
private:
mutex(mutex const&);
mutex& operator=(mutex const&);
pthread_mutex_t m;
public:
mutex()
{
int const res=pthread_mutex_init(&m,NULL);
if(res)
{
boost::throw_exception(thread_resource_error());
}
}
~mutex()
{
BOOST_VERIFY(!pthread_mutex_destroy(&m));
}
void lock()
{
int const res=pthread_mutex_lock(&m);
if(res)
{
boost::throw_exception(lock_error(res));
}
}
void unlock()
{
BOOST_VERIFY(!pthread_mutex_unlock(&m));
}
bool try_lock()
{
int const res=pthread_mutex_trylock(&m);
if(res && (res!=EBUSY))
{
boost::throw_exception(lock_error(res));
}
return !res;
}
typedef pthread_mutex_t* native_handle_type;
native_handle_type native_handle()
{
return &m;
}
typedef unique_lock<mutex> scoped_lock;
typedef detail::try_lock_wrapper<mutex> scoped_try_lock;
};
}
boost對於pthread_mutex_t和pthread_cond_t的封裝,方便了開發者的使用的資源的安全有效管理。當然,在不同的公司,可能也都有類似的封裝,學習boost的源碼,無疑可以加深我們的理解。在某些特定的場合,我們也可以學習boost的封裝方法,簡化我們的日常開發。
最後,奉上簡單的生產者、消費者的boost的實現,和前文《並發編程實戰: POSIX 使用互斥量和條件變量實現生產者/消費者問題》相比,我們可以看到boost簡化了mutex和condition variable的使用。以下代碼引自《Boost程序庫完全開發指南》:
#include <boost/thread.hpp>
#include <stack>
using std::stack;
using std::cout;
class buffer
{
private:
boost::mutex mu; // 條件變量需要配合互斥量
boost::condition_variable_any cond_put; // 生產者寫入
boost::condition_variable_any cond_get; // 消費者讀走
stack<int> stk;
int un_read;
int capacity;
bool is_full()
{
return un_read == capacity;
}
bool is_empty()
{
return 0 == un_read;
}
public:
buffer(size_t capacity) : un_read(0), capacity(capacity)
{}
void put(int x)
{
boost::mutex::scoped_lock lock(mu); // 這裏是讀鎖的門閂類
while (is_full())
{
cout << "full waiting..." << endl;
cond_put.wait(mu); // line:51
}
stk.push(x);
++un_read;
cond_get.notify_one();
}
void get(int *x)
{
boost::mutex::scoped_lock lock(mu); // 這裏是讀鎖的門閂類
while (is_empty())
{
cout << "empty waiting..." << endl;
cond_get.wait(mu);
}
*x = stk.top();
stk.pop();
--un_read;
cond_put.notify_one(); // 通知 51line可以寫入了
}
};
buffer buf(5);
void producer(int n)
{
for (int i = 0; i < n; ++i)
{
cout << "put : " << i << endl;
buf.put(i);
}
}
void consumer(int n)
{
int x;
for (int i = 0; i < n; ++i)
{
buf.get(&x);
cout << "get : " << x << endl;
}
}
int main()
{
boost::thread t1(producer, 20);
boost::thread t2(consumer, 10);
boost::thread t3(consumer, 10);
t1.join();
t2.join();
t3.join();
return 0;
}
最後說一句,condition_variable_any == condition, from /boost/thread/condition.hpp
namespace boost
{
typedef condition_variable_any condition;
}
最後更新:2017-04-03 12:55:02