閱讀827 返回首頁    go 技術社區[雲棲]


一個使用 asyncio 協程的網絡爬蟲(二)

協程

還記得我們對你許下的承諾麼?我們可以寫出這樣的異步代碼,它既有回調方式的高效,也有多線程代碼的簡潔。這個結合是同過一種稱為協程coroutine的模式來實現的。使用 Python3.4 標準庫 asyncio 和一個叫“aiohttp”的包,在協程中獲取一個網頁是非常直接的( @asyncio.coroutine 修飾符並非魔法。事實上,如果它修飾的是一個生成器函數,並且沒有設置 PYTHONASYNCIODEBUG 環境變量的話,這個修飾符基本上沒啥用。它隻是為了框架的其它部分方便,設置了一個屬性 _is_coroutine 而已。也可以直接使用 asyncio 和裸生成器,而沒有@asyncio.coroutine 修飾符):


  1. @asyncio.coroutine
  2. def fetch(self, url):
  3. response = yield from self.session.get(url)
  4. body = yield from response.read()

它也是可擴展的。在作者 Jesse 的係統上,與每個線程 50k 內存相比,一個 Python 協程隻需要 3k 內存。Python 很容易就可以啟動上千個協程。

協程的概念可以追溯到計算機科學的遠古時代,它很簡單,一個可以暫停和恢複的子過程。線程是被操作係統控製的搶占式多任務,而協程的多任務是可合作的,它們自己選擇什麼時候暫停去執行下一個協程。

有很多協程的實現。甚至在 Python 中也有幾種。Python 3.4 標準庫 asyncio 中的協程是建立在生成器之上的,這是一個 Future 類和“yield from”語句。從 Python 3.5 開始,協程變成了語言本身的特性(“PEP 492 Coroutines with async and await syntax” 中描述了 Python 3.5 內置的協程)。然而,理解 Python 3.4 中這個通過語言原有功能實現的協程,是我們處理 Python 3.5 中原生協程的基礎。

要解釋 Python 3.4 中基於生成器的協程,我們需要深入生成器的方方麵麵,以及它們是如何在 asyncio 中用作協程的。我很高興就此寫點東西,想必你也希望繼續讀下去。我們解釋了基於生成器的協程之後,就會在我們的異步網絡爬蟲中使用它們。

生成器如何工作

在你理解生成器之前,你需要知道普通的 Python 函數是怎麼工作的。正常情況下,當一個函數調用一個子過程,這個被調用函數獲得控製權,直到它返回或者有異常發生,才把控製權交給調用者:


  1. >>> def foo():
  2. ... bar()
  3. ...
  4. >>> def bar():
  5. ... pass

標準的 Python 解釋器是用 C 語言寫的。一個 Python 函數被調用所對應的 C 函數是 PyEval_EvalFrameEx。它獲得一個 Python 棧幀結構並在這個棧幀的上下文中執行 Python 字節碼。這裏是 foo 函數的字節碼:


  1. >>> import dis
  2. >>> dis.dis(foo)
  3. 2 0 LOAD_GLOBAL 0 (bar)
  4. 3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
  5. 6 POP_TOP
  6. 7 LOAD_CONST 0 (None)
  7. 10 RETURN_VALUE

foo 函數在它棧中加載 bar 函數並調用它,然後把 bar 的返回值從棧中彈出,加載 None 值到堆棧並返回。

當 PyEval_EvalFrameEx 遇到 CALL_FUNCTION 字節碼時,它會創建一個新的棧幀,並用這個棧幀遞歸的調用PyEval_EvalFrameEx 來執行 bar 函數。

非常重要的一點是,Python 的棧幀在堆中分配!Python 解釋器是一個標準的 C 程序,所以它的棧幀是正常的棧幀。但是 Python 的棧幀是在堆中處理。這意味著 Python 棧幀在函數調用結束後依然可以存在。我們在bar 函數中保存當前的棧幀,交互式的看看這種現象:


  1. >>> import inspect
  2. >>> frame = None
  3. >>> def foo():
  4. ... bar()
  5. ...
  6. >>> def bar():
  7. ... global frame
  8. ... frame = inspect.currentframe()
  9. ...
  10. >>> foo()
  11. >>> # The frame was executing the code for 'bar'.
  12. >>> frame.f_code.co_name
  13. 'bar'
  14. >>> # Its back pointer refers to the frame for 'foo'.
  15. >>> caller_frame = frame.f_back
  16. >>> caller_frame.f_code.co_name
  17. 'foo'

Figure 5.1 - Function Calls

Figure 5.1 - Function Calls

現在該說 Python 生成器了,它使用同樣構件——代碼對象和棧幀——去完成一個不可思議的任務。

這是一個生成器函數:


  1. >>> def gen_fn():
  2. ... result = yield 1
  3. ... print('result of yield: {}'.format(result))
  4. ... result2 = yield 2
  5. ... print('result of 2nd yield: {}'.format(result2))
  6. ... return 'done'
  7. ...

在 Python 把 gen_fn 編譯成字節碼的過程中,一旦它看到 yield 語句就知道這是一個生成器函數而不是普通的函數。它就會設置一個標誌來記住這個事實:


  1. >>> # The generator flag is bit position 5.
  2. >>> generator_bit = 1 << 5
  3. >>> bool(gen_fn.__code__.co_flags & generator_bit)
  4. True

當你調用一個生成器函數,Python 看到這個標誌,就不會實際運行它而是創建一個生成器:


  1. >>> gen = gen_fn()
  2. >>> type(gen)
  3. <class 'generator'>

Python 生成器封裝了一個棧幀和函數體代碼的引用:


  1. >>> gen.gi_code.co_name
  2. 'gen_fn'

所有通過調用 gen_fn 的生成器指向同一段代碼,但都有各自的棧幀。這些棧幀不再任何一個C函數棧中,而是在堆空間中等待被使用:

Figure 5.2 - Generators

Figure 5.2 - Generators

棧幀中有一個指向“最後執行指令”的指針。初始化為 -1,意味著它沒開始運行:


  1. >>> gen.gi_frame.f_lasti
  2. -1

當我們調用 send 時,生成器一直運行到第一個 yield 語句處停止,並且 send 返回 1,因為這是 gen 傳遞給 yield 表達式的值。


  1. >>> gen.send(None)
  2. 1

現在,生成器的指令指針是 3,所編譯的Python 字節碼一共有 56 個字節:


  1. >>> gen.gi_frame.f_lasti
  2. 3
  3. >>> len(gen.gi_code.co_code)
  4. 56

這個生成器可以在任何時候、任何函數中恢複運行,因為它的棧幀並不在真正的棧中,而是堆中。在調用鏈中它的位置也是不固定的,它不必遵循普通函數先進後出的順序。它像雲一樣自由。

我們可以傳遞一個值 hello 給生成器,它會成為 yield 語句的結果,並且生成器會繼續運行到第二個yield 語句處。


  1. >>> gen.send('hello')
  2. result of yield: hello
  3. 2

現在棧幀中包含局部變量 result


  1. >>> gen.gi_frame.f_locals
  2. {'result': 'hello'}

其它從 gen_fn 創建的生成器有著它自己的棧幀和局部變量。

當我們再一次調用 send,生成器繼續從第二個 yield 開始運行,以拋出一個特殊的 StopIteration 異常為結束。


  1. >>> gen.send('goodbye')
  2. result of 2nd yield: goodbye
  3. Traceback (most recent call last):
  4. File "<input>", line 1, in <module>
  5. StopIteration: done

這個異常有一個值 "done",它就是生成器的返回值。

使用生成器構建協程

所以生成器可以暫停,可以給它一個值讓它恢複,並且它還有一個返回值。這些特性看起來很適合去建立一個不使用那種亂糟糟的意麵似的回調異步編程模型。我們想創造一個這樣的“協程”:一個在程序中可以和其他過程合作調度的過程。我們的協程將會是標準庫 asyncio 中協程的一個簡化版本,我們將使用生成器,futures 和yield from 語句。

首先,我們需要一種方法去代表協程所需要等待的 future 事件。一個簡化的版本是:


  1. class Future:
  2. def __init__(self):
  3. self.result = None
  4. self._callbacks = []
  5. def add_done_callback(self, fn):
  6. self._callbacks.append(fn)
  7. def set_result(self, result):
  8. self.result = result
  9. for fn in self._callbacks:
  10. fn(self)

一個 future 初始化為“未解決的”,它通過調用 set_result 來“解決”。(這個 future 缺少很多東西,比如說,當這個 future 解決後,生成yield的協程應該馬上恢複而不是暫停,但是在我們的代碼中卻不沒有這樣做。參見 asyncio 的 Future 類以了解其完整實現。)

讓我們用 future 和協程來改寫我們的 fetcher。我們之前用回調寫的 fetch 如下:


  1. class Fetcher:
  2. def fetch(self):
  3. self.sock = socket.socket()
  4. self.sock.setblocking(False)
  5. try:
  6. self.sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. selector.register(self.sock.fileno(),
  10. EVENT_WRITE,
  11. self.connected)
  12. def connected(self, key, mask):
  13. print('connected!')
  14. # And so on....

fetch 方法開始連接一個套接字,然後注冊 connected 回調函數,它會在套接字建立連接後調用。現在我們使用協程把這兩步合並:


  1. def fetch(self):
  2. sock = socket.socket()
  3. sock.setblocking(False)
  4. try:
  5. sock.connect(('xkcd.com', 80))
  6. except BlockingIOError:
  7. pass
  8. f = Future()
  9. def on_connected():
  10. f.set_result(None)
  11. selector.register(sock.fileno(),
  12. EVENT_WRITE,
  13. on_connected)
  14. yield f
  15. selector.unregister(sock.fileno())
  16. print('connected!')

現在,fetch 是一個生成器,因為它有一個 yield 語句。我們創建一個未決的 future,然後 yield 它,暫停fetch 直到套接字連接建立。內聯函數 on_connected 解決這個 future。

但是當 future 被解決,誰來恢複這個生成器?我們需要一個協程驅動器。讓我們叫它 “task”:


  1. class Task:
  2. def __init__(self, coro):
  3. self.coro = coro
  4. f = Future()
  5. f.set_result(None)
  6. self.step(f)
  7. def step(self, future):
  8. try:
  9. next_future = self.coro.send(future.result)
  10. except StopIteration:
  11. return
  12. next_future.add_done_callback(self.step)
  13. # Begin fetching http://xkcd.com/353/
  14. fetcher = Fetcher('/353/')
  15. Task(fetcher.fetch())
  16. loop()

task 通過傳遞一個 None 值給 fetch 來啟動它。fetch 運行到它 yeild 出一個 future,這個 future 被作為next_future 而捕獲。當套接字連接建立,事件循環運行回調函數 on_connected,這裏 future 被解決,step 被調用,fetch 恢複運行。

用 yield from 重構協程

一旦套接字連接建立,我們就可以發送 HTTP GET 請求,然後讀取服務器響應。不再需要哪些分散在各處的回調函數,我們把它們放在同一個生成器函數中:


  1. def fetch(self):
  2. # ... connection logic from above, then:
  3. sock.send(request.encode('ascii'))
  4. while True:
  5. f = Future()
  6. def on_readable():
  7. f.set_result(sock.recv(4096))
  8. selector.register(sock.fileno(),
  9. EVENT_READ,
  10. on_readable)
  11. chunk = yield f
  12. selector.unregister(sock.fileno())
  13. if chunk:
  14. self.response += chunk
  15. else:
  16. # Done reading.
  17. break

從套接字中讀取所有信息的代碼看起來很通用。我們能不把它從 fetch 中提取成一個子過程?現在該 Python 3 熱捧的 yield from 登場了。它能讓一個生成器委派另一個生成器。

讓我們先回到原來那個簡單的生成器例子:


  1. >>> def gen_fn():
  2. ... result = yield 1
  3. ... print('result of yield: {}'.format(result))
  4. ... result2 = yield 2
  5. ... print('result of 2nd yield: {}'.format(result2))
  6. ... return 'done'
  7. ...

為了從其他生成器調用這個生成器,我們使用 yield from 委派它:


  1. >>> # Generator function:
  2. >>> def caller_fn():
  3. ... gen = gen_fn()
  4. ... rv = yield from gen
  5. ... print('return value of yield-from: {}'
  6. ... .format(rv))
  7. ...
  8. >>> # Make a generator from the
  9. >>> # generator function.
  10. >>> caller = caller_fn()

這個 caller 生成器的行為的和它委派的生成器 gen 表現的完全一致:


  1. >>> caller.send(None)
  2. 1
  3. >>> caller.gi_frame.f_lasti
  4. 15
  5. >>> caller.send('hello')
  6. result of yield: hello
  7. 2
  8. >>> caller.gi_frame.f_lasti # Hasn't advanced.
  9. 15
  10. >>> caller.send('goodbye')
  11. result of 2nd yield: goodbye
  12. return value of yield-from: done
  13. Traceback (most recent call last):
  14. File "<input>", line 1, in <module>
  15. StopIteration

當 caller 自 gen 生成(yield),caller 就不再前進。注意到 caller 的指令指針保持15不變,就是yield from 的地方,即使內部的生成器 gen 從一個 yield 語句運行到下一個 yield,它始終不變。(事實上,這就是“yield from”在 CPython 中工作的具體方式。函數會在執行每個語句之前提升其指令指針。但是在外部生成器執行“yield from”後,它會將其指令指針減一,以保持其固定在“yield form”語句上。然後其生成其 caller。這個循環不斷重複,直到內部生成器拋出 StopIteration,這裏指向外部生成器最終允許它自己進行到下一條指令的地方。)從 caller 外部來看,我們無法分辨 yield 出的值是來自 caller 還是它委派的生成器。而從 gen 內部來看,我們也不能分辨傳給它的值是來自 caller 還是 caller 的外麵。yield from語句是一個光滑的管道,值通過它進出 gen,一直到 gen 結束。

協程可以用 yield from 把工作委派給子協程,並接收子協程的返回值。注意到上麵的 caller 打印出“return value of yield-from: done”。當 gen 完成後,它的返回值成為 caller 中 yield from 語句的值。


  1. rv = yield from gen

前麵我們批評過基於回調的異步編程模式,其中最大的不滿是關於 “堆棧撕裂stack ripping”:當一個回調拋出異常,它的堆棧回溯通常是毫無用處的。它隻顯示出事件循環運行了它,而沒有說為什麼。那麼協程怎麼樣?


  1. >>> def gen_fn():
  2. ... raise Exception('my error')
  3. >>> caller = caller_fn()
  4. >>> caller.send(None)
  5. Traceback (most recent call last):
  6. File "<input>", line 1, in <module>
  7. File "<input>", line 3, in caller_fn
  8. File "<input>", line 2, in gen_fn
  9. Exception: my error

這還是非常有用的,當異常拋出時,堆棧回溯顯示出 caller_fn 委派了 gen_fn。令人更欣慰的是,你可以在一次異常處理器中封裝這個調用到一個子過程中,像正常函數一樣:


  1. >>> def gen_fn():
  2. ... yield 1
  3. ... raise Exception('uh oh')
  4. ...
  5. >>> def caller_fn():
  6. ... try:
  7. ... yield from gen_fn()
  8. ... except Exception as exc:
  9. ... print('caught {}'.format(exc))
  10. ...
  11. >>> caller = caller_fn()
  12. >>> caller.send(None)
  13. 1
  14. >>> caller.send('hello')
  15. caught uh oh

所以我們可以像提取子過程一樣提取子協程。讓我們從 fetcher 中提取一些有用的子協程。我們先寫一個可以讀一塊數據的協程 read


  1. def read(sock):
  2. f = Future()
  3. def on_readable():
  4. f.set_result(sock.recv(4096))
  5. selector.register(sock.fileno(), EVENT_READ, on_readable)
  6. chunk = yield f # Read one chunk.
  7. selector.unregister(sock.fileno())
  8. return chunk

在 read 的基礎上,read_all 協程讀取整個信息:


  1. def read_all(sock):
  2. response = []
  3. # Read whole response.
  4. chunk = yield from read(sock)
  5. while chunk:
  6. response.append(chunk)
  7. chunk = yield from read(sock)
  8. return b''.join(response)

如果你換個角度看,拋開 yield form 語句的話,它們就像在做阻塞 I/O 的普通函數一樣。但是事實上,read 和 read_all 都是協程。yield from read 暫停 read_all 直到 I/O 操作完成。當read_all 暫停時,asyncio 的事件循環正在做其它的工作並等待其他的 I/O 操作。read 在下次循環中當事件就緒,完成 I/O 操作時,read_all 恢複運行。

最終,fetch 調用了 read_all


  1. class Fetcher:
  2. def fetch(self):
  3. # ... connection logic from above, then:
  4. sock.send(request.encode('ascii'))
  5. self.response = yield from read_all(sock)

神奇的是,Task 類不需要做任何改變,它像以前一樣驅動外部的 fetch 協程:


  1. Task(fetcher.fetch())
  2. loop()

當 read yield 一個 future 時,task 從 yield from 管道中接收它,就像這個 future 直接從 fetch yield 一樣。當循環解決一個 future 時,task 把它的結果送給 fetch,通過管道,read 接受到這個值,這完全就像 task 直接驅動 read 一樣:

Figure 5.3 - Yield From

Figure 5.3 - Yield From

為了完善我們的協程實現,我們再做點打磨:當等待一個 future 時,我們的代碼使用 yield;而當委派一個子協程時,使用 yield from。不管是不是協程,我們總是使用 yield form 會更精煉一些。協程並不需要在意它在等待的東西是什麼類型。

在 Python 中,我們從生成器和迭代器的高度相似中獲得了好處,將生成器進化成 caller,迭代器也可以同樣獲得好處。所以,我們可以通過特殊的實現方式來迭代我們的 Future 類:


  1. # Method on Future class.
  2. def __iter__(self):
  3. # Tell Task to resume me here.
  4. yield self
  5. return self.result

future 的 __iter__ 方法是一個 yield 它自身的一個協程。當我們將代碼替換如下時:


  1. # f is a Future.
  2. yield f

以及……:


  1. # f is a Future.
  2. yield from f

……結果是一樣的!驅動 Task 從它的調用 send 中接收 future,並當 future 解決後,它發回新的結果給該協程。

在每個地方都使用 yield from 的好處是什麼?為什麼比用 field 等待 future 並用 yield from 委派子協程更好?之所以更好的原因是,一個方法可以自由地改變其實行而不影響到其調用者:它可以是一個當 future 解決後返回一個值的普通方法,也可以是一個包含 yield from 語句並返回一個值的協程。無論是哪種情況,調用者僅需要 yield from 該方法以等待結果就行。

親愛的讀者,我們已經完成了對 asyncio 協程探索。我們深入觀察了生成器的機製,實現了簡單的 future 和 task。我們指出協程是如何利用兩個世界的優點:比線程高效、比回調清晰的並發 I/O。當然真正的 asyncio 比我們這個簡化版本要複雜的多。真正的框架需要處理zero-copy I/0、公平調度、異常處理和其他大量特性。

使用 asyncio 編寫協程代碼比你現在看到的要簡單的多。在前麵的代碼中,我們從基本原理去實現協程,所以你看到了回調,task 和 future,甚至非阻塞套接字和 select 調用。但是當用 asyncio 編寫應用,這些都不會出現在你的代碼中。我們承諾過,你可以像這樣下載一個網頁:


  1. @asyncio.coroutine
  2. def fetch(self, url):
  3. response = yield from self.session.get(url)
  4. body = yield from response.read()

對我們的探索還滿意麼?回到我們原始的任務:使用 asyncio 寫一個網絡爬蟲。

原文發布時間為:2017-03-05

本文來自雲棲社區合作夥伴“Linux中國”

最後更新:2017-05-25 17:01:45

  上一篇:go  一個使用 asyncio 協程的網絡爬蟲(一)
  下一篇:go  如何在 Ubuntu 上用 Yocto 創建你自己的嵌入式 Linux 發行版