成都建设网站哪些公司好,wordpress图片清晰度,公司网页网站建设 ppt,网站的区别MapReduce概述
是一个分布式的编程框架#xff0c;MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序#xff0c;并发运行在一个Hadoop集群上。
优点#xff1a; 易于编程#xff0c;简单的实现一些接口#xff0c;就可以完成一…MapReduce概述
是一个分布式的编程框架MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序并发运行在一个Hadoop集群上。
优点 易于编程简单的实现一些接口就可以完成一个分布式程序。良好的扩展性可以增加机器来扩展计算能力。高容错性子任务失败后可以重试4次。 缺点 不擅长实时计算不能进行流式计算MapReduce的输入数据集是静态的不能动态变化。不擅长DAG有向无环图下一段计算的起始数据取决于上一个阶段的结果。
MapReduce核心思想
Map: 读单词进行分区Shuffle排序是框架内固定的代码必须排序。进行快排Reduce对区间有序的内容进行归并排序累加单词在MapReduce过程中只能有一个Map和一个Reduce
MapReduce进程
MrAppMaster负责整个程序的过程调度及状态协调。MapTask负责Map阶段的整个数据处理流程。ReduceTask负责Reduce阶段的整个数据处理流程。
序列化 变量类型后面加上Writable转换成可以序列化的类型String类型有点特别相应的序列化类型为Textjava类型转hadoop类型 private IntWritable key new IntWritable();key.set(java_value);构造器转换 new intWritable(1); hadoop类型转java类型 int value key.get();
WordCount案例
Driver类的8个步骤
该类中的步骤是使用hadoop框架的核心这8个步骤是写死的无法更改具体为
获取配置信息获取job对象实例指定本程序的jar包所在的本地路径关联Mapper/Reducer业务类指定Mapper输出数据的kv类型指定最终输出数据的kv类型部分案例不需要reduce这个步骤指定job的输入原始文件所在目录指定job的输出结果所在目录提交作业
WordCountMapper类的实现
继承mapper类选择mapreduce包新版本老版本的叫mapred包根据业务需求设定泛型的具体类型输入的kv类型输出的kv类型。 输入类型 keyIn:起始偏移量是字节偏移量。一般不参与计算类型为longwritablevalueIn每一行数据类型为Text 输出类型 keyOut: 每个单词valueOut:数字1 重写map方法ctrl o 快捷键重写方法 Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, IntWritable.Context context) throws IOException, InterruptedException {//1.获取一行// atguigu atguiguString line value.toString();//2.切割//atguigu//atguiguString[] words line.split( );//3.循环写出for (String word : words) {//封装outKeycontext.write(new Text(word),new IntWritable(1));}}WordCountReducer类的实现
继承Reduce类设置输入输出类型 输入 跟Map的输出类型对应即可输出 keyInt, 单词类型为TextkeyOut, 次数类型为IntWritable 重写reduce方法 Overrideprotected void reduce(Text key, IterableIntWritable values, ReducerText, IntWritable, Text, IntWritable.Context context) throws IOException, InterruptedException {int sum 0;//atguigu(1,1)//累加for (IntWritable value : values) {sum value.get();}//写出context.write(key, new IntWritable(sum);//注意}在处理大数据时不要在循环中new对象创建对象是很消耗资源的。可以使用ctrl alt F将这两个变量提升为全局变量作为Reducer类的属性值。但其实还有更好的方法可以使用Mapper类中的setup()方法来实现该需求。该方法是框架里面原本就设定好的方法在map阶段前只会执行1次。
private Text outKey;private IntWritable outValue;Overrideprotected void setup(MapperLongWritable, Text, Text, IntWritable.Context context) throws IOException, InterruptedException {outKey new Text();outValue new IntWritable(1);}Driver类的实现
该类的写法基本上是固定的不同需求只需要在此基础上修改一下map和reduce业务类即可。
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取JobConfiguration conf new Configuration();Job job Job.getInstance();//2.设置jar包路径绑定driver类job.setJarByClass(WordCountDriver.class);//3.关联mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置最终输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job, new Path(D:\\inputOutput\\input\\wordcount));FileOutputFormat.setOutputPath(job, new Path(D:\\inputOutput\\output\\output666));//7.提交jobjob.waitForCompletion(true);}打包本地程序到集群中运行
修改本地程序Driver类中的输入输出路径
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));使用maven的package命令打包本地程序成.jar文件生成的jar包在target目录下使用jar命令解压运行jar 包名 数据输入路径 数据输出路径
可以直接在window中的IDEA中发送任务到Linux集群中但配置方式较为烦琐生产环境中使用该方法的人较少。
序列化
java序列化
序列化目的是将内存的对象放进磁盘中进行保存序列化实际上是为了传输对象中的属性值而不是方法。序列化本质上是一种数据传输技术IO流。
ObjectOutputStream 序列化流 writeObject()ObjectInputStream 反序列化流 readObject(Object obj)序列化前提条件 实现Serializable接口空参构造器私有的属性准备get和set方法
总结java序列化是一个比较重的序列化序列化的内容很多比如属性校验血缘关系元数据。
hadoop序列化
特点 轻量化只有属性值和校验
hadoop中自定义bean对象步骤
实现writable接口空参构造器私有的属性get和set方法重写write方法和readFields方法write方法出现的属性顺序必须和readFields的读取顺序一致如果自定义的bean对象要作为reduce的输出结果需要重写toString方法否则存入磁盘的是地址值如果自定义的bean对象要作为map输出结果中的key进行输出并进行reduce操作必须实现comparable接口。
MapReduce框架原理
粗略流程mapreduce map reduce大体流程mapreduce inputFormat -- map -- shuffle(排序) -- reduce --outputFormat源码流程 inputformatmap mapsort: 按照字典序进行快排 reduce copy拉取map的处理结果sort由于结果是局部有序不是整体有序进行归并排序reduce之后再进行数据合并规约
InputFormat/OutputFormat基类
实现类有TextInputFormat和TextOutputFormat, 其中重点是切片逻辑和读写逻辑读写部分的代码框架已经写死了主要关注如何切片即可。
切片与MapTask并行度决定机制
数据块Block是物理上真的分开存储了。 数据切片只是逻辑上进行分片处理每个数据切片对应一个MapTask。
正常来说如果数据有300M我们按照常理来说会平均划分成3 x 100 M但是物理上每个物理块是128M每个MapTask进行计算时需要从另外那个主机读取数据跨越主机读取数据需要进行网络IO这是很慢的。 所有MR选择的是按照128M来进行切分尽管这样会导致划分的数据块并不是十分均匀但是对于网络IO的延迟来说还是可以接受的。
Hadoop提交流程源码和切片源码
提交源码主要debug节点
job.waitForCompletion() JobState枚举类DEFINE、RUNING submit()connect() 匿名内部类、new方法 无法进入打好断点点击快速运行进入ctrl alt 左方向键可以返回原先的位置initProviderList(): 添加了本地客户端协议和Yarn客户端协议create(conf): 根据配置文件来决定代码的运行环境Yarn分布式环境/本地单机环境 submitter 根据运行环境获取相应的提交器checkSpecs(job)检查输出路径是否正确jobStagingArea: job的临时运行区域给定一个绝对路径的目录D:\tmp\hadoop\mapred\staging里面存放了 local: 切片结果8个配置文件总和yarn切片结果8个配置文件总和jar包 copyAndConfigureFiles(): 读取任务需要的支持文件读取到job的临时运行环境中在Yarn环境中会上传jar包到该路径中writeSplits()给数据添加切片标记实际还未切分会生成切片文件到临时区域中writeConf(): 写配置文件到目录中
提交源码总结
mapreduce.framework.name这个参数决定了运行环境切片个数决定了MapTask个数