本文共 19215 字,大约阅读时间需要 64 分钟。
MapReduce源自Google的MapReduce论文,论文发表于2004年12月。Hadoop MapReduce可以说是Google MapReduce的一个开源实现。MapReduce优点在于可以将海量的数据进行离线处理,并且MapReduce也易于开发,因为MapReduce框架帮我们封装好了分布式计算的开发。而且对硬件设施要求不高,可以运行在廉价的机器上。MapReduce也有缺点,它最主要的缺点就是无法完成实时流式计算,只能离线处理。
MapReduce属于一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce官方文档地址如下:
在学习MapReduce之前我们需要准备好Hadoop的环境,也就是需要先安装好HDFS以及YARN,环境的搭建方式可以参考我之前的两篇文章: 以及
在安装Hadoop时,它就自带有一个WordCount的案例,这个案例是统计文件中每个单词出现的次数,也就是词频统计,我们在学习大数据开发时,一般都以WordCount作为入门。
例如,我现在有一个test.txt,文件内容如下:
hello worldhello hadoophello MapReduce
现在的需求是统计这个文件中每个单词出现的次数。假设我现在写了一些代码实现了这个文件的词频统计,统计的结果如下:
hello 3world 1hadoop 1MapReduce 1
以上这就是一个词频统计的例子。
词频统计看起来貌似很简单的样子,一般不需要多少代码就能完成了,而且如果对shell脚本比较熟悉的话,甚至一句代码就能完成这个词频统计的功能。确实词频统计是不难,但是为什么还要用大数据技术去完成这个词频统计的功能呢?这是因为实现小文件的词频统计功能或许用简单的代码就能完成,但是如果是几百GB、TB甚至是PB级的大文件还能用简单的代码完成吗?这显然是不可能的,就算能也需要花费相当大的时间成本。
而大数据技术就是要解决这种处理海量数据的问题,MapReduce在其中就是充当一个分布式并行计算的角色,分布式并行计算能大幅度提高海量数据的处理速度,毕竟多个人干活肯定比一个人干活快。又回到我们上面所说到的词频统计的例子,在实际工作中很多场景的开发都是在WordCount的基础上进行改造的。例如,要从所有服务器的访问日志中统计出被访问得最多的url以及访问量最高的IP,这就是一个典型的WordCount应用场景,要知道即便是小公司的服务器访问日志通常也都是GB级别的。
使用MapReduce执行WordCount的流程示意图:
从上图中,可以看到,输入的数据集会被拆分为多个块,然后这些块都会被放到不同的节点上进行并行的计算。在Splitting这一环节会把单词按照分割符或者分割规则进行拆分,拆分完成后就到Mapping上了,到Mapping这个环节后会把相同的单词通过网络进行映射或者说传输到同一个节点上。接着这些相同的单词就会在Shuffling环节时进行洗牌也就是合并,合并完成之后就会进入Reducing环节,这一环节就是把所有节点合并后的单词再进行一次合并,也就是会输出到HDFS文件系统中的某一个文件中。大体来看就是一个拆分又合并的过程,所以MapReduce是分为map和Reduce的。最重要的是,要清楚这一流程都是分布式并行的,每个节点都不会互相依赖,都是相互独立的。
以上我们也提到了MapReduce是分为Map和Reduce的,也就是说一个MapReduce作业会被拆分成Map和Reduce阶段。Map阶段对应的就是一堆的Map Tasks,同样的Reduce阶段也是会对应一堆的Reduce Tasks。
其实简单来说这也是一个输入输出的流程,要注意的是在MapReduce框架中输入的数据集会被序列化成键/值对,map阶段完成后会对这些键值对进行排序,最后到reduce阶段中进行合并输出,输出的也是键/值对,官网文档写的流程如下:
(input)-> map -> -> combine -> -> reduce -> (output)
示意图:
我们可以看到有几个主要的点:
我们可以再来看一张图,假设我们手动设置了block与split的对应关系,一个block对应两个split:
上图中一个block对应两个split(默认是一对一),一个split则是对应一个Map Task。Map Task将数据分完组之后到Shuffle,Shuffle完成后就到Reduce上进行输出,而每一个Reduce Tasks会输出到一个文件上,上图中有三个Reduce Tasks,所以会输出到三个文件上。
MapReduce1.x架构图如下:
简单说明一下其中的几个组件:
MapReduce2.x架构图如下,可以看到JobTracker和TaskTracker已经不复存在了,取而代之的是ResourceManager和NodeManager。不仅架构变了,功能也变了,2.x之后新引入了YARN,在YARN之上我们可以运行不同的计算框架,不再是1.x那样只能运行MapReduce了:
关于MapReduce2.x的架构之前已经在一文中说明过了,这里就不再赘述了。
1.创建一个Maven工程,配置依赖如下:
cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ true false UTF-8 2.6.0-cdh5.7.0 org.apache.hadoop hadoop-client ${hadoop.version} junit junit 4.10 test
2.创建一个类,开始编写我们wordcount的实现代码:
package org.zero01.hadoop.mapreduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * @program: hadoop-train * @description: 使用MapReduce开发WordCount应用程序 * @author: 01 * @create: 2018-03-31 14:03 **/public class WordCountApp { /** * Map: 读取输入的文件内容 */ public static class MyMapper extends Mapper{ LongWritable one = new LongWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行数据 String line = value.toString(); // 按照指定的分割符进行拆分 String[] words = line.split(" "); for (String word : words) { // 通过上下文把map的处理结果输出 context.write(new Text((word)), one); } } } /** * Reduce: 归并操作 */ public static class MyReducer extends Reducer { protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { // 求key出现的次数总和 sum += value.get(); } // 将最终的统计结果输出 context.write(key, new LongWritable(sum)); } } /** * 定义Driver:封装了MapReduce作业的所有信息 */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 创建Job,通过参数设置Job的名称 Job job = Job.getInstance(configuration, "wordcount"); // 设置Job的处理类 job.setJarByClass(WordCountApp.class); // 设置作业处理的输入路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置map相关参数 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置reduce相关参数 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置作业处理完成后的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
3.编写完成之后,在IDEA里通过Maven进行编译打包:
4.把打包好的jar包上传到服务器上:
测试文件内容如下:
[root@localhost ~]# hdfs dfs -text /test.txthello worldhadoop welcomehadoop hdfs mapreducehadoop hdfshello hadoop[root@localhost ~]#
5.然后执行如下命令执行Job:
[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc
简单说明一下这个命令:
6.到YARN上查看任务执行的信息:
申请资源:
运行:
完成:
7.可以看到已经执行成功,命令行终端的日志输出内容如下:
18/03/31 22:55:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable18/03/31 22:55:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:803218/03/31 22:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.18/03/31 22:55:53 INFO input.FileInputFormat: Total input paths to process : 118/03/31 22:55:53 INFO mapreduce.JobSubmitter: number of splits:118/03/31 22:55:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1522505784761_000118/03/31 22:55:54 INFO impl.YarnClientImpl: Submitted application application_1522505784761_000118/03/31 22:55:54 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1522505784761_0001/18/03/31 22:55:54 INFO mapreduce.Job: Running job: job_1522505784761_000118/03/31 22:56:06 INFO mapreduce.Job: Job job_1522505784761_0001 running in uber mode : false18/03/31 22:56:06 INFO mapreduce.Job: map 0% reduce 0%18/03/31 22:56:11 INFO mapreduce.Job: map 100% reduce 0%18/03/31 22:56:16 INFO mapreduce.Job: map 100% reduce 100%18/03/31 22:56:16 INFO mapreduce.Job: Job job_1522505784761_0001 completed successfully18/03/31 22:56:16 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=190 FILE: Number of bytes written=223169 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=174 HDFS: Number of bytes written=54 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3151 Total time spent by all reduces in occupied slots (ms)=2359 Total time spent by all map tasks (ms)=3151 Total time spent by all reduce tasks (ms)=2359 Total vcore-seconds taken by all map tasks=3151 Total vcore-seconds taken by all reduce tasks=2359 Total megabyte-seconds taken by all map tasks=3226624 Total megabyte-seconds taken by all reduce tasks=2415616 Map-Reduce Framework Map input records=5 Map output records=11 Map output bytes=162 Map output materialized bytes=190 Input split bytes=100 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=190 Reduce input records=11 Reduce output records=6 Spilled Records=22 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=233 CPU time spent (ms)=1860 Physical memory (bytes) snapshot=514777088 Virtual memory (bytes) snapshot=5571788800 Total committed heap usage (bytes)=471859200 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=74 File Output Format Counters Bytes Written=54
8.查看输出文件的内容:
[root@localhost ~]# hdfs dfs -ls /output/wc/Found 2 items-rw-r--r-- 1 root supergroup 0 2018-03-31 22:56 /output/wc/_SUCCESS-rw-r--r-- 1 root supergroup 54 2018-03-31 22:56 /output/wc/part-r-00000 # 执行结果的输出文件[root@localhost ~]# hdfs dfs -text /output/wc/part-r-00000 # 查看文件内容hadoop 4hdfs 2hello 2mapreduce 1welcome 1world 1[root@localhost ~]#
虽然我们已经成功通过编写java代码实现了wordcount功能,但是有一个问题,如果我们再执行刚刚那条命令,就会报如下错误:
[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc18/04/01 00:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable18/04/01 00:30:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:803218/04/01 00:30:12 WARN security.UserGroupInformation: PriviledgedActionException as:root (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already existsException in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already exists at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146) at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325) at org.zero01.hadoop.mapreduce.WordCountApp.main(WordCountApp.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136)[root@localhost ~]#
在平时的MapReduce据程序开发中,这个异常非常地常见,这个异常是因为输出文件的存放目录已经存在:Output directory hdfs://192.168.77.130:8020/output/wc already exists
有两种方式可以解决这个问题:
我们来在代码中实现自动删除功能,在刚刚的代码中,加入如下内容:
.../** * 定义Driver:封装了MapReduce作业的所有信息 */public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 准备清理已存在的输出目录 Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath,true); System.out.println("output file exists, but is has deleted"); }...
编写完成之后重新将编辑后的jar包上传,再执行hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc
命令,就不会再报错了。
Combiner类似于本地的Reduce,相当于是在Map阶段的时候就做一个Reduce的操作,它能够减少Map Task输出的数据量及网络传输量。
如下图:
在上图中,可以看到Mapper与Reducer之间有一层Combiner。Mapper先把数据在本地进行一个Combiner,也就是先做一个本地数据的合并,这个过程类似于Reduce只不过是本地的,也即是本节点。当Combiner合并完成之后,再把数据传输到Reducer上再一次进行最终的合并。这样Map Task输出的数据量就会大大减少,性能也会相应的提高,这一点可以从上图中看到。
我们来尝试一下在刚才开发的wordcount程序中,增加一层Combiner。增加Combiner很简单,只需要在设置map和reduce参数的代码之间增加一行代码即可,如下:
// 通过Job对象来设置Combiner处理类,在逻辑上和reduce是一样的job.setCombinerClass(MyReducer.class);
修改完成并重新上传jar包后,这时再执行wordcount程序,在终端的日志输出信息中,会发现Combiner相关的字段都有值,那么就代表我们的Combiner已经成功添加进去了:
Combiner的适用场景:
Combiner的不适用的场景:
Partitioner决定Map Task输出的数据交由哪个Reduce Task处理,也就是类似于制定一个分发规则。默认情况下的分发规则实现:分发的key的hash值对Reduce Task个数取模。
如下图:
上图中,把圆形数据放到了同一个Reduce Task上,把六边形数据放到了同一个Reduce Task上,剩下的图形数据则放到剩下的Reduce Task上, 这样的一个分发过程就是Partitioner。
例如,我现在有一组数据如下,这是今日各个手机品牌的销售量:
[root@localhost ~]# hdfs dfs -text /partitioner.txt xiaomi 200huawei 300xiaomi 100iphone7 300iphone7 500nokia 100[root@localhost ~]#
现在我有一个需求,就是将相同品牌的手机名称,分发到同一个Reduce上进行处理。这就需要用到Partitioner了,在我们之前的代码中增加如下内容:
public class WordCountApp { /** * Map: 读取输入的文件内容 */ public static class MyMapper extends Mapper{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行数据 String line = value.toString(); // 按照指定的分割符进行拆分 String[] words = line.split(" "); // 通过上下文把map的处理结果输出 context.write(new Text((words[0])), new LongWritable(Long.parseLong(words[1]))); } } ... /** * Partitioner: 设定Map Task输出的数据的分发规则 */ public static class MyPartitioner extends Partitioner { public int getPartition(Text key, LongWritable value, int numPartitions) { if(key.toString().equals("xiaomi")){ return 0; } if(key.toString().equals("huawei")){ return 1; } if(key.toString().equals("iphone7")) { return 2; } return 3; } } /** * 定义Driver:封装了MapReduce作业的所有信息 */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ... // 设置Job的partition job.setPartitionerClass(MyPartitioner.class); // 设置4个reducer,每个分区一个 job.setNumReduceTasks(4); ... }}
同样的,修改了代码后需要重新编译打包,把新的jar上传到服务器上。然后执行命令:
[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /partitioner.txt /output/wc
执行成功,此时可以看到/output/wc/
目录下有四个结果文件,这是因为我们在代码上设置了4个reducer,并且可以看到内容都是正确的:
[root@localhost ~]# hdfs dfs -ls /output/wc/Found 5 items-rw-r--r-- 1 root supergroup 0 2018-04-01 04:37 /output/wc/_SUCCESS-rw-r--r-- 1 root supergroup 11 2018-04-01 04:37 /output/wc/part-r-00000-rw-r--r-- 1 root supergroup 11 2018-04-01 04:37 /output/wc/part-r-00001-rw-r--r-- 1 root supergroup 13 2018-04-01 04:37 /output/wc/part-r-00002-rw-r--r-- 1 root supergroup 10 2018-04-01 04:37 /output/wc/part-r-00003[root@localhost ~]# for i in `seq 0 3`; do hdfs dfs -text /output/wc/part-r-0000$i; donexiaomi 300huawei 300iphone7 800nokia 100[root@localhost ~]#
JobHistory是一个Hadoop自带的历史服务器,它用于记录已运行完的MapReduce信息到指定的HDFS目录下。我们都知道,执行了MapReduce任务后,可以在YARN的管理页面上查看到任务的相关信息,但是由于JobHistory默认情况下是不开启的,所以我们无法通过点击History查看历史信息:
所以我们就需要打开这个服务,编辑配置文件内容:
[root@localhost ~]# cd /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# vim mapred-site.xml # 增加如下内容mapreduce.jobhistory.address 192.168.77.130:10020 MapReduce JobHistory Server IPC host:port mapreduce.jobhistory.webapp.address 192.168.77.130:19888 MapReduce JobHistory Server IPC host:port mapreduce.jobhistory.done-dir /history/done [root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# vim yarn-site.xml # 增加如下内容 mapreduce.jobhistory.intermediate-done-dir /history/done_intermediate [root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# yarn.log-aggregation-enable true
编辑完配置文件后,重新启动YARN服务:
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./stop-yarn.sh [root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./start-yarn.sh
启动JobHistory服务:
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./mr-jobhistory-daemon.sh start historyserverstarting historyserver, logging to /usr/local/hadoop-2.6.0-cdh5.7.0/logs/mapred-root-historyserver-localhost.out[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]#
检查进程:
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# jps2945 DataNode12946 JobHistoryServer3124 SecondaryNameNode12569 NodeManager13001 Jps2812 NameNode12463 ResourceManager[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]#
然后执行一个案例测试一下:
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce]# hadoop jar ./hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 3 4
任务执行成功后,这时候访问http://192.168.77.130:19888
就可以进入到JobHistory的web页面了:
能够正常访问就代表配置已经成功了,现在所有任务的执行日志都可以在这里进行查看,有利于我们日常开发中的排错,而且ui界面操作起来也要方便一些。
转载于:https://blog.51cto.com/zero01/2093445