SparkR:数据科学家的新利器

R Worker

SparkR RDD API和Scala RDD API相比有两大不同:SparkR RDD是R对象的分布式数据集,SparkR RDD transformation操作应用的是R函数。SparkR RDD API的执行依赖于Spark Core但运行在JVM上的Spark Core既无法识别R对象的类型和格式,又不能执行R的函数,因此如何在Spark的分布式计算核心的基础上实现SparkR RDD API是SparkR架构设计的关键。

SparkR设计了Scala RRDD类,除了从数据源创建的SparkR RDD外,每个SparkR RDD对象概念上在JVM端有一个对应的RRDD对象。RRDD派生自RDD类,改写了RDD的compute()方法,在执行时会启动一个R worker进程,通过socket连接将父RDD的分区数据、序列化后的R函数以及其它信息传给R worker进程。R worker进程反序列化接收到的分区数据和R函数,将R函数应到到分区数据上,再把结果数据序列化成字节数组传回JVM端。

从这里可以看出,与Scala RDD API相比,SparkR RDD API的实现多了几项开销:启动R worker进程,将分区数据传给R worker和R worker将结果返回,分区数据的序列化和反序列化。这也是SparkR RDD API相比Scala RDD API有较大性能差距的原因。

DataFrame API的实现

由于SparkR DataFrame API不需要传入R语言的函数(UDF()方法和RDD相关方法除外),而且DataFrame中的数据全部是以JVM的数据类型存储,所以和SparkR RDD API的实现相比,SparkR DataFrame API的实现简单很多。R端的DataFrame对象就是对应的JVM端DataFrame对象的wrapper,一个DataFrame方法的实现基本上就是简单地调用JVM端DataFrame的相应方法。这种情况下,R Worker就不需要了。这是使用SparkR DataFrame API能获得和ScalaAPI近乎相同的性能的原因。

当然,DataFrame API还包含了一些RDD API,这些RDD API方法的实现是先将DataFrame转换成RDD,然后调用RDD 的相关方法。