将HBase通过mr传到hdfs上
package com.zhiyou100.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRFromHBase {
public static class MrFromHBaseMap extends TableMapper<Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private Cell cell;
private String rowKey;
private String columnFamily;
private String columnQualify;
private String columnValue;
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
// 从Result 中获取数据输出初速
CellScanner scanner = value.cellScanner();
while (scanner.advance()) {
cell = scanner.current();
rowKey = Bytes.toString(CellUtil.cloneRow(cell));
columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
columnQualify = Bytes.toString(CellUtil.cloneQualifier(cell));
columnValue = Bytes.toString(CellUtil.cloneValue(cell));
outputKey.set(rowKey);
outputValue.set("columnFamily:" + columnFamily + "columnQualify:" + columnQualify + "columnValue:"
+ columnValue);
context.write(outputKey, outputValue);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration =HBaseConfiguration.create();
Job job =Job.getInstance(configuration);
job.setJarByClass(MRFromHBase.class);
job.setJobName("mapreduce 从hbase中读取数据");
//不需要reducer
job.setNumReduceTasks(0);
Scan scan =new Scan();
TableMapReduceUtil.initTableMapperJob("bd17:fromjava", scan,MrFromHBaseMap.class, Text.class, Text.class, job);
//设置输出路径
Path outputDir =new Path("/fromhbase");
outputDir.getFileSystem(configuration).delete(outputDir, true);
FileOutputFormat.setOutputPath(job, outputDir);
System.exit(job.waitForCompletion(true)?1:0);
}
}
最后更新:2017-11-03 00:03:46
上一篇:
阿里巴巴Java开发规约插件使用
下一篇:
关于hbase 在mr中出现的问题
阿里中间件首席架构师钟华:《企业IT架构转型之道:阿里巴巴中台战略思想与架构实战》新书出版!
magento -- 产品按评分排序
解忧杂货店:关于MySQL 5.7的188个精彩问答
如何让service不被杀死
你与高薪仅差一本Kotlin
《Linux From Scratch》第二部分:准备构建 第五章:构建临时文件系统- 5.7. Glibc-2.21
软银孙正义谈收购Sprint:拟用数据服务动摇美国
Top100论文导读:深入理解卷积神经网络CNN(Part Ⅱ)
Oracle Database 12c - Global Data Services
C# ToString()格式化时间,常用模式