您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 信息化管理 > MapRedue作业过程
MapRedue作业过程1.Hadoop作业构成1.1Hadoop作业执行流程用户配置并将一个Hadoop作业提到Hadoop框架中,Hadoop框架会把这个作业分解成一系列maptasks和reducetasks。Hadoop框架负责task分发和执行,结果收集和作业进度监控。下图给出了一个作业从开始执行到结束所经历的阶段和每个阶段被谁控制(用户orHadoop框架)。下图详细给出了用户编写MapRedue作业时需要进行那些工作以及Hadoop框架自动完成的工作:在编写MapReduce程序时,用户分别通过InputFormat和OutputFormat指定输入和输出格式,并定义Mapper和Reducer指定map阶段和reduce阶段的要做的工作。在Mapper或者Reducer中,用户只需指定一对key/value的处理逻辑,Hadoop框架会自动顺序迭代解析所有key/value,并将每对key/value交给Mapper或者Reducer处理。表面上看来,Hadoop限定数据格式必须为key/value形式,过于简单,很难解决复杂问题,实际上,可以通过组合的方法使key或者value(比如在key或者value中保存多个字段,每个字段用分隔符分开,或者value是个序列化后的对象,在Mapper中使用时,将其反序列化等)保存多重信息,以解决输入格式较复杂的应用。2.2用户的工作用户编写MapReduce需要实现的类或者方法有:(1)InputFormat接口用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法publicinterfaceInputFormatK,V{InputSplit[]getSplits(JobConfjob,intnumSplits)throwsIOException;RecordReaderK,VgetRecordReader(InputSplitsplit,JobConfjob,Reporterreporter)throwsIOException;}其中getSplits函数将所有输入数据分成numSplits个split,每个split交给一个maptask处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每个record解析成key/value对。Hadoop本身提供了一些InputFormat:(2)Mapper接口用户需继承Mapper接口实现自己的Mapper,Mapper中必须实现的函数是voidmap(K1key,V1value,OutputCollectorK2,V2output,Reporterreporter)throwsIOException其中,K1V1是通过Inputformat中的RecordReader对象解析处理的,OutputCollector获取map()的输出结果,Reporter保存了当前task处理进度。Hadoop本身提供了一些Mapper供用户使用:(3)Partitioner接口用户需继承该接口实现自己的Partitioner以指定maptask产生的key/value对交给哪个reducetask处理,好的Partitioner能让每个reducetask处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是getPartition(K2key,V2value,intnumPartitions)该函数返回K2V2对应的reducetaskID。用户如果不提供Partitioner,Hadoop会使用默认的(实际上是个hash函数)。(4)CombinerCombiner使得maptask与reducetask之间的数据传输量大大减小,可明显提高性能。大多数情况下,Combiner与Reducer相同。(5)Reducer接口用户需继承Reducer接口实现自己的Mapper,Mapper中必须实现的函数是voidreduce(K2key,IteratorV2values,OutputCollectorK3,V3output,Reporterreporter)throwsIOExceptionHadoop本身提供了一些Reducer供用户使用:(6)OutputFormat用户通过OutputFormat指定输出文件的内容格式,不过它没有split。每个reducetask将其数据写入自己的文件,文件名为part-nnnnn,其中nnnnn为reducetask的ID。Hadoop本身提供了几个OutputFormat:3.分布式缓存Haoop中自带了一个分布式缓存,即DistributedCache对象,方便maptask之间或者reducetask之间共享一些信息,比如某些实际应用中,所有maptask要读取同一个配置文件或者字典,则可将该配置文件或者字典放到分布式缓存中。2、Map-Reduce例子WordCount/***MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)*Mapper接口:*WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。*Reporter则可用于报告整个应用的运行进度,本例中未使用。*/publicstaticclassMapextendsMapReduceBaseimplementsMapperLongWritable,Text,Text,IntWritable{/***LongWritable,IntWritable,Text均是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,*都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String的替代品。*/}/***Mapper接口中的map方法:*voidmap(K1key,V1value,OutputCollectorK2,V2output,Reporterreporter)*映射一个单个的输入k/v对到一个中间的k/v对*输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。*OutputCollector接口:收集Mapper和Reducer输出的k,v对。*OutputCollector接口的collect(k,v)方法:增加一个(k,v)对到output*/publicvoidmap(LongWritablekey,Textvalue,OutputCollectorText,IntWritableoutput,Reporterreporter)throwsIOException{Stringline=value.toString();StringTokenizertokenizer=newStringTokenizer(line);while(tokenizer.hasMoreTokens()){word.set(tokenizer.nextToken());output.collect(word,one);}}}publicstaticclassReduceextendsMapReduceBaseimplementsReducerText,IntWritable,Text,IntWritable{publicvoidreduce(Textkey,IteratorIntWritablevalues,OutputCollectorText,IntWritableoutput,Reporterreporter)throwsIOException{intsum=0;while(values.hasNext()){sum+=values.next().get();}output.collect(key,newIntWritable(sum));}}publicstaticvoidmain(String[]args)throwsException{/***JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作*构造方法:JobConf()、JobConf(ClassexampleClass)、JobConf(Configurationconf)等*/JobConfconf=newJobConf(WordCount.class);conf.setJobName(wordcount);//设置一个用户定义的job名称conf.setOutputKeyClass(Text.class);//为job的输出数据设置Key类conf.setOutputValueClass(IntWritable.class);//为job输出设置value类conf.setMapperClass(Map.class);//为job设置Mapper类conf.setCombinerClass(Reduce.class);//为job设置Combiner类conf.setReducerClass(Reduce.class);//为job设置Reduce类conf.setInputFormat(TextInputFormat.class);//为map-reduce任务设置InputFormat实现类conf.setOutputFormat(TextOutputFormat.class);//为map-reduce任务设置OutputFormat实现类/***InputFormat描述map-reduce中对job的输入定义*setInputPaths():为map-reducejob设置路径数组作为输入列表*setInputPath():为map-reducejob设置路径数组作为输出列表*/FileInputFormat.setInputPaths(conf,newPath(args[0]));FileOutputFormat.setOutputPath(conf,newPath(args[1]));JobClient.runJob(conf);//运行一个job}}分析WordCount程序我们先来看看Hadoop自带的示例程序WordCount,这个程序用于统计一批文本文件中单词出现的频率,完整的代码可在下载的Hadoop安装包中得到(在src/examples目录中)。1.实现Map类见代码清单1。这个类实现Mapper接口中的map方法,输入参数中的value是文本文件中的一行,利用StringTokenizer将这个字符串拆成单词,然后将输出结果单词,1写入到org.apache.hadoop.mapred.OutputCollector中。OutputCollector由Hadoop框架提供,负责收集Mapper和Reducer的输出数据,实现map函数和reduce函数时,只需要简单地将其输出的对往OutputCollector中一丢即可,剩余的事框架自会帮你处理好。代码中LongWritable,IntWritable,Text均是Hadoop中实现的用于封装Java数据类型的类,这些类都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String的替代品。Reporter则可用于报告整个应用的运行进度,本例中未使用。publicstaticclassMapClassextendsMapReduceBaseimplementsMapper{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(LongWritablekey,Textvalue,Outpu
本文标题:MapRedue作业过程
链接地址:https://www.777doc.com/doc-2886775 .html