閱讀639 返回首頁    go 阿裏雲 go 技術社區[雲棲]


強者聯盟——Python語言結合Spark框架

引言:Spark由AMPLab實驗室開發,其本質是基於內存的快速迭代框架,“迭代”是機器學習最大的特點,因此非常適合做機器學習。得益於在數據科學中強大的表現,Python語言的粉絲遍布天下,如今又遇上強大的分布式內存計算框架Spark,兩個領域的強者走到一起,自然能碰出更加強大的火花(Spark可以翻譯為火花),因此本文主要講述了PySpark。
本文選自《全棧數據之門》。

全棧框架

  Spark由AMPLab實驗室開發,其本質是基於內存的快速迭代框架,“迭代”是機器學習最大的特點,因此非常適合做機器學習。
  框架由Scala語言開發,原生提供4種API,Scala、Java、Python以及最近版本開始支持的R。Python不是Spark的“親兒子”,在支持上要略差一些,但基本上常用的接口都支持。得益於在數據科學中強大的表現,Python語言的粉絲遍布天下,如今又遇上強大的分布式內存計算框架Spark,兩個領域的強者走到一起,自然能碰出更加強大的火花(Spark可以翻譯為火花),因此PySpark是本節的主角。
  在Hadoop發行版中,CDH5和HDP2都已經集成了Spark,隻是集成的版本比官方的版本要略低一些。當前最新的HDP2.4已經集成了1.6.1(官方最新為2.0),可以看出,Hortonworks的更新速度非常快,緊跟上遊的步伐。
  除Hadoop的Map-Reduce計算框架之外,Spark能異軍突起,而且慢慢地建立自己的全棧生態,那還真得了解下Spark到底提供了哪些全棧的技術。Spark目前主要提供了以下6大功能。

  1. Spark Core: RDD及其算子。
  2. Spark-SQL: DataFrame與SQL。
  3. Spark ML(MLlib): 機器學習框架。
  4. Spark Streaming: 實時計算框架。
  5. Spark GraphX: 圖計算框架。
  6. PySpark(SparkR): Spark之上的Python與R框架。

從RDD的離線計算到Streaming的實時計算;從DataFrame及SQL的支持,到MLlib機器學習框架;從GraphX的圖計算到對統計學家最愛的R的支持,可以看出Spark在構建自己的全棧數據生態。從當前學術界與工業界的反饋來看,Spark也已經做到了。

環境搭建

  是騾子是馬,拉出來遛一遛就知道了。要嚐試使用Spark是非常簡單的事情,一台機器就可以做測試和開發了。
  訪問網站https://spark.apache.org/downloads.html,下載預編譯好的版本,解壓即可以使用。選擇最新的穩定版本,注意選擇“Pre-built”開頭的版本,比如當前最新版本是1.6.1,通常下載spark-1.6.1-bin-hadoop2.6.tgz文件,文件名中帶“-bin-”即是預編譯好的版本,不需要另外安裝Scala環境,也不需要編譯,直接解壓到某個目錄即可。
  假設解壓到目錄/opt/spark,那麼在$HOME目錄的.bashrc文件中添加一個PATH:

【圖1】
  記得source一下.bashrc文件,讓環境變量生效:
【圖2】
  接著執行命令pyspark或者spark-shell,如果看到了Spark那帥帥的文本Logo和相應的命令行提示符>>>,則說明成功進入交互式界麵,即配置成功。
  pyspark與spark-shell都能支持交互式測試,此時便可以進行測試了。相比於Hadoop來說,基本上是零配置即可以開始測試。
  spark-shell測試:
【圖3】
  pyspark測試:
【圖4】

分布式部署

  上麵的環境測試成功,證明Spark的開發與測試環境已經配置好了。但是說好的分布式呢?我把別人的庫都拖下來了,就是想嚐試Spark的分布式環境,你就給我看這個啊?
  上麵說的是單機的環境部署,可用於開發與測試,隻是Spark支持的部署方式的其中一種。這種是local方式,好處是用一台筆記本電腦就可以運行程序並在上麵進行開發。雖然是單機,但有一個非常有用的特性,那就是可以實現多進程,比如8核的機器,隻需要運行代碼的時候指定--master local[*],就可以用8個進程的方式運行程序。*代表使用全部CPU核心,也可以使用如local[4],意為隻使用4個核心。
  單機的local模式寫的代碼,隻需要做少量的修改即可運行在分布式環境中。Spark的分布式部署支持好幾種方式,如下所示。

  Standalone:本身自帶的集群(方便測試和Spark本身框架的推廣)。
  Mesos:一個新的資源管理框架。
  YARN:Hadoop上新生的資源與計算管理框架,可以理解為Hadoop的操作係統,
  可以支持各種不同的計算框架。
  EC2:亞馬遜的機器環境的部署。
  從難易程度上來說,Standalone分布式最簡單,直接把解壓好的包複製到各台機器上去,配置好master文件和slave文件,指示哪台機器做master,哪些機器做salve。然後在master機器上,通過自帶的腳本啟動集群即可。
  從使用率上來說,應該是YARN被使用得最多,因為通常是直接使用發行版本中的Spark集成套件,CDH和HDP中都已經把Spark和YARN集成了,不用特別關注。
  分布式的優勢在於多CPU與更大的內存,從CPU的角度再來看Spark的三種方式。

  • 本機單CPU:“local”,數據文件在本機。
  • 本機多CPU:“local[4]”,數據文件在本機。
  • Standalone集群多CPU:“spark://master-ip:7077”,需要每台機器都能訪問數據文件。      YARN集群多CPU:使用“yarn-client”提交,需要每台機器都能訪問到數據文件。   交互式環境的部署也與上麵的部署有關係,直接使用spark-shell或者pyspark是local的方式啟動,如果需要啟動單機多核或者集群模式,需要指定--master參數,如下所示。

【圖5】
  如果使用pyspark,並且習慣了IPython的交互式風格,還可以加上環境變量來啟動IPython的交互式,或者使用IPython提供的Notebook:

【圖6】
  IPython風格如下所示:

【圖7】

示例分析

  環境部署是新手最頭痛的問題,前麵環境已經部署好了,接下來才是正題。因為Scala較Python複雜得多,因此先學習使用PySpark來寫程序。
  Spark有兩個最基礎的概念,sc與RDD。sc是SparkContext的縮寫,顧名思義,就是Spark上下文語境,sc連接到集群並做相應的參數配置,後麵所有的操作都在這個上下文語境中進行,是一切Spark的基礎。在啟動交互式界麵的時候,注意有一句提示:

  

SparkContext available as sc, HiveContext available as sqlContext.

  
  意思是,sc這個變量代表了SparkContext上下文,可以直接使用,在啟動交互式的時候,已經初始化好了。
如果是非交互式環境,需要在自己的代碼中進行初始化:

【圖8】
  RDD是Resilient Distributed Datasets(彈性分布式數據集)的縮寫,是Spark中最主要的數據處理對象。生成RDD的方式有很多種,其中最主要的一種是通過讀取文件來生成:

【圖9】
  讀取joy.txt文件後,就是一個RDD,此時的RDD的內容就是一個字符串,包含了文件的全部內容。
  還記得前麵使用Python來編寫的WordCount代碼嗎?通過Hadoop的Streaming接口提到Map-Reduce計算框架上執行,那段代碼可不太好理解,現在簡單的版本來了。
  WordCount例子的代碼如下所示:
【圖10】
  在上麵的代碼中,我個人喜歡用括號的閉合來進行分行,而不是在行尾加上續行符。
  PySpark中大量使用了匿名函數lambda,因為通常都是非常簡單的處理。核心代碼解讀如下。

  1. flatMap:對lines數據中的每行先選擇map(映射)操作,即以空格分割成一係列單詞形成一個列表。然後執行flat(展開)操作,將多行的列表展開,形成一個大列表。此時的數據結構為:['one','two','three',...]。
  2. map:對列表中的每個元素生成一個key-value對,其中value為1。此時的數據結構為:[('one', 1), ('two',1), ('three',1),...],其中的'one'、'two'、'three'這樣的key,可能會出現重複。
  3. reduceByKey:將上麵列表中的元素按key相同的值進行累加,其數據結構為:[('one', 3), ('two', 8), ('three', 1), ...],其中'one', 'two','three'這樣的key不會出現重複。

最後使用了wc.collect()函數,它告訴Spark需要取出所有wc中的數據,將取出的結果當成一個包含元組的列表來解析。
相比於用Python手動實現的版本,Spark實現的方式不僅簡單,而且很優雅。

兩類算子

  Spark的基礎上下文語境為sc,基礎的數據集為RDD,剩下的就是對RDD所做的操作了。
  對RDD所做的操作有transform與action,也稱為RDD的兩個基本算子。
  transform是轉換、變形的意思,即將RDD通過某種形式進行轉換,得到另外一個RDD,比如對列表中的數據使用map轉換,變成另外一個列表。
  當然,Spark能在Hadoop的Map-Reduce模型中脫穎而出的一個重要因素就是其強大的算子。Spark並沒有強製將其限定為Map和Reduce模型,而是提供了更加強大的變換能力,使得其代碼簡潔而優雅。
  下麵列出了一些常用的transform。

  • map(): 映射,類似於Python的map函數。
  • filter(): 過濾,類似於Python的filter函數。
  • reduceByKey(): 按key進行合並。
  • groupByKey(): 按key進行聚合。

RDD一個非常重要的特性是惰性(Lazy)原則。在一個RDD上執行一個transform後,並不立即運行,而是遇到action的時候,才去一層層構建運行的DAG圖,DAG圖也是Spark之所以快的原因。

  • first(): 返回RDD裏麵的第一個值。
  • take(n): 從RDD裏麵取出前n個值。
  • collect(): 返回全部的RDD元素。
  • sum(): 求和。
  • count(): 求個數。

回到前麵的WordCount例子,程序隻有在遇到wc.collect()這個需要取全部數據的action時才執行前麵RDD的各種transform,通過構建執行依賴的DAG圖,也保證了運行效率。

map與reduce

  初始的數據為一個列表,列表裏麵的每一個元素為一個元組,元組包含三個元素,分別代表id、name、age字段。RDD正是對這樣的基礎且又複雜的數據結構進行處理,因此可以使用pprint來打印結果,方便更好地理解數據結構,其代碼如下:

【圖11】
  parallelize這個算子將一個Python的數據結構序列化成一個RDD,其接受一個列表參數,還支持在序列化的時候將數據分成幾個分區(partition)。分區是Spark運行時的最小粒度結構,多個分區會在集群中進行分布式並行計算。
  使用Python的type方法打印數據類型,可知base為一個RDD。在此RDD之上,使用了一個map算子,將age增加3歲,其他值保持不變。map是一個高階函數,其接受一個函數作為參數,將函數應用於每一個元素之上,返回應用函數用後的新元素。此處使用了匿名函數lambda,其本身接受一個參數v,將age字段v[2]增加3,其他字段原樣返回。從結果來看,返回一個PipelineRDD,其繼承自RDD,可以簡單理解成是一個新的RDD結構。
  要打印RDD的結構,必須用一個action算子來觸發一個作業,此處使用了collect來獲取其全部的數據。
  接下來的操作,先使用map取出數據中的age字段v[2],接著使用一個reduce算子來計算所有的年齡之和。reduce的參數依然為一個函數,此函數必須接受兩個參數,分別去迭代RDD中的元素,從而聚合出結果。效果與Python中的reduce相同,最後隻返回一個元素,此處使用x+y計算其age之和,因此返回為一個數值,執行結果如下圖所示。
圖片描述

AMPLab的野心

  AMPLab除了最著名的Spark外,他們還希望基於內存構建一套完整的數據分析生態係統,可以參考https://amplab.cs.berkeley.edu/software/上的介紹。
  他們的目的就是BDAS(Berkeley Data Analytics Stack),基於內存的全棧大數據分析。前麵介紹過的Mesos是集群資源管理器。還有Tachyon,是基於內存的分布式文件係統,類似於Hadoop的HDFS文件係統,而Spark Streaming則類似於Storm實時計算。
  強大的全棧式Spark,撐起了大數據的半壁江山。

  本文選自《全棧數據之門》,點此鏈接可在博文視點官網查看此書。
                    圖片描述
  想及時獲得更多精彩文章,可在微信中搜索“博文視點”或者掃描下方二維碼並關注。
                       圖片描述

最後更新:2017-04-07 21:23:50

  上一篇:go Cloud computing gets foggy
  下一篇:go 四方聯合啟動醫保移動支付試點 激活移動醫療產業鏈