浅谈Hadoop

  1.MapReduce运行模型总体概览:

物联网

  mapreduceAllGraph.png

  InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。

  Map函数:数据处理逻辑的主体,用开发者开发。

  Partition:map的结果发送到相应的reduce。

  Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。

  Shuffle:map输出数据按照Partition分发到各个reduce。

  *reduce:将不同map汇总来的数据做reduce逻辑。

  2.多reduce:

物联网

  datatrans.png

  3.经典wordcount:

物联网

  wordcountdatatrans.png

物联网

  mapreducedataStream.png

  4.Map类的实现:

  必须继承org.apache.hadoop.mapreduce.Mapper 类

  map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。

  setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。

  cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)

  Context上下文环境对象,包含task相关的配置、属性和状态等。

  5.Reduce类的实现:

  必须继承org.apache.hadoop.mapreduce.Reducer类。

  reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。

  setup():在task开始前调用一次,做reducetask的一些初始化工作。

  cleanup():在task结束时调用一次,做reducetask的收尾清理工作。

  6.作业整体配置:

  参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();

  创建job: Jobjob= Job.getInstance(conf, "word count");

  设置map类,reduce类。

  设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。

  设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。

  设置InputFormat

  设置OutputFormat

  设置输入,输出路径。

  job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)

  wordcount:

  ```public class WordCountTask {

  private static final Logger logger = Logger.getLogger(WordCountTask.class);

  public static class WordCountMap extends Mapper{

  private static final IntWritable one = new IntWritable(1);

  private Text word = new Text();

[email protected]

  protected void cleanup(Context context)

  throws IOException, InterruptedException {

  logger.info("mapTaskEnd.....");

  }

  protected void map(Object key, Text value, Context context)

  throws IOException, InterruptedException {

  StringTokenizer itr = new StringTokenizer(value.toString());

  while (itr.hasMoreTokens()) {

  this.word.set(itr.nextToken());

  context.write(this.word, one);

  }

  }

[email protected]

  protected void setup(Context context)

  throws IOException, InterruptedException {

  logger.info("mapTaskStart.....");

  }

  }

  public static class WordCountReduce extends Reducer{

  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable values, Context context)

  throws IOException, InterruptedException

  {

  int sum = 0;

  for (IntWritable val : values) {

  sum += val.get();

  }

  this.result.set(sum);

  context.write(key, this.result);

  }

  }

  public static void main(String[] args)

  throws Exception

  {

  Configuration conf = new Configuration();

  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

  if (otherArgs.length < 2) {