通過reducer聯合產生寬表
public class ReducerJoin {
public static class ValueFlag implements Writable {
private String value;
private String flag;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(value);
out.writeUTF(flag);
}
public void readFields(DataInput in) throws IOException {
this.value = in.readUTF();
this.flag = in.readUTF();
}
}
// map讀取兩個文件 根據來源把每個kv對打上標簽 輸出給reduce可以必須是關聯字段
public static class ReducerJoinMap extends Mapper<LongWritable, Text, Text, ValueFlag> {
private FileSplit fileSplit;
private String fileName;
private String[] infos;
private Text oKey = new Text();
private ValueFlag oValue = new ValueFlag();
@Override
protected void setup(Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
throws IOException, InterruptedException {
fileSplit = (FileSplit) context.getInputSplit();
if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
fileName = "userLogsLarge";
} else if (fileSplit.getPath().toString().contains("user_info.txt")) {
fileName = "userInfo";
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
throws IOException, InterruptedException {
infos = value.toString().split("\\s");
oValue.setFlag(fileName);
if (fileName.equals("userLogsLarge")) {
// 解析user-logs-large.txt
oKey.set(infos[0]);
oValue.setValue(infos[1] + "\t" + infos[2]);
context.write(oKey, oValue);
} else if (fileName.equals("userInfo")) {
// 解析user_infos.txt
oKey.set(infos[0]);
oValue.setValue(infos[1] + "\t" + infos[2]);
context.write(oKey, oValue);
}
}
}
// 接受map發送過來的kv隊 根據value中的flag把同一個key對應的value分組
// 那麼兩組中的數據就是分別來自兩張表中的數據 對這兩組數據做笛卡爾乘機即完成關聯
public static class ReducerJoinReducer extends Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable> {
private List<String> userLogsLargeList;
private List<String> userInfosList;
private NullWritable outValue = NullWritable.get();
private AvroKey<UserActionLog> outKey = new AvroKey<UserActionLog>();
private String[] infos;
@Override
protected void reduce(Text key, Iterable<ValueFlag> values,
Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable>.Context context)
throws IOException, InterruptedException {
userLogsLargeList = new ArrayList<String>();
userInfosList = new ArrayList<String>();
for (ValueFlag value : values) {
if (value.getFlag().equals("userLogsLarge")) {
userLogsLargeList.add(value.getValue());
} else if (value.getFlag().equals("userInfo")) {
userInfosList.add(value.getValue());
}
}
// 對兩組中的數據進行笛卡爾乘積
for (String userlog : userLogsLargeList) {
for (String userinfo : userInfosList) {
// 構建一個useractionLog對象
UserActionLog.Builder build = UserActionLog.newBuilder();
// 從userlog中提取actiontyoe和ipaddress
infos = userlog.split("\\s");
build.setActionType(infos[0]);
build.setIpAddress(infos[1]);
// 從userinfo 提取gender 和privince
infos = userinfo.split("\\s");
if (infos[0].equals("man")) {
build.setGender(0);
} else {
build.setGender(1);
}
build.setProvience(infos[1]);
build.setUserName(key.toString());
UserActionLog userActionLog = build.build();
// 吧userAction封裝到Avrokey中
outKey.datum(userActionLog);
context.write(outKey, outValue);
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ReducerJoin.class);
job.setJobName("reducer聯合");
job.setMapperClass(ReducerJoinMap.class);
job.setReducerClass(ReducerJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ValueFlag.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWriter.class);
//設置輸出的格式是avrokey
job.setOutputFormatClass(AvroKeyOutputFormat.class);
//設置輸出key的schema
AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
FileInputFormat.addInputPath(job, new Path("/mapjoin"));
Path outputPath = new Path("/ReducerJoin");
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
最後更新:2017-10-19 21:03:23