閱讀959 返回首頁    go 阿裏雲 go 技術社區[雲棲]


並發編程(三): 使用C++11實現無鎖stack(lock-free stack)

前幾篇文章,我們討論了如何使用mutex保護數據及使用使用condition variable在多線程中進行同步。然而,使用mutex將會導致一下問題:

  • 等待互斥鎖會消耗寶貴的時間 — 有時候是很多時間。這種延遲會損害係統的scalability。尤其是在現在可用的core越多越多的情況下。
  • 低優先級的線程可以獲得互斥鎖,因此阻礙需要同一互斥鎖的高優先級線程。這個問題稱為優先級倒置(priority inversion )
  • 可能因為分配的時間片結束,持有互斥鎖的線程被取消調度。這對於等待同一互斥鎖的其他線程有不利影響,因為等待時間現在會更長。這個問題稱為鎖護送(lock convoying)

互斥鎖的問題還不隻這些。早在1994年10月,John D. Valois 在拉斯維加斯的並行和分布係統係統國際大會上的一篇論文—《Implementing Lock-Free Queues》已經研究了無鎖隊列的實現,有興趣的可以拜讀一下。

實現無鎖數據結構的基礎是CAS:Compare & Set,或是 Compare & Swap。CAS用C語言描述的代碼(來自Wikipedia Compare And Swap)

int compare_and_swap (int* reg, int oldval, int newval) 
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval) 
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS是個原子操作,保證了如果需要更新的地址沒有被他人改動多,那麼它可以安全的寫入。而這也是我們對於某個數據或者數據結構加鎖要保護的內容,保證讀寫的一致性,不出現dirty data。現在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應的是 CMPXCHG 匯編指令現在,我們將使用CAS來實現無鎖的stack,然後你就能夠理解CAS的用法了。

C++11中CAS實現:

template< class T>
struct atomic
{
public:
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order success,
                            std::memory_order failure );
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order success,
                            std::memory_order failure ) volatile;
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order order =
                                std::memory_order_seq_cst );
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order order =
                                std::memory_order_seq_cst ) volatile;
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order success,
                              std::memory_order failure );
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order success,
                              std::memory_order failure ) volatile;    
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order order =
                                  std::memory_order_seq_cst );
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order order =
                                  std::memory_order_seq_cst ) volatile;
...
};
Please refer to https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange to more information.

對上麵的版本進行一下說明。翻譯自上述url:

Atomically compares the value stored in *this with the value of expected, and if those are equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in*this intoexpected (performs load operation).

自動的比較*this的值和expect的值,如果相等,那麼將*this的值替換為desired的值(進行讀-修改-寫操作)。否則如果不相等,那麼將*this的值存到expected處。

偽碼就是:

if *this == expected:

    *this = desired;

else:

    expected = *this;
The memory models for the read-modify-write and load operations aresuccess andfailure respectively. In the (2) and (4) versionsorder is used for both read-modify-write and load operations, except thatstd::memory_order_release andstd::memory_order_relaxed are used for the load operation iforder==std::memory_order_acq_rel, ororder==std::memory_order_release respectively.

success對應於read-modify-write的內存模型;failure則對應於失敗時的load。對於order = std::memory_order_seq_cst的函數,那麼該memory order適用於read-modify-write and load,除非是如果order==std::memory_order_acq_rel,那麼load將使用std::memory_order_release;如果order==std::memory_order_release,那麼load將使用std::memory_order_relaxed

更多信息memory order請閱讀:https://en.cppreference.com/w/cpp/atomic/memory_order

The weak forms (1-2) of the functions are allowed to fail spuriously, that is, act as if*this!= expected even if they are equal. When a compare-and-exchange is in a loop, the weak version will yield better performance on some platforms. When a weak compare-and-exchange would require a loop and a strong one would not, the strong one is preferable.

weak形式允許假失敗,該函數直接比較原子對象所封裝的值與參數 expected 的物理內容,所以某些情況下,對象的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為對象底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。可以虛假的返回false(和expected相同)。若本atomic的T值和expected相同則用val值替換本atomic的T值,返回true;若不同則用本atomic的T值替換expected,返回false。  
與compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允許(spuriously 地)返回 false,即原子對象所封裝的值與參數 expected 的物理內容相同,比較操作一定會為 true。不過在某些平台下,如果算法本身需要循環操作來做檢查, compare_exchange_weak 的性能會更好。因此對於某些不需要采用循環操作的算法而言, 通常采用compare_exchange_strong 更好

下麵代碼部分來自https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange。

#include <atomic>
#include <string>
#include <iostream>
using namespace std;
template<typename T>
struct node
{
    T data;
    node* next;
    node(const T& data) : data(data), next(nullptr) {}
};

template<typename T>
class stack
{
    std::atomic<node<T>*> head;

 public:
    stack():head(nullptr){}
    void push(const T& data);
    T pop();
}; 
注意在這裏添加了stack的構造函數,把head初始化為nullptr。如果不初始化它為nullptr,那麼使用鏈表存儲的stack將無法確定終點在哪兒。。。

首先看一下push的實現:

    void push(const T& data)
    {
        node<T>* new_node = new node<T>(data);

        // put the current value of head into new_node->next
        new_node->next = head.load(std::memory_order_relaxed);

        // now make new_node the new head, but if the head
        // is no longer what's stored in new_node->next
        // (some other thread must have inserted a node just now)
        // then put that new head into new_node->next and try again
        while(!head.compare_exchange_weak(new_node->next,
                                          new_node,
                                          std::memory_order_release,
                                          std::memory_order_relaxed))
                ; // the body of the loop is empty
    }
主要是理解這兩句:

head.compare_exchange_weak(new_node->next,
                           new_node,
可以簡單用一下代碼來概括該調用的效果:
if ( head == new_node->new){
     head = new_node;
     return true;
}
else{
    new_node->next = head;
    return false;
}
因此,如果沒有其他的線程push,那麼head將指向當前的new_node,push完成。否則,說明其他線程push過新數據,那麼將當前push的新節點重新放到頂端,此時的head是最新的head。這樣,通過CAS,我們可以實現了thread-safe stack。

接下來看一下pop:

    T pop()
    {
        while(1){
            auto result = head.load(std::memory_order_relaxed);
            if (result == nullptr)
              throw std::string("Cannot pop from empty stack");
            if(head.compare_exchange_weak(result,result->next,
                                          std::memory_order_release,
                                          std::memory_order_relaxed))
               return result->data;
        }
    }
我們為什麼要限製result != nullptr?因為有可能當前stack僅有一個元素,線程B在pop時被調度,線程A pop成功,那麼線程B再pop就會出問題。

其實,上述的pop可以簡化,因為result其實在failed時候已經更新為head了。因此簡化代碼可以是:

  T pop()
    {
      auto result = head.load(std::memory_order_relaxed);
      while( result != nullptr && !head.compare_exchange_weak(result,result->next,
                                          std::memory_order_release,
                                          std::memory_order_relaxed));
      if( result != nullptr)
        return result->data;
      else
        throw std::string("Cannot pop from empty stack");
    }

尊重原創,轉載請注明出處: anzhsoft https://blog.csdn.net/anzhsoft/article/details/19125619


參考資料:

1. https://en.wikipedia.org/wiki/Compare-and-swap

2. https://en.wikipedia.org/wiki/Fetch-and-add

3. https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

4. https://technet.microsoft.com/zh-cn/hh874698


更多學習:

1. GCC實現 https://www.oschina.net/translate/a-fast-lock-free-queue-for-cpp?cmp

2. GCC實現 https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html

陳皓同學的精彩博文: https://coolshell.cn/articles/8239.html

最後更新:2017-04-03 12:55:06

  上一篇:go 用戶 &#39;sa&#39; 登錄失敗。原因: 該帳戶被禁用。 (Microsoft SQL Server,錯誤: 18470)
  下一篇:go Android 關於 OnScrollListener 事件順序次數的簡要分析