258
技術社區[雲棲]
《Spark 官方文檔》Spark快速入門
快速入門
本教程是對Spark的一個快速簡介。首先,我們通過Spark的交互式shell介紹一下API(主要是Python或Scala),然後展示一下如何用Java、Scala、Python寫一個Spark應用。更完整參考看這裏:programming guide
首先,請到Spark website下載一個Spark發布版本,以便後續方便學習。我們暫時還不會用到HDFS,所以你可以使用任何版本的Hadoop。
使用Spark shell交互式分析
基礎
利用Spark shell 很容易學習Spark API,同時也Spark shell也是強大的交互式數據分析工具。Spark shell既支持Scala(Scala版本的shell在Java虛擬機中運行,所以在這個shell中可以引用現有的Java庫),也支持Python。在Spark目錄下運行下麵的命令可以啟動一個Spark shell:
./bin/spark-shell
Spark最主要的抽象概念是個分布式集合,也叫作彈性分布式數據集(Resilient Distributed Dataset – RDD)。RDD可以由Hadoop InputFormats讀取HDFS文件創建得來,或者從其他RDD轉換得到。下麵我們就先利用Spark源代碼目錄下的README文件來新建一個RDD:
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDD有兩種算子,action算子(actions)返回結果,transformation算子(transformations)返回一個新RDD。我們先來看一下action算子:
scala> textFile.count() // Number of items in this RDD
res0: Long = 126
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
再來看下如何使用transformation算子。我們利用filter這個transformation算子返回一個隻包含原始文件子集的新RDD。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
把這兩個例子串起來,我們可以這樣寫:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多RDD算子
RDD action 和 transformation 算子可以做更加複雜的計算。下麵的代碼中,我們將找出文件中包含單詞數最多的行有多少個單詞:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先,用一個map算子將每一行映射為一個整數,返回一個新RDD。然後,用reduce算子找出這個RDD中最大的單詞數。map和reduce算組的參數都是scala 函數體(閉包),且函數體內可以使用任意的語言特性,或引用scala/java庫。例如,我們可以調用其他函數。為了好理解,下麵我們用Math.max作為例子:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
Hadoop上的MapReduce是大家耳熟能詳的一種通用數據流模式。而Spark能夠輕鬆地實現MapReduce流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
這個例子裏,我使用了flatMap
, map
, and reduceByKey
這幾個transformation算子,把每個單詞及其在文件中出現的次數轉成一個包含(String,int)鍵值對的RDD,計算出每個單詞在文件中出現的次數
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
緩存
Spark同樣支持把數據集拉到集群範圍的內存緩存中。這對於需要重複訪問的數據非常有用,比如:查詢一些小而”熱“(頻繁訪問)的數據集 或者 運行一些迭代算法(如 PageRank)。作為一個簡單的示例,我們把 linesWithSpark 這個數據集緩存一下:
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 19
scala> linesWithSpark.count()
res9: Long = 19
用Spark來緩存一個100行左右的文件,看起來確實有點傻。但有趣的是,同樣的代碼可以用於緩存非常大的數據集,即使這些數據集可能分布在數十或數百個節點,也是一樣。你可以用 bin/spark-shell 連到一個集群上來驗證一下,更詳細的請參考:programming guide.
獨立的應用程序
假設我們想寫一個獨立的Spark應用程序。我們將快速的過一下一個簡單的應用程序,分別用Scala(sbt編譯),Java(maven編譯)和Python。
首先用Scala新建一個簡單的Spark應用 – 簡單到連名字都叫SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
注意,應用程序需要定義一個main方法,而不是繼承scala.App。scala.App的子類可能不能正常工作。
這個程序,統計了Spark README文件中分別包含‘a’和’b’的行數。注意,你需要把YOUR_SPARK_HOME換成你的Spark安裝目錄。與之前用spark-shell不同,這個程序有一個單獨的SparkContext對象,我們初始化了這個SparkContext對象並將其作為程序的一部分。
我們把一個 SparkConf 對象傳給SparkContext的構造函數,SparkConf對象包含了我們這個應用程序的基本信息和配置。
我們的程序需要依賴Spark API,所以我們需要包含一個sbt配置文件,simple.sbt,在這個文件裏,我們可以配置Spark依賴項。這個文件同時也添加了Spark本身的依賴庫:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
為了讓sbt能正常工作,我們需要一個典型的目錄結構來放SimpleApp.scala和simple.sbt程序。一旦建立好目錄,我們就可以創建一個jar包,然後用spark-submit腳本運行我們的代碼。
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23
下一步
恭喜你!你的首個Spark應用已經跑起來了!
- 進一步的API參考,請看這裏:Spark programming guide,或者在其他頁麵上點擊 “Programming Guides”菜單
- 如果想了解集群上運行應用程序,請前往:deployment overview
- 最後,Spark examples子目錄下包含了多個示例,你可以這樣來運行這些例子:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
最後更新:2017-05-19 16:38:13