閱讀789 返回首頁    go 阿裏雲 go 技術社區[雲棲]


理解Python並發編程-PoolExecutor篇

之前我們使用多線程(threading)和多進程(multiprocessing)完成常規的需求,在啟動的時候start、jon等步驟不能省,複雜的需要還要用1-2個隊列。隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。有沒有什麼好的方法把這些步驟抽象一下呢,讓我們不關注這些細節,輕裝上陣呢?

答案是:有的

從Python3.2開始一個叫做concurrent.futures被納入了標準庫,而在Python2它屬於第三方的futures庫,需要手動安裝:


pip install futures

```                                             


這個模塊中有2個類:ThreadPoolExecutor和ProcessPoolExecutor,也就是對threading和multiprocessing的進行了高級別的抽象,

暴露出統一的接口,幫助開發者非常方便的實現異步調用:


```python

import time

from concurrent.futures import ProcessPoolExecutor, as_completed


NUMBERS = range(25, 38)



def fib(n):

    if n<= 2:

        return 1

    return fib(n-1) + fib(n-2)



start = time.time()


with ProcessPoolExecutor(max_workers=3) as executor:

    for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):

        print 'fib({}) = {}'.format(num, result)


print 'COST: {}'.format(time.time() - start)


感受下是不是很輕便呢?看一下花費的時間:


 python fib_executor.py

fib(25) = 75025

fib(26) = 121393

fib(27) = 196418

fib(28) = 317811

fib(29) = 514229

fib(30) = 832040

fib(31) = 1346269

fib(32) = 2178309

fib(33) = 3524578

fib(34) = 5702887

fib(35) = 9227465

fib(36) = 14930352

fib(37) = 24157817

COST: 10.8920350075


除了用map,另外一個常用的方法是submit。如果你要提交的任務的函數是一樣的,就可以簡化成map。但是假如提交的任務函數是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接拋出錯誤)就要用到submit:


from concurrent.futures import ThreadPoolExecutor, as_completed


NUMBERS = range(30, 35)



def fib(n):

    if n == 34:

        raise Exception("Don't do this")

    if n<= 2:

        return 1

    return fib(n-1) + fib(n-2)



with ThreadPoolExecutor(max_workers=3) as executor:

    future_to_num = {executor.submit(fib, num): num for num in NUMBERS}

    for future in as_completed(future_to_num):

        num = future_to_num[future]

        try:

            result = future.result()

        except Exception as e:

            print 'raise an exception: {}'.format(e)

        else:

            print 'fib({}) = {}'.format(num, result)



with ThreadPoolExecutor(max_workers=3) as executor:

    for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):

        print 'fib({}) = {}'.format(num, result)


執一下:


python fib_executor_with_raise.py

fib(30) = 832040

fib(31) = 1346269

raise an exception: Don't do this

fib(32) = 2178309

fib(33) = 3524578

Traceback (most recent call last):

  File "fib_executor_with_raise.py", line 28, in <module>

    for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):

  File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map

    yield future.result()

  File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result

    return self.__get_result()

  File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result

    reraise(self._exception, self._traceback)

  File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise

    exec('raise exc_type, exc_value, traceback', {}, locals_)

  File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run

    result = self.fn(*self.args, **self.kwargs)

  File "fib_executor_with_raise.py", line 9, in fib

    raise Exception("Don't do this")

Exception: Don't do this


可以看到,第一次捕捉到了異常,但是第二次執行的時候錯誤直接拋出來了。

上麵說到的map,有些同學馬上會說,這不是進程(線程)池的效果嗎?看起來確實是的:


import time

from multiprocessing.pool import Pool


NUMBERS = range(25, 38)



def fib(n):

    if n<= 2:

        return 1

    return fib(n-1) + fib(n-2)



start = time.time()


pool = Pool(3)

results = pool.map(fib, NUMBERS)

for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):

    print 'fib({}) = {}'.format(num, result)


print 'COST: {}'.format(time.time() - start)


好像代碼量更小喲。好吧,看一下花費的時間:




python fib_pool.py

fib(25) = 75025

fib(26) = 121393

fib(27) = 196418

fib(28) = 317811

fib(29) = 514229

fib(30) = 832040

fib(31) = 1346269

fib(32) = 2178309

fib(33) = 3524578

fib(34) = 5702887

fib(35) = 9227465

fib(36) = 14930352

fib(37) = 24157817

COST: 17.1342718601




WhatTF竟然花費了1.7倍的時間。為什麼?

BTW,有興趣的同學可以對比下ThreadPool和ThreadPoolExecutor,由於GIL的緣故,對比的差距一定會更多。

原理

我們就拿ProcessPoolExecutor介紹下它的原理,引用官方代碼注釋中的流程圖:


|======================= In-process =====================|== Out-of-process ==|


+----------+     +----------+       +--------+     +-----------+    +---------+

|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |

|          |     +----------+       |        |     +-----------+    |         |

|          |     | ...      |       |        |     | ...       |    |         |

|          |     | 6        |       |        |     | 5, call() |    |         |

|          |     | 7        |       |        |     | ...       |    |         |

| Process  |     | ...      |       | Local  |     +-----------+    | Process |

|  Pool    |     +----------+       | Worker |                      |  #1..n  |

| Executor |                        | Thread |                      |         |

|          |     +----------- +     |        |     +-----------+    |         |

|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |

|          |     +------------+     |        |     +-----------+    |         |

|          |     | 6: call()  |     |        |     | ...       |    |         |

|          |     |    future  |     |        |     | 4, result |    |         |

|          |     | ...        |     |        |     | 3, except |    |         |

+----------+     +------------+     +--------+     +-----------+    +---------+


我們結合源碼和上麵的數據流分析一下:

  1. executor.map會創建多個_WorkItem對象,每個對象都傳入了新創建的一個Future對象。
  2. 把每個_WorkItem對象然後放進一個叫做「Work Items」的dict中,鍵是不同的「Work Ids」。
  3. 創建一個管理「Work Ids」隊列的線程「Local worker thread」,它能做2件事:
    1. 從「Work Ids」隊列中獲取Work Id, 通過「Work Items」找到對應的_WorkItem。如果這個Item被取消了,就從「Work Items」裏麵把它刪掉,否則重新打包成一個_CallItem放入「Call Q」這個隊列。executor的那些進程會從隊列中取_CallItem執行,並把結果封裝成_ResultItems放入「Result Q」隊列中。
    2. 從「Result Q」隊列中獲取_ResultItems,然後從「Work Items」更新對應的Future對象並刪掉入口。

看起來就是一個「生產者/消費者」模型罷了,錯了。我們要注意,整個過程並不是多個進程與任務+結果-2個隊列直接通信的,而是通過一個中間的「Local worker thread」,它就是讓效率提升的重要原因之一!!!

設想,當某一段程序提交了一個請求,期望得到一個答複。但服務程序對這個請求可能很慢,在傳統的單線程環境下,調用函數是同步的,也就是說它必須等到服務程序返回結果後,才能進行其他處理。而在Future模式下,調用方式改為異步,而原先等待返回的時間段,在主調用函數中,則可用於處理其他事物。

Future

Future是常見的一種並發設計模式,在多個其他語言中都可以見到這種解決方案。

一個Future對象代表了一些尚未就緒(完成)的結果,在「將來」的某個時間就緒了之後就可以獲取到這個結果。比如上麵的例子,我們期望並發的執行一些參數不同的fib函數,獲取全部的結果。傳統模式就是在等待queue.get返回結果,這個是同步模式,而在Future模式下,調用方式改為異步,而原先等待返回的時間段,由於「Local worker thread」的存在,這個時候可以完成其他工作

在tornado中也有對應的實現。2013年的時候,我曾經寫過一篇博客使用tornado讓你的請求異步非阻塞,最後也提到了用concurrent.futures實現異步非阻塞的完成耗時任務。

最後更新:2017-06-19 22:32:06

  上一篇:go  [MySQL 優化] --order by 原理
  下一篇:go  時間序列數據的存儲和計算 - 開源時序數據庫解析(二)