MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。本章中,我们将看到用Java、Ruby、Python 和C++语言编写的同一个程序。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多机器的运营商。MapReduce的优势在于处理大规模数据集,所以这里先来看一个数据集。
一个气象数据集
在我们的例子里,要编写一个挖掘气象数据的程序。分布在全球各地的很多气象传感器每隔一小时收集气象数据,进而获取了大量的日志数据。由于这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用MapReduce来处理。
数据的格式
我们将使用国家气候数据中心(National Climatic Data Center,简称NCDC,网址为http://www.ncdc.noaa.gov/)提供的数据。这些数据按行并以ASCII编码存储,其中每一行是一条记录。该存储格式能够支持众多气象要素,其中许多要素可以有选择性地列入收集范围或其数据所需的存储长度是可变的。为了简单起见,我们重点讨论一些基本要素(如气温等),这些要素始终都有且长度固定。
例2-1显示了一行采样数据,其中重要字段已突出显示。该行数据已被分成很多行以突出显示每个字段,在实际文件中,这些字段被整合成一行且没有任何分隔符。
例2-1. 国家气候数据中心数据记录的格式
0057 332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4 +51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000) F
M-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
C
N
010000 # visibility distance (meters)
1 # quality code
N
9 -0128 # air temperature (degrees Celsius x 10)
1 # quality code -0139
# dew point temperature (degrees Celsius x 10)
1 # quality code 10268
# atmospheric pressure (hectopascals x 10)
1 # quality code
数据文件按照日期和气象站进行组织。从1901 年到2001 年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件。例如,1999年对应文件夹下面包含如下记录:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
因为有成千上万个气象台,所以整个数据集由大量的小容量文件组成。通常情况下,处理少量的大型文件显得更容易且有效,因此,这些数据需要经过预处理,将每年的数据文件拼接成一个独立文件。具体做法请参见附录C。
使用Unix工具进行数据分析
该数据集中每年全球气温的最高记录是多少?我们先不使用Hadoop来回答这一问题,因为只有提供性能基准和结果检查工具,才能和Hadoop进行有效对比。
传统处理按行存储数据的工具是awk。例2-2是一个用于计算每年最高气温的程序脚本。
例2-2. 该程序从NCDC气象记录中找出每年最高气温
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk'{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if ( temp!=9999 && q ~ /[01459]/ && temp > max) max = temp}
END { print max }'
done
该脚本循环遍历按年压缩的数据文件,首先显示年份,然后使用awk脚本处理每个文件。awk 脚本从数据中提取两个字段:气温和质量代码。气温值通过加上一个0 转换为整数。接着测试气温值是否有效(用值9999 替代NCDC 数据集中缺少的记录),通过质量代码检测读取的数值是否可疑或错误。如果数据读取正确,那么该值将与目前读取到的最大气温值进行比较,如果该值比原先的最大值大,就替换目前的最大值。处理完文件中所有的行后,再执行END块中的代码并打印出最大气温值。
下面是某次运行结果的起始部分:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
由于源文件中的气温值被放大了10倍,所以1901年的最高气温是 31.7°C (20世纪初记录的气温数据比较少,所以该结果是可能的)。使用亚马逊的EC2 High-CPU Extra Large Instance运行该程序,查找一个世纪以来气象数据中的最大气温值需要42分钟。
为了加快处理,我们需要并行运行部分程序。从理论上讲,这很简单:我们可以通过使用计算机上所有可用的硬件线程来处理,其中每个线程处理不同年份的数据。但是,其中依旧存在一些问题。
首先,将任务划分成大小相同的作业块通常并不容易或明显。在我们的例子中,不同年份数据文件的大小差异很大,因此部分线程会比其他线程更早运行结束。即使让它们继续下一步的工作,整个运行时间依旧由处理最长文件所需的时间决定。另一种更好的方法是将输入数据分成固定大小的块,然后把每块分配到各个进程,这样一来,即使有些进程能处理更多数据,我们也可以为它们分配更多的数据。
其次,将独立进程运行的结果合并后,可能还需要进一步的处理。在我们的例子中,每年的结果独立于其他年份,并可能将所有结果拼接起来,然后按年份进行排序。如果使用固定大小块的方法,则需要特定的方法来合并结果。在这个例子中,某年的数据通常被分割成几个块,每个块进行独立处理。我们将最终获得每个数据块中的最高气温,所以最后一步是寻找这些分块数据中的最大值作为该年的最高气温,其他年份的数据均需如此处理。
最后,我们依旧受限于一台计算机的处理能力。如果手上拥有的所有处理器都用上,至少也需要20分钟,结果也就只能这样了。我们不能使它更快。另外,某些数据集的增长会超出一台计算机的处理能力。当我们开始使用多台计算机时,整个大环境中的其他因素将对其产生影响,其中最主要的是协调性和可靠性两大因素。哪个进程负责运行整个作业?我们如何处理失败的进程?
因此,尽管可以实现并行处理,但实际上非常复杂。使用Hadoop之类的框架来实现并行数据处理将很有帮助。