`
yo8237233
  • 浏览: 60896 次
  • 来自: 深圳
社区版块
存档分类
最新评论

MapReduce二次排序

 
阅读更多
     默认情况下,Map 输出的结果会对 Key 进行默认的排序,但是有时候需要对 Key 排序的同时再对 Value 进行排序,这时候就要用到二次排序了。下面让我们来介绍一下什么是二次排序。

二次排序原理
        我们把二次排序主要分为以下几个阶段。
Map 起始阶段
        在Map阶段,使用 job.setInputFormatClass() 定义的 InputFormat ,将输入的数据集分割成小数据块 split,同时 InputFormat 提供一个 RecordReader的实现。本课程中使用的是 TextInputFormat,它提供的 RecordReader 会将文本的行号作为 Key,这一行的文本作为 Value。这就是自定义 Mapper 的输入是 < LongWritable,Text> 的原因。 然后调用自定义 Mapper 的map方法,将一个个< LongWritable,Text>键值对输入给 Mapper 的 map方法。
Map 最后阶段
        在 Map 阶段的最后,会先调用 job.setPartitionerClass() 对这个 Mapper 的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用 job.setSortComparatorClass() 设置的 Key 比较函数类排序。 可以看到,这本身就是一个二次排序。如果没有通过 job.setSortComparatorClass() 设置 Key 比较函数类,则使用 Key 实现的 compareTo() 方法。我们既可以使用 IntPair 实现的 compareTo() 方法,也可以专门定义 Key 比较函数类。
Reduce 阶段
        在 Reduce 阶段,reduce() 方法接受所有映射到这个 Reduce 的 map 输出后,也是会调用 job.setSortComparatorClass()方法设置的 Key 比较函数类,对所有数据进行排序。然后开始构造一个 Key 对应的 Value 迭代器。 这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个 Key 相同,它们就属于同一组,它们的 Value 放在一个 Value 迭代器,而这个迭代器的 Key 使用属于同一个组的所有Key的第一个Key。最后就是进入 Reducer 的 reduce() 方法,reduce() 方法的输入是所有的 Key 和它的 Value 迭代器,同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。

接下来我们通过数据示例,可以很直观的了解二次排序的原理。

输入文件sort.txt(下载)内容为:

40  20
40  10
40  30
40  5
30  30
30  20
30  10
30  40
50  20
50  50
50  10
50  60
        输出文件的内容(从小到大排序)如下:

30  10
30  20
30  30
30  40
==============================
40  5
40  10
40  20
40  30
============================== 
50  10
50  20
50  50
50  60

二次排序的具体流程
        在 MapReduce 中,所有的 Key 是需要被比较和排序的,而且是二次,先根据 Partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类 IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。

代码实现
        Hadoop 的 example 包中自带了一个 MapReduce 的二次排序算法,下面这个示例对 example 包中的二次排序源码的改进。 我们按照以下几步完成二次排序:

        第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
/**
* 自己定义的key类应该实现WritableComparable接口
*/
public  class IntPair implements WritableComparable<IntPair>{
	int first;//第一个成员变量
	int second;//第二个成员变量
	public void set(int left, int right){
		first = left;
		second = right;
	}
	public int getFirst(){
		return first;
	}
	public int getSecond(){
		return second;
	}
	@Override
	//反序列化,从流中的二进制转换成IntPair
	public void readFields(DataInput in) throws IOException{
		first = in.readInt();
		second = in.readInt();
	}
	@Override
	//序列化,将IntPair转化成使用流传送的二进制
	public void write(DataOutput out) throws IOException{
		out.writeInt(first);
		out.writeInt(second);
	}
	@Override
	//key的比较
	public int compareTo(IntPair o)
	{
		// TODO Auto-generated method stub
		if (first != o.first){
			return first < o.first ? -1 : 1;
		}else if (second != o.second){
			return second < o.second ? -1 : 1;
		}else{
			return 0;
		}
	}
	
	@Override
	public int hashCode(){
		return first * 157 + second;
	}
	@Override
	public boolean equals(Object right){
		if (right == null)
			return false;
		if (this == right)
			return true;
		if (right instanceof IntPair){
			IntPair r = (IntPair) right;
			return r.first == first && r.second == second;
		}else{
			return false;
		}
	}
}


第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。

第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本次中没有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。

第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
/**
*继承WritableComparator
*/
public static class GroupingComparator extends WritableComparator{
        protected GroupingComparator(){
            super(IntPair.class, true);
        }
        @Override
        //Compare two WritableComparables.
        public int compare(WritableComparable w1, WritableComparable w2){
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
}


  第五步:编写 MapReduce 主程序实现二次排序。
public class SecondarySort{
    // 自定义map
    public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
        private final IntPair intkey = new IntPair();
        private final IntWritable intvalue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()){
                left = Integer.parseInt(tokenizer.nextToken());
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                intkey.set(left, right);
                intvalue.set(right);
                context.write(intkey, intvalue);
            }
        }
    }
    // 自定义reduce
    public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{
        private final Text left = new Text();      
        public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException{
            left.set(Integer.toString(key.getFirst()));
            for (IntWritable val : values){
                context.write(left, val);
            }
        }
    }
    /**
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();

        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径

        job.setMapperClass(Map.class);// Mapper
        job.setReducerClass(Reduce.class);// Reducer
        
        job.setPartitionerClass(FirstPartitioner.class);// 分区函数
        //job.setSortComparatorClass(KeyComparator.Class);//本课程并没有自定义SortComparator,而是使用IntPair自带的排序
        job.setGroupingComparatorClass(GroupingComparator.class);// 分组函数


        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
分享到:
评论

相关推荐

    mapreduce二次排序

    mapreduce二次排序,年份升序,按照年份聚合,气温降序

    MapReduce模型--二次排序

    完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...

    hadoop 二次排序 原理

    Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻

    MapReduce的小应用

    利用MapReduce框架实现了关于音乐播放网站的两个简单问题。主要解决了多个Map多个Reduce的连接问题,二次排序问题,关于Key降序排序的问题。

    mapreduce secondarysort

    mapreduce的二次排序,稍微有点难度,帮助你更好的理解它

    拓思爱诺大数据-第二次作业MapReduce编程

    四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数

    大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记)

    03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...

    java大数据作业_5Mapreduce、数据挖掘

    课后作业 1.请找出日志中的访问者ip,访问时间,来源...6.简述二次排序算法 有输入数据如下所示: 1 2 2 3 2 1 4 6 3 1 3 8 3 2 需要使用二次排序算法,得到如下处理结果: 1 2 2 1 2 3 3 1 3 2 3 8 4 6 请简述处理过程

    mapreduce高级特性2

    mr各种应用场景的例子,1.1 内存排序1.2 mr数据类型1.3 自定义mr数据类型1.4 使用自定义数据实现内存排序1.5 二次排序1.6 使用自定义mr数据类型实现二次排序1.7 内存排序找出每一组中的最大值1.8 排序找出每一组中的...

    大数据技术 Hadoop开发者第二期 MapReduce HDFS Hive Mahout HBase 共64页.pdf

    10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 - 12、Mahout Kmeans简介 .................................... - 57 -

    《数据算法Hadoop Spark大数据处理技巧》PDF 带目录!!

    目录 第1章二次排序:简介 19 第2章二次排序:详细示例 42 第3章 Top 10 列表 54 第4章左外连接 96 第5章反转排序 127 第6章移动平均 137 第7章购物篮分析 155 第8章共同好友 182 第9章使用MapReduce实现推荐引擎 ...

    Hadoop权威指南 第二版(中文版)

     二次排序  联接  map端联接  reduce端联接  边数据分布  利用JobConf来配置作业  分布式缓存  MapReduce库类 第9章 构建Hadoop集群  集群规范  网络拓扑  集群的构建和安装  安装Java  创建Hadoop用户...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 ...

    Hadoop实战(第2版)

    大数据模式4 处理大数据的MapReduce 模式4.1 Join4.1.1 Repartition Join技术点19 优化repartition join 4.1.2 Replicated Join 4.1.3 Semi-join技术点20 实现semi-join4.1.4 为你的数据挑选最优的...

    Hadoop权威指南(中文版)2015上传.rar

    二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户 安装Hadoop 测试安装 SSH...

    hadoop开发者文档

    10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 12、Mahout Kmeans简介 .................................... - 57 -

    大数据导论-6.1.4-熟悉大数据处理技术——大数据的处理模式.pptx

    一次MapReduce处理引擎的运行被称为MapReduce作业,它由映射(Map)和归约(Reduce)两部分任务组成,这两部分任务又被分为多个阶段。 一个作业 = 映射 + 归约 其中映射任务被分为映射(map)、合并(combine)和...

    代码之美(中文完整版).pdf

    5.6 版本4:第二次优化:避免重复验证 5.7 版本5:第三次优化:复杂度 O(1) 5.8 版本 6:第四次优化:缓存(Caching) 5.9 从故事中学到的 第6章 集成测试框架:脆弱之美 6.1. 三个类搞定一个验收测试框架 6.2. 框架...

    2017最新大数据架构师精英课程

    94_job二次排序5 t3 Z2 R- ]( a: s* c0 Z 95_从db输入数据进行mr计算: L. M4 I6 y, R2 l/ u/ L 96_输出数据到db中 97_NLineInputFormat& u( k1 T& z( O# P, S* y1 Y 98_KeyValueTextInputFormat* p$ O1 z- h, n" e( ...

Global site tag (gtag.js) - Google Analytics