閱讀258 返回首頁    go 技術社區[雲棲]


《Spark 官方文檔》Spark快速入門

快速入門

本教程是對Spark的一個快速簡介。首先,我們通過Spark的交互式shell介紹一下API(主要是Python或Scala),然後展示一下如何用Java、Scala、Python寫一個Spark應用。更完整參考看這裏:programming guide

首先,請到Spark website下載一個Spark發布版本,以便後續方便學習。我們暫時還不會用到HDFS,所以你可以使用任何版本的Hadoop。

 

使用Spark shell交互式分析

基礎

利用Spark shell 很容易學習Spark API,同時也Spark shell也是強大的交互式數據分析工具。Spark shell既支持Scala(Scala版本的shell在Java虛擬機中運行,所以在這個shell中可以引用現有的Java庫),也支持Python。在Spark目錄下運行下麵的命令可以啟動一個Spark shell:

./bin/spark-shell

Spark最主要的抽象概念是個分布式集合,也叫作彈性分布式數據集(Resilient Distributed Dataset – RDD)。RDD可以由Hadoop InputFormats讀取HDFS文件創建得來,或者從其他RDD轉換得到。下麵我們就先利用Spark源代碼目錄下的README文件來新建一個RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDD有兩種算子,action算子(actions)返回結果,transformation算子(transformations)返回一個新RDD。我們先來看一下action算子:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

再來看下如何使用transformation算子。我們利用filter這個transformation算子返回一個隻包含原始文件子集的新RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

把這兩個例子串起來,我們可以這樣寫:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多RDD算子

RDD action 和 transformation 算子可以做更加複雜的計算。下麵的代碼中,我們將找出文件中包含單詞數最多的行有多少個單詞:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先,用一個map算子將每一行映射為一個整數,返回一個新RDD。然後,用reduce算子找出這個RDD中最大的單詞數。map和reduce算組的參數都是scala 函數體(閉包),且函數體內可以使用任意的語言特性,或引用scala/java庫。例如,我們可以調用其他函數。為了好理解,下麵我們用Math.max作為例子:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoop上的MapReduce是大家耳熟能詳的一種通用數據流模式。而Spark能夠輕鬆地實現MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

這個例子裏,我使用了flatMapmap, and reduceByKey 這幾個transformation算子,把每個單詞及其在文件中出現的次數轉成一個包含(String,int)鍵值對的RDD,計算出每個單詞在文件中出現的次數

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark同樣支持把數據集拉到集群範圍的內存緩存中。這對於需要重複訪問的數據非常有用,比如:查詢一些小而”熱“(頻繁訪問)的數據集 或者 運行一些迭代算法(如 PageRank)。作為一個簡單的示例,我們把 linesWithSpark 這個數據集緩存一下:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

用Spark來緩存一個100行左右的文件,看起來確實有點傻。但有趣的是,同樣的代碼可以用於緩存非常大的數據集,即使這些數據集可能分布在數十或數百個節點,也是一樣。你可以用 bin/spark-shell 連到一個集群上來驗證一下,更詳細的請參考:programming guide.

獨立的應用程序

假設我們想寫一個獨立的Spark應用程序。我們將快速的過一下一個簡單的應用程序,分別用Scala(sbt編譯),Java(maven編譯)和Python。

首先用Scala新建一個簡單的Spark應用 – 簡單到連名字都叫SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意,應用程序需要定義一個main方法,而不是繼承scala.App。scala.App的子類可能不能正常工作。

這個程序,統計了Spark README文件中分別包含‘a’和’b’的行數。注意,你需要把YOUR_SPARK_HOME換成你的Spark安裝目錄。與之前用spark-shell不同,這個程序有一個單獨的SparkContext對象,我們初始化了這個SparkContext對象並將其作為程序的一部分。

我們把一個 SparkConf 對象傳給SparkContext的構造函數,SparkConf對象包含了我們這個應用程序的基本信息和配置。

我們的程序需要依賴Spark API,所以我們需要包含一個sbt配置文件,simple.sbt,在這個文件裏,我們可以配置Spark依賴項。這個文件同時也添加了Spark本身的依賴庫:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

為了讓sbt能正常工作,我們需要一個典型的目錄結構來放SimpleApp.scala和simple.sbt程序。一旦建立好目錄,我們就可以創建一個jar包,然後用spark-submit腳本運行我們的代碼。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一步

恭喜你!你的首個Spark應用已經跑起來了!

  • 進一步的API參考,請看這裏:Spark programming guide,或者在其他頁麵上點擊 “Programming Guides”菜單
  • 如果想了解集群上運行應用程序,請前往:deployment overview
  • 最後,Spark examples子目錄下包含了多個示例,你可以這樣來運行這些例子:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 16:38:13

  上一篇:go  簡化SLF4J和通用日誌工具的區別
  下一篇:go  《Maven官方指南》翻譯邀請