Sunday, March 17, 2013

hadoop streaming命令参数简介


在hadoop计算中,可以通过正确使用streaming提供的各项命令参数来方便快捷的实现某些需求,下面具体介绍下:
  • hadoop streaming简介
    • Hadoop MapReduce是对MapReduce框架的一个java开源实现,并且可以运行非java语言编写的用户程序。它提供了一个非常有用的工具 - streaming,可以用STDIN (标准输入)STDOUT (标准输出)与用户编写的MapReduce进行数据的交换。任何能够使用STDINSTDOUT的编程语言都可以用来编写MapReduce程序,比如我们用Pythonsys.stdinsys.stdout,或者是C中的stdinstdout
  • hadoop streaming命令
    • 使用下面的命令运行streaming MapReduce程序:
    • $Hadoop_HOME/bin/hadoop streaming args
    • 其中,args是streaming参数
  • 重点介绍几个参数
    • -file & -cacheFile & -cacheArchive:这系列参数用来设置需要分发的文件:本地文件、HDFS文件和HDFS压缩文件;
      • -file localfie:经常用来分发mapper和reducer程序或者脚本;
      • -cacheFile hdfs://host:port/path/to/file#filename:选择在计算节点下的缓存文件作为本地文件处理,streaming程序可以通过./filename直接访问文件,而且是从本地读取文件,需要注意的是不能向-cacheFile分发的文件写入任何内容。
      • -cacheFile hdfs://host:port/path/to/link#linename:如果要分发的文件有目录结构,可以先将整个目录打包并上传到HDFS上,然后通过上面的命令分发压缩包。例如本地有目录app,里面有文件dict,首先需要进入到app目录下,使用命令tar -zcvf app.tar.gz ./*,然后上传至HDFS上,设置后可通过./app/dict访问文件dict,需要注意的是本地打包是需要进入app目录进行打包;
    • -jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值;
      • -jobconf mapred.map.tasks=M:设置map任务的的总数为M个,reduce设置类似;
      • -jobconf mapred.job.map.capacity=M:设置同时最多运行M个map任务,reduce设置类似;
    • -combiner:指定combiner JAVA类,主要用于对中间过程的输出进行本地的局部聚集,这个功能尚未使用过,待后续补充;
    • -partitioner:指定partitioner JAVA类,streaming提供了一些实用的partitioner实现,常用的有KeyFieldBasedPartitioner,实例如下:
    • $HADOOP_HOME/bin/hadoop streaming \
             -D stream.map.output.field.separator=. \
             -D stream.num.map.output.key.fields=4 \
             -D map.output.key.field.separator=. \
             -D num.key.fields.for.partition=2 \
      -input /user/test/input -output /user/test/output \
      -mapper “mymapper.sh” -reducer “ myreducer.sh” \
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
      -file /home/work/mymapper.sh \
      -file /home/work/myreducer.sh \
      -jobconf mapred.job.name=”key-partition-demo”

      其中,-D stream.map.output.field.separator=.和-D stream.num.map.output.key.fields=4与上面的定制输出数据分隔方式意义相同,指定map的输出行第4个英文句号”.”之前为key,后面为value。-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner指定使用KeyFieldBasedPartitioner,-D map.output.key.field.separator=.指定key的内部用英文句号”.”分隔,-D num.key.fields.for.partition=2指定将key分隔出来的前两个部分而不是整个key用于Partitioner做partition
先介绍到这里,后续再补充下combiner的实际用法,或者其他实用的参数。


Thursday, March 14, 2013

hadoop表拼接简介

在hadoop计算过程中,表拼接作为一类常见的操作,被广泛的使用,下面介绍一种基本的方法供参考,具体代码如下:



def mapper() :
    """
    在query后加入A或B字段
    """
    for line in sys.stdin:
        line = line.rstrip()
        if line == '' :
            continue
        fields = line.split('\t')
        fields_len = len(fields)
        if fields_len == 4 :
            print '%s\t%s' % (fields[0], 'A')
        if fields_len > 50  :
            print '%s\t%s\t%s' % (fields[11], 'B', fields[2])



def reducer() :
    """
    合并
    """
    key = ''
    value = 0
    for line in sys.stdin :
        line = line.rstrip()
        if line == '' :
            continue
        fields = line.split('\t')
        if len(fields) < 2 :
            continue
        if fields[1] == 'A' :
            key = fields[0]
        if fields[0] == key and fields[1] == 'B' :
            value += string.atof(fields[2])

    print 'final_sum : %.2f' % (value)




其中,mapper阶段,主要根据数据本身的特点,标注A或B这样的标签用以区分,并且把用于拼接的key作为第一列输出,标签和其他必要的属性列以此输出;
在本例中,列数为4的数据标记为A标签,列数大于50的数据标记为B标签;
利用hadoop中间阶段基于前两列(可设置)的排序,我们可以得到相同key下的A和B的数据以此排序;
reducer阶段,主要对key相同的数据进行一系列的操作,并且最终输出结果;
在本例中,相同key的B数据的第二列数值进行累加,并且不同key的也进行累加,最后输出全部累加值和;
当然,这些计算过程必须在一定的配置下才能完成,下一篇将会对基本的hadoop任务配置做简介。