2014年,Spark开源生态系统得到了大幅增长,已成为大数据领域最人气的开源项目之一,活跃在Hortonworks、IBM、Cloudera、MapR和Pivotal等众多知名大数据公司,更拥有Spark SQL、Spark Streaming、MLlib、GraphX等多个相关项目。同时值得一提的是,Spark贡献者中有一半左右的中国人。
短短四年时间,Spark不仅发展为Apache基金会的顶级开源项目,更通过其高性能内存计算及其丰富的生态快速赢得几乎所有大数据处理用户。2015年1月10日,一场基于Spark的高性能应用实践盛宴由Databricks软件工程师连城、百度高级工程师甄鹏、百度架构师孙垚光、百度美国研发中心高级架构师刘少山四位专家联手打造。
Databricks软件工程师连城——Spark SQL 1.2的提升和新特性
谈及Spark SQL 1.2的提升和新特性,连城主要总结了4个方面——External data source API(外部数据源API)、列式内存存储加强(Enhanced in-memory columnar storage)、Parquet支持加强(Enhanced Parquet support)和Hive支持加强(Enhanced Hive support)。
External data source API
连城表示,因为在处理很多外部数据源中出现的扩展问题,Spark在1.2版本发布了External data source API。通过External data source API,Spark将不同的外部数据源抽象成一个关系表格,从而实现更贴近无缝的操作。
External data source API在支持了多种如JSON、Avro、CSV等简单格式的同时,还实现了Parquet、ORC等的智能支持;同时,通过这个API,开发者还可以使用JDBC将HBase这样的外部系统对接到Spark中。
连城表示,在1.2版本之前,开发者其实已经实现了各种各样外部数据源的支持,因此,对比更原生的支持一些外部数据源,External data source API的意义更在于针对相应数据源进行的特殊优化,主要包括Column pruning(列剪枝)和Pushing predicates to datasources(将predicates贴近数据源)两个方面:
Column pruning。主要包括纵横的两种剪枝。在列剪枝中,Column pruning可以完全忽视无需处理的字段,从而显著地减少IO。同时,在某些条件查询中,基于Parquet、ORC等智能格式写入时记录的统计信息(比如最大值、最小值等),扫描可以跳过大段的数据,从而省略了大量的磁盘扫描负载。
Pushing predicates to datasources。在更复杂的SQL查询中,让过滤条件维度尽可能的接近数据源,从而减少磁盘和网络IO,最终提高整体端到端的性能。
使用External data source API之前
使用External data source API之后
搭载了如Parquet和ORC这样的智能格式
连城表示,在Spark 1.2版本中,External data source API并没有实现预期中的功能,在Roadmap中,First class分片支持(First class partitioning support with partition pruning)、Data sink(insertion)API、将Hive作为外部数据源等。
Enhanced in-memory columnar storage
连城表示,不管Shark,还是Spark,内存缓存表的支持都是非常重要的一个特性。他表示,虽然在1.1和之前版本中的列式内存表的性能已然不错,但是还会出现一些问题:第一,大数据量下缓存超大体积表时(虽然不推荐,但不缺现实用例),会出现OOM等问题;第二,在列式存储中,像Parquet、ORC这种收集统计信息然后通过这些信息做partition skipping等操作在之前版本中并没有完全实现。这些问题在1.2版本中都得到了解决,本节,连城主要介绍了语义统一、缓存实体化、基于缓存共享的查询计划、Cache大表时的OOM问题、表格统计(Table statistics)等方面。
缓存实体化。SQLContext.cacheTable(“tbl”)默认使用eager模式,缓存实体化将自动进行,不会再等到表被使用或触发时,避免手动做“SELECT COUNT(*) FROM src;”。同时,新增了“CACHE [LAZY] TABLE tbl [AS SELECT …]”这样的DML。
语义统一。早期时候,SchemaRDD.cache()和SQLContext.cacheTable(“tbl”)这两个语义是不同的。其中,SQLContext.cacheTable会去建立一些列式存储格式相关优化,而SchemaRDD.cache()却以一行一个对象的模式进行。在1.2版本中,这两个操作已被统一,同时各种cache操作都将得到一个统一的内存表。
基于缓存共享的查询计划。两个得到相同结果的cache语句将共享同一份缓存数据。
避免Cache大表时的OOM问题。优化内存表的建立和访问,减少开销,进一步提升性能;在缓存大表时,引入batched column buffer builder,将每一列切成多个batch,从而避免了OOM。
表格统计。Table statistics,类似Parquet、ORC使用的技术,在1.2版本中主要实现了Predicate pushdown(实现更快的表格扫描)和Auto broadcast join(实现更快的表格join)。
最后,连城还详细介绍了一些关于加强Parquet和Hive支持的实现,以及Spark未来的一些工作。
百度基础架构部高级工程师甄鹏——Spark在百度开放云BMR中的实战分享
百度分布式计算团队从2011年开始持续关注Spark,并于2014年将Spark正式引入百度分布式计算生态系统中,在国内率先面向开发者及企业用户推出了支持Spark并兼容开源接口的大数据处理产品BMR(Baidu MapReduce)。在甄鹏的分享中,我们主要了解了百度Spark 应用现状、百度开放云BMR和Spark> Spark在百度
甄鹏表示,当前百度的Spark集群由上千台物理主机(数万Cores,上百TBMemory)组成,日提交App在数百,已应用于凤巢、大搜索、直达号、百度大数据等业务。之以选择Spark,甄鹏总结了三个原因:快速高效、API 友好易用和组件丰富。
快速高效。首先,Spark使用了线程池模式,任务调度效率很高;其次,Spark可以最大限度地利用内存,多轮迭代任务执行效率高。
API友好易用。这主要基于两个方面:第一,Spark支持多门编程语言,可以满足不同语言背景的人使用;第二,Spark的表达能力非常丰富,并且封装了大量常用操作。
组件丰富。Spark生态圈当下已比较完善,在官方组件涵盖SQL、图计算、机器学习和实时计算的同时,还有着很多第三方开发的优秀组件,足以应对日常的数据处理需求。
百度开放云BMR
在BMR介绍中,甄鹏表示,虽然BMR被称为Baidu MapReduce,但是这个名称已经不能完全表示出这个平台:BMR是百度开放云的数据分析服务产品,基于百度多年大数据处理分析经验,面向企业和开发者提供按需部署的Hadoop&Spark集群计算服务,让客户具备海量数据分析和挖掘能力,从而提升业务竞争力。
如图所示,BMR基于BCC(百度云服务器),建立在HDFS和BOS(百度对象存储)分布式存储之上,其处理引擎包含了MapReduce和Spark,同时还使用了HBase数据库。在此之上,系统集成了Pig、Hive、SQL、Streaming、GraphX、MLLib等专有服务。在系统的最上层,BMR提供了一个基于Web的控制台,以及一个API形式的SDK。
在图片的最右边,Scheduler在BMR中起到了管理作用,使用它开发者可以编写比较复杂的作业流。
Spark> 类似于通常的云服务,BMR中的Spark同样随用随起,集群空闲即销毁,帮助用户节省预算。此外,集群创建可以在3到5分钟内完成,包含了完整的Spark+HDFS+YARN堆栈。同时,BMR也提供Long Running模式,并有多种套餐可选。
完善的报表服务,全方位监控
在安全上,用户拥有虚拟的独立网络,在同一用户全部集群可互联的同时,BMR用户间网络被完全隔离。同时,BMR还支持动态扩容,节点规模可弹性伸缩。除此之外,在实现Spark全组件支持的同时,BMR可无缝对接百度的对象存储BOS服务,借力百度多年的存储研发经验,保证数据存储的高可靠性。
百度基础架构部架构师孙垚光——百度高性能通用Shuffle服务
在2014 Sort Benchmark国际大赛上,百度成功夺冠,其幕后英雄无疑卓越的Shuffle机制,在孙垚光的分享中,我们对Shuffle的发展、细节和未来有了一次深度的接触。
Shuffle简介
孙垚光表示,简单来说,Shuffle就是按照一定的分组和规则Map一个数据,然后传入Reduce端。不管对于MapReduce还是Spark,Shuffle都是一个非常重要的阶段。然而,虽然Shuffle解决的问题相同,但是在Spark和MapReduce中,Shuffle流程(具体时间和细节)仍然存在一定的差别:
Baidu Shuffle发展历程
通过孙垚光了解到,Shuffle在百度的发展主要包括两个阶段:跟随社区和独立发展。从2008年百度的MapReduce/Hadoop起步开始,百度就开始跟随社区,使用社区版本,期间的主要工作包含Bug修复和性能优化两个方面(增加内存池、减少JVMGC,传输Server由Jetty换Netty,及批量传输、聚合数据等方面)。
分离了shuffle和Map/Reduce
在2012年开始,Baidu Shuffle开启独立发展阶段,主要源于下一代离线计算系统的开发,Shuffle被抽离为独立的ShuffleService服务,从而提高了集群资源的利用率。
截止此时,不管是社区版本(MapReduce/Spark),还是百度研发的ShuffleService,它们都是基于磁盘的PULL模式。基于磁盘,所有Map的数据都会放到磁盘,虽然Spark号称内存计算,但是涉及到Shuffle时还是会写磁盘。基于PULL,所有数据在放到Map端的磁盘之后,Reduce在使用时还需要主动的拉出来,因此会受到两个问题影响:首先,业务数据存储在Map端的服务器上,机器宕机时会不可避免丢失数据,这一点在大规模分布式集群中非常致命;其次,更重要的是,Shuffle阶段会产生大量的磁盘寻道(随机读)和数据重算(中间数据存在本地磁盘),举个例子,某任务有1百万个Map,1万个Reduce,如果一次磁盘寻道的时间是10毫秒,那么集群总共的磁盘寻道时间= 1000000 ×10000 ×0.01 = 1亿秒。
New Shuffle
基于这些问题,百度设计了基于内存的PUSH模式。新模式下,Map输出的数据将不落磁盘,并在内存中及时地Push给远端的Shuffle模块,从而将获得以下提升:
New Shuffle的优势
New Shuffle架构
如图所示,蓝色部分为New Shuffle部分,主要包含两个部分:数据写入和读取的API,Map端会使用这个接口来读取数据,Reduce会使用这个接口来读取数据;其次,最终重要的是,服务器端使用了典型的主从架构,用多个shuffle工作者节点来shuffle数据。同时,在系统设计中,Master非常有利于横向扩展,让shuffle不会成为整个分布式系统的瓶颈。
让New Shuffle模块专注于shuffle,不依赖于外部计算模块,从而计算模块可以专注于计算,同时还避免了磁盘IO。然而New Shuffle带来的问题也随之暴漏,其中影响比较重要的两个就是:慢节点和数据重复。
慢节点。以shuffle写入过程中出现慢节点为例,通常包含两个情况。首先,Shuffle自身慢节点,对比社区版本中只会影响到一个task,New Shuffle中常常会影响到一片集群。在这里,百度为每个Shuffle节点都配置了一个从节点,当Map检测到一个慢节点时,系统会自动切换到从节点。其次,DFS出现慢节点,这个情况下,Shuffle的从节点只能起到缓解作用。这种情况下,首先DFS系统会自动检测出慢节点,并进行替换。比如,传统的HDFS会以pipeline的形式进行写入,而DFS则转换为分发写。
在此之外,New Shuffle还需要解决更多问题,比如资源共享和隔离等。同时,基于New Shuffle的机制,New Shuffle还面临一些其他挑战,比如Reduce全启动、数据过于分散、对DFS压力过大、连接数等等。
数据重复。如上图所示,这些问题主要因为New Shuffle对上层组件缺少感知,这个问题的解决主要使用task id和block id进行去重。
New Shuffle展望
孙垚光表示,New Shuffle使用了通用的Writer和Reader接口,当下已经支持百度MR和DCE(DAG、C++),同时即将对开源Spark提供支持。在未来,New Shuffle无疑将成为更通用的组件,支持更多的计算模型。
百度美国硅谷研发中心高级架构师刘少山——Fast big data analytics with Spark>
Tachyon是一个分布式的内存文件系统,可以在集群里以访问内存的速度来访问存在Tachyon里的文件。Tachyon是架构在分布式文件存储和上层各种计算框架之间的中间件,主要负责将那些不需要落到DFS里的文件,落到分布式内存文件系统中,从而达到共享内存,以提高效率。1月10日下午的最后一场分享中,刘少山带来了一场Tachyon的深入解析。
Tachyon和Spark
刘少山表示,在Spark使用过程中,用户经常困扰于3个问题:首先,两个Spark 实例通过存储系统来共享数据,这个过程中对磁盘的操作会显著降低性能;其次,因为Spark崩溃所造成的数据丢失;最后,垃圾回收机制,如果两个Spark实例需求同样的数据,那么这个数据会被缓存两次,从而造成很大的内存压力,更降低性能。
使用Tachyon,存储可以从Spark中分离处理,让其更专注于计算,从而避免了上述的3个问题。
Tachyon架构
刘少山从Spark的角度分享了Tachyon的部署。在与Spark搭配使用时,系统会建立一个Tachyon的job,通过Tachyon Client来访问同一个机器上的Tachyon Worker,也就是机器上的内存。而Tachyon Client则会与Tachyon Master交互,来清楚每个分节点所包含的数据。由此可见,在整个Tachyon 系统中,Master、Client和Worker为最重要的三个部分。
Tachyon Master。Master主要部件是Inode和Master Worker Info:Inode会负责系统的监视,Master Worker Info则存储了所有Worker的信息。
Tachyon Worker。Worker主要负责存储,其中Worker Storage是最主要的数据结构,包含Local data folder和Under File System两个部分。其中Local data folder表示存在本地的Tachyon文件,Under File System则负责从HDFS中读取Worker中未发现的数据。
Tachyon Client。Client为上层用户提供了一个透明的机制,其TachyonFS接口负责数据请求。每个Client中有多个Tachyon File,其中Block In Stream负责文件读取(Local Block In Stream负责本地机器读取,Remote Block In Stream则负责读取远程机器);Block Out Stream主要负责将文件写到本地机器上。在Client上,Master Client会与Master交互,Worker Client则与Client交互。
Tachyon在百度
为什么要使用Tachyon,刘少山指出,在百度,计算集群和存储集群往往不在同一个地理位置的数据中心,在大数据分析时,远程数据读取将带来非常高的延时,特别是ad-hoc查询。因此,将Tachyon作为一个传输缓存层,百度通常会将之部署在计算集群上。首次查询时,数据会从远程存储取出,而在以后的查询中,数据就会从本地的Tacnyon上读取,从而大幅的改善了延时。
在百度,Tachyon的部署还处于初始阶段,大约部署了50台机器,主要服务于ad-hoc查询。
实践中遭遇的挑战
通过刘少山了解到,Tachyon的使用过程并不是一帆风顺,比如:因为Tachyon需求对Block完全读取,从而可能造成Blocks并未被缓存;有时候,虽然scheduler已经确认了数据存在本地,Spark workers仍然从远程blocks读取,而缓存命中率也只有可怜的33%(如果你需要的是2号block,Tachyon会分别从1、2、3号block读取,从而将block读取了3份)。因此,刘少山表示,如果要使用好Spark与Tachyon,一定要对用例和Tachyon进行充分的了解。
分享最后,刘少山还介绍了Hierarchical Storage Feature特性以及百度未来的工作,其中包括缓存替换策略等。