base::date() library("vtreat") packageVersion("vtreat") library("rquery") packageVersion("rquery") packageVersion("cdata") packageVersion("sparklyr") conf <- sparklyr::spark_config() conf$sparklyr.log.console <- FALSE conf$spark.yarn.am.cores <- 2 conf$spark.executor.cores <- 2 conf$spark.executor.memory <- "4G" conf$spark.yarn.am.memory <- "4G" conf$`sparklyr.shell.driver-memory` <- "4G" conf$`sparklyr.shell.executor-memory` <- "4G" conf$`spark.yarn.executor.memoryOverhead` <- "4G" raw_connection <- sparklyr::spark_connect(version='2.2.0', master = "local", config = conf) db_opts <- rq_connection_tests(raw_connection) db <- rquery_db_info(connection = raw_connection, is_dbi = TRUE, connection_options = db_opts) base::date()
base::date() dir <- '~/Documents/work/PracticalDataScienceWithR/zmPDSwR/KDD2009/' d <- read.table(paste(dir,'orange_small_train.data.gz',sep=''), header=TRUE, sep='\t', na.strings=c('NA',''), stringsAsFactors=FALSE) churn <- read.table(paste(dir,'orange_small_train_churn.labels.txt', sep=''), header=FALSE, sep='\t') d$churn <- churn$V1 for(ci in colnames(d)) { if(is.integer(d[[ci]])) { d[[ci]] <- as.numeric(d[[ci]]) } } rq_copy_to(db, "kdd2009", d, overwrite = TRUE, temporary = TRUE) rm(list = c("dir", "d", "churn")) base::date()
base::date() tmp_name_gen <- wrapr::mk_tmp_name_source("kddvtreat") d <- db_td(db, "kdd2009") %.>% extend_nse(., sample_col = random()) cat(format(d)) #cat(to_sql(d, db)) d <- materialize(db, d, table_name = tmp_name_gen()) y_name <- "churn" vars <- setdiff(column_names(d), c(y_name, "sample_col")) d_train <- d %.>% select_rows_nse(., sample_col <= 0.5) %.>% materialize(db, ., table_name = tmp_name_gen()) d_test <- d %.>% select_rows_nse(., sample_col > 0.9) %.>% materialize(db, ., table_name = tmp_name_gen()) d_variable_design <- d %.>% select_rows_nse(., (sample_col > 0.5) & (sample_col <= 0.9)) %.>% materialize(db, ., table_name = tmp_name_gen()) base::date()
base::date() cl = parallel::makeCluster(4) print(length(vars)) # design treatments in small groups to manage memory vgroups <- split(vars, ceiling(seq_len(length(vars))/10)) treatment_plans <- lapply(vgroups, function(vi) { di <- d_variable_design %.>% select_columns(., c(y_name, vi)) %.>% execute(db, .) vtreat::designTreatmentsC(di, vi, y_name, 1, parallelCluster = cl, verbose = FALSE) }) base::date() base::date() # get unified scoreFrame scoreFrame <- lapply(treatment_plans, function(tpi) { tpi$scoreFrame }) scoreFrame <- do.call(rbind, scoreFrame) base::date() # try to get Bonferroni- corrected valid derived variables. approximate_df <- length(vars) + nrow(scoreFrame) theshold <- 1/(1 + approximate_df) newvars <- scoreFrame$varName[ (scoreFrame$varMoves) & (scoreFrame$sig < theshold) & (scoreFrame$rsq >= 1.0e-3) ] newvars <- unique(newvars) print(length(newvars)) base::date()
base::date() col_sample <- execute(db, d_train, limit = 1L) rqplan <- as_rquery_plan(treatment_plans, var_restriction = newvars) if(FALSE) { ops <- rquery_prepare(db, rqplan, d_train, "dtrain_prepped", col_sample = col_sample, return_ops = TRUE) cat(format(ops)) ops %.>% rquery::op_diagram(.) %.>% DiagrammeR::grViz(.) # sql <- rquery::to_sql(ops, db) # cat(sql) } base::date() d_train <- rquery_prepare(db, rqplan, d_train, tmp_name_gen(), col_sample = col_sample, temporary = TRUE, overwrite = TRUE) length(column_names(d_train)) base::date() # cdata::qlook(db, d_train$table_name) # rquery::rlook(db, d_train$table_name) d_train %.>% orderby(., limit = 10) %.>% execute(db, .) %.>% str(., list.len = 10000) base::date()
parallel::stopCluster(cl) for(ti in tmp_name_gen(dumpList = TRUE)) { rq_remove_table(db = db, table_name = ti) } sparklyr::spark_disconnect(raw_connection)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.