閱讀331 返回首頁    go 阿裏雲 go 技術社區[雲棲]


一個使用 asyncio 協程的網絡爬蟲(一)

介紹

經典的計算機科學強調高效的算法,盡可能快地完成計算。但是很多網絡程序的時間並不是消耗在計算上,而是在等待許多慢速的連接或者低頻事件的發生。這些程序暴露出一個新的挑戰:如何高效的等待大量網絡事件。一個現代的解決方案是異步 I/O。

這一章我們將實現一個簡單的網絡爬蟲。這個爬蟲隻是一個原型式的異步應用,因為它等待許多響應而隻做少量的計算。一次爬的網頁越多,它就能越快的完成任務。如果它為每個動態的請求啟動一個線程的話,隨著並發請求數量的增加,它會在耗盡套接字之前,耗盡內存或者線程相關的資源。使用異步 I/O 可以避免這個的問題。

我們將分三個階段展示這個例子。首先,我們會實現一個事件循環並用這個事件循環和回調來勾畫出一隻網絡爬蟲。它很有效,但是當把它擴展成更複雜的問題時,就會導致無法管理的混亂代碼。然後,由於 Python 的協程不僅有效而且可擴展,我們將用 Python 的生成器函數實現一個簡單的協程。在最後一個階段,我們將使用 Python 標準庫“asyncio”中功能完整的協程, 並通過異步隊列完成這個網絡爬蟲。(在 PyCon 2013 上,Guido 介紹了標準的 asyncio 庫,當時稱之為“Tulip”。)

任務

網絡爬蟲尋找並下載一個網站上的所有網頁,也許還會把它們存檔,為它們建立索引。從根 URL 開始,它獲取每個網頁,解析出沒有遇到過的鏈接加到隊列中。當網頁沒有未見到過的鏈接並且隊列為空時,它便停止運行。

我們可以通過同時下載大量的網頁來加快這一過程。當爬蟲發現新的鏈接,它使用一個新的套接字並行的處理這個新鏈接,解析響應,添加新鏈接到隊列。當並發很大時,可能會導致性能下降,所以我們會限製並發的數量,在隊列保留那些未處理的鏈接,直到一些正在執行的任務完成。

傳統方式

怎麼使一個爬蟲並發?傳統的做法是創建一個線程池,每個線程使用一個套接字在一段時間內負責一個網頁的下載。比如,下載 xkcd.com 網站的一個網頁:


  1. def fetch(url):
  2. sock = socket.socket()
  3. sock.connect(('xkcd.com', 80))
  4. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  5. sock.send(request.encode('ascii'))
  6. response = b''
  7. chunk = sock.recv(4096)
  8. while chunk:
  9. response += chunk
  10. chunk = sock.recv(4096)
  11. # Page is now downloaded.
  12. links = parse_links(response)
  13. q.add(links)

套接字操作默認是阻塞的:當一個線程調用一個類似 connect 和 recv 方法時,它會阻塞,直到操作完成。(即使是 send 也能被阻塞,比如接收端在接受外發消息時緩慢而係統的外發數據緩存已經滿了的情況下)因此,為了同一時間內下載多個網頁,我們需要很多線程。一個複雜的應用會通過線程池保持空閑的線程來分攤創建線程的開銷。同樣的做法也適用於套接字,使用連接池。

到目前為止,使用線程的是成本昂貴的,操作係統對一個進程、一個用戶、一台機器能使用線程做了不同的硬性限製。在 作者 Jesse 的係統中,一個 Python 線程需要 50K 的內存,開啟上萬個線程就會失敗。每個線程的開銷和係統的限製就是這種方式的瓶頸所在。

在 Dan Kegel 那一篇很有影響力的文章“The C10K problem”中,它提出了多線程方式在 I/O 並發上的局限性。他在開始寫道,

網絡服務器到了要同時處理成千上萬的客戶的時代了,你不這樣認為麼?畢竟,現在網絡規模很大了。

Kegel 在 1999 年創造出“C10K”這個術語。一萬個連接在今天看來還是可接受的,但是問題依然存在,隻不過大小不同。回到那時候,對於 C10K 問題,每個連接啟一個線程是不切實際的。現在這個限製已經成指數級增長。確實,我們的玩具網絡爬蟲使用線程也可以工作的很好。但是,對於有著千萬級連接的大規模應用來說,限製依然存在:它會消耗掉所有線程,即使套接字還夠用。那麼我們該如何解決這個問題?

異步

異步 I/O 框架在一個線程中完成並發操作。讓我們看看這是怎麼做到的。

異步框架使用非阻塞套接字。異步爬蟲中,我們在發起到服務器的連接前把套接字設為非阻塞:


  1. sock = socket.socket()
  2. sock.setblocking(False)
  3. try:
  4. sock.connect(('xkcd.com', 80))
  5. except BlockingIOError:
  6. pass

對一個非阻塞套接字調用 connect 方法會立即拋出異常,即使它可以正常工作。這個異常複現了底層 C 語言函數令人厭煩的行為,它把 errno 設置為 EINPROGRESS,告訴你操作已經開始。

現在我們的爬蟲需要一種知道連接何時建立的方法,這樣它才能發送 HTTP 請求。我們可以簡單地使用循環來重試:


  1. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  2. encoded = request.encode('ascii')
  3. while True:
  4. try:
  5. sock.send(encoded)
  6. break # Done.
  7. except OSError as e:
  8. pass
  9. print('sent')

這種方法不僅消耗 CPU,也不能有效的等待多個套接字。在遠古時代,BSD Unix 的解決方法是 select,這是一個 C 函數,它在一個或一組非阻塞套接字上等待事件發生。現在,互聯網應用大量連接的需求,導致select 被 poll 所代替,在 BSD 上的實現是 kqueue ,在 Linux 上是 epoll。它們的 API 和 select相似,但在大數量的連接中也能有較好的性能。

Python 3.4 的 DefaultSelector 會使用你係統上最好的 select 類函數。要注冊一個網絡 I/O 事件的提醒,我們會創建一個非阻塞套接字,並使用默認 selector 注冊它。


  1. from selectors import DefaultSelector, EVENT_WRITE
  2. selector = DefaultSelector()
  3. sock = socket.socket()
  4. sock.setblocking(False)
  5. try:
  6. sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. def connected():
  10. selector.unregister(sock.fileno())
  11. print('connected!')
  12. selector.register(sock.fileno(), EVENT_WRITE, connected)

我們不理會這個偽造的錯誤,調用 selector.register,傳遞套接字文件描述符和一個表示我們想要監聽什麼事件的常量表達式。為了當連接建立時收到提醒,我們使用 EVENT_WRITE :它表示什麼時候這個套接字可寫。我們還傳遞了一個 Python 函數 connected,當對應事件發生時被調用。這樣的函數被稱為回調

在一個循環中,selector 接收到 I/O 提醒時我們處理它們。


  1. def loop():
  2. while True:
  3. events = selector.select()
  4. for event_key, event_mask in events:
  5. callback = event_key.data
  6. callback()

connected 回調函數被保存在 event_key.data 中,一旦這個非阻塞套接字建立連接,它就會被取出來執行。

不像我們前麵那個快速輪轉的循環,這裏的 select 調用會暫停,等待下一個 I/O 事件,接著執行等待這些事件的回調函數。沒有完成的操作會保持掛起,直到進到下一個事件循環時執行。

到目前為止我們展現了什麼?我們展示了如何開始一個 I/O 操作和當操作準備好時調用回調函數。異步框架,它在單線程中執行並發操作,其建立在兩個功能之上,非阻塞套接字和事件循環。

我們這裏達成了“並發性concurrency”,但不是傳統意義上的“並行性parallelism”。也就是說,我們構建了一個可以進行重疊 I/O 的微小係統,它可以在其它操作還在進行的時候就開始一個新的操作。它實際上並沒有利用多核來並行執行計算。這個係統是用於解決I/O 密集I/O-bound問題的,而不是解決 CPU 密集CPU-bound問題的。(Python 的全局解釋器鎖禁止在一個進程中以任何方式並行執行 Python 代碼。在 Python 中並行化 CPU 密集的算法需要多個進程,或者以將該代碼移植為 C 語言並行版本。但是這是另外一個話題了。)

所以,我們的事件循環在並發 I/O 上是有效的,因為它並不用為每個連接撥付線程資源。但是在我們開始前,我們需要澄清一個常見的誤解:異步比多線程快。通常並不是這樣的,事實上,在 Python 中,在處理少量非常活躍的連接時,像我們這樣的事件循環是慢於多線程的。在運行時環境中是沒有全局解釋器鎖的,在同樣的負載下線程會執行的更好。異步 I/O 真正適用於事件很少、有許多緩慢或睡眠的連接的應用程序。(Jesse 在“什麼是異步,它如何工作,什麼時候該用它?”一文中指出了異步所適用和不適用的場景。Mike Bayer 在“異步 Python 和數據庫”一文中比較了不同負載情況下異步 I/O 和多線程的不同。)

回調

用我們剛剛建立的異步框架,怎麼才能完成一個網絡爬蟲?即使是一個簡單的網頁下載程序也是很難寫的。

首先,我們有一個尚未獲取的 URL 集合,和一個已經解析過的 URL 集合。


  1. urls_todo = set(['/'])
  2. seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已經完成的 URL。用根 URL / 初始化它們。

獲取一個網頁需要一係列的回調。在套接字連接建立時會觸發 connected 回調,它向服務器發送一個 GET 請求。但是它要等待響應,所以我們需要注冊另一個回調函數;當該回調被調用,它仍然不能讀取到完整的請求時,就會再一次注冊回調,如此反複。

讓我們把這些回調放在一個 Fetcher 對象中,它需要一個 URL,一個套接字,還需要一個地方保存返回的字節:


  1. class Fetcher:
  2. def __init__(self, url):
  3. self.response = b'' # Empty array of bytes.
  4. self.url = url
  5. self.sock = None

我們的入口點在 Fetcher.fetch


  1. # Method on Fetcher class.
  2. def fetch(self):
  3. self.sock = socket.socket()
  4. self.sock.setblocking(False)
  5. try:
  6. self.sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. # Register next callback.
  10. selector.register(self.sock.fileno(),
  11. EVENT_WRITE,
  12. self.connected)

fetch 方法從連接一個套接字開始。但是要注意這個方法在連接建立前就返回了。它必須將控製返回到事件循環中等待連接建立。為了理解為什麼要這樣做,假設我們程序的整體結構如下:


  1. # Begin fetching http://xkcd.com/353/
  2. fetcher = Fetcher('/353/')
  3. fetcher.fetch()
  4. while True:
  5. events = selector.select()
  6. for event_key, event_mask in events:
  7. callback = event_key.data
  8. callback(event_key, event_mask)

當調用 select 函數後,所有的事件提醒才會在事件循環中處理,所以 fetch 必須把控製權交給事件循環,這樣我們的程序才能知道什麼時候連接已建立,接著循環調用 connected 回調,它已經在上麵的 fetch 方法中注冊過。

這裏是我們的 connected 方法的實現:


  1. # Method on Fetcher class.
  2. def connected(self, key, mask):
  3. print('connected!')
  4. selector.unregister(key.fd)
  5. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
  6. self.sock.send(request.encode('ascii'))
  7. # Register the next callback.
  8. selector.register(key.fd,
  9. EVENT_READ,
  10. self.read_response)

這個方法發送一個 GET 請求。一個真正的應用會檢查 send 的返回值,以防所有的信息沒能一次發送出去。但是我們的請求很小,應用也不複雜。它隻是簡單的調用 send,然後等待響應。當然,它必須注冊另一個回調並把控製權交給事件循環。接下來也是最後一個回調函數 read_response,它處理服務器的響應:


  1. # Method on Fetcher class.
  2. def read_response(self, key, mask):
  3. global stopped
  4. chunk = self.sock.recv(4096) # 4k chunk size.
  5. if chunk:
  6. self.response += chunk
  7. else:
  8. selector.unregister(key.fd) # Done reading.
  9. links = self.parse_links()
  10. # Python set-logic:
  11. for link in links.difference(seen_urls):
  12. urls_todo.add(link)
  13. Fetcher(link).fetch() # <- New Fetcher.
  14. seen_urls.update(links)
  15. urls_todo.remove(self.url)
  16. if not urls_todo:
  17. stopped = True

這個回調在每次 selector 發現套接字可讀時被調用,可讀有兩種情況:套接字接受到數據或它被關閉。

這個回調函數從套接字讀取 4K 數據。如果不到 4k,那麼有多少讀多少。如果比 4K 多,chunk 中隻包 4K 數據並且這個套接字保持可讀,這樣在事件循環的下一個周期,會再次回到這個回調函數。當響應完成時,服務器關閉這個套接字,chunk 為空。

這裏沒有展示的 parse_links 方法,它返回一個 URL 集合。我們為每個新的 URL 啟動一個 fetcher。注意一個使用異步回調方式編程的好處:我們不需要為共享數據加鎖,比如我們往 seen_urls 增加新鏈接時。這是一種非搶占式的多任務,它不會在我們代碼中的任意一個地方被打斷。

我們增加了一個全局變量 stopped,用它來控製這個循環:


  1. stopped = False
  2. def loop():
  3. while not stopped:
  4. events = selector.select()
  5. for event_key, event_mask in events:
  6. callback = event_key.data
  7. callback()

一旦所有的網頁被下載下來,fetcher 停止這個事件循環,程序退出。

這個例子讓異步編程的一個問題明顯的暴露出來:意大利麵代碼。

我們需要某種方式來表達一係列的計算和 I/O 操作,並且能夠調度多個這樣的係列操作讓它們並發的執行。但是,沒有線程你不能把這一係列操作寫在一個函數中:當函數開始一個 I/O 操作,它明確的把未來所需的狀態保存下來,然後返回。你需要考慮如何寫這個狀態保存的代碼。

讓我們來解釋下這到底是什麼意思。先來看一下在線程中使用通常的阻塞套接字來獲取一個網頁時是多麼簡單。


  1. # Blocking version.
  2. def fetch(url):
  3. sock = socket.socket()
  4. sock.connect(('xkcd.com', 80))
  5. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  6. sock.send(request.encode('ascii'))
  7. response = b''
  8. chunk = sock.recv(4096)
  9. while chunk:
  10. response += chunk
  11. chunk = sock.recv(4096)
  12. # Page is now downloaded.
  13. links = parse_links(response)
  14. q.add(links)

在一個套接字操作和下一個操作之間這個函數到底記住了什麼狀態?它有一個套接字,一個 URL 和一個可增長的response。運行在線程中的函數使用編程語言的基本功能來在棧中的局部變量保存這些臨時狀態。這樣的函數也有一個“continuation”——它會在 I/O 結束後執行這些代碼。運行時環境通過線程的指令指針來記住這個 continuation。你不必考慮怎麼在 I/O 操作後恢複局部變量和這個 continuation。語言本身的特性幫你解決。

但是用一個基於回調的異步框架時,這些語言特性不能提供一點幫助。當等待 I/O 操作時,一個函數必須明確的保存它的狀態,因為它會在 I/O 操作完成之前返回並清除棧幀。在我們基於回調的例子中,作為局部變量的替代,我們把 sock 和 response 作為 Fetcher 實例 self 的屬性來存儲。而作為指令指針的替代,它通過注冊 connected 和 read_response 回調來保存它的 continuation。隨著應用功能的增長,我們需要手動保存的回調的複雜性也會增加。如此繁複的記賬式工作會讓編碼者感到頭痛。

更糟糕的是,當我們的回調函數拋出異常會發生什麼?假設我們沒有寫好 parse_links 方法,它在解析 HTML 時拋出異常:


  1. Traceback (most recent call last):
  2. File "loop-with-callbacks.py", line 111, in <module>
  3. loop()
  4. File "loop-with-callbacks.py", line 106, in loop
  5. callback(event_key, event_mask)
  6. File "loop-with-callbacks.py", line 51, in read_response
  7. links = self.parse_links()
  8. File "loop-with-callbacks.py", line 67, in parse_links
  9. raise Exception('parse error')
  10. Exception: parse error

這個堆棧回溯隻能顯示出事件循環調用了一個回調。我們不知道是什麼導致了這個錯誤。這條鏈的兩邊都被破壞:不知道從哪來也不知到哪去。這種丟失上下文的現象被稱為“堆棧撕裂stack ripping”,經常會導致無法分析原因。它還會阻止我們為回調鏈設置異常處理,即那種用“try / except”塊封裝函數調用及其調用樹。(對於這個問題的更複雜的解決方案,參見 https://www.tornadoweb.org/en/stable/stack_context.html )

所以,除了關於多線程和異步哪個更高效的長期爭議之外,還有一個關於這兩者之間的爭論:誰更容易跪了。如果在同步上出現失誤,線程更容易出現數據競爭的問題,而回調因為"堆棧撕裂stack ripping"問題而非常難於調試。

原文發布時間為:2017-03-04

本文來自雲棲社區合作夥伴“Linux中國”

最後更新:2017-05-25 17:01:46

  上一篇:go  如何用 R 語言的 Shiny 庫編寫 web 程序
  下一篇:go  一個使用 asyncio 協程的網絡爬蟲(二)