閱讀309 返回首頁    go 阿裏雲 go 技術社區[雲棲]


C++11的thread代碼分析

本文分析的是llvm libc++的實現:https://libcxx.llvm.org/

class thread

thread類直接包裝了一個pthread_t,在linux下實際是unsigned long int。

class  thread
{
    pthread_t __t_;
    id get_id() const _NOEXCEPT {return __t_;}
}

用了一個std::unique_ptr來包裝用戶定義的線程函數:

創建線程用的是

template <class _Fp>
void*
__thread_proxy(void* __vp)
{
    __thread_local_data().reset(new __thread_struct);
    std::unique_ptr<_Fp> __p(static_cast<_Fp*>(__vp));
    (*__p)();
    return nullptr;
}

template <class _Fp>
thread::thread(_Fp __f)
{
    std::unique_ptr<_Fp> __p(new _Fp(__f));
    int __ec = pthread_create(&__t_, 0, &__thread_proxy<_Fp>, __p.get());
    if (__ec == 0)
        __p.release();
    else
        __throw_system_error(__ec, "thread constructor failed");
}

thread::joinable() , thread::join(), thread::detach() 

再來看下thread::joinable() , thread::join(), thread::detach() 函數。

也是相應調用了posix的函數。在調用join()之後,會把_t設置為0,這樣再調用joinable()時就會返回false。對於_t變量沒有memory barrier同步,感覺可能會有問題。

bool joinable() const {return __t_ != 0;}
void
thread::join()
{
    int ec = pthread_join(__t_, 0);
    __t_ = 0;
}

void
thread::detach()
{
    int ec = EINVAL;
    if (__t_ != 0)
    {
        ec = pthread_detach(__t_);
        if (ec == 0)
            __t_ = 0;
    }
    if (ec)
        throw system_error(error_code(ec, system_category()), "thread::detach failed");
}

thread::hardware_concurrency()

thread::hardware_concurrency()函數,獲取的是當前可用的processor的數量。

調用的是sysconf(_SC_NPROCESSORS_ONLN)函數,據man手冊:

        - _SC_NPROCESSORS_ONLN
              The number of processors currently online (available).

unsigned
thread::hardware_concurrency() _NOEXCEPT
{
    long result = sysconf(_SC_NPROCESSORS_ONLN);
    // sysconf returns -1 if the name is invalid, the option does not exist or
    // does not have a definite limit.
    // if sysconf returns some other negative number, we have no idea
    // what is going on. Default to something safe.
    if (result < 0)
        return 0;
    return static_cast<unsigned>(result);
}

thread::sleep_for和thread::sleep_until

sleep_for函數實際調用的是nanosleep函數:

void
sleep_for(const chrono::nanoseconds& ns)
{
    using namespace chrono;
    if (ns > nanoseconds::zero())
    {
        seconds s = duration_cast<seconds>(ns);
        timespec ts;
        typedef decltype(ts.tv_sec) ts_sec;
        _LIBCPP_CONSTEXPR ts_sec ts_sec_max = numeric_limits<ts_sec>::max();
        if (s.count() < ts_sec_max)
        {
            ts.tv_sec = static_cast<ts_sec>(s.count());
            ts.tv_nsec = static_cast<decltype(ts.tv_nsec)>((ns-s).count());
        }
        else
        {
            ts.tv_sec = ts_sec_max;
            ts.tv_nsec = giga::num - 1;
        }

        while (nanosleep(&ts, &ts) == -1 && errno == EINTR)
            ;
    }
}
sleep_until函數用到了mutex, condition_variable, unique_lock,實際上調用的還是pthread_cond_timedwait函數:

template <class _Clock, class _Duration>
void
sleep_until(const chrono::time_point<_Clock, _Duration>& __t)
{
    using namespace chrono;
    mutex __mut;
    condition_variable __cv;
    unique_lock<mutex> __lk(__mut);
    while (_Clock::now() < __t)
        __cv.wait_until(__lk, __t);
}

void
condition_variable::__do_timed_wait(unique_lock<mutex>& lk,
     chrono::time_point<chrono::system_clock, chrono::nanoseconds> tp) _NOEXCEPT
{
    using namespace chrono;
    if (!lk.owns_lock())
        __throw_system_error(EPERM,
                            "condition_variable::timed wait: mutex not locked");
    nanoseconds d = tp.time_since_epoch();
    if (d > nanoseconds(0x59682F000000E941))
        d = nanoseconds(0x59682F000000E941);
    timespec ts;
    seconds s = duration_cast<seconds>(d);
    typedef decltype(ts.tv_sec) ts_sec;
    _LIBCPP_CONSTEXPR ts_sec ts_sec_max = numeric_limits<ts_sec>::max();
    if (s.count() < ts_sec_max)
    {
        ts.tv_sec = static_cast<ts_sec>(s.count());
        ts.tv_nsec = static_cast<decltype(ts.tv_nsec)>((d - s).count());
    }
    else
    {
        ts.tv_sec = ts_sec_max;
        ts.tv_nsec = giga::num - 1;
    }
    int ec = pthread_cond_timedwait(&__cv_, lk.mutex()->native_handle(), &ts);
    if (ec != 0 && ec != ETIMEDOUT)
        __throw_system_error(ec, "condition_variable timed_wait failed");
}

std::notify_all_at_thread_exit 的實現

先來看個例子,這個notify_all_at_thread_exit函數到底有什麼用:

#include <mutex>
#include <thread>
#include <condtion_variable>
 
std::mutex m;
std::condition_variable cv;
 
bool ready = false;
ComplexType result;  // some arbitrary type
 
void thread_func()
{
    std::unique_lock<std::mutex> lk(m);
    // assign a value to result using thread_local data
    result = function_that_uses_thread_locals();
    ready = true;
    std::notify_all_at_thread_exit(cv, std::move(lk));
} // 1. destroy thread_locals, 2. unlock mutex, 3. notify cv
 
int main()
{
    std::thread t(thread_func);
    t.detach();
 
    // do other work
    // ...
 
    // wait for the detached thread
    std::unique_lock<std::mutex> lk(m);
    while(!ready) {
        cv.wait(lk);
    }
    process(result); // result is ready and thread_local destructors have finished
}

可以看到std::notify_all_at_thread_exit 函數,實際上是注冊了一對condition_variable,mutex,當線程退出時,notify_all。

下麵來看下具體的實現:

這個是通過Thread-specific Data來實現的,具體可以參考:https://www.ibm.com/developerworks/cn/linux/thread/posix_threadapi/part2/

但我個人覺得這個應該叫線程特定數據比較好,因為它是可以被別的線程訪問的,而不是某個線程”專有“的。

簡而言之,std::thread在構造的時候,創建了一個__thread_struct_imp對象。

__thread_struct_imp對象裏,用一個vector來保存了pair<condition_variable*, mutex*>

class  __thread_struct_imp
{
    typedef vector<__assoc_sub_state*,
                          __hidden_allocator<__assoc_sub_state*> > _AsyncStates;
<strong>    typedef vector<pair<condition_variable*, mutex*>,
               __hidden_allocator<pair<condition_variable*, mutex*> > > _Notify;</strong>

    _AsyncStates async_states_;
    _Notify notify_;

當調用notify_all_at_thread_exit函數時,把condition_variable和mutex,push到vector裏

void
__thread_struct_imp::notify_all_at_thread_exit(condition_variable* cv, mutex* m)
{
    notify_.push_back(pair<condition_variable*, mutex*>(cv, m));
}

當線程退出時,會delete掉__thread_struct_imp,也就是會調用__thread_struct_imp的析構函數。

在析構函數裏,會調用曆遍vector,unlock每個mutex,和調用condition_variable.notify_all()函數

__thread_struct_imp::~__thread_struct_imp()
{
    for (_Notify::iterator i = notify_.begin(), e = notify_.end();
            i != e; ++i)
    {
        i->second->unlock();
        i->first->notify_all();
    }
    for (_AsyncStates::iterator i = async_states_.begin(), e = async_states_.end();
            i != e; ++i)
    {
        (*i)->__make_ready();
        (*i)->__release_shared();
    }
}

更詳細的一些封閉代碼,我提取出來放到了gist上:https://gist.github.com/hengyunabc/d48fbebdb9bddcdf05e9


其它的一些東東:

關於線程的yield, detch, join,可以直接參考man文檔:

pthread_yield:

       pthread_yield() causes the calling thread to relinquish the CPU.  The
       thread is placed at the end of the run queue for its static priority
       and another thread is scheduled to run.  For further details, see
       sched_yield(2)
pthread_detach:

       The pthread_detach() function marks the thread identified by thread
       as detached.  When a detached thread terminates, its resources are
       automatically released back to the system without the need for
       another thread to join with the terminated thread.

       Attempting to detach an already detached thread results in
       unspecified behavior.
pthread_join:

       The pthread_join() function waits for the thread specified by thread
       to terminate.  If that thread has already terminated, then
       pthread_join() returns immediately.  The thread specified by thread
       must be joinable.

總結:

個人感覺像 join, detach這兩個函數實際沒多大用處。絕大部分情況下,線程創建之後,都應該detach掉。

像join這種同步機製不如換mutex等更好。

參考:

https://en.cppreference.com/w/cpp/thread/notify_all_at_thread_exit

https://man7.org/linux/man-pages/man3/pthread_detach.3.html

https://man7.org/linux/man-pages/man3/pthread_join.3.html

https://stackoverflow.com/questions/19744250/c11-what-happens-to-a-detached-thread-when-main-exits

https://man7.org/linux/man-pages/man3/pthread_yield.3.html

https://man7.org/linux/man-pages/man2/sched_yield.2.html

https://www.ibm.com/developerworks/cn/linux/thread/posix_threadapi/part2/

man pthread_key_create


最後更新:2017-04-03 06:03:06

  上一篇:go VxWorks6.6 pcPentium BSP 使用說明(三):設備驅動
  下一篇:go 由2013星光大道總決賽同步程序員如何成功?