閱讀786 返回首頁    go 阿裏雲 go 技術社區[雲棲]


大數據上雲那些事兒:上雲工具之爬蟲(Scrapy)數據

一、 Scrapy簡單介紹

Scrapy是一個用 Python 寫的 Crawler Framework ,簡單輕巧,並且非常方便。
Scrapy 使用 Twisted 這個異步網絡庫來處理網絡通訊,架構清晰,並且包含了各種中間件接口,可以靈活的完成各種需求。整體架構如下圖所示:
scrapy

綠線是數據流向,首先從初始 URL 開始,Scheduler 會將其交給 Downloader 進行下載,下載之後會交給 Spider 進行分析,Spider 分析出來的結果有兩種:一種是需要進一步抓取的鏈接,例如之前分析的“下一頁”的鏈接,這些東西會被傳回 Scheduler ;另一種是需要保存的數據,它們則被送到 Item Pipeline 那裏,那是對數據進行後期處理(詳細分析、過濾、存儲等)的地方。另外,在數據流動的通道裏還可以安裝各種中間件,進行必要的處理。

二、Scrapy環境安裝

係統環境要求:

Linux

軟件環境要求:

  1. 已安裝:Python 2.7 ( 下載地址:https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tgz)
  2. 已安裝:pip (可參考:https://pip.pypa.io/en/stable/installing/ 進行安裝

Scrapy安裝

執行安裝命令:

pip install Scrapy

Scrapy校驗

執行命令:

scrapy

執行結果:
scrapy2

ODPS Python安裝

執行安裝命令:

pip install pyodps

ODPS Python校驗

執行命令:

python -c "from odps import ODPS"

執行結果:無報錯,即為安裝成功

三、 創建Scrapy項目

在你想要創建Scrapy項目的目錄下,執行:

scrapy startproject hr_scrapy_demo

看一下Scrapy創建項目後的目錄結構:

hr_scrapy_demo /
    scrapy.cfg              # 全局配置文件
    hr_scrapy_demo /                # 項目下的Python模塊,你可以從這裏引用該Python模塊
        __init__.py
        items.py            # 自定義的Items
        pipelines.py        # 自定義的Pipelines
        settings.py         # 自定義的項目級配置信息
        spiders/            # 自定義的spiders
            __init__.py

四、 創建OdpsPipelines

在hr_scrapy_demo/pipelines.py中,我們可以自定義我們的數據處理pipelines,以下是我之前編寫好的一個OdpsPipeline,該Pipeline可以用於將我們采集到的item保存到ODPS中,但也有幾點需要說明:
1. ODPS中的表必須已經提前創建好。
2. Spider中采集到的item必須包含該表的所有字段,且名字必須一致,否則會拋出異常。
3. 支持分區表和無分區表。

將下麵代碼替換掉你項目中的pipelines.py


# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html

from odps import ODPS
import logging

logger = logging.getLogger('OdpsPipeline')
class OdpsPipeline(object):
    collection_name = 'odps'
    records = []

    def __init__(self, odps_endpoint, odps_project,accessid,accesskey,odps_table,odps_partition=None,buffer=1000):
        self.odps_endpoint = odps_endpoint
        self.odps_project = odps_project
        self.accessid = accessid
        self.accesskey = accesskey
        self.odps_table = odps_table
        self.odps_partition = odps_partition
        self.buffer = buffer

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            odps_endpoint=crawler.settings.get('ODPS_ENDPOINT'),
            odps_project=crawler.settings.get('ODPS_PROJECT'),
            accessid=crawler.settings.get('ODPS_ACCESSID'),
            accesskey=crawler.settings.get('ODPS_ACCESSKEY'),
            odps_table=crawler.settings.get('ODPS_TABLE'),
            odps_partition=crawler.settings.get('ODPS_PARTITION'),
            buffer=crawler.settings.get('WRITE_BUFFER')
        )

    def open_spider(self, spider):
        self.odps = ODPS(self.accessid,self.accesskey,project=self.odps_project,endpoint=self.odps_endpoint)
        self.table = self.odps.get_table(self.odps_table)
        if(self.odps_partition is not None and self.odps_partition != ""):
            self.table.create_partition(self.odps_partition,if_not_exists=True)

    def close_spider(self, spider):
        self.write_to_odps()

    '''
        將數據寫入odps
    '''
    def write_to_odps(self):
        if(len(self.records) is None or len(self.records) == 0):
            return
        if(self.odps_partition is None or self.odps_partition == ""):
            with self.table.open_writer() as writer:
                writer.write(self.records)
                logger.info("write to odps {0} records. ".format(len(self.records)))
                self.records = []
        else:
            with self.table.open_writer(partition=self.odps_partition) as writer:
                writer.write(self.records)
                logger.info("write to odps {0} records. ".format(len(self.records)))
                self.records = []

    def isPartition(self,name):
        for pt in self.table.schema.partitions:
            if(pt.name == name):
                return True
        return False

    def process_item(self, item, spider):
        cols = []
        for col in self.table.schema.columns:
            if(self.isPartition(col.name)):
                continue
            c = None
            for key in item.keys():
                if(col.name == key):
                    c = item[key]
                    break
            if(c is None):
                raise Exception("{0} column not found in item.".format(col.name))
            cols.append(c)
        self.records.append(self.table.new_record(cols))
        #logger.info("records={0} : buffer={1}".format(len(self.records),self.buffer))
        if( len(self.records) >= int(self.buffer)):
            self.write_to_odps()
        return item

注冊Pipeline 到hr_scrapy_demo/setting.py,修改ITEM_PIPELINES的值為:

# Configure item pipelines
# See https://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    'hr_scrapy_demo.pipelines.OdpsPipeline': 300,
}
#300代表Pipeline的優先級,可以同時存在多個pipeline,依據該數值從小到大依次執行pipeline

五、 配置ODPS 基本信息

hr_scrapy_demo/setting.py中,添加參數如下:

ODPS_PROJECT = 'your odps project name'
ODPS_ACCESSID = 'accessid'
ODPS_ACCESSKEY = 'accesskey'
ODPS_ENDPOINT = 'https://service.odps.aliyun.com/api'
#注:如果爬蟲運行在ECS上,可將ODPS_ENDPOINT修改為內網地址:
#ODPS_ENDPOINT = 'https:// odps-ext.aliyun-inc.com/api'

六、創建自己的Spiders

Spider主要用於采集網站數據,並解析網站數據轉換為相應的items,再交由Pipelines進行處理。針對每個需要采集的網站,我們都需要單獨創建對應的Spider。
以下是一個Spider示例,以采集南方新聞網的要聞信息為依據。


# -*- coding:utf-8 -*-  
import scrapy
import logging

logger = logging.getLogger('NanfangSpider')

class NanfangSpider(scrapy.Spider):
    name = "nanfang"

    '''
        設置你要采集的其實網址,可以是多個.
        此處以南方新聞網-要聞-首頁為例.
    '''
    start_urls = [
            'https://www.southcn.com/pc2016/yw/node_346416.htm'
            ]

    '''
        [ODPS配置信息]
        ODPS_TABLE:ODPS表名
        ODPS_PARTITION:ODPS表的分區值(可選)
        WRITE_BUFFER:寫入緩存(默認1000條)
    '''
    custom_settings = {
        'ODPS_TABLE':'hr_scrapy_nanfang_news',
        #'ODPS_PARTITION':'pt=20170209',
        'WRITE_BUFFER':'1000'
    }

    '''
        ODPS Demo DDL:
        drop table if exists hr_scrapy_nanfang_news;
        create table hr_scrapy_nanfang_news
        (
            title string,
            source string,
            times string,
            url string,
            editor string,
            content string
        );
    '''

    '''
        對start_urls的url的解析方法,返回結果為item.
        關於具體解析API可參考:https://doc.scrapy.org/en/latest/intro/tutorial.html
    '''
    def parse(self, response):

        #查找網頁中DIV元素,且其class=j-link,並對其進行遍曆
        for quote in response.css("div.j-link"):
            #查找該DIV中的所有<a>超鏈接,並獲取其href
            href = quote.css("a::attr('href')").extract_first()

            #進入該href鏈接,此處跳轉到方法:parse_details,對其返回HTML進行再次處理。
            yield scrapy.Request(response.urljoin(href),callback=self.parse_details)

        #查找下一頁的連接,此處用xpath方式獲取,因css語法簡單,無法獲取
        nexthref = response.xpath(u'//div[@]//center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()

        #如找到下一頁,則跳轉到下一頁,並繼續由parse對返回HTML進行處理。
        if(nexthref is not None):
            yield scrapy.Request(response.urljoin(nexthref),callback=self.parse)

    '''
        新聞詳情頁處理方法
    '''
    def parse_details(self, response):
        #找到正文
        main_div = response.css("div.main")

        #因新聞詳情也可能有分頁,獲取下一頁的鏈接
        next_href = main_div.xpath(u'//div[@]/center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()

        #獲取正文內容,僅取DIV內所有<p>元素下的文本。
        content = main_div.xpath('//div[@]//p//text()').extract()
        content = "\n".join(content)

        if(next_href is None):
            #最後一頁,則獲取所有內容,返回item
            title = main_div.css('div.m-article h2::text').extract_first()
            source = main_div.css('div.meta span[]::text').extract_first()
            times = main_div.css('div.meta span[]::text').extract_first()
            url = response.url
            editor = main_div.css('div.m-editor::text').extract_first()
            item = {}
            if('item' in response.meta):
                item = response.meta['item']
            item['title'] = title
            item['source'] = source
            item['times'] = times
            item['url'] = url
            item['editor'] = editor
            if('content' in item):
                item['content'] += '\n'+content
            else:
                item['content'] = content
            yield item

        else:
            #非最後一頁 ,則取出當前頁content,並拚接,然後跳轉到下一頁
            request = scrapy.Request(response.urljoin(next_href),
                                 callback=self.parse_details)
            item = {}
            if('item' in response.meta and 'content' in response.meta['item']):
                item = response.meta['item']
                item['content'] += '\n'+content
            else:
                item['content'] = content
            request.meta['item'] = item
            yield request

七、 運行Scrapy

切換到你的工程目錄下,執行以下命令:
Scrapy crawl nanfang –loglevel INFO
執行結果如下圖所示:
image

八、 驗證爬取結果

待數據采集完成之後,登陸DATA IDE查看采集內容:
image

本文演示僅為一個簡單的案例,實際生產還需考慮多線程處理,網站校驗,分布式爬取等。

最後更新:2017-06-28 15:31:51

  上一篇:go  Recorder of Join
  下一篇:go  企業建設H5響應式網站的5大好處