閱讀531 返回首頁    go 阿裏雲 go 技術社區[雲棲]


通過reducer聯合產生寬表

public class ReducerJoin {

public static class ValueFlag implements Writable {
    private String value;
    private String flag;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(value);
        out.writeUTF(flag);

    }

    public void readFields(DataInput in) throws IOException {
        this.value = in.readUTF();
        this.flag = in.readUTF();

    }

}

// map讀取兩個文件 根據來源把每個kv對打上標簽 輸出給reduce可以必須是關聯字段
public static class ReducerJoinMap extends Mapper<LongWritable, Text, Text, ValueFlag> {
    private FileSplit fileSplit;
    private String fileName;
    private String[] infos;
    private Text oKey = new Text();
    private ValueFlag oValue = new ValueFlag();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        fileSplit = (FileSplit) context.getInputSplit();
        if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
            fileName = "userLogsLarge";
        } else if (fileSplit.getPath().toString().contains("user_info.txt")) {
            fileName = "userInfo";
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        infos = value.toString().split("\\s");

        oValue.setFlag(fileName);
        if (fileName.equals("userLogsLarge")) {
            // 解析user-logs-large.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        } else if (fileName.equals("userInfo")) {
            // 解析user_infos.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        }

    }

}

// 接受map發送過來的kv隊 根據value中的flag把同一個key對應的value分組
// 那麼兩組中的數據就是分別來自兩張表中的數據 對這兩組數據做笛卡爾乘機即完成關聯
public static class ReducerJoinReducer extends Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable> {

    private List<String> userLogsLargeList;
    private List<String> userInfosList;
    private NullWritable outValue = NullWritable.get();
    private AvroKey<UserActionLog> outKey = new AvroKey<UserActionLog>();
    private String[] infos;

    @Override
    protected void reduce(Text key, Iterable<ValueFlag> values,
            Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable>.Context context)
            throws IOException, InterruptedException {

        userLogsLargeList = new ArrayList<String>();
        userInfosList = new ArrayList<String>();

        for (ValueFlag value : values) {

            if (value.getFlag().equals("userLogsLarge")) {
                userLogsLargeList.add(value.getValue());
            } else if (value.getFlag().equals("userInfo")) {
                userInfosList.add(value.getValue());
            }
        }
        // 對兩組中的數據進行笛卡爾乘積
        for (String userlog : userLogsLargeList) {
            for (String userinfo : userInfosList) {
                // 構建一個useractionLog對象
                UserActionLog.Builder build = UserActionLog.newBuilder();

                // 從userlog中提取actiontyoe和ipaddress
                infos = userlog.split("\\s");
                build.setActionType(infos[0]);
                build.setIpAddress(infos[1]);
                // 從userinfo 提取gender 和privince
                infos = userinfo.split("\\s");
                if (infos[0].equals("man")) {
                    build.setGender(0);
                } else {
                    build.setGender(1);
                }
                build.setProvience(infos[1]);
                build.setUserName(key.toString());
                UserActionLog userActionLog = build.build();
                // 吧userAction封裝到Avrokey中
                outKey.datum(userActionLog);
                context.write(outKey, outValue);
            }
        }

    }

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    job.setJarByClass(ReducerJoin.class);
    job.setJobName("reducer聯合");

    job.setMapperClass(ReducerJoinMap.class);
    job.setReducerClass(ReducerJoinReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ValueFlag.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWriter.class);
    //設置輸出的格式是avrokey
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    //設置輸出key的schema
    AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
    FileInputFormat.addInputPath(job, new Path("/mapjoin"));
    Path outputPath = new Path("/ReducerJoin");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

最後更新:2017-10-19 21:03:23

  上一篇:go  如何使用putty連接阿裏雲服務器
  下一篇:go  通過Avro 將文件合並