之前离职的哥们的mr任务留了一堆的坑,他把value当成排序过的,于是reduce里面全部是如此统计dau、设备数的:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
String lastId = ""; long dau = 0; for (Text value: values) { String id = value.toString(); if (!id.equals(lastId)) { if (id.startsWith("u#")) { dau = dau + 1; } else if (id.startsWith("o#")) { dau = dau + 1; } lastId = id; } } |
心好累,手动微笑。
正所谓前人挖坑后人填,我不入地狱谁入地狱。于是开始一个个mr的改代码。
方法一:用set统计
用set的好处就是改动极小,但存在oom的风险。实际跑了下线上的数据,果然oom了。摔!
方法二:bloom filter
好处是改动也不大,也不会oom,但就统计的结果可能会比实际的值要小。考虑到数据量也没有大到要用bloom filter的地步,且希望数据尽量的精准,放弃!
方法三:mr二次排序
好处是所有需求都满足了,坏处就是改动太大!
我理解的二次排序:根据多列来进行排序,实现的方法就是将这多列抽象为一个class,写好class的compareTo方法,这样就能根据多列来排序了。同时需要保证原来的逻辑不变,即只根据原有的key来partition,并且根据原有的key来进行group。
示例:
原文件:
key1 v3
key2 v8
key1 v1
key2 v2
key2 v1
key2 v7
key1 v6
最终结果:
key1 v1
key1 v3
key1 v6
key2 v1
key2 v2
key2 v7
key2 v8
key的设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class CombinationKey implements WritableComparable<CombinationKey> { private String appkey; private String deviceId; @Override public int compareTo(CombinationKey o) { int appkeyRes = appkey.compareTo(o.appkey); if(appkeyRes != 0) return appkeyRes; else return deviceId.compareTo(o.deviceId); } public CombinationKey() { } @Override public void write(DataOutput out) throws IOException { out.writeUTF(appkey); out.writeUTF(deviceId); } @Override public void readFields(DataInput in) throws IOException { appkey = in.readUTF(); deviceId = in.readUTF(); } } |
分区函数的设置
1 2 3 4 5 6 7 |
public class CombinationPartitioner<T> extends Partitioner<CombinationKey, T> { @Override public int getPartition(CombinationKey combinationKey, T value, int numPartitions) { return (combinationKey.getAppkey().hashCode() & Integer.MAX_VALUE) % numPartitions; } } |
分组函数的设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class GroupingComparator extends WritableComparator { protected GroupingComparator() { super(CombinationKey.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { CombinationKey ip1 = (CombinationKey) w1; CombinationKey ip2 = (CombinationKey) w2; String l = ip1.getAppkey(); String r = ip2.getAppkey(); return l.compareTo(r); } } |
若在reduce开始write一行标识行"-------",不加
-------
key1 v1
-------
key1 v3
-------
key1 v6
-------
key2 v1
-------
key2 v2
-------
key2 v7
-------
key2 v8
若添加
-------
key1 v1
key1 v3
key1 v6
-------
key2 v1
key2 v2
key2 v7
key2 v8
job设置
至此,二次排序就搞定了。记得在job中设置:
1 2 |
job.setPartitionerClass(CombinationPartitioner.class); job.setGroupingComparatorClass(GroupingComparator.class); |