关联规则二项集Hadoop实现

2019-03-28 13:20|来源: 网络

近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。

算法的思想还是参考上次的图片:

这里实现分为五个步骤:

  1. 针对原始输入计算每个项目出现的次数;
  2. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
  3. 针对原始输入的事务进行按frequence list file进行排序并剪枝;
  4. 生成二项集规则;
  5. 计算二项集规则出现的次数,并删除小于阈值的二项集规则;

第一步的实现:包括步骤1和步骤2,代码如下:

GetFlist.java:

  1. package org.fansy.date1108.fpgrowth.twodimension;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.util.ArrayList;
  6. import java.util.Comparator;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.PriorityQueue;
  10. import java.util.regex.Pattern;
  11. import org.apache.Hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.FSDataInputStream;
  13. import org.apache.hadoop.fs.FSDataOutputStream;
  14. import org.apache.hadoop.fs.FileSystem;
  15. import org.apache.hadoop.fs.Path;
  16. import org.apache.hadoop.io.IntWritable;
  17. import org.apache.hadoop.io.LongWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.mapreduce.Mapper;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. // the specific comparator
  25. class MyComparator implements Comparator<String>{
  26. private String splitter=",";
  27. public MyComparator(String splitter){
  28. this.splitter=splitter;
  29. }
  30. @Override
  31. publicint compare(String o1, String o2) {
  32. // TODO Auto-generated method stub
  33. String[] str1=o1.toString().split(splitter);
  34. String[] str2=o2.toString().split(splitter);
  35. int num1=Integer.parseInt(str1[1]);
  36. int num2=Integer.parseInt(str2[1]);
  37. if(num1>num2){
  38. return -1;
  39. }elseif(num1<num2){
  40. return1;
  41. }else{
  42. return str1[0].compareTo(str2[0]);
  43. }
  44. }
  45. }
  46. publicclass GetFList {
  47. /**
  48. * the program is based on the picture
  49. */
  50. // Mapper
  51. publicstaticclass MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{
  52. private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
  53. privatefinal IntWritable newvalue=new IntWritable(1);
  54. publicvoid map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
  55. String [] items=splitter.split(value.toString());
  56. for(String item:items){
  57. context.write(new Text(item), newvalue);
  58. }
  59. }
  60. }
  61. // Reducer
  62. publicstaticclass ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{
  63. publicvoid reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
  64. int temp=0;
  65. for(IntWritable v:value){
  66. temp+=v.get();
  67. }
  68. context.write(key, new IntWritable(temp));
  69. }
  70. }
  71. publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  72. // TODO Auto-generated method stub
  73. if(args.length!=3){
  74. System.out.println("Usage: <input><output><min_support>");
  75. System.exit(1);
  76. }
  77. String input=args[0];
  78. String output=args[1];
  79. int minSupport=0;
  80. try {
  81. minSupport=Integer.parseInt(args[2]);
  82. } catch (NumberFormatException e) {
  83. // TODO Auto-generated catch block
  84. minSupport=3;
  85. }
  86. Configuration conf=new Configuration();
  87. String temp=args[1]+"_temp";
  88. Job job=new Job(conf,"the get flist job");
  89. job.setJarByClass(GetFList.class);
  90. job.setMapperClass(MapperGF.class);
  91. job.setCombinerClass(ReducerGF.class);
  92. job.setReducerClass(ReducerGF.class);
  93. job.setOutputKeyClass(Text.class);
  94. job.setOutputValueClass(IntWritable.class);
  95. FileInputFormat.setInputPaths(job, new Path(input));
  96. FileOutputFormat.setOutputPath(job, new Path(temp));
  97. boolean succeed=job.waitForCompletion(true);
  98. if(succeed){
  99. // read the temp output and write the data to the final output
  100. List<String> list=readFList(temp+"/part-r-00000",minSupport);
  101. System.out.println("the frequence list has generated ... ");
  102. // generate the frequence file
  103. generateFList(list,output);
  104. System.out.println("the frequence file has generated ... ");
  105. }else{
  106. System.out.println("the job is failed");
  107. System.exit(1);
  108. }
  109. }
  110. // read the temp_output and return the frequence list
  111. publicstatic List<String> readFList(String input,int minSupport) throws IOException{
  112. // read the hdfs file
  113. Configuration conf=new Configuration();
  114. Path path=new Path(input);
  115. FileSystem fs=FileSystem.get(path.toUri(),conf);
  116. FSDataInputStream in1=fs.open(path);
  117. PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t"));
  118. InputStreamReader isr1=new InputStreamReader(in1);
  119. BufferedReader br=new BufferedReader(isr1);
  120. String line;
  121. while((line=br.readLine())!=null){
  122. int num=0;
  123. try {
  124. num=Integer.parseInt(line.split("\t")[1]);
  125. } catch (NumberFormatException e) {
  126. // TODO Auto-generated catch block
  127. num=0;
  128. }
  129. if(num>minSupport){
  130. queue.add(line);
  131. }
  132. }
  133. br.close();
  134. isr1.close();
  135. in1.close();
  136. List<String> list=new ArrayList<String>();
  137. while(!queue.isEmpty()){
  138. list.add(queue.poll());
  139. }
  140. return list;
  141. }
  142. // generate the frequence file
  143. publicstaticvoid generateFList(List<String> list,String output) throws IOException{
  144. Configuration conf=new Configuration();
  145. Path path=new Path(output);
  146. FileSystem fs=FileSystem.get(path.toUri(),conf);
  147. FSDataOutputStream writer=fs.create(path);
  148. Iterator<String> i=list.iterator();
  149. while(i.hasNext()){
  150. writer.writeBytes(i.next()+"\n");// in the last line add a \n which is not supposed to exist
  151. }
  152. writer.close();
  153. }
  154. }

相关问答

更多
  • 1、HBase是Hadoop生态系统的一部分,又其他框架如PIG, HIVE等的支持,而Cassender上运行mapreduce相对比较复杂的。总体上来说,Cassender或许在存储上比较有效,但HBase的数据处理能力更强些。 2、HBase有Shell脚本和Web页面的处理能力,而Cassender没有Shell的支持,只有API,可用性上不如HBase。 3、Cassender的Schema发生变化时,需要集群重启,但Cassender宣称“写操作永不失败”,而HBase是有可能的。 4、场景:C ...
  • 具体参考Eclipse查看hadoop源代码出现Source not found,是因为没有添加.zip在我们hadoop编程中,经常遇到像看看hadoop的某个类中函数的功能。但是我们会遇到一种情况就是Source not found。遇到这个问题,该如何解决。因为我们已经引入了包,为什么会找不到。如果不了解怎么引入的可以参考:hadoop开发方式总结及操作指导http://www.aboutyun.com/thread-6950-1-1.html看到上面现象,是因为我们每天添加.zip。该如何添加zip ...
  • 关联规则类似于“A,B→C”,这意味着当A和B发生时C往往会发生。 项目集只是一个集合,例如“A,B,C”,并且如果它的项目倾向于同时出现,则频繁出现。 查找关联规则的常用方法是查找所有频繁项集,然后将它们后处理成规则。 An association rule is something like "A,B → C", meaning that C tends to occur when A and B occur. An itemset is just a collection such as "A,B,C ...
  • 这比http://en.wikipedia.org/wiki/Association_rule_learning有点宽泛,但希望有用。 一些早期的FOAF工作可能很有趣(SVD / PCA等): http://stderr.org/~elw/foaf/ http://www.scribd.com/doc/353326/The-Social-Semantics-of-LiveJournal-FOAF-Structure-and-Change-from-2004-to -2005 http://datamini ...
  • hadoop-core通常足以编写map-reduce作业。 由于在群集上运行作业时,hadoop库应该可用,因此可以在依赖项中添加provided 。 对于单元测试,您可以将org.apache.mrunit依赖项与test hadoop-core is usually enough to write map-reduce jobs. Since hadoop libraries should be available when you run ...
  • 与围绕Hadoop API的其他平台系列相比,在MapRed中操纵Intersection很难。 有人已经提到了Hive(如果你有SQL背景,超级容易做交叉),但你也可以考虑: 猪 级联 (特别是CoGroup,如果内存是一个问题,HashJoin,如果不是) It's tough to maneuver Intersection in MapRed compared to the other family of platforms around the Hadoop API. Someone alread ...
  • 在门户上配置它,然后使用“资源浏览器”(也在门户网站上)调查模板。 这应该给你答案。 Configure it on the portal, and then investigate the template using the 'resource explorer' (also on the portal). That should give you the answer.
  • 如果您的nubers是整数(为什么规范化为0?)而且很小,你可以“破解”这个限制: apple banana apple 变 apple banana apple_2 这将允许找到关联规则,如 banana => apple, apple_2 但你需要混合使用一些聪明的过滤器,以免得到无用的规则 apple_2 => apple You can "hack" around this limitation if your nubers are integer (why normalize to 0 1 ...
  • 是。 变更集本身在交付到流时关闭。 但其关联的工作项不是:您可以添加或删除与交付的变更集关联的一个或多个工作项。 话虽这么说,我有一个特殊的钩子,它使得该关联在交付时是强制性的:即,如果没有首先将您的更改集首先关联到工作项,则无法交付。 我不确定该挂钩是否是我的组织的自定义挂钩,但是在这里您可以检查它是否存在: 它在项目区管理下 Team Configuration / Operation Behavior / Source Control / Deliver (client) / Precondi ...
  • 经过更多关于这个主题的Google搜索和探索tfs API之后我最终得到了: 如果你所有的变更集都链接到工作项目(不是我的情况,但这是我原来想问的): // based on https://etoptanci.wordpress.com/2011/05/04/seeing-all-code-changes-for-a-work-item/ private static void GetChangesForWorkItem() { var configurationServer = TfsConfi ...