霧裏看花之 Python Asyncio
從 Twisted 框架借鑒一些經驗來理解 asynio 並非難事,但是,asyncio 包含眾多的元素,我開始動搖,不知道如何將這些孤立的零碎拚圖組合成一副完整的圖畫。我已沒有足夠的智力提出任何更好的建議,在這裏,隻想分享我的困惑,求大神指點。
原語
asyncio 通過協程coroutines 的幫助來實現異步 IO。最初它是通過 yield
和 yield from
表達式實現的一個庫,因為 Python 語言本身演進的緣故,現在它已經變成一個更複雜的怪獸。所以,為了在同一個頻道討論下去,你需要了解如下一些術語:
- 事件循環
- 事件循環策略
- awaitable
- 協程函數
- 老式協程函數
- 協程
- 協程封裝
- 生成器generator
- future
- 並發的future
- 任務task
- 句柄
- 執行器executor
- 傳輸transport
- 協議
此外,Python 還新增了一些新的特殊方法:
-
__aenter__
和__aenter__
,用於異步塊操作 -
__aiter__
和__anext__
,用於異步迭代器(異步循環和異步推導)。為了更強大些,協議已經改變過一次了。 在 Python 3.5 它返回一個 awaitable(這是個協程);在 3.6它返回一個新的異步生成器。 -
__await__
,用於自定義的 awaitable
你還需要了解相當多的內容,文檔涵蓋了那些部分。盡管如此,我做了一些額外說明以便對其有更好的理解:
事件循環
asyncio 事件循環和你第一眼看上去的略有不同。表麵看,每個線程都有一個事件循環,然而事實並非如此。我認為它們應該按照如下的方式工作:
- 如果是主線程,當調用
asyncio.get_event_loop()
時創建一個事件循環。 - 如果是其它線程,當調用
asyncio.get_event_loop()
時返回運行時錯誤。 - 當前線程可以使用
asyncio.set_event_loop()
在任何時間節點綁定事件循環。該事件循環可由asyncio.new_evet_loop()
函數創建。 - 事件循環可以在不綁定到當前線程的情況下使用。
-
asyncio.get_event_loop()
返回綁定線程的事件循環,而非當前運行的事件循環。
這些行為的組合是超混淆的,主要有以下幾個原因。 首先,你需要知道這些函數被委托到全局設置的底層事件循環策略。 默認是將事件循環綁定到線程。 或者,如果需要的話,可以在理論上將事件循環綁定到一個 greenlet 或類似的。 然而,重要的是要知道庫代碼不控製策略,因此不能推斷 asyncio 將適用於線程。
其次,asyncio 不需要通過策略將事件循環綁定到上下文。 事件循環可以單獨工作。 但是這正是庫代碼的第一個問題,因為協同程序或類似的東西並不知道哪個事件循環負責調度它。 這意味著,如果從協程中調用 asyncio.get_event_loop()
,你可能沒有機會取得事件循環。 這也是所有 API 均采用可選的顯式事件循環參數的原因。 舉例來說,要弄清楚當前哪個協程正在運行,不能使用如下調用:
def get_task():
loop = asyncio.get_event_loop()
try:
return asyncio.Task.get_current(loop)
except RuntimeError:
return None
相反,必須顯式地傳遞事件循環。 這進一步要求你在庫代碼中顯式地遍曆事件循環,否則可能發生很奇怪的事情。 我不知道這種設計的思想是什麼,但如果不解決這個問題(例如 get_event_loop()
返回實際運行的事件循環),那麼唯一有意義的其它方案是明確禁止顯式事件循環傳遞,並要求它綁定到當前上下文(線程等)。
由於事件循環策略不提供當前上下文的標識符,因此庫也不可能以任何方式“索引”到當前上下文。 也沒有回調函數用來監視這樣的上下文的拆除,這進一步限製了實際可以開展的操作。
awaitable 與協程coroutine
以我的愚見,Python 最大的設計錯誤是過度重載迭代器。它們現在不僅用於迭代,而且用於各種類型的協程。 Python 中迭代器最大的設計錯誤之一是如果 StopIteration
沒有被捕獲形成的空泡。 這可能導致非常令人沮喪的問題,其中某處的異常可能導致其它地方的生成器或協同程序中止。 這是一個長期存在的問題,基於 Python 的模板引擎如 Jinja 經常麵臨這種問題。 該模板引擎在內部渲染為生成器,並且當由於某種原因的模板引起 StopIteration
時,渲染就停止在那裏。
Python 慢慢認識到了過度重載的教訓。 首先在 3.x 版本加入 asyncio 模塊,並沒有語言級支持。 所以自始至終它不過僅僅是裝飾器和生成器而已。 為了實現 yield from
以及其它東西,StopIteration
再次重載。 這導致了令人困惑的行為,像這樣:
>>> def foo(n):
... if n in (0, 1):
... return [1]
... for item in range(n):
... yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]
沒有錯誤,沒有警告。隻是不是你所期望的行為。 這是因為從一個作為生成器的函數中 return
的值實際上引發了一個帶有單個參數的 StopIteration
,它不是由迭代器協議捕獲的,而隻是在協程代碼中處理。
在 3.5 和 3.6 有很多改變,因為現在除了生成器我們還有協程對象。除了通過封裝生成器來生成協程,沒有其它可以直接生成協程的單獨對象。它是通過用給函數加 async
前綴來實現。 例如 async def x()
會產生這樣的協程。 現在在 3.6,將有單獨的異步生成器,它通過觸發 AsyncStopIteration
保持其獨立性。 此外,對於Python 3.5 和更高版本,導入新的 future 對象(generator_stop
),如果代碼在迭代步驟中觸發 StopIteration
,它將引發 RuntimeError
。
為什麼我提到這一切? 因為老的實現方式並未真的消失。 生成器仍然具有 send
和 throw
方法以及協程仍然在很大程度上表現為生成器。你需要知道這些東西,它們將在未來伴隨你相當長的時間。
為了統一很多這樣的重複,現在我們在 Python 中有更多的概念了:
- awaitable:具有
__await__
方法的對象。 由本地協同程序和舊式協同程序以及一些其它程序實現。 - 協程函數coroutinefunction:返回原生協程的函數。 不要與返回協程的函數混淆。
- 協程coroutine: 原生的協程程序。 注意,目前為止,當前文檔不認為老式 asyncio 協程是協程程序。 至少
inspect.iscoroutine
不認為它是協程。 盡管它被future/awaitable
分支接納。
特別令人困惑的是 asyncio.iscoroutinefunction
和inspect.iscoroutinefunction
正在做不同的事情,這與 inspect.iscoroutine
和 inspect.iscoroutinefunction
情況相同。 值得注意的是,盡管inspect
在類型檢查中不知道有關 asycnio 舊式協程函數的任何信息,但是當您檢查 awaitable 狀態時它顯然知道它們,即使它與 await
不一致。
協程封裝器coroutine wrapper
每當你運行 async def
,Python 就會調用一個線程局部的協程封裝器。它由sys.set_coroutine_wrapper
設置,並且它是可以包裝這些東西的一個函數。 看起來有點像如下代碼:
>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
... pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42
在這種情況下,我從來沒有實際調用原始的函數,隻是給你一個提示,說明這個函數可以做什麼。 目前我隻能說它總是線程局部有效,所以,如果替換事件循環策略,你需要搞清楚如何讓協程封裝器在相同的上下文同步更新。創建的新線程不會從父線程繼承那些標識。
這不要與 asyncio 協程封裝代碼混淆。
awaitable 和 future
有些東西是 awaitable 的。 據我所見,以下概念被認為是 awaitable:
- 原生的協程
- 配置了假的
CO_ITERABLE_COROUTINE
標識的生成器(文中有涉及) - 具有
__await__
方法的對象
除了生成器由於曆史遺留的原因不使用之外,其它的對象都使用 __await__
方法。CO_ITERABLE_COROUTINE
標誌來自哪裏?它來自一個協程封裝器(現在與sys.set_coroutine_wrapper
有些混淆),即 @asyncio.coroutine
。 通過一些間接方法,它使用types.coroutine
(現在與 types.CoroutineType
或 asyncio.coroutine
有些混淆)封裝生成器,並通過另外一個標誌 CO_ITERABLE_COROUTINE
重新創建內部代碼對象。
所以既然我們知道這些東西是什麼,那麼什麼是 future? 首先,我們需要澄清一件事情:在 Python 3 中,實際上有兩種(完全不兼容)的 future 類型:asyncio.futures.Future
和concurrent.futures.Future
。 其中一個出現在另一個之前,但它們都仍然在 asyncio 中使用。 例如,asyncio.run_coroutine_threadsafe()
將調度一個協程到在另一個線程中運行的事件循環,但它返回一個 concurrent.futures.Future
對象,而不是 asyncio.futures.Future
對象。 這是有道理的,因為隻有 concurrent.futures.Future
對象是線程安全的。
所以現在我們知道有兩個不兼容的 future,我們應該澄清哪個 future 在 asyncio 中。 老實說,我不完全確定差異在哪裏,但我打算暫時稱之為“最終”。它是一個最終將持有一個值的對象,當還在計算時你可以對最終結果做一些處理。 future 對象的一些變種稱為 deferred,還有一些叫做 promise。 我實在難以理解它們真正的區別。
你能用一個 future 對象做什麼? 你可以關聯一個準備就緒時將被調用的回調函數,或者你可以關聯一個 future 失敗時將被觸發的回調函數。 此外,你可以 await
它(它實現__await__
,因此可等待),此外,future 也可以取消。
那麼你怎樣才能得到這樣的 future 對象? 通過在 awaitable 對象上調用 asyncio.ensure_future
。它會把一個舊版的生成器轉變為 future 對象。 然而,如果你閱讀文檔,你會讀到asyncio.ensure_future
實際上返回一個task
(任務)。 那麼問題來了,什麼是任務?
任務
任務task某種意義上是一個封裝了協程的 futur 對象。它的工作方式和 future 類似,但它也有一些額外的方法來提取所包含的協程的當前堆棧。 我們已經見過了在前麵提到過的任務,因為它是通過Task.get_current
確定事件循環當前正在做什麼的主要方式。
在如何取消工作方麵,任務和 future 也有區別,但這超出了本文的範圍。“取消”是它們自己最大的問題。 如果你處於一個協程中,並且知道自己正在運行,你可以通過前麵提到的 Task.get_current
獲取自己的任務,但這需要你知道自己被派遣在哪個事件循環,該事件循環可能是、也可能不是已綁定的那個線程。
協程不可能知道它與哪個循環一起使用。task
也沒有提供該信息的公共 API。 然而,如果你確實可以獲得一個任務,你可以訪問 task._loop
,通過它反指到事件循環。
句柄
除了上麵提到的所有一切還有句柄。 句柄是等待執行的不透明對象,不可等待,但可以被取消。 特別是如果你使用 call_soon
或者 call_soon_threadsafe
(還有其它一些)調度執行一個調用,你可以獲得句柄,然後使用它盡力嚐試取消執行,但不能等待實際調用生效。
執行器Executor
因為你可以有多個事件循環,但這並不意味著每個線程理所當然地應用多個事件循環,最常見的情形還是一個線程一個事件循環。 那麼你如何通知另一個事件循環做一些工作? 你不能到另一個線程的事件循環中執行回調函數並獲取結果。 這種情況下,你需要使用執行器。
執行器Executor來自 concurrent.futures
,它允許你將工作安排到本身未發生事件的線程中。 例如,如果在事件循環中使用 run_in_executor
來調度將在另一個線程中調用的函數。 其返回結果是 asyncio 協程,而不是像 run_coroutine_threadsafe
這樣的並發協程。 我還沒有足夠的心智來弄清楚為什麼設計這樣的 API,應該如何使用,以及什麼時候使用。 文檔中建議執行器可以用於構建多進程。
傳輸和協議
我總是認為傳輸與協議也淩亂不堪,實際這部分內容基本上是對 Twisted 的逐字拷貝。詳情毋庸贅述,請直接閱讀相關文檔。
如何使用 asyncio
現在我們已經大致了解 asyncio,我發現了一些模式,人們似乎在寫 asyncio 代碼時使用:
- 將事件循環傳遞給所有協程。 這似乎是社區中一部分人的做法。 把事件循環信息提供給協程為協程獲取自己運行的任務提供了可能性。
- 或者你要求事件循環綁定到線程,這也能達到同樣的目的。 理想情況下兩者都支持。 可悲的是,社區已經分化。
- 如果想使用上下文數據(如線程本地數據),你可謂是運氣不佳。 最流行的變通方法顯然是 atlassian 的
aiolocals
,它基本上需要你手動傳遞上下文信息到協程,因為解釋器不為此提供支持。 這意味著如果你用一個工具類庫生成協程,你將失去上下文。 - 忽略 Python 中的舊式協程。 隻使用 3.5 版本中
async def
關鍵字和協程。 你總可能要用到它們,因為在老版本中,沒有異步上下文管理器,這是非常必要的資源管理。 - 學習重新啟動事件循環進行善後清理。 這部分功能和我預想的不同,我花了比較長的時間來厘清它的實現。清理操作的最好方式是不斷重啟事件循環直到沒有等待事件。 遺憾的是沒有什麼通用的模式來處理清理操作,你隻能用一些醜陋的臨時方案煳口度日。 例如 aiohttp 的 web 支持也做這個模式,所以如果你想要結合兩個清理邏輯,你可能需要重新實現它提供的工具助手,因為該助手功能實現後,它徹底破壞了事件循環的設計。 當然,它不是我見過的第一個幹這種壞事的庫 :(。
- 使用子進程是不明顯的。 你需要一個事件循環在主線程中運行,我想它是在監聽信號事件,然後分派到其它事件循環。 這需要通過
asyncio.get_child_watcher().attach_loop(...)
通知循環。 - 編寫同時支持異步和同步的代碼在某種程度上注定要失敗。 嚐試在同一個對象上支持
with
和async with
是危險的事情。 - 如果你想給一個協程起個更好的名字,弄清楚為什麼它沒有被等待,設置
__name__
沒有幫助。 你需要設置__qualname__
而不是打印出錯誤消息來。 - 有時內部類型交換會使你麻痹。 特別是
asyncio.wait()
函數將確保所有的事情都是 future,這意味著如果你傳遞協程,你將很難發現你的協程是否已經完成或者正在等待,因為輸入對象不再匹配輸出對象。 在這種情況下,唯一真正理智的做法是確保前期一切都是 future。
上下文數據
除了瘋狂的複雜性和對如何更好地編寫 API 缺乏理解,我最大的問題是完全缺乏對上下文本地數據的考慮。這是 Node 社區現在學習的東西。continuation-local-storage
存在,但該實現被接受的太晚。持續本地存儲和類似的概念常用於在並發環境中實施安全策略,並且該信息的損壞可能導致嚴重的安全問題。
事實上,Python 甚至沒有任何存儲,這令人失望至極。我正在研究這個內容,因為我正在調查如何最好地支持 Sentry's breadcrumbs 的 asyncio,然而我並沒有看到一個合理的方式做到這一點。在 asyncio 中沒有上下文的概念,沒有辦法從通用代碼中找出您正在使用的事件循環,並且如果沒有 monkeypatching(運行環境下的補丁),也無法獲取這些信息。
Node 當前正在經曆如何找到這個問題的長期解決方案的過程。這個問題不容忽視,因為它在所有生態係統中反複出現過,如 JavaScript、Python 和 .NET 環境。該問題被命名為異步上下文傳播,其解決方案有許多名稱。在 Go 中,需要使用上下文包,並明確地傳遞給所有 goroutine(不是一個完美的解決方案,但至少有一個)。.NET 具有本地調用上下文形式的最佳解決方案。它可以是線程上下文,Web 請求上下文或類似的東西,除非被抑製,否則它會自動傳播。微軟的解決方案是我們的黃金標準。我現在相信,微軟在 15 年前已經解決了該問題。
我不知道該生態係統是否還夠年輕,還可以添加邏輯調用上下文,可能現在仍然為時未晚。
原文發布時間為:2017-12-21
本文來自雲棲社區合作夥伴“Linux中國”
最後更新:2017-06-01 12:02:18