閱讀657 返回首頁    go iPhone_iPad_Mac_apple


《Spark1.6.1官方文檔》Spark1.6.1操作指南

本項目是 Apache Spark1.6.1方文檔的中文翻譯版,之前翻譯過spark1.5,由於網站有組織翻譯Spark1.6.1所以我又重新翻譯了一下,我翻譯的這一章和spark1.5一樣。本次翻譯主要針對對Spark感興趣和致力於從事大數據方法開發的人員提供有價值的中文資料,希望能夠對大家的工作和學習有所幫助。

這個翻譯本應該在二月份完成,由於一直出差加上加班導致短短一篇文章遲遲沒有完成,在此想大家表示深深的歉意。同時也希望有能力的童鞋能夠一起努力推動大數據在中國的發展,讓天下沒有難學的技術謝謝大家。

Quick Start

本篇文章介紹了Spark快速入門. 我們將第一次通過Spark的Shell介紹API(通過 Python 或Scala), 然後展示怎麼通過Java,Scala和Python寫應用. 看 編程指南了解更多信息.

跟著下麵指導, 第一步下載安裝包從 Spark官方網址. 我們不會使用HDFS,你可以下載一個匹配任何Hadoop版本的發行版本.

Spark Shell交互分析

基礎

Spark Shell提供了一個簡單學習API的方式, 同時它也是一個有用的交互分析工具. 它可以使用Scala(它是一種運行在Java虛擬機上並且可以使用Java依賴包)或者Python. 切換到Spark目錄並啟動按照以下方式啟動它:

./bin/spark-shell

Spark的主要抽象是一個分布式數據集被稱為彈性分布式數據集 (RDD). RDDs可以通過Hadoop上麵的輸入格式( 例如 HDFS 文件)或者通過其他的RDD轉換. 讓我們通過Spark目錄中的README文件創建一個新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs 有actions(我翻譯成操作因子), 它能夠返回值, 和transformations(轉換), 它能夠返回一個新的RDDs. Let’s start with a few actions:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

現在讓我們使用轉換操作.我們將使用 filter 轉換操作返回一個新的RDD包含這個文件的子集.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我們能夠通過鏈式編程把transformations 操作和 actions操作連接一起:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多的RDD操作

RDD操作因子和轉換因子可以用來進行更加複雜的計算.讓我們尋找到需要的數據:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一個map一行整數數值,創建一個新的RDD. reduce被調用來查找最多數據一行的數據個數map的參數和 reduce的參數是Scala函數自變量(閉包),我們能夠使用任何Scala或Java的語言的特性. 例如, 我們很容易的調用在其他地方定義的函數. 我們使用 Math.max()函數使代碼很容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

通用的數據處理流程是MapReduce,Hadoop廣泛使用. Spark很容易的繼承MapReduce計算流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

這裏, 我們組合了 flatMapmap, 和reduceByKey 轉換因子來計算文件中每個單詞的數量通過一個RDD (String, Int) 對. 通過Shell統計單數的數量, 我們能夠使用 collect 因子:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark也支持把一個數據集放到一個集群的緩存裏麵.當數據多次訪問它很有用,例如當你查詢一個常用的數據集或者運行一個 PageRank算法的時候. 舉一個簡單的例子, 讓我們把 linesWithSpark 數據集緩存起來:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

它可能是不明智的用Spark瀏覽和緩存一個100行的數據. 這個部分通常運用在大的數據集上, 當集群有十個甚至成千上百個節點的時候. 你也能夠通過 bin/spark-shell來連接集群, 更多的描述請看 programming guide.

獨立的應用

假如我希望開發一個獨立的應用通過Spark API. 我們可以通過 Scala (with sbt), Java (with Maven), and Python來調用開發Spark API.

現在我們創建一個簡單的Spark應用,實際上, 它的名字是 SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意一個應用應該定義在一個 main()函數裏麵或者通過繼承scala.Appscala.App的子類可能不能正確的工作.

這個程序僅僅是統計README文件中包含‘a’和‘b’的數量.注意你需要替換你的本地的YOUR_SPARK_HOME 環境變量. 不同於前麵的Spark shell例子, 它需要初始化SparkContext, 我們需要初始化 SparkContext作為程序運行的一部分.

我們需要創建一個 SparkConf 對象,它包含了我們應用所包含的信息.

我們的應用依賴Spark API, 所以我們也包含sbt 配置文件, simple.sbt, 它包含了Spark的一些依賴. 這個文件增加一個倉庫,而這些倉庫是Spark運行不可缺少的一部分:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

為了使 sbt能夠正確的工作, 我們需要布局SimpleApp.scala 和simple.sbt按計劃執行這種類型結構.一旦配置正確, 我們能夠創建一個JAR包包含這個應用的代碼, 然後使用 spark-submit 來運行我們的程序.

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一章

配置運行你的第一個Spark程序!

  • 想深入的了解SparkAPI, 可以打開 Spark programming guide, 或者看“Programming Guides” 菜單了解其他組件.
  • 如果想在集群上麵運行引用, 可以點擊 deployment overview了解.
  • 最後, Spark包含幾個簡單的實例 (ScalaJavaPythonR). 你可以按照下麵的實例運行他們:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

 轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 16:02:15

  上一篇:go  《SAFe 4.0參考指南:精益軟件與係統工程的規模化敏捷框架》SAFe團隊層
  下一篇:go  《SLF4J官方文檔》本地化支持