分片、Block与Map数量
Block,即HDFS上的基本存储单元,默认大小是64MB。我们都知道一个block对应一个map任务,但实际过程是先将文件分片,然后每个分片启动一个map任务,只不过通常情况下分片的大小和Block是一致的。可以通过参数来将分片的大小进行改变,相应的启动的map数量也会改变,其对应关系如下:
最小分片大小 | 最大分片大小 | 块大小 | 分片大小 | 说明 |
---|---|---|---|---|
1L(默认值) | Long.MAX_VALUE(默认值) | 64MB(默认值) | 64MB | |
1L(默认值) | Long.MAX_VALUE(默认值) | 128MB | 128MB | |
128MB | Long.MAX_VALUE(默认值) | 64MB(默认值) | 128MB | 通过设置最小分片值来改变分片大小 |
1L(默认值) | 32MB | 64MB(默认值) | 32MB | 通过设置最大分片值来改变分片大小 |
如上所示,当分片值是128MB而block大小是64MB时,这时启动的map数量比默认情况下要少一半,但本地开销会变大,因为涉及到数据的传输等过程。当分片值是32MB而block大小是64MB时,这时启动的map数量比默认情况下要多一倍,也就是一个block启动了2个map来进行处理。
若遇到一个文件分为多个block存放,但启动任务时候又必须放在一个map中进行处理,则可以通过将分片最小值设置成一个大于该文件的值来实现。
对应API和参数如下:
- FileInputFormat.setMaxInputSplitSize(..),mapred.max.split.size,设置分片最大值
- FileInputFormat.setMinInputSplitSize(..),mapred.min.split.size,设置分片最小值
处理未知的脏数据
一般而言,脏数据过滤是每个任务通过编码来实现的,例如过滤掉维度数量不符的数据,过滤掉某些维度为空的数据,等等。但还有一些脏数据却无法通过编码来处理,因为这类脏数据是无法预料的,例如因为网络传输出错而导致的超长行,或者特殊字符(EOF等等)。一个超长的行很大可能会造成序列化、网络传输等过程中内存溢出。
对于这样的情况,若能准确的分析出问题所在,也可以通过代码来实现脏数据的过滤。如,使用TextInputFormat、NLineInputFormat时,可以通过设置mapred.linerecordreader.maxlength参数来防止“超长“行类的脏数据。但对于客观原因上无法定位其原因的脏数据,则只能采用SkipBadRecords工具类来避免出错了。
SkipBadRecords主要API:
- setAttemptsToStartSkipping(..),对应参数mapred.skip.attempts.to.start.skipping,默认值为2,设置开始skip操作前的尝试次数
- setMapperMaxSkipRecords(..),对应参数mapred.skip.map.max.skip.records,默认值为0,设置map出错时跳过的记录数
- setReducerMaxSkipGroups(..),对应参数mapred.skip.reduce.max.skip.groups,默认值为0,设置reduce出错是跳过的group数(shuffle分组)
- setSkipOutputPath(..),对应参数mapred.skip.out.dir,默认值为输出路径下的_logs文件夹
- ...
Map和Reduce任务执行失败4次则会宣告彻底失败(可通过参数mapred.map.max.attemps和mapred.reduce.max.attemps来修改),若开启skip mode(将map和reduce跳过的记录/group数量调为大于零的整数),则会出现下列过程:
- 第一次尝试,失败
- 第二次尝试,失败
- 第三次尝试,开启skip mode,任然失败,但失败记录被tasktracker保存
- 第四次尝试,开启skip mode,跳过上次失败的记录一定范围内的数据
可以发现若采用默认参数:2次失败开启skip mode、4次失败则任务失败,则会造成只能处理一条未知脏数据。有此可见skip mode对于脏数据较多的情况是不合适的。
输入输出格式
Hadoop支持多种输入格式化方法,默认是TextInputFormat,可以通过
KeyValueTextInputFormat
KeyValueTextInputFormat与默认的TextInputFormat的区别就是:TextInputFormat的key为当前行的偏移量,value为本行数据;KeyValueTextInputFormat的key为分隔符前的数据,value为分隔符后面的数据。
使用KeyValueTextInputFormat需要注意以下三行代码必须要有:
1 2 3 |
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); job.setInputFormatClass(KeyValueTextInputFormat.class); KeyValueTextInputFormat.addInputPath(job, new Path("/home/yurnom/data.txt")); |
第一行为设置分隔符,默认分隔符为制表符(\t),若为默认分隔符可以不设置。第二行表示采用KeyValueTextInputFormat来格式化输入,如不设置则将采用默认的TextInputFormat。
较早版本中,分隔符的设置方法如下:
1 |
conf.set("key.value.separator.in.input.line", ","); |
NLineInputFormat
通常来说,一个Hadoop Job有多少个Map是根据输入数据有多少个Block而定的,例如TextInputFormat、KeyValueTextInputFormat。这种情况下每个Map处理多少行数据是不确定的,或者说是根据Block大小来确定的。若希望每个Map处理的数据行数一样,则可以采用NLineInputFormat来处理。若一个文件有1千万行数据,设置每个Map处理100W行数据,则最终会启动10个Map来处理数据。
1 2 3 |
job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.addInputPath(job, new Path("/home/yurnom/data.txt")); NLineInputFormat.setNumLinesPerSplit(job, 5); |
第一行表示采用NLineInputFormat来格式化输入,如不设置则将采用默认的TextInputFormat。第三行设置每个Map处理多少行数据(N lines),最后一个Map处理的数据可能会小于N。N也可以通过参数来设置,如下。
1 |
conf.set("mapreduce.input.lineinputformat.linespermap", "5"); |
较早版本中采用的参数如下:
1 |
conf.set("mapred.line.input.format.linesprermap", "5"); |
MultipleInputs
MultipleInputs用于不同的数据源/数据格式的场景。MultipleInputs可以设置多个InputFormat和多个Mapper。具体如下所示:
1 2 3 4 5 6 |
//job.setMapperClass(My1Mapper.class); //job.setInputFormatClass(NLineInputFormat.class); MultipleInputs.addInputPath(job, new Path("/home/yurnom/c1.data"), NLineInputFormat.class, My1Mapper.class); MultipleInputs.addInputPath(job, new Path("/home/yurnom/c2.data"), TextInputFormat.class, My2Mapper.class); |
当采用FileInputFormat时会报
MultipleOutputs
MultipleOutputs可以将一个reduce的结果存放在不同的文件中。具体实现如下:
1 2 3 4 5 |
MultipleOutputs.addNamedOutput(job, "short", TextOutputFormat.class, Text.class, Text.class); //设置short组 MultipleOutputs.addNamedOutput(job, "long", TextOutputFormat.class, Text.class, Text.class); //设置long组 FileOutputFormat.setOutputPath(job, new Path("/home/yurnom/c.data")); |
Reduce代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public static class MyReducer extends Reducer<Text, Text, Text, Text> { private MultipleOutputs outputs; @Override protected void setup(Context context) throws IOException, InterruptedException { outputs = new MultipleOutputs(context); } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if(key.toString().length() < 2) { //key的长度小于2则输入至short组,否则输入到long组 outputs.write("short", key, getValue(values)); } else { outputs.write("long", key, getValue(values)); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { outputs.close();//关闭MultipleOutputs,否则最后缓存的数据会丢失 } private Text getValue(Iterable<Text> values) { StringBuilder sb = new StringBuilder(); for(Text t : values) { sb.append(t.toString()).append(" "); } return new Text(sb.toString()); } } |
同MultipleInputs一样,当使用FileOutputFormat时会报错,原因不详。MultipleInputs和MultipleOutputs可结合使用。