在hadoop计算过程中,表拼接作为一类常见的操作,被广泛的使用,下面介绍一种基本的方法供参考,具体代码如下:
def mapper() :
"""
在query后加入A或B字段
"""
for line in sys.stdin:
line = line.rstrip()
if line == '' :
continue
fields = line.split('\t')
fields_len = len(fields)
if fields_len == 4 :
print '%s\t%s' % (fields[0], 'A')
if fields_len > 50 :
print '%s\t%s\t%s' % (fields[11], 'B', fields[2])
def reducer() :
"""
合并
"""
key = ''
value = 0
for line in sys.stdin :
line = line.rstrip()
if line == '' :
continue
fields = line.split('\t')
if len(fields) < 2 :
continue
if fields[1] == 'A' :
key = fields[0]
if fields[0] == key and fields[1] == 'B' :
value += string.atof(fields[2])
print 'final_sum : %.2f' % (value)
其中,mapper阶段,主要根据数据本身的特点,标注A或B这样的标签用以区分,并且把用于拼接的key作为第一列输出,标签和其他必要的属性列以此输出;
在本例中,列数为4的数据标记为A标签,列数大于50的数据标记为B标签;
利用hadoop中间阶段基于前两列(可设置)的排序,我们可以得到相同key下的A和B的数据以此排序;
reducer阶段,主要对key相同的数据进行一系列的操作,并且最终输出结果;
在本例中,相同key的B数据的第二列数值进行累加,并且不同key的也进行累加,最后输出全部累加值和;
当然,这些计算过程必须在一定的配置下才能完成,下一篇将会对基本的hadoop任务配置做简介。
No comments:
Post a Comment