MapReduce的一对多连接操作

2019-03-28 12:54|来源: 网络

问题描述:
一个trade table表
product1"trade1
product2"trade2
product3"trade3
一个pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6

建立两个表之间的连接,该两表是一对多关系的
如下:
trade1pay1
trade1pay4
trade2pay2
...

思路:

为了将两个表整合到一起,由于有相同的第一列,且第一个表与第二个表是一对多关系的。
这里依然采用分组,以及组内排序,只要保证一方最先到达reduce端,则就可以进行迭代处理了。
为了保证第一个表先到达reduce端,可以为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别代表第一个表和第二个表,只要按照组内升序排列即可。

具体代码:

自定义组合键策略

package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
    //product1 0/1
    private String firstKey;//product1
    private int secondKey;//0,1;0代表是trade表,1代表是pay表
    //只需要保证trade表在pay表前面就行,则只需要对组顺序排列
                                                           
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
                                                           
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextIntPair tip=(TextIntPair)o;
        return this.getFirstKey().compareTo(tip.getFirstKey());
    }
}

分组策略

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
    protected TextComparator() {
        super(TextIntPair.class,true);//注册比较器
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair tip1=(TextIntPair)a;
        TextIntPair tip2=(TextIntPair)b;
        return tip1.getFirstKey().compareTo(tip2.getFirstKey());
    }
}

组内排序策略:目的是保证第一个表比第二个表先到达
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextIntPair.class,true);
    }
    //这里可以进行排序的方式管理
    //必须保证是同一个分组的
    //a与b进行比较
    //如果a在前b在后,则会产生升序
    //如果a在后b在前,则会产生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair ti1=(TextIntPair)a;
        TextIntPair ti2=(TextIntPair)b;
        //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
          return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
          return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
    }
                                     
}

分区策略:

package whut.onetomany;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionByText extends Partitioner<TextIntPair, Text> {
    @Override
    public int getPartition(TextIntPair key, Text value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

相关阅读:

Hadoop权威指南(中文版-带目录索引)PDF http://www.linuxidc.com/Linux/2013-05/84948.htm
Hadoop权威指南(中文第2版)PDF http://www.linuxidc.com/Linux/2012-07/65972.htm
采用MapReduce与Hadoop进行大数据分析  http://www.linuxidc.com/Linux/2013-07/87312.htm

相关问答

更多
  • 都可以,简单的直接用txt打开java文件,写好后打包成class文件,就可以运行了。你看他原来在哪里放class文件的,你就放在那里
  • 先看一个标准的hbase作为数据读取源和输出目标的样例: Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class ...
  • 本书Hadoop:权威指南是一个很好的开始。 介绍性的章节应该对你非常有用,以确定MapReduce的用处以及何时使用它。 更高级的章节比字数有更多更实际的例子。 如果你想更深入地研究,你可能想用MapReduce来检查数据密集型文本处理 。 这肯定有很多“真实世界”用例,但听起来你并不感兴趣做文本处理。 举个例子,要实现的主要内容是: 映射阶段主要用于解析,转换数据和过滤数据。 考虑记录,无共享的方法来记录处理。 在字数统计中,这是解析该行并分离出单词。 缩小阶段全部关于聚合:计数,平均,最小/最大等。在 ...
  • 有关mongodb锁定的详细信息,请参阅并发页面。 对于您的情况,map-reduce命令在相关集合运行时对其进行读写锁定。 map-reduce命令的部分可以是并发的,但在一般情况下,它在运行时被锁定。 See the concurrency page for details on mongodb locking. For your case, the map-reduce command takes a read and write lock for the relevant collections w ...
  • 找到解决方案 看到这个后纱线显示数据节点id / name为localhost Call From localhost.localdomain/127.0.0.1 to localhost.localdomain:56148 failed on connection exception: java.net.ConnectException: Connection refused; master和slave都在/ etc / hostname中拥有localhost.localdomain的主机名。 我将s ...
  • '默认'加入将是洗牌加入,又名。 作为共同连接。 请参阅JoinOperator.java 。 它依靠M / R shuffle来分割数据,并在Reduce端完成连接。 正如洗牌过程中的数据大小复制一样,速度很慢。 更好的选择是MapJoin,参见MapJoinOpertator.java 。 如果你只有一个大表和一个或多个小表加入(例如,典型的星型模式),这将起作用。 首先扫描小表,然后构建散列表并将其上载到HDFS缓存中,然后启动M / R作业,只需要拆分一个表(大表)即可。 比shuffle join ...
  • 第一步 - 通过此链接了解Hbase表如何成为映射器的源 : Hbase表作为Mapper源 下一步针对您的具体问题:每个表将有2个Mapper(即2个输入)。 您可以根据表中的值进行缩减。 您可能需要一个复杂的键或值来指定哪个表值来自Reducer。 在reducer中,如果值/键来自两个表,即两个映射器,那么你可以使用Hbase API(上面的链接解释了它的机器人)来写入表。 First step - go through this link to understand how Hbase Table ...
  • 回答你的问题: 是的,我在自己的MapReduce管道中使用operation.db.Put ,并且ndb模型很好。 不,缓存似乎不会干扰数据库操作。 不, db和ndb是一样的。 这可能是由于最终的一致性。 由于您使用MapReduce迭代实体,因此您可能不使用祖先查询。 因此,您无法确保立即看到您的实体被删除。 可能还有其他因素。 见下文。 MapReduce非常适合批处理,因此您可以走在正确的道路上。 您遇到的似乎没有被删除的实体的问题可能是由于多种原因造成的。 以下是一些: 最终的一致性 - 正如我 ...
  • cd ~/riak erl -name zed@127.0.0.1 -setcookie riak -pa apps/riak/ebin 在shell中: # connect to the server > {ok, Client} = riak:client_connect('riak@127.0.0.1'). {ok,{riak_client,'riak@127.0.0.1',<<6,201,208,64>>}} # create and insert objects > Client:put(ri ...
  • 你的问题有点宽泛,但我仍会尝试回答。 Hadoop为任何算法执行磁盘读/写操作是因为Hadoop执行面向磁盘的处理并且它是基于这个原则构建的。 这也是为什么开发spark,将计算从磁盘移动到内存以便它可以减少面向磁盘的计算的延迟开销的原因之一。 现在,对于每个MapReduce迭代,从/到磁盘的读/写操作有助于系统的健壮性和可靠性。考虑一个最简单的例子,工作节点有2个容器,这意味着两个独立的JVM将在同一台机器上运行,并且它们将访问该节点上可用的相同数据源。 因此,如果Hadoop在每次更改时都不会在磁盘上 ...