装房和城乡建设部网站,网站展示 包括什么,邢台seo服务公司,wordpress4.9博客模板3.6.1OutputFormat接口实现类
OutputFormat是MapReduce输出的基类#xff0c;所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1、文本输出TextOutputFormat
默认的输出格式是TextOutputFormat#xff0c;它把每条记录写为文…3.6.1OutputFormat接口实现类
OutputFormat是MapReduce输出的基类所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1、文本输出TextOutputFormat
默认的输出格式是TextOutputFormat它把每条记录写为文本行。它的键和值可以是任意类型疑问TextOutputFormat调用toString()方法把他们转换为字符串。
2、SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入这便是一种好的输出格式因为它的格式紧凑很容易被压缩。
3、自定义OutputFormat
根据用户需求自定义实现输出。
3.6.2自定义OutputFormat
1、使用场景
为了实现控制最终文件的输出路径和输出格式可以自定义OutputFormat。
例如要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录这类灵活的输出需求可以通过自定义OutputFormat来实现。
2、自定义OUtputFormat步骤
1自定义一个类继承FileOutputFormat。
2改写RecordWriter具体改写输出数据的方法write()。
3.6.3自定义OutputFormat案例实操
1、需求
过滤输入的log日志包含atguigu的网站输出到e:/atguigu.log不包含atguigu的网站输出到e:/other.log。
1输入数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com2期望输出数据
http://www.atguigu.comhttp://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com2、需求分析 3、案例实操
1编写FilterMapper类
package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends MapperLongWritable, Text, Text, NullWritable {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, NullWritable.Context context) throws IOException, InterruptedException {// 写出context.write(value, NullWritable.get());}
}2编写FilterReducer类
package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends ReducerText, NullWritable, Text, NullWritable {Text k new Text();Overrideprotected void reduce(Text key, IterableNullWritable values, ReducerText, NullWritable, Text, NullWritable.Context context) throws IOException, InterruptedException {// 1 获取一行String line key.toString();// 2 拼接line line \r\n;// 3 设置keyk.set(line);// 4 输出context.write(k, NullWritable.get());}
}3自定义一个OutputFormat类
package com.atguigu.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormatText, NullWritable{Overridepublic RecordWriterText, NullWritable getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}
}4编写RecordWriter类
package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriterText, NullWritable {FSDataOutputStream atguiguOut null;FSDataOutputStream otherOut null;public FilterRecordWriter(TaskAttemptContext job) {// 1 获取文件系统FileSystem fs;try {fs FileSystem.get(job.getConfiguration());// 2 创建输出文件路径Path atguiguPath new Path(e:/atguigu.log);Path otherPath new Path(e:/other.log);// 3 创建输出流atguiguOut fs.create(atguiguPath);otherOut fs.create(otherPath);} catch (IOException e) {e.printStackTrace();}}Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {// 判断是否包含“atguigu”输出到不同文件if (key.toString().contains(atguigu)) {atguiguOut.write(key.toString().getBytes());} else {otherOut.write(key.toString().getBytes());}}Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关闭资源IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}5编写FilterDriver类
package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args new String[] { e:/input/log.txt, e:/output2 };Configuration conf new Configuration();Job job Job.getInstance(conf);job.setJarByClass(FilterDriver.class);job.setMapperClass(FilterMapper.class);job.setReducerClass(FilterReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中job.setOutputFormatClass(FilterOutputFormat.class);Path input new Path(args[0]);Path output new Path(args[1]);// 如果输出路径存在则进行删除FileSystem fs FileSystem.get(conf);if (fs.exists(output)) {fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);// 虽然我们自定义了outputformat但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, output);boolean result job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}