Hadoop之combiner和partitioner

2019-03-28 14:04|来源: 网络

1. Combiner

通常,每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

我们以计算特定key对应值的平均值为例,展示一下combiner的用法:

class Mapper
    method Map(string t, integer r)
    Emit(string t, integer r)

class Combiner
    method Combine(string t, integers [r1, r2, . . .])
    sum ← 0
    cnt ← 0
    for all integer r ∈ integers [r1, r2, . . .] do
        sum ← sum + r
        cnt ← cnt + 1
    Emit(string t, pair (sum, cnt))  // Separate sum and count

class Reducer
    method Reduce(string t, pairs [(s1, c1), (s2, c2) . . .])
    sum ← 0
    cnt ← 0
    for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
        sum ← sum + s
        cnt ← cnt + c
    ravg ← sum/cnt
    Emit(string t, integer ravg)

乍一看应该没有问题,但是不幸的是,这个combiner是不正确的。因为框架要求,combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致。而上面的伪代码中,mapper的输出类型为<string, integer>,而combiner的输出类型为<string, pair<integer, integer>>,这样的话,combiner就不能正常工作。

改变的方式非常简单,把mapper的输出包装一下即可:

class Mapper
    method Map(string t, integer r)
        Emit(string t, pair (r, 1))

class Combiner
    method Combine(string t, pairs [(s1, c1), (s2, c2) . . .])
    sum ← 0
    cnt ← 0
    for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
        sum ← sum + s
        cnt ← cnt + c
    Emit(string t, pair (sum, cnt))

class Reducer
    method Reduce(string t, pairs [(s1, c1), (s2, c2) . . .])
    sum ← 0
    cnt ← 0
    for all pair (s, c) ∈ pairs [(s1, c1), (s2, c2) . . .] do
        sum ← sum + s
        cnt ← cnt + c
    ravg ← sum/cnt
    Emit(string t, integer ravg)


2. Partitioner

首先需要继承自Partitioner类(在0.19中为Partitioner接口),并重载它的getPartition方法:

[java]
  1. public static class CatPartitioner extends Partitioner<Text, Text> {  
  2.   
  3.     @Override  
  4.     public int getPartition(Text key, Text value, int numPartitions) {  
  5.         String[] parts = key.toString().split("-");  
  6.         if (parts.length == 2) {  
  7.             return Math.abs(parts[0].hashCode()) % numPartitions;  
  8.         }  
  9.         return Math.abs(key.toString().hashCode()) % numPartitions;  
  10.     }  
  11. }  

然后在job配置中设置Partitioner Class:

[java]
  1. job.setPartitionerClass(CatPartitioner.class); 

相关问答

更多
  • 1. 想使用Partitioner,首先需要知道这个东西是做什么的。 Partitioner partitions the key space. Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The ...
  • Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出。 Combiner是用reducer来定义的,多数的情况下Combiner和reduce处理的是同一种逻辑,所以job.setCombinerClass()的参数可以直接使用定义的reduce。 当然也可以单独去定义一个有别于reduce的Combiner,继承Reducer,写法基本上定义reduce一样。
  • (Partition)分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。 事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行p ...
  • combiner是要在maper和reduce端都运行的的汇总,所以他要和reduce一致,实现方法也要一致,这样说你可能还不明白,你需要了解mapreducer的洗牌机制
  • 直接回答你的问题是=> COMBINER 详细信息:组合器可以在地图阶段被视为微型减速器。 他们在映射器结果进一步分发之前对映射器结果执行本地减少。 一旦Combiner功能被执行,它就会被传递给Reducer以进行进一步的工作。 在哪里 当我们在减速器上工作时,分区器进入画面。 因此,分区器决定哪个还原器负责特定的密钥。 他们基本上使用映射器结果(如果使用组合器,然后使用组合器结果)并基于密钥将其发送给负责的Reducer。 为了更好的理解,你可以参考下面的图片,我从雅虎开发者教程(HAD)上找到了这个图 ...
  • 在执行映射之后,在每个节点上执行组合器。 然而,只有在总和的最后才需要除以5(不同课程的数量)。 因此,您只能在减速器的末端划分,而不能在合并器的末尾划分。 基本上你可以: 拆下合成器(但保留减速器) 定义一个与reducer完全相同但在末尾不分割的reducer The combiner is executed on each node after executing the map. Yet need to divide by 5 (the number of different courses) on ...
  • Combiner应该只是一个Reducer ,因此实现了Reducer接口(没有Combiner接口)。 将组合步骤视为Mapper和Reducer之间的一种中间减少步骤。 以字数为例。 来自雅虎的教程 : 字数是组合器有用的主要示例。 清单1--3中的字数统计程序为它看到的每个单词的每个实例发出一个(字,1)对。 因此,如果同一文档包含3次单词“cat”,则该对(“cat”,1)将发出三次; 所有这些都被发送到Reducer。 通过使用组合器,可以将它们压缩成单个(“cat”,3)对以发送到Reducer ...
  • 问题最终出现在自定义密钥(IntermediaryKey)的序列化/反序列化中。 正在阅读“useBothGUIDFlag”变量,与其本应相反。 在reducer中获取“mapred.task.partition”属性值有助于注意到已发生此交换。 具有相反“useBothGUIDFlag”值的键似乎将转到正确的reducer。 The problem ended up being in the serialization/deserialization of the custom key (Intermed ...
  • 1 /响应已在此部分中指定:“在每个分区中,后台线程按键执行内存中排序,如果有组合器功能,则在排序输出上运行。” 首先,分区是在内存中创建的,如果有自定义组合器,它将在内存中执行,结果将在最后溢出到磁盘。 2 /自定义组合器和自定义分区将在驱动程序类中指定时存在。 job.setCombinerClass(MyCombiner.class); job.setPartitionerClass(MyPartitioner.class); 如果没有指定自定义组合器,则不执行组合器。 如果没有指定自定义分区程序, ...
  • 一旦你打破了一点点,它就不像你想象的那么复杂。 taggedKey.getJoinKey().hashCode()只返回一个整数。 每个对象都有一个hashCode()函数,它只返回一个有希望对该对象本身唯一的数字。 您可以查看TaggedKey的源代码,看看它是如何工作的,但是您需要知道的是它根据对象的内容返回一个整数。 %运算符执行模数除法,即执行除法后返回余数的位置。 (8%3 = 2,15%7 = 1等)。 所以假设你有3个分区器(numPartitions = 3)。 每次使用3进行模数除法,无论 ...