1.MapReduce运行模型总体概览:
mapreduceAllGraph.png
InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。
Map函数:数据处理逻辑的主体,用开发者开发。
Partition:map的结果发送到相应的reduce。
Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。
Shuffle:map输出数据按照Partition分发到各个reduce。
*reduce:将不同map汇总来的数据做reduce逻辑。
2.多reduce:
datatrans.png
3.经典wordcount:
wordcountdatatrans.png
mapreducedataStream.png
4.Map类的实现:
必须继承org.apache.hadoop.mapreduce.Mapper 类
map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。
setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。
cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)
Context上下文环境对象,包含task相关的配置、属性和状态等。
5.Reduce类的实现:
必须继承org.apache.hadoop.mapreduce.Reducer类。
reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。
setup():在task开始前调用一次,做reducetask的一些初始化工作。
cleanup():在task结束时调用一次,做reducetask的收尾清理工作。
6.作业整体配置:
参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();
创建job: Jobjob= Job.getInstance(conf, "word count");
设置map类,reduce类。
设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。
设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。
设置InputFormat
设置OutputFormat
设置输入,输出路径。
job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)
wordcount:
```public class WordCountTask {
private static final Logger logger = Logger.getLogger(WordCountTask.class);
public static class WordCountMap extends Mapper{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
[email protected]
protected void cleanup(Context context)
throws IOException, InterruptedException {
logger.info("mapTaskEnd.....");
}
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
[email protected]
protected void setup(Context context)
throws IOException, InterruptedException {
logger.info("mapTaskStart.....");
}
}
public static class WordCountReduce extends Reducer{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {