library(SparkR)#初始化SparkContextsc <- sparkR.init("local", "RWordCount") #从HDFS上的一个文本文件创建RDD lines <- textFile(sc, "hdfs://localhost:9000/my_text_file")#调用RDD的transformation和action方法来计算word count#transformation用的函数是R代码words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] })wordCount <- lapply(words, function(word) { list(word, 1L) })counts <- reduceByKey(wordCount, "+", 2L)output <- collect(counts)
基于DataFrame API的示例
基于DataFrame API的SparkR程序首先创建SparkContext,然后创建SQLContext,用SQLContext来创建DataFrame,再操作DataFrame里的数据。下面是用SparkR DataFrame API计算平均年龄的示例:
library(SparkR)#初始化SparkContext和SQLContextsc <- sparkR.init("local", "AverageAge") sqlCtx <- sparkRSQL.init(sc)#从当前目录的一个JSON文件创建DataFramedf <- jsonFile(sqlCtx, "person.json")#调用DataFrame的操作来计算平均年龄df2 <- agg(df, age="avg")averageAge <- collect(df2)[1, 1]
对于上面两个示例要注意的一点是SparkR RDD和DataFrame API的调用形式和Java/Scala API有些不同。假设rdd为一个RDD对象,在Java/Scala API中,调用rdd的map()方法的形式为:rdd.map(…),而在SparkR中,调用的形式为:map(rdd, …)。这是因为SparkR使用了R的S4对象系统来实现RDD和DataFrame类。
8/11 首页 上一页 6 7 8 9 10 11 下一页 尾页