R/to_spark_input.R

to_spark_input <- function(lazy, df, columns = SparkR::columns(df)) {
  spark_expr <- translate_spark_columns(lazy$expr, df, columns)
  eval(spark_expr, envir = lazy$env)
}

translate_spark_columns <- function(call, df, columns) {
  if(is.atomic(call)) return(call)
  
  if(is.symbol(call)) {
    name <- as.character(call)
    if(name %in% columns) {
      do.call(`$`, list(df, name))
    } else {
      call
    }
  } else if(is.call(call)) {
    name <- as.character(call[[1]])
    if(name[1] %in% c("$", "[[", "[")) {
      call
    } else {
      call[-1] <- lapply(call[-1], translate_spark_columns, df, columns)
      call
    }
  }
}
saurfang/SparkRext documentation built on May 29, 2019, 3:19 p.m.