大數據上雲那些事兒:上雲工具之爬蟲(Scrapy)數據
一、 Scrapy簡單介紹
Scrapy是一個用 Python 寫的 Crawler Framework ,簡單輕巧,並且非常方便。
Scrapy 使用 Twisted 這個異步網絡庫來處理網絡通訊,架構清晰,並且包含了各種中間件接口,可以靈活的完成各種需求。整體架構如下圖所示:
綠線是數據流向,首先從初始 URL 開始,Scheduler 會將其交給 Downloader 進行下載,下載之後會交給 Spider 進行分析,Spider 分析出來的結果有兩種:一種是需要進一步抓取的鏈接,例如之前分析的“下一頁”的鏈接,這些東西會被傳回 Scheduler ;另一種是需要保存的數據,它們則被送到 Item Pipeline 那裏,那是對數據進行後期處理(詳細分析、過濾、存儲等)的地方。另外,在數據流動的通道裏還可以安裝各種中間件,進行必要的處理。
二、Scrapy環境安裝
係統環境要求:
Linux
軟件環境要求:
- 已安裝:Python 2.7 ( 下載地址:https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tgz)
- 已安裝:pip (可參考:https://pip.pypa.io/en/stable/installing/ 進行安裝
Scrapy安裝
執行安裝命令:
pip install Scrapy
Scrapy校驗
執行命令:
scrapy
ODPS Python安裝
執行安裝命令:
pip install pyodps
ODPS Python校驗
執行命令:
python -c "from odps import ODPS"
執行結果:無報錯,即為安裝成功
三、 創建Scrapy項目
在你想要創建Scrapy項目的目錄下,執行:
scrapy startproject hr_scrapy_demo
看一下Scrapy創建項目後的目錄結構:
hr_scrapy_demo /
scrapy.cfg # 全局配置文件
hr_scrapy_demo / # 項目下的Python模塊,你可以從這裏引用該Python模塊
__init__.py
items.py # 自定義的Items
pipelines.py # 自定義的Pipelines
settings.py # 自定義的項目級配置信息
spiders/ # 自定義的spiders
__init__.py
四、 創建OdpsPipelines
在hr_scrapy_demo/pipelines.py中,我們可以自定義我們的數據處理pipelines,以下是我之前編寫好的一個OdpsPipeline,該Pipeline可以用於將我們采集到的item保存到ODPS中,但也有幾點需要說明:
1. ODPS中的表必須已經提前創建好。
2. Spider中采集到的item必須包含該表的所有字段,且名字必須一致,否則會拋出異常。
3. 支持分區表和無分區表。
將下麵代碼替換掉你項目中的pipelines.py
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from odps import ODPS
import logging
logger = logging.getLogger('OdpsPipeline')
class OdpsPipeline(object):
collection_name = 'odps'
records = []
def __init__(self, odps_endpoint, odps_project,accessid,accesskey,odps_table,odps_partition=None,buffer=1000):
self.odps_endpoint = odps_endpoint
self.odps_project = odps_project
self.accessid = accessid
self.accesskey = accesskey
self.odps_table = odps_table
self.odps_partition = odps_partition
self.buffer = buffer
@classmethod
def from_crawler(cls, crawler):
return cls(
odps_endpoint=crawler.settings.get('ODPS_ENDPOINT'),
odps_project=crawler.settings.get('ODPS_PROJECT'),
accessid=crawler.settings.get('ODPS_ACCESSID'),
accesskey=crawler.settings.get('ODPS_ACCESSKEY'),
odps_table=crawler.settings.get('ODPS_TABLE'),
odps_partition=crawler.settings.get('ODPS_PARTITION'),
buffer=crawler.settings.get('WRITE_BUFFER')
)
def open_spider(self, spider):
self.odps = ODPS(self.accessid,self.accesskey,project=self.odps_project,endpoint=self.odps_endpoint)
self.table = self.odps.get_table(self.odps_table)
if(self.odps_partition is not None and self.odps_partition != ""):
self.table.create_partition(self.odps_partition,if_not_exists=True)
def close_spider(self, spider):
self.write_to_odps()
'''
將數據寫入odps
'''
def write_to_odps(self):
if(len(self.records) is None or len(self.records) == 0):
return
if(self.odps_partition is None or self.odps_partition == ""):
with self.table.open_writer() as writer:
writer.write(self.records)
logger.info("write to odps {0} records. ".format(len(self.records)))
self.records = []
else:
with self.table.open_writer(partition=self.odps_partition) as writer:
writer.write(self.records)
logger.info("write to odps {0} records. ".format(len(self.records)))
self.records = []
def isPartition(self,name):
for pt in self.table.schema.partitions:
if(pt.name == name):
return True
return False
def process_item(self, item, spider):
cols = []
for col in self.table.schema.columns:
if(self.isPartition(col.name)):
continue
c = None
for key in item.keys():
if(col.name == key):
c = item[key]
break
if(c is None):
raise Exception("{0} column not found in item.".format(col.name))
cols.append(c)
self.records.append(self.table.new_record(cols))
#logger.info("records={0} : buffer={1}".format(len(self.records),self.buffer))
if( len(self.records) >= int(self.buffer)):
self.write_to_odps()
return item
注冊Pipeline 到hr_scrapy_demo/setting.py,修改ITEM_PIPELINES的值為:
# Configure item pipelines
# See https://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'hr_scrapy_demo.pipelines.OdpsPipeline': 300,
}
#300代表Pipeline的優先級,可以同時存在多個pipeline,依據該數值從小到大依次執行pipeline
五、 配置ODPS 基本信息
hr_scrapy_demo/setting.py中,添加參數如下:
ODPS_PROJECT = 'your odps project name'
ODPS_ACCESSID = 'accessid'
ODPS_ACCESSKEY = 'accesskey'
ODPS_ENDPOINT = 'https://service.odps.aliyun.com/api'
#注:如果爬蟲運行在ECS上,可將ODPS_ENDPOINT修改為內網地址:
#ODPS_ENDPOINT = 'https:// odps-ext.aliyun-inc.com/api'
六、創建自己的Spiders
Spider主要用於采集網站數據,並解析網站數據轉換為相應的items,再交由Pipelines進行處理。針對每個需要采集的網站,我們都需要單獨創建對應的Spider。
以下是一個Spider示例,以采集南方新聞網的要聞信息為依據。
# -*- coding:utf-8 -*-
import scrapy
import logging
logger = logging.getLogger('NanfangSpider')
class NanfangSpider(scrapy.Spider):
name = "nanfang"
'''
設置你要采集的其實網址,可以是多個.
此處以南方新聞網-要聞-首頁為例.
'''
start_urls = [
'https://www.southcn.com/pc2016/yw/node_346416.htm'
]
'''
[ODPS配置信息]
ODPS_TABLE:ODPS表名
ODPS_PARTITION:ODPS表的分區值(可選)
WRITE_BUFFER:寫入緩存(默認1000條)
'''
custom_settings = {
'ODPS_TABLE':'hr_scrapy_nanfang_news',
#'ODPS_PARTITION':'pt=20170209',
'WRITE_BUFFER':'1000'
}
'''
ODPS Demo DDL:
drop table if exists hr_scrapy_nanfang_news;
create table hr_scrapy_nanfang_news
(
title string,
source string,
times string,
url string,
editor string,
content string
);
'''
'''
對start_urls的url的解析方法,返回結果為item.
關於具體解析API可參考:https://doc.scrapy.org/en/latest/intro/tutorial.html
'''
def parse(self, response):
#查找網頁中DIV元素,且其class=j-link,並對其進行遍曆
for quote in response.css("div.j-link"):
#查找該DIV中的所有<a>超鏈接,並獲取其href
href = quote.css("a::attr('href')").extract_first()
#進入該href鏈接,此處跳轉到方法:parse_details,對其返回HTML進行再次處理。
yield scrapy.Request(response.urljoin(href),callback=self.parse_details)
#查找下一頁的連接,此處用xpath方式獲取,因css語法簡單,無法獲取
nexthref = response.xpath(u'//div[@]//center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
#如找到下一頁,則跳轉到下一頁,並繼續由parse對返回HTML進行處理。
if(nexthref is not None):
yield scrapy.Request(response.urljoin(nexthref),callback=self.parse)
'''
新聞詳情頁處理方法
'''
def parse_details(self, response):
#找到正文
main_div = response.css("div.main")
#因新聞詳情也可能有分頁,獲取下一頁的鏈接
next_href = main_div.xpath(u'//div[@]/center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()
#獲取正文內容,僅取DIV內所有<p>元素下的文本。
content = main_div.xpath('//div[@]//p//text()').extract()
content = "\n".join(content)
if(next_href is None):
#最後一頁,則獲取所有內容,返回item
title = main_div.css('div.m-article h2::text').extract_first()
source = main_div.css('div.meta span[]::text').extract_first()
times = main_div.css('div.meta span[]::text').extract_first()
url = response.url
editor = main_div.css('div.m-editor::text').extract_first()
item = {}
if('item' in response.meta):
item = response.meta['item']
item['title'] = title
item['source'] = source
item['times'] = times
item['url'] = url
item['editor'] = editor
if('content' in item):
item['content'] += '\n'+content
else:
item['content'] = content
yield item
else:
#非最後一頁 ,則取出當前頁content,並拚接,然後跳轉到下一頁
request = scrapy.Request(response.urljoin(next_href),
callback=self.parse_details)
item = {}
if('item' in response.meta and 'content' in response.meta['item']):
item = response.meta['item']
item['content'] += '\n'+content
else:
item['content'] = content
request.meta['item'] = item
yield request
七、 運行Scrapy
切換到你的工程目錄下,執行以下命令:
Scrapy crawl nanfang –loglevel INFO
執行結果如下圖所示:
八、 驗證爬取結果
本文演示僅為一個簡單的案例,實際生產還需考慮多線程處理,網站校驗,分布式爬取等。
最後更新:2017-06-28 15:31:51