R/library.R

library(lubridate)
#library(tidyverse)
library(data.table)
library(DBI)
#library(reshape2) # data.table is better
library(openxlsx)
library(ggplot2)
library(ggthemes)
library(RMySQL)
library(anytime)

library(stringr)
#library(googlesheets)

library(countrycode)
library(reticulate)
library(rstudioapi)


# installation
# debian
# RStudio 1.2v (with cmd)
# debian GNU/Linux:: Microsft Store
# browseURL('https://rstudio.github.io/reticulate/index.html')
# library reticulate (python)


#mosaic::mPlot(bp)
# New Functions
ppy = function (...) reticulate::repl_python(...)
ppy3 = function(...) {
  use_python("C:/MyProgramFiles/python36/python3.exe")
  reticulate::repl_python(...)
}
prop.table2 = function(...) percent(prop.table(...))
countrycode2 = function(x) {
  y = x
  x = countrycode(x,'country.name','iso2c')
  x[y == 'Internacional'] = 'com'
  x[y == "United Kingdom"] = 'uk'
  x = tolower(x)
  x
}

pointofsale.ms = function(brand,market){
  brand.v = c('edreams','opodo','govoyages','travellink')
  names(brand.v) = toupper(substr(brand.v,1,1))
  market = countrycode2(market)
  brand = toupper(substr(brand,1,1));
  brand = brand.v[brand]
  pos = paste0(brand,'.',market)
  pos
}


as.numeric2 = function(x, delete = c(intToUtf8(8364),','), sub = c('','')) {
  Encoding(x) = 'UTF-8'
  for(i in 1:length(delete)){
  x = gsub(delete[i],sub[i],x,fixed=T)
  }
  x = as.numeric(x)
  x
}
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
  if(missing(by)) {
    out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
    if(vector) lapply(out, unlist)
    } else
    split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}

# guestrefSK <- function(x,pos=F){
# SubSingle <- unlist(strsplit(x,'|',fixed=T))
# n <- length(SubSingle)/2
# if(pos) list(SubSingle[2*(1:n)-1],SubSingle[2*(1:n)]) else
#   SubSingle[2*(1:n)-1]
# }

guestrefSK <- function(x,pos=F){
  out <- tstrsplit(x,'|',fixed=T,fill='')
 if(pos) out else out[[1]]
}

percent <- function(x, digits = 2, format = "f", ...) {
  paste0(formatC(100 * x, format = format, digits = digits, ...), "%")
}

na.convert <- function(x,y = 0) {
  x[is.na(x)] <- y
  x}

na.convert.mean <- function(x) {
  y = mean(x, na.rm = T)
  x[is.na(x)] <- y
  x}


na.count <-function (x,DT = F) {
  out <- sapply(x, function(y) sum(is.na(y)))
  if(DT) data.table(variable = names(out),value = out) else
    out
}

from_unixtime <- function(...,ts = F ) {
  # ts = TimeStamp
  if(ts) anytime(...)
    else as.Date(anytime(...))
}
shell2 <- function(...,check = 'putty') {
  if(!any(grepl(check,shell('tasklist',intern=T))))
    shell(...)
}

match2 <- function(x,y) match(tolower(x),tolower(y))


# wrapper of data.table::split
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
  if(missing(by)) {
    out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
    if(vector) lapply(out, unlist)
    } else
    split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}

.p = function(x,y = 'N') {
  # DT[,.N,col][,.p(.SD)]
  c(x,list(pp = prop.table2(x[[y]])))
}
#

get.fileextension = function(filename) {
  out = sapply(strsplit(filename,'.',fixed = T),last)
  paste0('.',out)
}

add.Sys.time2 = function(filename,ttable = F) {
  suffix = get.fileextension(filename)
  suffix.tmp = paste0('_',Sys.time2(ttable),suffix)
  file.tmp = gsub(suffix,suffix.tmp,filename,fixed=T)
  file.tmp
}

Sys.time2 <- function(sqlformat = F) {
  out = format(Sys.time(),'%Y-%m-%d_%H-%M-%S')
  if(sqlformat) out = gsub('-','_',out)
  out
}

# Format
convertMonth <- function(x, isnum = T) {
  stopifnot(class(x) %in% c('integer','numeric','Date'))
  if(class(x) == 'Date') {
    warning('Converting Date to mmonth')
    month(x) + year(x)*12
    } else
      as.Date(paste(trunc(x/12) + (ceiling(x%%12/12)-1), x%%12 + (ceiling(x%%12/12)-1)*(-12),'01',sep='-'))
}
# convertMonth <- function(x) {
#     as.Date(paste(trunc(x/12) + (ceiling(x%%12/12)-1), x%%12 + (ceiling(x%%12/12)-1)*(-12),'01',sep='-'))
# }
#
# convertMonth2 <- function(x) {
#     as.Date(paste(trunc(x/12) - (x%%12 ==0), x%%12 + 12*(x%%12 == 0),'01',sep='-'))
# }


convertWeek <- function(x) {
    as.Date(paste(trunc(x/52) + (ceiling(x%%52/52)-1), x%%52 + (ceiling(x%%52/52)-1)*(-52),'01',sep='-'))
}

convertTable = function(filename,deletepoint = T){
  filename = last(strsplit(filename,'/')[[1]])
  if(deletepoint) {
    filename = strsplit(filename,'.',fixed=T)[[1]]
    filename = paste0(filename[-length(filename)],collapse='.')
  }
  filename
}

# lapply2 <- function(A,ff,...){ # rewrites namescol of A into ff
#   stopifnot(class(A)[1] =='data.table')
#   namescol <- c(...)
#   A[,(namescol):= lapply(.SD,ff),.SDcols = namescol]
# }
lapply2 <- function(A,ff,ss,...){ # rewrites namescol of A into ff
  if(missing(ss)) ss = names(A)
  stopifnot(class(A)[1] =='data.table')
  if(class(ss) == 'numeric') namescol <- names(A)[! names(A) %in%  c(...)] else
  namescol <- c(ss,...)
  A[,(namescol):= lapply(.SD,ff),.SDcols = namescol]
}

# Sql -------------------------------------------------------------------------------------------------------------------------
collapse <- function(x,string=T,brackets = T,sep = ", "){
  l <- ifelse(brackets,'(','')
  r <- ifelse(brackets,')','')
  cc <- ifelse(string,"'","")
  collp <- paste0(cc,sep,cc )

  if(string)  paste0(" ('",paste0(x,collapse=), "') ")
  else paste0(" ( ",paste0(x,collapse=), ") ")
   paste0(l,cc,paste0(x,collapse= collp ), cc, r )
}


to_unixtime <- function(dd){
  as.numeric(as.POSIXct(dd))
}

groupstr <- function(x=1,y){ # x number of columns NOT in group by, y num. IN group by
  if(missing(y)) {
    y = x
    x = 1}
  paste0(' ',(x+1):(x+y),' ',collapse = ', ')
}

# Data table wrapers
dbGetQuery2 <- function(...)
  data.table(dbGetQuery(...))
# dbGetQuery3 = function(con,sql,...){
#   ff.aux = function(sql.i) dbGetQuery2(con,sql.i,...)
#   out.list = lapply(sql,ff.aux)
#   out = do.call(rbind,out.list)
#   out
# }
dbGetQuery3 = function(con,sql,groupby  = F , groupby.ff = function(x) sum(x,na.rm = T),...){
  ff.aux = function(sql.i) dbGetQuery2(con,sql.i,...)
  out.list = lapply(sql,ff.aux)
  out = do.call(rbind,out.list)
  if(groupby) {
    metricnames = sapply(out,class)
    metricnames = metricnames[metricnames %in% c('numeric','integer')]
    nonmetricanames = names(out)[!names(out) %in% names(metricnames)]
    #ff = function(x) sum(x,na.rm = T)
    out = out[,lapply(.SD,groupby.ff),eval(nonmetricanames)]
    out } else out
}

read.xlsx2 <- function(...)
  data.table(read.xlsx(...))

# Explore conection
# cmd <- credentials bx

con <- dbConnect(RPresto::Presto(), host='localhost', port=8889, user=userold, password=" ", schema=" ",
  catalog='hive', source='local')


bqbilling = last(strsplit(bqproject,'-')[[1]])
# standard
conbq <- DBI::dbConnect(bigrquery::dbi_driver(), dataset = bqdataset, project = bqproject ,
                       billing = bqbilling, use_legacy_sql = FALSE)
# legacy
conbq_l <- DBI::dbConnect(bigrquery::dbi_driver(), dataset = bqdataset, project = bqproject ,
                       billing = bqbilling, use_legacy_sql = T)

# Bq --------------------------------------------------------------------------------------------------------------

gatable <- function(PC,fullname = T){
  if(any(duplicated(PC))) stop('duplicates in PC')
  rows = match(PC,bqAcc$pos)
  if(any(is.na(rows))) stop(paste0(collapse(head(PC[is.na(rows)])) ,' are not pointofsales'))
  out = bqAcc[rows]$profile
  names(out) = PC
  if(fullname) out = paste0('`',out,'.ga_sessions_*','`')
  out
}


bq_desc = function(tablename){
  if(!grepl('.',tablename)) tablename = paste0(bqdataset,'.',bqtable)
  filename = paste0(Sys.time2(),'.json')
  id = terminalCreate()
  ccmd = paste0('bq --format prettyjson show ' , tablename ,' > ',filename)
  ccmd%$%.
  json_desc = rjson::fromJSON(paste(readLines(filename), collapse=""))
  df_desc = as.data.frame(json_desc)
  df_desc.name = df_desc[1,grepl('schema.fields.name',names(df_desc))]
  df_desc.type = df_desc[1,grepl('schema.fields.type',names(df_desc))]
  dt_desc = data.table(name = t(as.matrix(df_desc.name)), type = t(as.matrix(df_desc.type)))
  names(dt_desc) = c('name','type')
  terminalKill(id)
  dt_desc
}

createSchema <- function(A,filename){
# schema = rep('string',ncol(A))
# names(schema) = names(A)

schema = sapply(A,class)
schema[schema == 'character'] = 'string'
schema[schema %in% c("POSIXct", "POSIXt")] = 'timestamp'
schema[schema == 'Date'] = "DATETIME"

schema[schema == 'numeric'] = 'float'
schema[schema == 'integer'] = 'integer'

if(any(! schema %in% c('string','timestamp','float','integer'))) warning(paste0(collapse(schema[! schema %in% c('string','timestamp','float','integer')]),' not in bq data types '))

schema = data.table(description = NA,mode = 'NULLABLE',name = names(schema),type = schema)
#schema[name == 'GoogleSendDate',type := 'timestamp']
#schema[grepl('date',name,T), type := 'timestamp' ]
schemajs = jsonlite::toJSON(schema,pretty = T)
#filename = paste0('./output/',DataExtension,'_Schema.json')
write(schemajs,filename)
}

# Explore Tables -------------------------------------------------------------------------------------------------------------------
#tableMOM <- "hive.workspace.MoM_LTV_zeta"
#tableMOM <- "hive.workspace.MoM_LTV_eta"
#tableMOM <- "hive.workspace.MoM_LTV_iota"
#tableMOM <- "hive.workspace.MoM_LTV_nu"
#tableMOM <- "hive.workspace.MoM_LTV_chi"
#tableMOM <- "hive.workspace.MoM_LTV_omega"
#tableMOM = "hive.workspace.MoM_LTV2_alpha"
#tableMOM = "hive.workspace.MoM_LTV2_gamma"
#tableMOM = "hive.workspace.MoM_LTV2_theta"
#tableMOM = "hive.workspace.MoM_LTV2_iota"
tableMOM = "hive.workspace.MoM_LTV2_kappa"
tableMOM = "hive.workspace.MoM_LTV2_lambda"

#tableRM <- 'hive.workspace.NetRM_alpha'
#tableRM <- 'hive.workspace.NetRM_beta'
#tableRM <- 'hive.workspace.NetRM_zeta'
tableRM <- 'hive.workspace.NetRM_lambda'

tableMCh <- 'hive.workspace.marketingchannel_0014'
#tableUnlocked <- "hive.workspace.tableUnlocked_0001"
#tableUnlocked <- "hive.workspace.Unlocked_gamma"
#tableUnlocked <- "hive.workspace.Unlocked_delta"
tableUnlocked <- "hive.workspace.prime_eta"
#tableMS_raw = 'hive.workspace.primeraw_beta'

tableNPS <- "hive.workspace.tableNPS_0001"
tableEvents <- 'hive.workspace.events'
tableSearches <- "hive.workspace.searches_alpha"
tableCR <- 'hive.workspace.cr'
# tableSessionDE <- 'hive.workspace.sessiondataextensions' # DEPRECATED

# Console ----------------------------------------------------------------------
rmse <- function(pred,data){
  stopifnot(class(pred) == 'numeric' & class(data) == 'factor')
  data <- as.numeric(data) - 1
  sqrt(mean((pred - data)^2))
}

Sys.time2 <- function(ttable = F) {
  out = format(Sys.time(),'%Y-%m-%d_%H-%M-%S')
  if(ttable) out = gsub('-','_',out)
  out
}
shellR <- function(cmd, id, log = T, activate = F, wait = T, wait2 = 0.5, clear= F,caption = NULL,var=F){
  cmd <- gsub('\n','',cmd)
  #as.environment('package:rstudioapi')
  if(var) cmd <- paste0(substitute(cmd),'="',cmd,'"')
  if(missing(id)) id <- rstudioapi::terminalList()[1]
  if(is.na(id)) id <- rstudioapi::terminalCreate(caption,F)
  rstudioapi::terminalActivate(id,F)
  if(clear) rstudioapi::terminalClear(id)
  rstudioapi::terminalSend(id,paste0(cmd,'\n'))
  # \n
  Sys.sleep(wait2)
  if(wait){
    while(last(rstudioapi::terminalBuffer(id))!='$') {
    #while(rstudioapi::terminalBusy(id)) {
      Sys.sleep(0.2)
      }
  }

  # Return the output of the console
  logg <- rstudioapi::terminalBuffer(id)
  nlast <- length(logg)
  logg <- logg[-nlast]
  logg <- logg[(max(c(1,which('$' == logg))) + 2):(nlast - 2)]
  nlast <- length(logg)

  if(activate & !var) rstudioapi::terminalActivate(id)
  if(log) cat(logg[-nlast],sep = '\n')

  invisible(logg[-nlast])
  }

# Wrapper for shellR
# export
"%$%" <- function(cmd,opt){
  # . runs command .. declare var
  if(substitute(opt) == '.') shellR(cmd) else
    if(substitute(opt) == '..') {
      cmd <- paste0(substitute(cmd),'="',cmd,'"')
     shellR(cmd) } else
      stop('y not in c(. , ..)')
}

"%between%" <- function(x,y) {
  stopifnot(length(y)==2)
  between(x,y[1],y[2])
}

# quotes around
"%'%" <- function(x,y){
  # . runs command .. declare var
  if(substitute(y) == '.') paste0("'",x,"'") else
    paste0(y,x,y)
}
# quotes around (biquery date format)
"%''%" <- function(x,y){
  stopifnot(is.Date(x) == T)
  x = format(x,'%Y%m%d')
  # . runs command .. declare var
  if(substitute(y) == '.') paste0("'",x,"'") else
    paste0(y,x,y)
}

# quotes around
'%"%' <- function(x,y){
  # . runs command .. declare var
  if(substitute(y) == '.') paste0('"',x,'"') else
    paste0(y,x,y)
}
# customdime
customdimensions = function(index,table = '') {
  if(table != '') table = paste0(table,'.')
  paste0("(SELECT MAX(IF(index=",index,", value, NULL)) FROM UNNEST(",table,"customDimensions))")
}
# wrapper of data.table::split
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
  if(missing(by)) {
    out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
    if(vector) lapply(out, unlist)
    } else
    split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}


Jwrite <- function(A,filename){
  A <- jsonlite::toJSON(as.matrix(A))
  A <- gsub('],[',']\n[',A,fixed=T)
  A <- gsub("[[","[",A,fixed=T)
  A <- gsub("]]","]",A,fixed=T)
  write(A,filename)
}

strpred <- function(pred_Cl){
  pred_Cl <- substring(pred_Cl,2)
  pred_Cl <- stringr::str_sub(pred_Cl,1,-2)
  pred_Cl <- strsplit(pred_Cl,', ')[[1]]
  as.numeric(pred_Cl)
}

# Ggplot2 -------------------------------------------------------------------------------

# ggplot as google
#ggplot = function(...) ggplot2::ggplot(...) + scale_color_gdocs() + scale_fill_gdocs()

theme_set(theme_gdocs())
blank_theme <-  theme_minimal()+theme(
  axis.title.x = element_blank(),
  axis.title.y = element_blank(),
  panel.border = element_blank(),
  panel.grid= element_blank(),
  axis.ticks = element_blank(),
  plot.title=element_text(size=14, face="bold"),
  axis.text = element_blank()
  )
geom_pie = function(A,y,fill){
  names(A)[names(A) == y] = 'value'
  names(A)[names(A) == fill] = 'group'
pie = ggplot(A, aes(x="", y=value, fill=group)) +
geom_bar(width = 1, stat = "identity")
pie
pie = pie + coord_polar("y", start=0)
pie

pie = pie + geom_text(aes(y = value/nrow(A) + c(0, cumsum(value)[-length(value)]),
            label = percent(value/100,0)), size=5) + blank_theme
pie
}

grep2 = function(patterns,x,...) {
  for(i in 1:length(patterns))
    x = grep(patterns[i],x,value = T,...)
  x
}

round2 = function(x,p = 0.5){
y = trunc(x)
z = x - y
z = abs(z) > p

y + sign(y)*z
}

sum.is.na = function(x)
  sum(is.na(x))

get.na.names = function(A, nnames = T, cutoff = 0.95) {
  out = sapply(A, sum.is.na)/A[,.N]
  if(nnames)
    names(out)[out >= cutoff] else
      which(out >= cutoff)
}


mode.s = function(x) {
  out = names(which.max(table(x)))
  class(out) = class(x)
  out
}

# sql =================================================================================================


#cat('ExploreEtl(A,nametable,create = T,csv = F,csvfilename)')
ExploreEtl <- function(A,nametable,create = T,csv=F,filename,savemysql=F,types){
#  conection
nametable <- tolower(nametable)
nametable <- gsub('hive.workspace.','',nametable,fixed=T)

shell2(cmd,wait=F)


#cmdmysql <- credentials

shell3 <- function(...,check = 'putty') {
  if(sum(grepl(check,shell('tasklist',intern=T))) == 1)
    shell(...)
}
shell3(cmdmysql,wait=F)
conmysql <- dbConnect(RMySQL::MySQL(), host="localhost", port= 3308, user= user, password = pswd_mysql, dbname="explore_tmp",
  source='local')

# sql <- paste0("desc explore_tmp.prova_20171130_2017_11_30__12_04_57")
# dbGetQuery(conmysql,sql)

# Feature types --------------------------------------------------------------------------------------------------------------------------
if(grepl('.',nametable,fixed = T)) stop('tableIn without Schem or catalog')
mysqltable <- paste0('explore_tmp.',nametable,format(Sys.time(),'_%Y_%m_%d__%H_%M_%S')) # MySQL table
if(savemysql) mysqltable <- paste0('explore_tmp.',nametable) # MySQL table

prestotable <- paste0('hive.workspace.',nametable) # Presto Table
sql <- "show tables from hive.workspace"
existingTables <- dbGetQuery(con,sql)
dbDisconnect(con)
if(create & prestotable %in% existingTables) stop(paste0(prestotable, ' already exists, change nametable or create to F'))
if(csv) A <- fread(filename,nrows = 1)
#flush.console()
if(missing(types)){ types <- sapply(A, class)
stopifnot(types %in% c('numeric','character','integer'))
types[types=='integer'] <- 'numeric'
types <- factor(types, levels=c('character','numeric'))
levels(types) <- c('varchar(100)','double(100,10)') }
#stopifnot(is.factor(types))
string <- data.frame(names(A),types)
string <- paste0(string[,1],' ',string[,2])
string <- paste0(string,collapse = ', ')

# Create MySQL-Table ------------------------------------------------------------------------------------------------------
cat('MySQL\n')
sql <- paste0('show tables from explore_tmp;')
showtables <- dbGetQuery(conmysql,sql)

if(mysqltable %in% paste0('explore_tmp.',showtables$Table)){
   sql <- paste0(
    "drop table ", mysqltable, " ;"
  )
  dbGetQuery(conmysql,sql)
}
rm(showtables)

#if(create){
sql <- paste0(
  "create table ", mysqltable, " ( ",
  string, " );"
)
dbGetQuery(conmysql, sql)
#dbDisconnect(conmysql)
#}
if(!csv){
  filename <- paste0("temp",format(Sys.time(),'_%Y_%m_%d__%H_%M_%S'),".csv")
  fwrite(A,filename,showProgress = F)
}



# Upload  data in MySQL ---------------------------------------------------------------------------------------------------------------
sql0 <- paste0("load data local infile '", filename, "' into table ", mysqltable," FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n' IGNORE 1 lines")
sql1a <- paste0('@',1:length(names(A)),collapse=", ")
sql1b <- paste0(names(A)[1:length(names(A))],' = ', 'nullif(@',1:length(names(A)),",'')",collapse=", ")
sql1 <- paste0("(",sql1a,") set ",sql1b, ";")
sql <- paste0(sql0,sql1)
dbGetQuery(conmysql, sql)

if(!csv){
  file.remove(filename)
}
#dbDisconnect(conmysql)

# levels(types) <- c("'",'')
#
# nrow(A)
# sum(sapply(A[1,], nchar))
# seqq <- seq(1,nrow(A),by=3*10^4)
# cat('Number iterations:', length(seqq),'\n')
#
# for(j in 1:length(seqq)){
#  cat('\rIteration', j,'\r')
# a <- seqq[j]
# if(j < length(seqq)) b <- seqq[j+1] - 1 else b <- nrow(A)
# B <- A[a:b,]
# cols <- c( 1:ncol(B))
#
# # create a new column `x` with the three columns collapsed together
# #values <- apply( B[ , cols,with=F ] , 1 , paste , collapse = "-" )
# values <- paste0(types[1],B[[1]],types[1])
# values[is.na(B[[1]])] <- "NULL"
# for(i in 2:ncol(A)){
# aux <- paste0(',',types[i],B[[i]],types[i])
# aux[is.na(B[[i]])] <- ",NULL"
# values <- paste0(values, aux)
# }
#
# values <- paste(values,collapse = "), (")
# values <- paste0("(", values,")")
#
# sql <- paste0(
#   "insert into ",table,
#   " values ", values)
# dbGetQuery(con,sql)
# dbDisconnect(con)

# Import To Presto ------------------------------------------------------------------------------------------------------------
cat('Presto\n')
if(create){
  sql <- paste0(
    "create table ", prestotable," as
    select *
    from mysql.", mysqltable
  )
  dbGetQuery(con,sql)
  dbDisconnect(con)
} else {
  sql <- paste0(
    "insert into ", prestotable,"
    select *
    from  mysql.", mysqltable
  )
  dbGetQuery(con,sql)
  dbDisconnect(con)
}

# Delete MySQL table
if(!savemysql){
   sql <- paste0(
    "drop table ", mysqltable, " ;"
  )
  dbGetQuery(conmysql,sql)
}





# Output Validation ---------------------------------------------------------------------------------------------------------------

# sql <- paste0(
#   "select count(*) N from ",prestotable)
# n <- dbGetQuery(con,sql)$N
# dbDisconnect(con)

sql <- paste0(
  "select * from ",prestotable, " limit 2")
print(dbGetQuery(con,sql))
dbDisconnect(con)
# cat('\n',prestotable,'nrow:',n,'TRUE','\n')
# print(headd)

dbDisconnect(conmysql)
}

cat('ExploreEtl(A,nametable,create = T,csv = F,csvfilename,savemysql=F)')
bq_load <- function(A,nametable,create=T,overwrite = F,filename,bigfile = F, PartitionField = F,schema=F){
nametablevec <-  str_split(nametable,'[.]')[[1]]
stopifnot(length(nametablevec) <= 2)
if(length(nametablevec) == 1) nametablevec = c(bqdataset,nametablevec)

stopifnot(is.character(PartitionField) | PartitionField == F)
if(missing(A) & missing(filename)) stop('data.frame or filename')
if(missing(nametable) & missing(filename)) stop('nametable or filename')
if(missing(nametable)) nametable = convertTable(filename)
if(bigfile){
  if(missing(filename)) {
    missing.filename = missing(filename)
    filename = paste0('deleteme_',Sys.time2(),'.csv')
    fwrite(A,filename)
    }
  id <- terminalCreate(paste0('GCloud',Sys.time2()))
  flags = rep('',20)
  if(schema!=F) {
    flags[1] = '--skip_leading_rows=1'
    flags[3] = paste0('--schema ',schema)
        } else flags[3] = '--autodetect'
  if(overwrite) {
    flags[1] = '--replace'
        }
    if(!create){
      flags[1] = '--skip_leading_rows=1'
      flags[3] = ''
    }
  if(PartitionField!=F)
    flags[2] = paste0(" --time_partitioning_field ", PartitionField)
  if(grepl('.',nametable))
   shell.c = paste(
    "bq load ",flags[2],flags[3],"--source_format CSV ", flags[1], paste0(nametablevec,collapse = '.'), filename )
  shellR(shell.c,id)
  Sys.sleep(5)
  terminalKill(id)
  if(missing.filename) file.remove(filename)
} else {
if(nametable != tolower(nametable)) warning('nametable has capital letters')
if(grepl('/',nametable)) {
  warning('filename has /: filename to nametable')
  nametable = convertTable(nametable)
}
#nametable <- tolower(nametable)
if(missing(A)) A <- fread(filename)
if(any(grepl('.',names(A),fixed=T) | grepl(' ',names(A),fixed=T))) warning(" '.' and ' ' are convert to '_' and '' in column names")
names(A) <- gsub('.','_',names(A),fixed = T)
names(A) <- gsub(' ','',names(A),fixed = T)

create_disposition = ifelse(create,"CREATE_IF_NEEDED","CREATE_NEVER")
write_disposition = ifelse(create,"WRITE_EMPTY","WRITE_APPEND")
if(overwrite) write_disposition = "WRITE_TRUNCATE"
invisible(bigrquery::insert_upload_job(bqproject, nametablevec[1], nametablevec[2], A, billing = bqproject,
  create_disposition = create_disposition,
  write_disposition = write_disposition))
}
}


bq_query = function(sql, filename, nn_bq = 1e7) {
if(substr(filename,1,2) == './')
  filename = substr(filename,3,1e3)
filename = paste0(getwd(),'/',filename)
nn_bq = format(nn_bq, scientific = F)
cmd =
  paste0('bq query --nouse_legacy --format csv --n ', nn_bq,' ',sql %'%., ' > ', filename %'%.)
cmd%$%.

cmd = paste0('wc -l ', filename %'%.)
nn_out =  cmd %$% .
nn_out = strsplit(nn_out,' ',fixed=T)[[1]][1]
nn_out = as.numeric(nn_out) - 2
if(nn_out == nn_bq ) warning('max lines reached')

}
cat('BigQEtl(A,nametable,create=T,overwrite = F,csv=F,filename)')

# nametable <- "prova_20171220d"
# # filename <- "prova_20171220.csv"
# A <- data.table(a=1:4,b=letters[1:4])
# ExploreEtl(A,nametable)
Iuiu1234/iuiu documentation built on May 13, 2019, 11:08 a.m.