閱讀1000 返回首頁    go 人物


將HBase通過mr傳到hdfs上

package com.zhiyou100.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRFromHBase {

    public static class MrFromHBaseMap extends TableMapper<Text, Text> {
        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private Cell cell;
        private String rowKey;
        private String columnFamily;
        private String columnQualify;
        private String columnValue;

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // 從Result 中獲取數據輸出初速
            CellScanner scanner = value.cellScanner();
            while (scanner.advance()) {
                cell = scanner.current();
                rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
                columnQualify = Bytes.toString(CellUtil.cloneQualifier(cell));
                columnValue = Bytes.toString(CellUtil.cloneValue(cell));
                outputKey.set(rowKey);
                outputValue.set("columnFamily:" + columnFamily + "columnQualify:" + columnQualify + "columnValue:"
                        + columnValue);
                context.write(outputKey, outputValue);
            }
        }

    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration =HBaseConfiguration.create();
        Job job =Job.getInstance(configuration);
        job.setJarByClass(MRFromHBase.class);
        job.setJobName("mapreduce 從hbase中讀取數據");
        //不需要reducer
        job.setNumReduceTasks(0);
        Scan scan =new Scan();
        TableMapReduceUtil.initTableMapperJob("bd17:fromjava", scan,MrFromHBaseMap.class, Text.class, Text.class, job);
        //設置輸出路徑
        Path outputDir =new Path("/fromhbase");
        outputDir.getFileSystem(configuration).delete(outputDir, true);
        FileOutputFormat.setOutputPath(job, outputDir);

        System.exit(job.waitForCompletion(true)?1:0);
    }
}






最後更新:2017-11-03 00:03:46

  上一篇:go  阿裏巴巴Java開發規約插件使用
  下一篇:go  關於hbase 在mr中出現的問題