閱讀189 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Hadoop的環境搭建,和編寫一個簡單的hadoop job

hadoop 入門:

0hadoop的簡要介紹
google之所以能夠成功,一個重要的技術就是map-reduce。map-reduce是google為大規模的、分布式數據進行處理的一種編程模式。
而本文介紹的hadoop是apache的開源map-reduce實現。本文不過多的介紹map-reduce,主要精力放在hadoop的配置和編寫一個簡單的haoop程序上
對map-recude感興趣的朋友可以進一步閱讀參考文獻。
1 hadoop服務器的安裝:
hadoop是一個分布式的處理框架,本文先介紹的是一個簡單的偽分布式hadoop(安裝在一個linux機器上)

配置環境是ubuntu
創建一個新文件/etc/sources.list.d/cloudera.list
把下邊的內容複製到新文件:
 
deb https://archive.cloudera.com/debian intrepid-cdh3 contrib
deb-src https://archive.cloudera.com/debian intrepid-cdh3 contrib
 
 然後打開teminal輸入下邊的命令:
$ curl -s https://archive.cloudera.com/debian/archive.key | \
sudo apt-key add - sudo apt-get update

然後,安裝采用偽分布式配置的 Hadoop(所有 Hadoop 守護進程在同一個主機上運行):

$ sudo apt-get install hadoop-0.20-conf-pseudo
 

 確保係統已經安裝了sshd(如果沒有,請先安裝)。
 設置不需要密碼的ssh:
     
$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

啟動hadoop:
首先對namenode進行格式化:
# hadoop-0.20 namenode -format

Hadoop 提供一些簡化啟動的輔助工具。這些工具分為啟動(比如 start-dfs)和停止(比如 stop-dfs)兩類。下麵的簡單腳本說明如何啟動 Hadoop 節點:

# /usr/lib/hadoop-0.20/bin/start-dfs.sh
# /usr/lib/hadoop-0.20/bin/start-mapred.sh
#
 
輸入命令jps可以查看守護進程是否正在運行;

編寫一個hadoop程序:
作為聯係,我們從網上下載一個cvs格式的數據文件:
https://earthquake.usgs.gov/research/data/pager/EXPO_CAT_2007_12.csv
cvs是以逗號進行列分割的數據文件。
使用opencvs可以很方便的處理cvs格式的數據。
opencvs可以從sourceforge上下載。
opencvs可以把一個string以逗號進行分割成一個string數組
隻擴展 Hadoop 的 Mapper 類。然後我可以使用泛型來為傳出鍵和值指定顯式類。類型子句也指定了傳入鍵和值,這對於讀取文件分別是字節數和文本行數。

EarthQuakesPerDateMapper 類擴展了 Hadoop 的 Mapper 對象。它顯式地將其輸出鍵指定為一個 Text 對象,將其值指定為一個 IntWritable,這是一個 Hadoop 特定類,實質上是一個整數。還要注意,class 子句的前兩個類型是 LongWritable 和 Text,分別是字節數和文本行數。

由於類定義中的類型子句,我將傳入 map 方法的參數類型設置為在 context.write 子句內帶有該方法的輸出。如果我想指定其他內容,將會出現一個編譯器問題,或 Hadoop 將輸出一個錯誤消息,描述類型不匹配的消息。

一個mapper的實現:

public class EarthQuakesPerDateMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 @Override
 protected void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

  if (key.get() > 0) {
   try {
    CSVParser parser = new CSVParser();
    String[] lines = parser.parseLine(value.toString());
    lines = new CSVParser().parseLine(lines[0]);
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    Date dt = formatter.parse(lines[0]);
    formatter.applyPattern("dd-MM-yyyy");

    String dtstr = formatter.format(dt);
    context.write(new Text(dtstr), new IntWritable(1));
   } catch (java.text.ParseException e) {
    // TODO Auto-generated catch block
    //e.printStackTrace();
   }
  }
 }
}
reduce 實現如下 所示。與 Hadoop 的 Mapper 一樣,Reducer 被參數化了:前兩個參數是傳入的鍵類型(Text)和值類型(IntWritable),後兩個參數是輸出類型:鍵和值,這在本例中是相同的。


public class EarthQuakesPerDateReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  context.write(key, new IntWritable(count));
 }
}

寫好mapper和reducer之後,就可以定義一個hadoop job了。

public class EarthQuakesPerDayJob {
 public static void main(String[] args) throws Throwable {
  Job job = new Job();
  job.setJarByClass(EarthQuakesPerDayJob.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(EarthQuakesPerDateMapper.class);
  job.setReducerClass(EarthQuakesPerDateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

在linux上執行hadoop:
$> export HADOOP_CLASSPATH=lib/opencsv-2.3.jar
$> hadoop jar hadoop.jar in out
在程序所在目錄定義一個子目錄in,把剛才所下載的cvs文件放到in目錄下。
in就是程序數據的輸入目錄,out是輸出目錄,注意這個out文件夾是程序建立的,不可以手動建立。
運行是會看到:
11/09/05 08:47:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/09/05 08:47:26 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.JobClient: Running job: job_local_0001
11/09/05 08:47:26 INFO input.FileInputFormat: Total input paths to process : 1
11/09/05 08:47:26 INFO mapred.MapTask: io.sort.mb = 100
11/09/05 08:47:27 INFO mapred.MapTask: data buffer = 79691776/99614720
11/09/05 08:47:27 INFO mapred.MapTask: record buffer = 262144/327680
11/09/05 08:47:27 INFO mapred.JobClient:  map 0% reduce 0%
11/09/05 08:47:28 INFO mapred.MapTask: Starting flush of map output
11/09/05 08:47:28 INFO mapred.MapTask: Finished spill 0
11/09/05 08:47:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
11/09/05 08:47:28 INFO mapred.LocalJobRunner:
11/09/05 08:47:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.Merger: Merging 1 sorted segments
11/09/05 08:47:29 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 97887 bytes
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
11/09/05 08:47:29 INFO mapred.LocalJobRunner:
11/09/05 08:47:29 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
11/09/05 08:47:29 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to out1
11/09/05 08:47:29 INFO mapred.LocalJobRunner: reduce > reduce
11/09/05 08:47:29 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
11/09/05 08:47:29 INFO mapred.JobClient:  map 100% reduce 100%
11/09/05 08:47:29 INFO mapred.JobClient: Job complete: job_local_0001
11/09/05 08:47:29 INFO mapred.JobClient: Counters: 12
11/09/05 08:47:29 INFO mapred.JobClient:   FileSystemCounters
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_READ=11961631
11/09/05 08:47:29 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=9370383
11/09/05 08:47:29 INFO mapred.JobClient:   Map-Reduce Framework
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input groups=142
11/09/05 08:47:29 INFO mapred.JobClient:     Combine output records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map input records=5639
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce output records=142
11/09/05 08:47:29 INFO mapred.JobClient:     Spilled Records=11274
11/09/05 08:47:29 INFO mapred.JobClient:     Map output bytes=86611
11/09/05 08:47:29 INFO mapred.JobClient:     Combine input records=0
11/09/05 08:47:29 INFO mapred.JobClient:     Map output records=5637
11/09/05 08:47:29 INFO mapred.JobClient:     Reduce input records=5637

運行完成後:
cd到out目錄下,會看到一個part-r-00000文件。
輸入命令:cat part-r-00000
可以看到hadoopjob的運行結果。

參考資料:
Java 開發 2.0: 用 Hadoop MapReduce 進行大數據分析

用 Hadoop 進行分布式數據處理,第 1 部分: 入門

MapReduce 編程模型在日誌分析方麵的應用

 

最後更新:2017-04-02 06:51:55

  上一篇:go XSLT之計數循環
  下一篇:go ImageView 用法總結