知识点
相关文章
更多最近更新
更多Hadoop 之 Secondary Sort介绍
2019-03-28 14:12|来源: 网络
我们知道,在reduce之前,MP框架会对收到的<K,V>对按K进行排序,而对于一个特定的K来说,它的List<V>是没有被排过序的,就是说这些V是无序的,因为它们来自不同的Map端,而且很多应用也不依赖于K所对应的list<V>的顺序,但是有一些应用就要就要依赖于相同K的V的顺序,而且还要把他们聚合在一起,下面会提出这样一个问题,是参考Hadoop The Defiinitive Guide的第八章 (下载见 http://www.linuxidc.com/Linux/2012-01/51182.htm )。
1. 问题的提出
对于如下数据,我们要计算出每一年的最高温度值:- (1900,34)
- (1900,32)
- ....
- (1950, 0)
- (1950, 22)
- (1950, −11)
- (1949, 111)
- (1949, 78)
计算结果可能如下:
- 1901 317
- 1902 244
- 1903 289
- 1904 256
- ...
- 1949 111
2. Secondary Sort
Secondary Sort 实际上就是一种对Value进行二次排序,然后按key的特定部分进行聚合的方法,这里用到了一个组合Key的概念,就是把K与要排序的Value组合在一起,生成一个新的Key值,上面的例子中,新的组合key为<1900,32>,也就是<年份,温度>的组合,(1900, 35°C),(1900, 34°C),这样组合以后,生成一个新的key,但是这样组合以后,它们会被切分到不同的Reduce上,所以我们这里要写一个根据新组合的key的第一个参数(年份)来进行相应的partitioner,在JobConf中可以使用setPartitionerClass来进行设置,这样可以解决相同年份的key会被聚合在同一个Reduce上,但是还没有解析在同一个Reduce上,把部分key相同的记录聚合(group)在一起,所以这里我们要设置一个group的比较器,这样就可以把相同年份的记录聚合在一起,但对于相同key(这里是指key中第一个参数相同)的排序问题,我们要使用一个KeyComparator比较器来做,就是在group中对key进行二次排序,在上面例子中就是按key中第二个参数温度来降序排序,这里要注意的是这里的输入是key,而不是value,这就是我们为什么把value组合在key一起的原因,而写这个比较方法的时候,还要注意一定要符合Group方法的原因,如果group是按key的第一个参数来得,那这里key的比较就要在第一个参数相同的情况下,才能会第二个参数(value)进行比较,我想这里解释了为什么这种排序叫Secondary Sort的原因吧,在上面的例子中,key的比较是先比较第一个参数(年份),如果第一个参数相同,再比较第二个参数(温度),按第二个参数降序排列。所以一般要使用Secondary Sort,在JobConf要配置这三个参数
- setPartitionerClass // 这个是用来设置key的切分,上面例子中是按key中的第一个参数来切分
- setOutputValueGroupingComparator // 这里设置group,就是按key的哪一个参数进行聚合,上面的例子中也是按第一个参数年份进行聚合
- setOutputKeyComparatorClass // 这个是设置key的比较器,设置聚合的key的一个二次排序方法
3. 代码分析
- Example 8-9. Application to find the maximum temperature by sorting temperatures in the key
- public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
- // Map任务
- static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {
- private NcdcRecordParser parser = new NcdcRecordParser();
- public void map(LongWritable key, Text value,
- OutputCollector<IntPair, NullWritable> output, Reporter reporter)
- throws IOException {
- parser.parse(value); // 解析输入的文本
- if (parser.isValidTemperature()) {
- // 这里把年份与温度组合成一个key,value为空
- output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get());
- }
- }
- }
- // Reduce任务
- static class MaxTemperatureReducer extends MapReduceBase
- implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {
- public void reduce(IntPair key, Iterator<NullWritable> values,
- OutputCollector<IntPair, NullWritable> output, Reporter reporter)
- throws IOException {
- // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列
- // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值
- output.collect(key, NullWritable.get());
- }
- }
- // 切分器,这里是按年份* 127 % reduceNum来进行切分的
- public static class FirstPartitioner
- implements Partitioner<IntPair, NullWritable> {
- @Override
- public void configure(JobConf job) {}
- @Override
- public int getPartition(IntPair key, NullWritable value, int numPartitions) {
- return Math.abs(key.getFirst() * 127) % numPartitions;
- }
- }
- // 聚合key的一个比较器
- public static class KeyComparator extends WritableComparator {
- protected KeyComparator() {
- super(IntPair.class, true);
- }
- @Override
- public int compare(WritableComparable w1, WritableComparable w2) {
- IntPair ip1 = (IntPair) w1;
- IntPair ip2 = (IntPair) w2;
- // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
- // 这里是先比较年份,再比较温度,按温度降序排序
- int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
- if (cmp != 0) {
- return cmp;
- }
- return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
- }
- }
- // 设置聚合比较器
- public static class GroupComparator extends WritableComparator {
- protected GroupComparator() {
- super(IntPair.class, true);
- }
- @Override
- public int compare(WritableComparable w1, WritableComparable w2) {
- IntPair ip1 = (IntPair) w1;
- IntPair ip2 = (IntPair) w2;
- // 这里是按key的第一个参数来聚合,就是年份
- return IntPair.compare(ip1.getFirst(), ip2.getFirst());
- }
- }
- @Override
- public int run(String[] args) throws IOException {
- JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
- if (conf == null) {
- return -1;
- }
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setPartitionerClass(FirstPartitioner.class); // 设置切分器
- conf.setOutputKeyComparatorClass(KeyComparator.class); // 设置key的比较器
- conf.setOutputValueGroupingComparator(GroupComparator.class); // 设置聚合比较器
- conf.setReducerClass(MaxTemperatureReducer.class);
- conf.setOutputKeyClass(IntPair.class); // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.
- conf.setOutputValueClass(NullWritable.class); // 输出的value为NULL,因为这里的实际value已经组合到了key中
- JobClient.runJob(conf);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
- System.exit(exitCode);
- }
- }
- % hadoop jar job.jar MaxTemperatureUsingSecondarySort input/ncdc/all \
- > output-secondarysort
- % hadoop fs -cat output-secondarysort/part-* | sort | head
- 1901 317
- 1902 244
- 1903 289
- 1904 256
- 1905 283
- 1906 294
- 1907 283
- 1908 289
相关问答
更多-
Hadoop:安装问题(Hadoop: installation problems)[2022-02-15]
使用bash而不是sh来调用脚本。 这解决了我的问题。 Use bash and not sh to invoke the scripts. That solved my problem. -
您需要一个Zookeeper集群,但是,您可以添加一个名称节点来启用高可用性 You need a Zookeeper cluster, but yes, you can add a namenode to enable High Availability
-
TLDR :你只需要彻底移除你的WikiComparator ,而不是完全调用job.setGroupingComparatorClass 。 说明 :组比较器用于比较地图输出键,而不是地图输出值。 您的地图输出键是Text对象,值是WikiWritable对象。 这意味着传递给比较器进行反序列化的字节代表序列化的Text对象。 但是, WikiComparator使用反射来创建WikiWritable对象(如其构造函数中所述),然后尝试使用WikiWritable.readFields方法反序列化Text ...
-
HADOOP_SECONDARYNAMENODE_OPTS用于为辅助namenode java进程设置命令行属性。 如下 HADOOP_SECONDARYNAMENODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS" 要启动辅助名称节点,您需要使用namenode详细信息维护core-site.xml,然后启动辅助名称节点。 如果您仍然遇到问题,请更新日志和配置文件夹 HADOOP_SECONDARYNAMENOD ...
-
您对二级排序的理解是正确的。 在回答给定的2个场景之前,我想告诉你在Reducer的reduce()方法之前发生了什么。 每个reducer都将其相关的分区结果从reducer磁盘中的所有映射器和存储器复制为多个溢出文件。 后台线程合并所有这些溢出文件并创建单个排序文件。 最终sinlge排序文件中的记录首先按自然键排序。 然后,如果已配置,则每个键的记录在内部按分组键(第二个排序)排序。 因此,决定何时使用给定的2个场景的决定取决于给定键的单个排序文件中有多少reords。 reduce方法通过reduc ...
-
如果你需要基本上模拟 select year, day, count(*) as totalPerDay from DATA group by year, day 比你不需要二次排序。 但是,如果你需要生产像CUBE这样的东西,你需要在一个MR工作中计算每年的总数和每周的总数,那么就需要进行二次分类。 If you need to basically simulate select year, day, count(*) as totalPerDay from DATA group by year, da ...
-
目前,手动或使用工具的静态代码审查似乎很有效。 我相信我违反了规则:当重写compareTo() ,不要忘记重写equals()和hashCode() 。 如果修复这个解决了问题,我会保持每个人的发布。 At the moment, it appears that static code review either manually or using tools works good. I believe I broke the rule: when overriding compareTo(), don' ...
-
当你设置不。 通过numReducetasks减少器,它只是对框架的一个提示。 我不保证你只得到指定的号码。 减速器实际上取决于减号。 在地图阶段之后获得的分区。 而且基于没有。 分区你会得到没有。 减速器 分区基于密钥发生,默认分区程序是散列分区程序。 因此,基于散列函数对键进行散列并将其分组。 当您谈论如此小的数据时,所有密钥都会转到同一个分区,因为框架会尽最大努力使处理尽可能高效,并为这么小的数据创建多个分区将是一个过度的问题。 When you set no. of reducers through ...
-
在映射器中,您可以将userid作为键, timestamp和serviceId作为按timestamp排序的值发出(为了执行排序操作,我假设每个用户的所有行都可以放在主内存中)。 然后,MR框架将负责将每个用户的所有不同行发送到单个reducer,您可以轻松地在那里执行分析。 如果每个用户的行数太多(比如数百万),则可以将userId-serviceId作为键发出,在reduce阶段之后,每个user-service都有一个行文件,并且该服务花费的时间也是如此。 如果需要,可以使用getmerge加入所有 ...
-
在MapReduce中使用分区程序进行二级排序有什么意义?(What is the point of using a Partitioner for Secondary Sorting in MapReduce?)[2022-01-16]
问题几乎和答案一样好:),你上面提到的一切都是正确的,我想解释这个概念的另一种方式应该有所帮助。 所以让我试一试。 让我们假设我们的二级排序是在一个由姓氏和名字组成的复合键上。 使用复合键,现在让我们看一下二级排序机制 分区器和组比较器仅使用自然键 ,分区器使用它将具有相同自然键的所有记录引导到单个reducer。 这种分区发生在Map Map中,来自各种Map任务的数据由reducers接收,然后将它们分组 ,然后发送到reduce方法 。 这个分组是组比较器的结果,如果我们不指定自定义组比较器,那么Ha ...