前言
战5渣,意思是战斗力只有5的渣渣。战斗力多少算高没有一般的定论,但战斗力只有5的肯定是渣渣,如下图所示。战5渣系列(真心希望这个系列的文章越少越好)用来记录工作中遇到的那些想抽自己脸的错误经历,并希望以此为戒。
背景简介
出错部分的需求简介:统计某个时间范围内(范围A)新装的电话号码在某个时间范围内(范围B)的通话频率,并过滤出通话频率小于阀值的电话号码。
若是采用关系型数据库来处理,由于通话记录的量十分巨大,oracle基本上处于工作不能的状态。于是采用MR来进行处理,相当于一个非常简单的join操作。
由于时间范围A内的新装电话号码数量并不大,基本都在1W条以内,所以可以采用map端join来进行处理:map阶段处理的是每一条通话记录,在map的setup阶段将所有号码读出并放入hashmap中,遍历通话记录时若该记录的号码在hashmap中时,将相应的计数值加1。最后在map的cleanup阶段,遍历该hashmap,并输出所有的记录。
reduce阶段则统计每条记录的总频率,然后将小于阀值的记录输出。
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public static class LessThanMapper extends TableMapper<Text, IntWritable> { private static HashMap<String, Integer> rawMap = new HashMap<String, Integer>(); private static int threshold = 1; @Override protected void setup(Context context) throws IOException, InterruptedException { String yyyymm = context.getConfiguration().get(HBaseConnUtil.YYYYMM); rawMap = HBaseConnUtil.getRaw(yyyymm);//通过传递的参数获取电话号码(map大小1W左右) threshold = context.getConfiguration().getInt(HBaseConnUtil.THRESHOLD, 1); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String servId = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("SERV_ID"))); if(servId != null && rawMap.containsKey(servId)) {//hashmap中存在则计数+1 rawMap.put(servId, rawMap.get(servId) + 1); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Map.Entry<String, Integer> e : rawMap.entrySet()) {//遍历hashmap并输出 if(e.getValue() < threshold) {//若map阶段就大于阀值则reduce阶段必然大于阀值,故过滤掉 context.write(new Text(e.getKey()), new IntWritable(e.getValue())); } } } } |
出错的代码就在以上代码中。先说说出错的现象,测试时号码总数为9051:
参数(通话次数小于N) | 结果 | 分析 |
---|---|---|
N=0 | 0 | 正确,没有通话次数小于0的号码 |
N=1 | 9051 | 居然将所有的号码都输出了,难道恰好这些号码在这个时间内都没通话? |
N=2 | 8614 | ##¥¥@%……!&居然比上面的小,你们是子集关系啊大哥,有比子集小的父集么? |
N=3 | 8223 | 好吧 |
N=3 | 8141 | 大哥我错了 |
N=90 | 8725 | 还玩起抛物线了? |
N=9000 | 9051 | 终于正常了,没有哪个号码通话超过9000次 |
在排除完可能数据没有导入成功或rowkey的设计和使用不统一的可能后,发现另外还有2个现象,这2个现象也是后来发现问题的关键:
- 本地测试一切都正常
- 有一次测试时数据量小只有一个map,结果正常;其它的测试一般都是25+个map,结果一直不正确
好吧,这时战斗力只有5的我终于不得不怀疑是自己的代码问题了,于是开始看Mapper。半小时后无果,遂尝试将mapper和reducer中的threshold判断都删除,输出结果正确。删除mapper的判断,加上reducer的判断,输出结果正确。删除reducer的判断,加上mapper的判断,输出结果错误。至此,终于发现出错的代码为上述代码中的22行,至于为啥,战斗力只有5的我一时半会还是没想明白。后来急忙的将正确的结果交付后开始仔细思考,发现哈哈哈我的战斗力果然只有5啊(等等,一起排错的几个同事到现在还没发现,岂不是战斗值更低)。
分布式的世界水很深
一个Hadoop的Job会启动多个map?若输入是hdfs文件,则有多少个block就启动多少个;若输入是hbase,则有多少个region就启动多少个。这些mapper运行在不同的机器上面,通常一台机器上会有多个mapper。上述错误是一个典型的用单线程的思想来考虑分布式所造成的。实际运算过程中,多个mapper分布在不同的机器上,每个mapper都有自己的输出,这些输出最终汇总到1个或多个reducer(可设置)中,代码22行中的判断只是本mapper中该记录超过了阀值,于是不输出,但另外一个mapper中,该记录可能低于阀值,于是输出,最后汇总到reduce时,恰好该记录还是低于阀值,于是便出现在了最终结果中。如下所示:
mapper01 | mapper02 | |
---|---|---|
记录<电话号码,通话频次> | <n01,0> | <n01,1> |
<n02,1> | <n02,3> | |
<n03,3> | <n03,0> | |
<n04,0> | <n04,2> | |
<n05,0> | <n05,0> | |
频次<1输出 | <n01,0> | <n03,0> |
<n04,0> | <n05,0> | |
<n05,0> | ||
reducer阶段输出 | n01, n03, n04, n05 | |
实际正确的输出 | n05 |