Hadoop中的排序器/组合器/合并器

2019-03-28 14:14|来源: 网络

目前,海量数据处理主要存在二个问题:大规模计算(cpu+mem)、海量数据存储(disk),而Hadoop被专门设计用来针对海量数据的处理,它通过分布式文件系统解决海量数据的存储问题,组织成千上万个计算节点来共同完成一个任务解决了大规模计算问题。Hadoop的核心是MapReduce,而不是分布式文件系统HDFS,这是因为MapRduce所依赖的存储系统并不依赖于任何一个文件系统,甚至是分布式文件系统。MapReduce设计的核心思想map-reduce计算模型,即将任何作业的处理分为两个阶段——map阶段和reduce阶段,在每一个阶段都由若干个机器节点(普通PC)共同协作完成,每一个机器节点只负责整个任务的一部分,这样做就使的任务的处理时间随着工作节点数量的增加而呈线性减少。这中间在很多情况下会存在这样的问题,就是单个计算节点往往无法一次性就把它所需要的数据加载进内存,也就是我们通常所说的内存瓶颈。解决内存瓶颈问题一般都是采用“分割-合并”的策略,而Hadoop在内部正是通过排序器、组合器、合并器来实现这一策略的。可以这么说,map-reduce计算模型是Hadoop解决海量数据处理的战略布局, 排序器/组合器/合并器是Hadoop解决计算节点性能的战术实现。

     在本文,我将给大家详细的阐述Hadoop的大牛们是如何来设计key-value排序器、组合器、合并器的。

一、排序器(Sorter)

         对于Hadoop中排序器的设计,我不得不得先向Hadoop的设计大牛们深深的鞠上一躬,它牛B了,太精彩了。他们将排序问题抽象成了一个框架,或者说是一个排序模型:


         上面的排序模型将排序问题描述成这样的一个过程:排序器不断地对待排序的记录集进行某两条记录的比较与交换。也就是说,任何一种排序算法只对带排序的记录集进行两种基本操作:比较两条记录的大小、交换两条记录在带排序记录集中的位置,这就是Hadoop大牛们对排序问题的精简定义,这样的定义来源于他们对问题在最本质上的了解。我不得不说只有对问题有了最深入、最本质的了解之后才能设计出最抽象、最准确的解决方案,对已实现的解决方法的不断重构只能治标不能治本。在Sorter中实现某种排序算法,在Sortable中实现某种可排序的数据集,这样使得排序算法与待排序的数据集分开,排序算法永远也不知道也不需要知道它正在对哪个数据类型的数据集进行排序。而Hadoop在其内部这是这样设计的。

         Hadoop关于可排序的数据集定义了一个抽象接口IndexedSortable,也就是说任何能够排序的数据集必须要实现两个方法,一是能够比较它的数据集中任意两项的大小,二是能够交换它的数据集中任意两项的位置;排序算法的实现定义了一个抽象接口IndexedSorter,它定义了两个方法,其中的一个方法还可以报告当前排序的完成进度。


二、组合器(Combiner)

         Hadoop在其内部利用组合器来局部合并具有相同key的key-value记录,组合器的设计大致同排序器的设计一样,也是将合并操作和待合并的数据分开,这个组合器框架设计如下:

          Hadoop中的合并器一般合并的key-value数据来自与内存,然后把合并后的结果按照一定的组织格式写入磁盘文件作为中间数据,待最后汇总合并,我们把每一次合并后的这种中间结果看做一个段。在Hadoop中是这样实现这个组合框架的:

     

        Hadoop中的组合器的实现主要用来处理key-value类型的数据,它先把带合并的所有的key-value结合按照key值来聚合,这样就使得相同key值的key-value记录被保存到同一个迭代器(Iterator)中,然后把一个个迭代器结构组合器来处理,最后每一迭代器的处理结果被交给了收集器(Collector)来保存。


三、合并器(Merger)

        这里的合并器(Merger)不同于上面介绍的组合器(Combiner),它主要是正对上面组合器生成的最后结果进行合并也就是合并所有的中间段,这个框架在Hadoop内部是这样设计的:

     Hadoop中合并器(Merger)的设计类似于它的组合器(Combiner),唯一不同的就是这个聚合迭代器的来源略有不同,合并器中的每一个迭代器来自于所有的中间段中属于相同类的key-value,所以对每一中间段,合并器都生成了一个对应的段读取器来读取段(SegmentReader)中的key-value值。主要看一下与段读取器(SegmentReader)相关的实现类:


四、总结

       在本文,我系统的介绍了Hadoop对排序器/组合器/合并器的设计,Hadoop对它们的设计可以说是站在一个好高的层次,因此我们就可以把这些框架移植到其它引用场景下,甚至无需修改任何代码。

相关问答

更多
  • 我使用过之前没有工作的Eclipse Indego 3.7版本,所以我将Eclipse版本降级到3.3。 当我用Google搜索时,我发现Eclipse最新版本中的Hadoop插件发生了一些变化,这些版本在3.3之后发布 I have used Eclipse Indego 3.7 version which was not working previously so i degraded the Eclipse version to 3.3. As i googled i came to know tha ...
  • 执行全局排序的一种方法是使用定制分区程序并为您的reducer进行范围分区。 为了这个工作,你必须知道你的mapper输出键的范围。 您可以将您的密钥范围分成n个桶,其中n是减速器的数量。 根据密钥映射到的存储区,映射器输出会被路由到特定的还原器。 每个减速器的输出都被排序。 由于范围分区,所有减速机输出的集合被全局排序。 你所要做的就是按照与文件名中的5位数相同的顺序来取得Reducer输出文件。 需要注意的一件事是密钥分发中的偏差,这将导致群集中减速器的负载不均匀。 如果您有分配信息,即密钥的直方图,则 ...
  • 它们实现了部分相同的目的,但聚合器更通用,可用于组合器不能的情况。 所以这是肯定的,聚合器不会自动用作组合器。 如果您希望将它用作组合器,则必须指定它。 引用来自级联,“ 组合器仅限于关联和交换功能,如'sum'和'max'。并且为了工作,必须对Map任务发出的值进行序列化,排序(反序列化和比较),再次反序列化经营 “ They fulfill partly the same purpose but the aggregator is more generic and can be used in case ...
  • 比如,你的密钥是(Attribute1, Attribute2) 。 现在您可以使用Sort Comparator,首先按Attribute1排序,然后按Attribute2排序。 例如, Key = (2008,32) // year, temperature 现在,如果您想按年份然后按温度排序,可以使用Sort Comparator,如下所示: public static class KeyComparator extends WritableComparator { protected Ke ...
  • 这里描述的JobTracker有一个API,它为您提供有关群集本身的大量信息以及所有作业的详细信息。 特别是,如果您知道作业ID并且想要查找每个单独的地图的指标并减少任务,则可以调用getMapTaskReports ,它将返回此处详述的TaskReport实例,该实例可让您访问getFinishTime或getStartTime等方法。 例如: TaskReport[] maps = jobtracker.getMapTaskReports("your_job_id"); for (TaskReport ...
  • 你不能用当前的实现。 但是,人们已经“破解”了Hadoop代码以执行您想要执行的操作。 在MapReduce模型中,您需要等待所有映射器完成,因为密钥需要进行分组和排序; 另外,您可能会运行一些推测性映射器,但您还不知道哪个重复的映射器将首先完成。 但是,正如“打破MapReduce Stage Barrier”一文所指出的那样,对于某些应用程序来说,不等待映射器的所有输出都是有意义的。 如果你想实现这种行为(最有可能用于研究目的),那么你应该看一下实现ShuffleConsumerPlugin的org.a ...
  • Combiner应该只是一个Reducer ,因此实现了Reducer接口(没有Combiner接口)。 将组合步骤视为Mapper和Reducer之间的一种中间减少步骤。 以字数为例。 来自雅虎的教程 : 字数是组合器有用的主要示例。 清单1--3中的字数统计程序为它看到的每个单词的每个实例发出一个(字,1)对。 因此,如果同一文档包含3次单词“cat”,则该对(“cat”,1)将发出三次; 所有这些都被发送到Reducer。 通过使用组合器,可以将它们压缩成单个(“cat”,3)对以发送到Reducer ...
  • 两个映射器都无法同时读取同一文件。 解决方案(解决方法):创建输入文件的副本(在这种情况下,让重复的rec文件为rec1)。 然后使用rec和mapper2使用rec1提供mapper1。 两个映射器都是并行执行的,所以你不必担心reducer输出,因为两个映射器输出都将被洗牌,这样两个文件中的相同键都会转到同一个reducer。 所以输出就是你想要的。 希望这有助于面临类似问题的其他人。 Both mappers can't read the same file at the same time. Sol ...
  • 我认为之前关于SO的回答可以回答你的问题,只需稍加改动: 是否可以读取MongoDB数据,使用Hadoop处理它,并将其输出到RDBS(MySQL)? 主要区别在于您将OutputFormatClass设置为: job.setOutputFormatClass( SequenceFileOutputFormat.class ); 您还需要在要保存数据的HDFS上设置输出路径。 请参阅他们的WordCount示例以获取完整的代码示例,但使用上面的输出格式而不是MongoOutputFormat。 I thi ...
  • 目前尚不清楚您使用的是什么Python库,但假设您使用PySpark,您可以在客户端机器上复制或配置HADOOP_CONF_DIR ,并且可以与任何外部Hadoop系统进行通信。 至少,您需要配置core-site.xml以与HDFS和hive-site.xml进行通信以与Hive进行通信。 如果您使用的是PyHive库 ,则只需连接到user@hiveserver2:1000 It's not clear what Python library you are using, but assuming Py ...