# set up a SparkR cluster with some data in it.
sc <- sparklyr::spark_connect(version='2.2.0', 
                              master = "local")
# connect SparkR to cluster 
# https://github.com/WinVector/BigDataRStrata2017/blob/master/Exercises/solutions/06-Spark-Extension.md
SPARK_HOME <- sc$spark_home
library(SparkR, lib.loc = paste0(SPARK_HOME, "/R/lib/"))
sr <- sparkR.session(master = "local", sparkHome = SPARK_HOME)

df <- dplyr::copy_to(sc, 
                    data.frame(
                      subjectID = c(1,                   
                                    1,
                                    2,                   
                                    2),
                      surveyCategory = c(
                        'withdrawal behavior',
                        'positive re-framing',
                        'withdrawal behavior',
                        'positive re-framing'
                      ),
                      assessmentTotal = c(5,                 
                                          2,
                                          3,                  
                                          4),
                      irrelevantCol1 = "irrel1",
                      irrelevantCol2 = "irrel2",
                      stringsAsFactors = FALSE),
                    name = 'df',
                    temporary = TRUE, 
                    overwrite = FALSE)
sparklyr::spark_write_parquet(df, 'df_tmp')
dSparkR <- SparkR::read.df('df_tmp')

Show our SparkR setup.

library("wrapr")
library("rquery")
library("SparkR")
packageVersion("SparkR")
print(sr)
print(dSparkR)

SparkR::createOrReplaceTempView(dSparkR, 
                                "dSparkR")

SparkR::sql("SELECT * from dSparkR") %.>%
  head(.) %.>%
  knitr::kable(.)

Run the same query as the rquery example.

scale <- 0.237
db_info <- rquery_db_info(indentifier_quote_char = '`',
                           string_quote_char = '"')
d <- rquery::table_source(
  table_name = "dSparkR",
  columns = colnames(dSparkR))

dq <- d %.>%
  extend_nse(.,
             probability :=
               exp(assessmentTotal * scale)/
               sum(exp(assessmentTotal * scale)),
             count := count(1),
             partitionby = 'subjectID') %.>%
  extend_nse(.,
             rank := rank(),
             partitionby = 'subjectID',
             orderby = c('probability', 'surveyCategory'))  %.>%
  rename_columns(., 'diagnosis' := 'surveyCategory') %.>%
  select_rows_nse(., rank == count) %.>%
  select_columns(., c('subjectID', 
                      'diagnosis', 
                      'probability')) %.>%
  order_by(., 'subjectID')

sql <- rquery::to_sql(dq, db_info)

# run query through SparkR
SparkR::sql(sql) %.>%
  head(.) %.>%
  knitr::kable(.)
sparklyr::spark_disconnect(sc)


YTLogos/rquery documentation built on May 19, 2019, 1:46 a.m.