# devtools::install_github("tidyverse/dplyr")
# devtools::install_github("tidyverse/dbplyr")
# devtools::install_github("rstudio/sparklyr")
# See also: https://github.com/rstudio/sparklyr/issues/721
suppressPackageStartupMessages(library("dplyr"))
library("sparklyr")

devtools::session_info()

# more memory as suggested in https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"

sc <- sparklyr::spark_connect(version='2.1.0',
                              hadoop_version='2.7',
                              master = "local",
                              config = config)
print(sc)



#' Compute union_all of tables.  Cut down from \code{replyr::replyr_union_all()} for debugging.
#'
#' @param sc remote data source tables are on (and where to copy-to and work), NULL for local tables.
#' @param tabA not-NULL table with at least 1 row on sc data source, and columns \code{c("car", "fact", "value")}.
#' @param tabB not-NULL table with at least 1 row on same data source as tabA and columns \code{c("car", "fact", "value")}.
#' @param tempName name for temp table
#' @return table with all rows of tabA and tabB (union_all).
#'
#' @export
example_union_all <- function(sc, tabA, tabB, tempName) {
  cols <- intersect(colnames(tabA), colnames(tabB))
  expectedCols <- c("car", "fact", "value")
  if((length(cols)!=length(expectedCols)) ||
     (!all.equal(cols, expectedCols))) {
    stop(paste("example_union_all: column set must be exactly",
               paste(expectedCols, collapse = ', ')))
  }
  mergeColName <- 'exampleunioncol'
  # build a 2-row table to control the union
  controlTable <- data.frame(exampleunioncol= c('a', 'b'),
                             stringsAsFactors = FALSE)
  if(!is.null(sc)) {
    controlTable <- copy_to(sc, controlTable,
                            name= tempName,
                            temporary=TRUE)
  }
  # decorate left and right tables for the merge
  tabA <- tabA %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('a'))
  tabB <- tabB %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('b'))
  # do the merges
  joined <- controlTable %>%
    left_join(tabA, by=mergeColName) %>%
    left_join(tabB, by=mergeColName, suffix = c('_a', '_b'))
  # coalesce the values
  joined <- joined %>%
    mutate(car = ifelse(exampleunioncol=='a', car_a, car_b))
  joined <- joined %>%
    mutate(fact = ifelse(exampleunioncol=='a', fact_a, fact_b))
  joined <- joined %>%
    mutate(value = ifelse(exampleunioncol=='a', value_a, value_b))
  joined %>%
    select(one_of(cols))
}


mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars))

frameList <- mtcars2 %>%
  tidyr::gather(key='fact', value='value', -car) %>%
  split(., .$fact)

frameListS <- lapply(names(frameList),
                     function(ni) {
                       copy_to(sc, frameList[[ni]], ni)
                     }
)

count <- 1
for(rep in 1:20) {
  print(paste('start rep', rep, base::date()))
  nm <- paste('tmp', count, sep='_')
  count <- count + 1
  res <- compute(frameListS[[1]], name=nm)
  for(i in (2:length(frameListS))) {
    print(paste(' start phase', rep, i, base::date()))
    oi <- frameListS[[i]]
    nm2 <- paste('ctmp', count, sep='_')
    count <- count + 1
    res <- example_union_all(sc, res, oi, nm2)
    prevNM <- nm
    nm <- paste('tmp', count, sep='_')
    count <- count + 1
    res <- compute(res, name=nm)
    dplyr::db_drop_table(sc, nm2)
    dplyr::db_drop_table(sc, prevNM)
    print(paste(' done phase', rep, i, base::date()))
  }
  print(head(res))
  dplyr::db_drop_table(sc, nm)
  print(paste('done rep', rep, base::date()))
}


WinVector/replyr documentation built on Oct. 22, 2020, 8:07 p.m.