Hadoop大数据技术开发实战
上QQ阅读APP看书,第一时间看更新

5.6 案例分析:二次排序

MapReduce在传递<key,value>对时默认按照key进行排序,而有时候除了key以外,还需要根据value或value中的某一个字段进行排序,基于这种需求进行的自定义排序称为“二次排序”。

例如有以下数据:

现需要对上述数据先按照第一字段进行升序排列,若第一字段相同,则按照第二字段进行降序排列,期望的输出结果如下:

1. 设计思路

由于MapReduce中主要是对key的比较和排序,因此可以将需要排序的两个字段组合成一个复合key,而value值不变,则组合后的<key,value>对形如<(key,value),value>。

在编程时可以自定义一个类MyKeyPair,该类中包含要排序的两个字段,然后将该类作为<key,value>对中的key(Hadoop中的任何类型都可以作为key),形如<MyKeyPair,value>,相当于自定义key的类型。由于所有的key是可序列化并且可比较的,因此自定义的key需要实现接口WritableComparable。

与按照一个字段排序相比,本次二次排序需要自定义的地方如下:

  •  自定义组合key类,需要实现WritableComparable接口。
  •  自定义分区类,按照第一个字段进行分区,需要继承Partitioner类。
  •  自定义分组类,按照第一个字段进行分组,需要继承WritableComparator类。
2. 编写程序

(1)自定义组合key类。

新建自定义组合key类MyKeyPair.java,该类需要实现Hadoop提供的org.apache.hadoop.io.WritableComparable接口,该接口继承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定义源码如下:

然后需要实现WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比较方法compareTo()。write()方法用于将数据写入输出流;readFields()方法用于从输入流读取数据;compareTo()方法用于将两个对象进行比较,以便能够进行排序。

自定义组合key类MyKeyPair.java的源码如下:

(2)自定义分区类。

新建自定义分区类MyPartitioner.java,该类需要继承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner类,并实现其中的抽象方法getPartition()。Partitioner类是一个抽象泛型类,用于控制对Map任务输出结果的分区,泛型的两个参数分别表示<key,value>对中key的类型和value的类型。Partitioner类的源码如下:

关于MapReduce的分区规则可参考本章5.1.3节的MapReduce工作原理,此处不再赘述。

自定义分区类MyPartitioner.java的源码如下:

上述代码继承Partitioner类的同时指定了<key,value>对中key的类型为MyKeyPair,value的类型为IntWritable。

(3)自定义分组类。

新建自定义分组类MyGroupComparator.java,该类需要继承Hadoop提供的org.apache.hadoop.io.WritableComparator类,并重写其中的compare()方法,以实现按照指定的字段进行分组。

自定义分组类MyGroupComparator.java的源码如下:

上述代码首先通过构造方法指定了<key,value>对中key的类型为MyKeyPair,由于MapReduce默认以<key,value>对中的key值进行分组,因此接下来重写了compare()方法,实现了按照MyKeyPair对象中的first字段进行对比,若值相等则会将当前<key,value>对分为一组。

(4)定义Mapper类。

新建Mapper类MyMapper.java,实现将输入的数据封装为<MyKeyPair, IntWritable>形式的<key,value>对进行输出,即输出的key的类型为MyKeyPair,输出的value的类型为IntWritable。

Mapper类MyMapper.java的源码如下:

(5)定义Reducer类。

新建Reducer类MyReducer.java,将接收到的分组后的<key,value-list>对循环进行输出。

Reducer类MyReducer.java的源码如下:

上述代码将MyKeyPair类型的key中的first字段值作为输出的key,输出的value从集合values中进行遍历。

(6)定义应用程序主类。

新建应用程序主类MySecondSortApp.java,在该类中需要指定自定义的分区类和分组类,同时需要显式设置Map任务输出的key和value的类型。

应用程序主类MySecondSortApp.java的源码如下:

上述代码解析如下:

❶ 设置map()方法输出的key和value的类型。若将此省略,则默认采用❷中设置的输出类型。也就是说,若map()方法和reduce()方法的输出类型一致,可以省略对map()方法输出类型的设置。若map()方法和reduce()方法实际的输出类型与此处的设置不匹配,则程序运行过程中将会报错。

在MapReduce程序运行的过程中会通过JobConf类获取map()方法的输出类型,获取map()方法输出key的类型的源码如下:

从上述源码可以看出,当没有设置map()方法的输出类型时,会调用getOutputKeyClass()方法使用reduce()方法的输出类型。

❸ 在执行MapReduce程序时,会首先从HDFS中读取数据块,然后按行拆分成<key,value>对,这个过程则是由TextInputFormat类完成的。TextInputFormat类继承了抽象类FileInputFormat<K,V>,而FileInputFormat<K, V>又继承了抽象类InputFormat<K, V>,抽象类InputFormat<K, V>中定义了两个方法:getSplits()和createRecordReader()。getSplits()方法负责将HDFS数据解析为InputSplit集合,createRecordReader()方法负责将一个InputSplit解析为一个<key,value>对记录。抽象类InputFormat<K, V>的源码如下:

3. 程序运行

程序的打包和执行参考前面的“单词计数”和“数据去重”案例,此处不再赘述。

执行完成后,查看执行结果,如图5-11所示。

图5-11 查看二次排序程序执行结果