实例讲解Hadoop中的map/reduce查询(Python语言实现)

2019-03-28 14:15|来源: 网络

条件,假设你已经装好了 Hadoop集群,配好了hdfs并可以正常运行。

$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129


$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index

数据格式如下

20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>



1.map脚本取数据explorer_map.py

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree

debug = False#设置lzo文件偏移位
if debug:
        lzo = 0
else:
        lzo = 1

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#hadoop查询走标准输入,数据以\t分隔,去掉每行中的\n
                if len(flags) == 0:
                        break
                if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1,lzo设置为False则+1
                        continue
                stat_date=flags[0+lzo]#日期
                stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                version = flags[4+lzo]
                xmlstr = flags[10+lzo]
                #xmlstr=line
                dom = cElementTree.fromstring(xmlstr)
#xml字段对象,以下均为取值操作
                uuid = dom.attrib['UUID']
                node = dom.find('UserDoubleClick')
                associateKey=node.get('AssociateKey')
                associateKeys=associateKey.split('.')
                player = associateKeys[0]
                fileext=node.get('FileExt')
                count=node.get('Count')
                print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
#将\t前的字符串作为key输入reduce,\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常        

2.reduce脚本计算结果并输出explorer_red.py

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
import os
import string

res = {}

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#拆分\t以获得map传过来的key和value
                if len(flags) != 2:
#\t切割后,如果数据有问题,元素多于2或者少于2则认为数据不合法,跳出继续下一行
                        continue
                skey= flags[0]
#取出第一个元素作为key
                count=int(flags[1])
#取出第二个元素作为value
                if res.has_key(skey) == False:
                        res[skey]=0
                res[skey] += count
#计算count总和
        except Exception,e:
                pass
#不抛出,继续执行

for key in res.keys():
        print key+','+'%s' % res[key]
#格式化输出,以放入临时文件

3.放入crontab执行的脚本

#!/bin/sh

[ $1 ] && day=$1 DATE=`date -d "$1" +%Y%m%d`
[ $1 ] || day=`date -d "1 day ago" +%Y%m%d`     DATE=`date -d "1 day ago" +%Y%m%d`
#取昨天日期

cd /opt/modules/hadoop/hadoop-0.20.203.0/
#进入hadoop工作目录
bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -file /home/rsync/explorer/explorer_map.py -file /home/rsync/explorer/explorer_red.py -mapper /home/rsync/explorer/explorer_map.py -reducer /home/rsync/explorer/explorer_red.py -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -input /data/dw/explorer/$DATE -output /tmp/explorer_$DATE
#执行map/reduce,并将排序完结果放入hdfs:///tmp/explorer

bin/hadoop fs -copyToLocal /tmp/explorer_$DATE /tmp
#将m/r结果从hdfs://tmp/explorer_$DATE 保存到本地/tmp下
bin/hadoop dfs -rmr /tmp/explorer_$DATE
#删除hdfs下临时文件夹

cd
#返回自身目录
cd explorer
#进入explorer文件夹
./rm.py $DATE
执行入库和删除临时文件夹脚本


4.将/tmp生成的结果入库并删除临时文件夹

#!/usr/bin/python

import os
import sys
import string

if len(sys.argv) == 2:
                date = sys.argv[1:][0] #取脚本参数
                os.system ("mysql -h192.168.1.229 -ujobs -p223238 -P3306    bf5_data    -e \"load data local infile '/tmp/explorer_"+date+"/part-00000' into table explorer FIELDS TERMINATED
BY '\,' (stat_date,ver,FileExt,player,AssociateKey,count)\"")#执行入库sql语句,并用load方式将数据加载到统计表中
                os.system ("rm -rf /tmp/explorer_"+date)#删除map/reduce过的数据
else:
                print "Argv error"

#因为没有安装MySQLdb包,所以用运行脚本的方式加载数据。

原始数据和最后完成的输出数据对比,红色为原数据,绿色为输出数据

20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>

-----------------------------------------------------------

2011-11-29,5.05.1026.1111,mp3,Audio,Audio.mp3,1


5.调试技巧

因为这种方式比较抽象,所以你很难得到一个直观的调试过程。建议调试如下

#将hadoop中的数据文本copy出来一个,lzo需要解压缩,然后将map中的debug模式置为True,也就是不加hadoop中的lzo偏移量。
#用head输入hadoop里的文件,通过管道操作放入map/reduce中执行,看输出结果

$head explorer_20111129 | explorer_map.py | explorer_red.py

一天的数据大概几十个G,以前用awk和perl脚本跑需要至少半小时以上,改用map/reduce方式后,大概20几秒跑完,效率还是提高了很多的。

相关问答

更多
  • 找到离存数据最近的一台机器运行和这个数据相关的map任务,reduce是按照你整理出的key有多少个来决定的。一个机器很难说,处理的快的处理多一点,保持所有机器使用平衡。 上面你都自己写了20个map,和文件大小个数有关,和数据条数无关。 要看你选择的输入格式是什么,默认是行偏移量,然后由你编写map函数,指定key和value是什么。相同的key整合起来传给reduce,由reduce进行下一步处理,最后输出到指定的地方。
  • 首先hadoop框架要求程序员将函数分成两部分,即map和reduce函数。 map阶段:就是将输入通过map函数处理得出中间结果并通过hadoop框架分配到不同的reduce。 reduce阶段:就是将中间结果通过reduce函数处理得到最后的结果。 以wordcount为例,在map阶段,map函数在每个单词后面加上一个1;在reduce阶段,reduce函数将相同单词后面的1都加起来。其中hadoop框架实现过程中的排序,分配等,当然这些也可以通过自定义的函数来控制。
  • 您好,我来为您解答:   用 Java 实现的,开源的,支持 Fedora、Ubuntu 等 Linux 平台!   GNU/Linux是产品开发和运行的平台。 Hadoop已在有2000个节点的GNU/Linux主机组成的集群系统上得到验证。   Win32平台是作为开发平台支持的。由于分布式操作尚未在Win32平台上充分测试,所以还不作为一个生产平台被支持。   希望我的回答对你有帮助。
  • 有2种方式来实现Map/Reduce 1.java的方式 2.Hadoop Streaming, SHELL/Python/ruby等各种支持 标准输入输出的语言 ~如果你认可我的回答,请及时点击【采纳为满意回答】按钮 ~~手机提问的朋友在客户端右上角评价点【满意】即可。 ~你的采纳是我前进的动力 ~~O(∩_∩)O,记得好评和采纳,互相帮助,谢谢。
  • shanthanu,你的第一个问题是 问)哪种脚本语言对hadoop有用? A)大多数脚本语言,如php,python,perl,ruby bash都很好。 任何能够从stdin读取,写入sdtout和parse选项卡以及新行字符的语言都可以工作:Hadoop Streaming只是将键值对的字符串表示与一个选项卡连接到必须在每个任务跟踪器节点上可执行的任意程序。 在用于设置hadoop集群的大多数Linux发行版中,已经安装了python,bash,ruby,perl ......但是没有什么能阻止你为自 ...
  • hadoop-streaming的-file选项仅适用于本地文件。 但请注意,其帮助文本提到-file标志已被弃用,以支持generic -files选项。 使用generic -files选项允许我们指定一个远程(hdfs / gs)文件来进行舞台。 另请注意,通用选项必须位于应用程序特定标志之前。 您的调用将变为: hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files gs://bucket-name/intro_t ...
  • 我认为你没有使用reducer,mapper输出是最终输出 如果你引入减速器,则会发生改组和排序,你会获得所需的输出。 请参考以下问题:减速机的改组和分拣阶段的目的是什么 示例Reducer实现: public class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOExceptio ...
  • 这是除了最佳之外的一切,因为地图输出必须始终复制到另一台服务器。 但您可以简单地修改服务器上的mapred-site.xml。 mapred.tasktracker.map.tasks.maximum 4 The maximum number of map tasks that will be run simultaneously by a task tracker.
  • 我建议通过Pig或Hive运行它,因为你可以用几行来解决这种问题。 如果做不到这一点,我会做以下事情。 在已连接的数据上运行另一个MapReduce作业,并执行以下操作:在映射器中,对于每个输入拆分,保留每个国家/地区ID处理的最小订单,最大订单和元组数(具有唯一用户ID的行)的选项卡。 只有少数几个国家/地区,因此您可以在整个地图工作中将这些统计信息保存在内存中。 在拆分结束时,将累计的统计数据输出到按国家/地区ID键入的reducer。 然后,reducer简单地组合来自每个拆分的聚合数据,以找到全局m ...
  • 我通过以下方式在Eclipse中开发Cassandra / Hadoop应用程序: 使用maven(m2e)为我的Eclipse项目收集和配置依赖项(Hadoop,Cassandra,Pig等) 创建测试用例(src / test / java中的类)来测试我的映射器和缩减器。 诀窍是使用扩展RecordWriter和StatusReporter的内部类动态构建上下文对象。 如果执行此操作,则在调用setup / map / cleanup或setup / reduce / cleanup之后,您可以断言正 ...