base::date()

library("vtreat")
packageVersion("vtreat")
library("rquery")
packageVersion("rquery")
packageVersion("cdata")
packageVersion("sparklyr")

conf <- sparklyr::spark_config()
conf$sparklyr.log.console <- FALSE
conf$spark.yarn.am.cores <- 2
conf$spark.executor.cores <- 2
conf$spark.executor.memory <- "4G"
conf$spark.yarn.am.memory <- "4G" 
conf$`sparklyr.shell.driver-memory` <- "4G"
conf$`sparklyr.shell.executor-memory` <- "4G"
conf$`spark.yarn.executor.memoryOverhead` <- "4G"

raw_connection <- sparklyr::spark_connect(version='2.2.0', 
                              master = "local",
                              config = conf)
db_opts <- rq_connection_tests(raw_connection)
db <- rquery_db_info(connection = raw_connection,
                     is_dbi = TRUE,
                     connection_options = db_opts)
base::date()
base::date()
dir <- '~/Documents/work/PracticalDataScienceWithR/zmPDSwR/KDD2009/' 

d <- read.table(paste(dir,'orange_small_train.data.gz',sep=''),
               header=TRUE, sep='\t', na.strings=c('NA',''), 
               stringsAsFactors=FALSE)
churn <- read.table(paste(dir,'orange_small_train_churn.labels.txt', sep=''),
                   header=FALSE, sep='\t')
d$churn <- churn$V1

for(ci in colnames(d)) {
  if(is.integer(d[[ci]])) {
    d[[ci]] <- as.numeric(d[[ci]])
  }
}
rq_copy_to(db, "kdd2009", d,
           overwrite = TRUE,
           temporary = TRUE)
rm(list = c("dir", "d", "churn"))
base::date()
base::date()
tmp_name_gen <- wrapr::mk_tmp_name_source("kddvtreat")

d <- db_td(db, "kdd2009") %.>%
  extend_nse(., sample_col = random())

cat(format(d))
#cat(to_sql(d, db))

d <- materialize(db, d, table_name = tmp_name_gen())

y_name <- "churn"
vars <- setdiff(column_names(d), c(y_name, "sample_col"))

d_train <- d %.>%
  select_rows_nse(., sample_col <= 0.5) %.>%
  materialize(db, ., table_name = tmp_name_gen())

d_test <- d %.>%
  select_rows_nse(., sample_col > 0.9) %.>%
  materialize(db, ., table_name = tmp_name_gen())

d_variable_design <- d %.>%
  select_rows_nse(., (sample_col > 0.5) & (sample_col <= 0.9)) %.>%
  materialize(db, ., table_name = tmp_name_gen())
base::date()
base::date()
cl = parallel::makeCluster(4)
print(length(vars))
# design treatments in small groups to manage memory
vgroups <- split(vars, ceiling(seq_len(length(vars))/10))
treatment_plans <- lapply(vgroups,
                          function(vi) {
                            di <- d_variable_design %.>%
                              select_columns(., c(y_name, vi)) %.>%
                              execute(db, .)
                            vtreat::designTreatmentsC(di, vi, y_name, 1, 
                                                      parallelCluster = cl,
                                                      verbose = FALSE)
                          })
base::date()

base::date()
# get unified scoreFrame
scoreFrame <- lapply(treatment_plans,
                     function(tpi) {
                       tpi$scoreFrame
                     })
scoreFrame <- do.call(rbind, scoreFrame)
base::date()


# try to get Bonferroni- corrected valid derived variables.
approximate_df <- length(vars) + nrow(scoreFrame)
theshold <- 1/(1 + approximate_df)
newvars <- scoreFrame$varName[ (scoreFrame$varMoves) &
                                 (scoreFrame$sig < theshold) & 
                                 (scoreFrame$rsq >= 1.0e-3) ]
newvars <- unique(newvars)
print(length(newvars))
base::date()
base::date()

col_sample <- execute(db, d_train, limit = 1L)
rqplan <- as_rquery_plan(treatment_plans, var_restriction = newvars)

if(FALSE) {
  ops <- rquery_prepare(db, rqplan, d_train, 
                        "dtrain_prepped", 
                        col_sample = col_sample,
                        return_ops = TRUE)
  cat(format(ops))
  ops %.>%
    rquery::op_diagram(.) %.>%
    DiagrammeR::grViz(.)
  # sql <- rquery::to_sql(ops, db)
  # cat(sql)
}

base::date()


d_train <- rquery_prepare(db, rqplan, d_train, 
                          tmp_name_gen(), 
                          col_sample = col_sample,
                          temporary = TRUE, 
                          overwrite = TRUE)
length(column_names(d_train))

base::date()

# cdata::qlook(db, d_train$table_name)
# rquery::rlook(db, d_train$table_name)
d_train %.>% 
  orderby(., limit = 10) %.>% 
  execute(db, .) %.>% 
  str(., list.len = 10000)
base::date()
parallel::stopCluster(cl)
for(ti in tmp_name_gen(dumpList = TRUE)) {
  rq_remove_table(db = db, table_name = ti)
}
sparklyr::spark_disconnect(raw_connection)


WinVector/vtreat documentation built on Aug. 29, 2023, 4:49 a.m.