Hadoop实现Secondary Sort

2019-03-28 13:49|来源: 网络

Hadoop中每个reduce的输入的key都是有序的,而value则是无序的。而且同一个job运行多次,由于map完成顺序不同,reduce收到的value顺序是不固定的。那如何才能实现reduce收到有序的value呢?这就需要Secondary Sort。

Secondary Sort要解决的问题:reduce收到的value有序。

这里举一个场景,来说明Secondary Sort是如何实现的。假设我们有若干公司若干部门的人数,数据样例如下:

公司名   部门的人数

Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3

我们想知道每个公司的最大部门(人数最多)的人数。即希望先按公司名group,然后对group内的人数降序排列,最后取每个group的第一个即可。

由于reduce收到的value是无序的,所以要对value进行排序,首先需要将value封装到key里面。即需要自定义key的类型,代码如下:

  1. import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.io.WritableComparable;  
  8.   
  9. public class MyKey implements WritableComparable<MyKey> {  
  10.   public final Text first;  
  11.   public final IntWritable second;  
  12.   
  13.   public MyKey() {  
  14.     first = new Text();  
  15.     second = new IntWritable();  
  16.   }  
  17.   
  18.   public MyKey(Text first, IntWritable second) {  
  19.     this.first = first;  
  20.     this.second = second;  
  21.   }  
  22.   
  23.   @Override  
  24.   public void write(DataOutput out) throws IOException {  
  25.     first.write(out);  
  26.     second.write(out);  
  27.   }  
  28.   
  29.   @Override  
  30.   public void readFields(DataInput in) throws IOException {  
  31.     first.readFields(in);  
  32.     second.readFields(in);  
  33.   }  
  34.   
  35.   @Override  
  36.   public String toString() {  
  37.     return first + "\t" + second;  
  38.   }  
  39.   
  40.   @Override  
  41.   public int compareTo(MyKey tp) {  
  42.     int cmp = first.compareTo(tp.first);  
  43.     if (cmp != 0) {  
  44.       return cmp;  
  45.     }  
  46.     return -second.compareTo(tp.second);  
  47.   }  
  48. }

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关问答

更多
  • 我建议你使用新的API 此示例基于新API public class ChainJobs extends Configured implements Tool { private static final String OUTPUT_PATH = "intermediate_output"; @Override public int run(String[] args) throws Exception { /* * Job 1 */ Configuration conf = ...
  • 如果您使用的是较旧的API( mapred.* ),请在作业conf中设置OutputKeyComparatorClass: jobConf.setOutputKeyComparatorClass(ReverseComparator.class); ReverseComparator可以是这样的: static class ReverseComparator extends WritableComparator { private static final Text.Comparator ...
  • 您对二级排序的理解是正确的。 在回答给定的2个场景之前,我想告诉你在Reducer的reduce()方法之前发生了什么。 每个reducer都将其相关的分区结果从reducer磁盘中的所有映射器和存储器复制为多个溢出文件。 后台线程合并所有这些溢出文件并创建单个排序文件。 最终sinlge排序文件中的记录首先按自然键排序。 然后,如果已配置,则每个键的记录在内部按分组键(第二个排序)排序。 因此,决定何时使用给定的2个场景的决定取决于给定键的单个排序文件中有多少reords。 reduce方法通过reduc ...
  • 比如,你的密钥是(Attribute1, Attribute2) 。 现在您可以使用Sort Comparator,首先按Attribute1排序,然后按Attribute2排序。 例如, Key = (2008,32) // year, temperature 现在,如果您想按年份然后按温度排序,可以使用Sort Comparator,如下所示: public static class KeyComparator extends WritableComparator { protected Ke ...
  • 更新 AWS刚刚宣布了亚马逊DynamoDB备受期待的全球二级索引的普遍可用性,这些索引正在解决下面进一步讨论的本地二级索引的局限性: 您现在可以使用项目主键以外的属性创建索引并执行查找。 [...] 现在,您可以在创建表时创建最多五个全局二级索引,每个表都引用一个哈希键或一个哈希键和一个范围键。 您还可以创建最多五个本地二级索引,并且可以选择将表的一些或全部属性投影到每个表的索引中。 有关这两种型号之间选择的更多详细信息,请参阅博客文章。 更正 正如vartec正确指出的那样 ,我已经超越了自己,在当地二 ...
  • 你可以做类似下面的事情。 currGrid.on('dgrid-sort', function (event) { event.preventDefault(); var sortSet = []; sortSet.push(event.sort[0]); sortSet.push({property: "scheduled"}); currGrid.set('sort', function (a, b) { var aValue, bValue, r ...
  • 正如Шах所述,使用一个比较器。 比较器在等于时返回0。 首先比较标题,如果它们相等,则进行时间比较。 我没有运行代码,但它应该是这样的: private static final Comparator TASKHOLDERCOMPARATOR_DUEDATE = new Comparator() { @Override public int compare(ViewHolder a, ViewHolder b) { int comparison = ...
  • 如果你需要基本上模拟 select year, day, count(*) as totalPerDay from DATA group by year, day 比你不需要二次排序。 但是,如果你需要生产像CUBE这样的东西,你需要在一个MR工作中计算每年的总数和每周的总数,那么就需要进行二次分类。 If you need to basically simulate select year, day, count(*) as totalPerDay from DATA group by year, da ...
  • 目前,手动或使用工具的静态代码审查似乎很有效。 我相信我违反了规则:当重写compareTo() ,不要忘记重写equals()和hashCode() 。 如果修复这个解决了问题,我会保持每个人的发布。 At the moment, it appears that static code review either manually or using tools works good. I believe I broke the rule: when overriding compareTo(), don' ...
  • 当您在产品集合上调用setOrder()时,它会检查给定属性是否为price ,如果是,则使用addAttributeToSort 。 这是因为按价格排序需要一些比通用sortOrder方法更具体的代码。 但是当您传递数组时,测试失败并使用公共代码,因此您可以尝试将两个setOrder()调用分开: if ($this->getCurrentOrder()) { $this->_collection->setOrder($this->getCurrentOrder(), $this->getCurr ...