FieldSelection
FieldSelection包含FieldSelectionMapper、FieldSelectionReducer和FieldSelectionHelper,根据字面意思就可以了解其作用:用于选择field。直接上示例:
测试数据
1 2 3 4 5 6 7 |
G1 G2 G3 G4 G5 G6 A1 A2 A3 A4 A5 A6 E1 E2 E3 E4 E5 E6 B1 B2 B3 B4 B5 B6 F1 F2 F3 F4 F5 F6 C1 C2 C3 C4 C5 C6 D1 D2 D3 D4 D5 D6 |
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0,3:1,2,4-"); //conf.set(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0,2,1:4,3"); conf.set("mapred.textoutputformat.separator", "|"); Job job = new Job(conf, "GroupTest"); job.setJarByClass(CombinerTest.class); job.setMapperClass(FieldSelectionMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //job.setReducerClass(FieldSelectionReducer.class); //job.setOutputKeyClass(Text.class); //job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path("/home/yurnom/data.txt")); FileOutputFormat.setOutputPath(job, new Path("/home/yurnom/fieldsel")); System.exit(job.waitForCompletion(true) ? 0 : 1); } |
其中第三行用于选择输出的field。冒号之前的为key的field,之后为value的field。"0,3:1,2,4-"的意思为:选择第1、4列为key,选择第2、3、5及其以后的列为value。此外还可以使用类似“4-7”这样的方式来选择一个范围。为了直观的区分key和value,第五行将key和value的分隔符设置为“|”。
代码中没有使用reduce,先来看map的输出。
输出结果
1 2 3 4 5 6 7 |
G1 G4|G2 G3 G5 G6 A1 A4|A2 A3 A5 A6 E1 E4|E2 E3 E5 E6 B1 B4|B2 B3 B5 B6 F1 F4|F2 F3 F5 F6 C1 C4|C2 C3 C5 C6 D1 D4|D2 D3 D5 D6 |
如上所述,择第1、4列为key,选择第2、3、5及其以后的列为value。由于没有reduce,没有group阶段,所以没有排序。
现将以上代码中注视掉的部分放开,并注视掉15行,运行结果如下:
1 2 3 4 5 6 7 |
A1 A2 A4|A5 A3 B1 B2 B4|B5 B3 C1 C2 C4|C5 C3 D1 D2 D4|D5 D3 E1 E2 E4|E5 E3 F1 F2 F4|F5 F3 G1 G2 G4|G5 G3 |
JobControl与ControlledJob
当任务过于复杂,需要多个MapReduce按照一定的顺序或流程来执行时,除了编写一个个MR任务并用shell脚本按顺序启动外,还可以用JobControl来控制。
旧版本API中还可以采用ChainMapper和ChainReducer来处理流程性的任务,但新版本API中据我所知1.0.4、1.2.1版本中都是没有实现的,在2.4.1版本中才可以发现新版本API的ChainMapper和ChainReducer(2.4.1之前哪个版本才出现没有考证)。
下面给出JobControl的简单使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Configuration conf01 = new Configuration(); Job job01 = new Job(conf01, "JobControlTest01"); ... ControlledJob cjob01 = new ControlledJob(job01.getConfiguration()); cjob01.setJob(job01); Configuration conf02 = new Configuration(); Job job02 = new Job(conf02, "JobControlTest02"); ... ControlledJob cjob02 = new ControlledJob(job02.getConfiguration()); cjob02.setJob(job02); cjob02.addDependingJob(cjob01); JobControl jobControl = new JobControl("JobControlTest"); jobControl.addJob(cjob01); jobControl.addJob(cjob02); |
Mapper
InverseMapper
InverseMapper
1 2 3 4 5 |
@Override public void map(K key, V value, Context context ) throws IOException, InterruptedException { context.write(value, key); } |
TokenCounterMapper
WordCount示例中Map的实现,利用StringTokenizer分割字符串,然后输出并计数为1。
1 2 3 4 5 6 7 8 9 10 11 12 |
private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } |
MultithreadedMapper
当Map任务不是CPU密集型任务时,例如Map需要从数据库中读取数据,可以采用MultithreadedMapper来提高throughput。通过
Reducer
Hadoop自带的Reducer主要有IntSumReducer和LongSumReducer,根据字面意思就可以了解到该reducer的作用就是简单的做sum操作。
Tool、ToolRunner和GenericOptionsParser
若运行job时看到以下信息,则代表该MapReduce任务没有按照官方推荐的方式来运行。
1 |
14/09/04 18:03:56 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. |
可以通过如下方式启动Job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class MyRunner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "MyRunner"); ... FileInputFormat.addInputPath(job, new Path("/home/yurnom/data.txt")); FileOutputFormat.setOutputPath(job, new Path("/home/yurnom/test")); return job.waitForCompletion(true)? 0: 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MyRunner(), args); ToolRunner.printGenericCommandUsage(System.out); System.exit(exitCode); } } |
GenericOptionsParser为一个解析Hadoop命令选项的辅助类,通常不直接调用,而是通过代码15行的方式来隐式的使用。
参考资料
Hadoop权威指南