使用Hadoop的datajoin包进行关系型join操作

2019-03-28 13:29|来源: 网络

datajoin包在Hadoop的contrib目录下,我们也可以在src下面看见其源码,它的源码很小,我建议大体看看以了解其原理。

利用datajoin进行join操作,在《Hadoop in action》里面已经讲的十分清楚,在这里只提及值得注意的几个地方。

  1. TaggedMapOutput的目的是标识数据,让我们知道哪个记录是从哪里来的。
  2. DataJoinMapperBase类中的generateInputTag在map任务开始时被调用(还没进行map函数),其目的是生成tag,并自动存储于DataJoinMapperBase类的inputTag中。 
  3. generateTaggedMapOutput()用于生存带标签的数据,这可以让我们知道数据的来源,在我们想进行left join /right join的时候很有用,同时对于过滤数据等操作也有帮助。
  4. datajoin在当前是用旧API写的,也就是说Mapper子类是实现Mapper接口而不是扩展Mapper虚类,但是MapperChain.addMapper虽然是在旧API目录下面,但是却只支持扩展虚类的方式,相信当你数据流比较长的时候这会给你带来麻烦,这个问题我没有解决,重写datajoin包可能是一种好的方式,但你也可以手动执行多个作业来间接达到目的。

下面是我自己想的一个练习,由于上述的第4点的限制,这其实是一个不完整的练习。

数据是《hadoop in action》的数据:

datajoin_customers文件 datajoin_orders文件
10001,Stephanie Leung,555-555-5555
10002,Edward Kim,123-456-7890
10003,Jose Madriz,281-330-8004
10004,David Stork,408-555-0000
102343,Posa Wu, 12387887989
10003,A,12.95,02-Jun-2008
10001,B,88.25,20-May-2008
10002,C,32.00,30-Nov-2007
10003,D,25.02,22-Jan-2009
21312,F,32.00,23-Jan-2010

题目:选出id为10,000到1000,000,000的用户数据进行right join(保留右边,左边为如果没有对应用户信息则设为NULL)。

说明:我们设想订单可以是匿名用户购买的(比如淘宝网),现在我希望知道这个id范围的一些订单信息。

  1. import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;  
  8. import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;  
  9. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  10. import org.apache.hadoop.fs.Path;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.io.Writable;  
  13. import org.apache.hadoop.mapred.FileInputFormat;  
  14. import org.apache.hadoop.mapred.FileOutputFormat;  
  15. import org.apache.hadoop.mapred.JobClient;  
  16. import org.apache.hadoop.mapred.JobConf;  
  17. import org.apache.hadoop.mapred.TextInputFormat;  
  18. import org.apache.hadoop.mapred.TextOutputFormat;  
  19.   
  20. import org.apache.hadoop.util.Tool;  
  21. import org.apache.hadoop.util.ToolRunner;  
  22.   
  23.   
  24. public class DataJoin extends Configured implements Tool{  
  25.   
  26.     public static class Mapper extends DataJoinMapperBase{  
  27.   
  28.         protected Text generateInputTag(String inputFile) {  
  29.             return new Text(inputFile);  
  30.         }  
  31.   
  32.         protected TaggedMapOutput generateTaggedMapOutput(Object value) {  
  33.             TaggedMapOutput ret = new MyTaggedWritable((Text)value);  
  34.             String ck = ((Text)value).toString().split(","2)[0];  
  35.             if(10000 > Long.valueOf(ck) || 1000000000 < Long.valueOf(ck)){  
  36.                 return null;  
  37.            }  
  38.             ret.setTag(new Text(this.inputTag));  
  39.               
  40.             return ret;  
  41.         }  
  42.         protected Text generateGroupKey(TaggedMapOutput aRecord) {  
  43.             return new Text(aRecord.getData().toString().split(",")[0]);  
  44.         }  
  45.     }  
  46.       
  47.       
  48.     public static class Reducer extends DataJoinReducerBase{  
  49.   
  50.         protected TaggedMapOutput combine(Object[] tags, Object[] values) {  
  51.             if(tags.length < 1)  
  52.                 return null;  
  53.             if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("customers")){  
  54.                 return null;  
  55.             }  
  56.               
  57.               
  58.             String retStr = "";  
  59.             if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("orders")){  
  60.                 retStr = "NULL,";  
  61.             }  
  62.             for(int i = 0; i < values.length; i++){  
  63.                 if(i >  0)  
  64.                     retStr += ",";  
  65.                 retStr +=((MyTaggedWritable)values[i]).getData().toString().split(",",2)[1];  
  66.             }  
  67.             TaggedMapOutput ret = new MyTaggedWritable(new Text(retStr));  
  68.             ret.setTag((Text)tags[0]);  
  69.             return ret;  
  70.         }  
  71.           
  72.     }  
  73.     public static class MyTaggedWritable extends TaggedMapOutput{  
  74.   
  75.         public Text data;  
  76.           
  77.         public MyTaggedWritable(){  
  78.             this.data = new Text("");  //必须有,否则反序列化时出错。   
  79.         }  
  80.           
  81.         public MyTaggedWritable(Text data){  
  82.             this.data = data;  
  83.         }  
  84.           
  85.         public void readFields(DataInput in) throws IOException {  
  86.             this.tag.readFields(in);  
  87.             this.data.readFields(in);  
  88.         }  
  89.   
  90.         public void write(DataOutput out) throws IOException {  
  91.             this.tag.write(out);  
  92.             this.data.write(out);  
  93.         }  
  94.   
  95.         public Writable getData() {  
  96.             return this.data;  
  97.         }  
  98.           
  99.     }  
  100.     public int run(String[] arg0) throws Exception {  
  101.         Configuration conf = getConf();  
  102.         JobConf job = new JobConf(conf, DataJoin.class);  
  103.         job.setJarByClass(DataJoin.class);  
  104.           
  105.         Path in = new Path(arg0[0]);  
  106.         Path out = new Path(arg0[1]);  
  107.         FileInputFormat.setInputPaths(job,in);  
  108.         FileOutputFormat.setOutputPath(job, out);  
  109.           
  110.         job.setJobName("DataJoin");  
  111.         job.setMapperClass(Mapper.class);  
  112.         job.setReducerClass(Reducer.class);  
  113.         job.setInputFormat(TextInputFormat.class);  
  114.         job.setOutputFormat(TextOutputFormat.class);  
  115.         job.setOutputKeyClass(Text.class);  
  116.         job.setMapOutputKeyClass(Text.class);  
  117.         job.setMapOutputValueClass(MyTaggedWritable.class);  
  118.         job.setOutputValueClass(Text.class);  
  119.         job.set("mapred.textoutputformat.separator"",");  
  120.         JobClient.runJob(job);  
  121.         return 0;  
  122.     }  
  123.       
  124.     public static void main(String args[]) throws Exception{  
  125.         int res = ToolRunner.run(new Configuration(), new DataJoin(),args);  
  126.         System.exit(res);  
  127.     }  
  128.   
  129. }  

在generateTaggedMapOutput中我们进行数据过滤,把10000到1000,000,000间的数据选出来,然后在combine进行left join,我们知道combine函数是决定联结方式的地方。

相关问答

更多
  • Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。 优势应该在于分布式架构比较相似能快速上手吧
  • cygwin是在windowns下模拟linux操作系统的,跟hadoop没有任何关系,自然也没有支持不支持hadoop之说。在线安装不上可能是网络原因
  • 使用sqoop进行定时的数据抽取工作,并存放到hive数据仓库中,使用hive的hql进行数据汇总。这个方案中可以使用hive on tez 或者hive on spark进行计算性能提速 可以试试。
  • 换个角度讲, 当C#在微软平台上成为主流开发语言的时候, python已经渐渐的成为了linux应用程序的主流开发语言之一了. 原因很简单, perl在淡出, ruby未发力, shell不够用, php不合适, java不解释. 记得Redhat 7的字符界面安装程序就是python写的, 那几乎是我第一次听说python的年代了.
  • Hadoop自身是否有记录不清楚,但Hadoop是部署在linux上的,可以通过linux的历史命令查看。 1、history 2、fc -l 可以用grep过滤,例如: history | grep 'hadoop' 或history | grep 'hdfs'
  • 网上下载hadoop压缩包(以1.0.1为例),解压后:src/mapred/org/apache/hadoop/mapreduce 和src/core/org/apache/hadoop/conf (fs、io、util)
  • 使用pig脚本的简单方法: PAGERANK = LOAD 'hdfs/pagerank/dataset/location' USING PigStorage(',') AS (page:chararray, rank:float); WORDS_TO_PAGES = LOAD 'hdfs/words/dataset/location' USING PigStorage(',') AS (word:chararray, pages:chararray); PAGES_MATCHING = ...
  • 您需要TaggedWritable的默认构造函数(Hadoop使用反射来创建此对象,并且需要默认构造函数(无args)。 您还有一个问题,即您的readFields方法,您在可写接口上调用data.readFields(in) - 但不知道实际的运行时类data 。 我建议您在输出数据对象本身之前写出数据类名,或者查看GenericWritable类(您需要扩展它以定义可以使用的可允许可写类的集合)。 所以你可以修改如下: public static class TaggedWritable extends ...
  • TaggedWritable类没有空构造函数,因此在应该读取序列化数据的reduce阶段,app会因为无法通过反射创建TaggedWritable键入键而TaggedWritable 。 您应该添加一个空构造函数。 您的地图阶段已成功完成,因为在地图阶段,您的映射器会TaggedWritable创建TaggedWritable类型的键。 This code solves the problem and gives the expected result. It is from here, public st ...
  • 地图端加入 在地图侧(片段复制)连接中,您在内存中保存一个数据集(例如哈希表),并在另一个数据集上连接,逐个记录。 在猪,你会写 edges_from_list = JOIN a_follows_b BY user_a_id, some_list BY user_id using 'replicated'; 注意小数据集在右侧 。 这非常有效,因为没有网络开销和最小的CPU需求。 减少加入 在reduce-side连接中,您使用hadoop的标准合并排序对连接键进行分组。