阅读831 返回首页    go 阿里云 go 技术社区[云栖]


通过avro合并大文件 并计算文件词频

通过avro合并大文件 并计算文件词频

public class AvroFilemr {
// 读取avro文件 每读取一条记录 其实是一个小文件,对其进行wordcount解析
// 并以单词,1 的形式发送到reducer

public static class AvroFilemrMap extends Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable> {
    private Text outKey = new Text();
    private final IntWritable ONE = new IntWritable(1);
    private String[] infos;
    private ByteBuffer content;

    @Override
    protected void map(AvroKey<SmallFile> key, NullWritable value,
            Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        content = key.datum().getContent();
        infos = new String(content.array()).split("\\s");
        for (String string : infos) {
            outKey.set(string);
            context.write(outKey, ONE);
        }
    }
}

// 把wordcount的计算结果 以word_count.avsc的模式输出成avro文件
public static class AvroFilemrReduce extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
    private int sum;
    private Schema writeSchema;
    private GenericRecord record;
    private AvroKey<GenericRecord> outKey = new AvroKey<GenericRecord>();
    private NullWritable outValue = NullWritable.get();

    @Override
    protected void setup(Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
            throws IOException, InterruptedException {
        Parser parser = new Parser();
        writeSchema = parser.parse(new File("src/main/avro/wordCount.avsc"));
        record = new GenericData.Record(writeSchema);

    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
            throws IOException, InterruptedException {

        sum = 0;
        for (IntWritable value : values) {
            sum += value.get();

        }
        record.put("word", key.toString());
        record.put("count", sum);
        outKey.datum(record);
        context.write(outKey, outValue);
    }
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration =new Configuration();
    Job job=Job.getInstance(configuration);
    job.setJarByClass(AvroFilemr.class);
    job.setJobName("读avro文件计算并把结果写入到新的avro文件");

    job.setMapperClass(AvroFilemrMap.class);
    job.setReducerClass(AvroFilemrReduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    AvroJob.setInputKeySchema(job, SmallFile.getClassSchema());
    Parser parser =new Parser();
    AvroJob.setOutputKeySchema(job, parser.parse(new File("src/main/avro/wordCount.avsc")));

    FileInputFormat.addInputPath(job, new Path("/AvroMErgeSmallFile"));
    Path outputPath =new Path("/AvroFilemr");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true)?0:1);
}

}

最后更新:2017-10-20 09:33:23

  上一篇:go  重读avro文件 对文件进行简单的mr计算
  下一篇:go  如何在阿里云服务器centos 7上安装Apache