library(lubridate)
#library(tidyverse)
library(data.table)
library(DBI)
#library(reshape2) # data.table is better
library(openxlsx)
library(ggplot2)
library(ggthemes)
library(RMySQL)
library(anytime)
library(stringr)
#library(googlesheets)
library(countrycode)
library(reticulate)
library(rstudioapi)
# installation
# debian
# RStudio 1.2v (with cmd)
# debian GNU/Linux:: Microsft Store
# browseURL('https://rstudio.github.io/reticulate/index.html')
# library reticulate (python)
#mosaic::mPlot(bp)
# New Functions
ppy = function (...) reticulate::repl_python(...)
ppy3 = function(...) {
use_python("C:/MyProgramFiles/python36/python3.exe")
reticulate::repl_python(...)
}
prop.table2 = function(...) percent(prop.table(...))
countrycode2 = function(x) {
y = x
x = countrycode(x,'country.name','iso2c')
x[y == 'Internacional'] = 'com'
x[y == "United Kingdom"] = 'uk'
x = tolower(x)
x
}
pointofsale.ms = function(brand,market){
brand.v = c('edreams','opodo','govoyages','travellink')
names(brand.v) = toupper(substr(brand.v,1,1))
market = countrycode2(market)
brand = toupper(substr(brand,1,1));
brand = brand.v[brand]
pos = paste0(brand,'.',market)
pos
}
as.numeric2 = function(x, delete = c(intToUtf8(8364),','), sub = c('','')) {
Encoding(x) = 'UTF-8'
for(i in 1:length(delete)){
x = gsub(delete[i],sub[i],x,fixed=T)
}
x = as.numeric(x)
x
}
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
if(missing(by)) {
out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
if(vector) lapply(out, unlist)
} else
split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}
# guestrefSK <- function(x,pos=F){
# SubSingle <- unlist(strsplit(x,'|',fixed=T))
# n <- length(SubSingle)/2
# if(pos) list(SubSingle[2*(1:n)-1],SubSingle[2*(1:n)]) else
# SubSingle[2*(1:n)-1]
# }
guestrefSK <- function(x,pos=F){
out <- tstrsplit(x,'|',fixed=T,fill='')
if(pos) out else out[[1]]
}
percent <- function(x, digits = 2, format = "f", ...) {
paste0(formatC(100 * x, format = format, digits = digits, ...), "%")
}
na.convert <- function(x,y = 0) {
x[is.na(x)] <- y
x}
na.convert.mean <- function(x) {
y = mean(x, na.rm = T)
x[is.na(x)] <- y
x}
na.count <-function (x,DT = F) {
out <- sapply(x, function(y) sum(is.na(y)))
if(DT) data.table(variable = names(out),value = out) else
out
}
from_unixtime <- function(...,ts = F ) {
# ts = TimeStamp
if(ts) anytime(...)
else as.Date(anytime(...))
}
shell2 <- function(...,check = 'putty') {
if(!any(grepl(check,shell('tasklist',intern=T))))
shell(...)
}
match2 <- function(x,y) match(tolower(x),tolower(y))
# wrapper of data.table::split
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
if(missing(by)) {
out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
if(vector) lapply(out, unlist)
} else
split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}
.p = function(x,y = 'N') {
# DT[,.N,col][,.p(.SD)]
c(x,list(pp = prop.table2(x[[y]])))
}
#
get.fileextension = function(filename) {
out = sapply(strsplit(filename,'.',fixed = T),last)
paste0('.',out)
}
add.Sys.time2 = function(filename,ttable = F) {
suffix = get.fileextension(filename)
suffix.tmp = paste0('_',Sys.time2(ttable),suffix)
file.tmp = gsub(suffix,suffix.tmp,filename,fixed=T)
file.tmp
}
Sys.time2 <- function(sqlformat = F) {
out = format(Sys.time(),'%Y-%m-%d_%H-%M-%S')
if(sqlformat) out = gsub('-','_',out)
out
}
# Format
convertMonth <- function(x, isnum = T) {
stopifnot(class(x) %in% c('integer','numeric','Date'))
if(class(x) == 'Date') {
warning('Converting Date to mmonth')
month(x) + year(x)*12
} else
as.Date(paste(trunc(x/12) + (ceiling(x%%12/12)-1), x%%12 + (ceiling(x%%12/12)-1)*(-12),'01',sep='-'))
}
# convertMonth <- function(x) {
# as.Date(paste(trunc(x/12) + (ceiling(x%%12/12)-1), x%%12 + (ceiling(x%%12/12)-1)*(-12),'01',sep='-'))
# }
#
# convertMonth2 <- function(x) {
# as.Date(paste(trunc(x/12) - (x%%12 ==0), x%%12 + 12*(x%%12 == 0),'01',sep='-'))
# }
convertWeek <- function(x) {
as.Date(paste(trunc(x/52) + (ceiling(x%%52/52)-1), x%%52 + (ceiling(x%%52/52)-1)*(-52),'01',sep='-'))
}
convertTable = function(filename,deletepoint = T){
filename = last(strsplit(filename,'/')[[1]])
if(deletepoint) {
filename = strsplit(filename,'.',fixed=T)[[1]]
filename = paste0(filename[-length(filename)],collapse='.')
}
filename
}
# lapply2 <- function(A,ff,...){ # rewrites namescol of A into ff
# stopifnot(class(A)[1] =='data.table')
# namescol <- c(...)
# A[,(namescol):= lapply(.SD,ff),.SDcols = namescol]
# }
lapply2 <- function(A,ff,ss,...){ # rewrites namescol of A into ff
if(missing(ss)) ss = names(A)
stopifnot(class(A)[1] =='data.table')
if(class(ss) == 'numeric') namescol <- names(A)[! names(A) %in% c(...)] else
namescol <- c(ss,...)
A[,(namescol):= lapply(.SD,ff),.SDcols = namescol]
}
# Sql -------------------------------------------------------------------------------------------------------------------------
collapse <- function(x,string=T,brackets = T,sep = ", "){
l <- ifelse(brackets,'(','')
r <- ifelse(brackets,')','')
cc <- ifelse(string,"'","")
collp <- paste0(cc,sep,cc )
if(string) paste0(" ('",paste0(x,collapse=), "') ")
else paste0(" ( ",paste0(x,collapse=), ") ")
paste0(l,cc,paste0(x,collapse= collp ), cc, r )
}
to_unixtime <- function(dd){
as.numeric(as.POSIXct(dd))
}
groupstr <- function(x=1,y){ # x number of columns NOT in group by, y num. IN group by
if(missing(y)) {
y = x
x = 1}
paste0(' ',(x+1):(x+y),' ',collapse = ', ')
}
# Data table wrapers
dbGetQuery2 <- function(...)
data.table(dbGetQuery(...))
# dbGetQuery3 = function(con,sql,...){
# ff.aux = function(sql.i) dbGetQuery2(con,sql.i,...)
# out.list = lapply(sql,ff.aux)
# out = do.call(rbind,out.list)
# out
# }
dbGetQuery3 = function(con,sql,groupby = F , groupby.ff = function(x) sum(x,na.rm = T),...){
ff.aux = function(sql.i) dbGetQuery2(con,sql.i,...)
out.list = lapply(sql,ff.aux)
out = do.call(rbind,out.list)
if(groupby) {
metricnames = sapply(out,class)
metricnames = metricnames[metricnames %in% c('numeric','integer')]
nonmetricanames = names(out)[!names(out) %in% names(metricnames)]
#ff = function(x) sum(x,na.rm = T)
out = out[,lapply(.SD,groupby.ff),eval(nonmetricanames)]
out } else out
}
read.xlsx2 <- function(...)
data.table(read.xlsx(...))
# Explore conection
# cmd <- credentials bx
con <- dbConnect(RPresto::Presto(), host='localhost', port=8889, user=userold, password=" ", schema=" ",
catalog='hive', source='local')
bqbilling = last(strsplit(bqproject,'-')[[1]])
# standard
conbq <- DBI::dbConnect(bigrquery::dbi_driver(), dataset = bqdataset, project = bqproject ,
billing = bqbilling, use_legacy_sql = FALSE)
# legacy
conbq_l <- DBI::dbConnect(bigrquery::dbi_driver(), dataset = bqdataset, project = bqproject ,
billing = bqbilling, use_legacy_sql = T)
# Bq --------------------------------------------------------------------------------------------------------------
gatable <- function(PC,fullname = T){
if(any(duplicated(PC))) stop('duplicates in PC')
rows = match(PC,bqAcc$pos)
if(any(is.na(rows))) stop(paste0(collapse(head(PC[is.na(rows)])) ,' are not pointofsales'))
out = bqAcc[rows]$profile
names(out) = PC
if(fullname) out = paste0('`',out,'.ga_sessions_*','`')
out
}
bq_desc = function(tablename){
if(!grepl('.',tablename)) tablename = paste0(bqdataset,'.',bqtable)
filename = paste0(Sys.time2(),'.json')
id = terminalCreate()
ccmd = paste0('bq --format prettyjson show ' , tablename ,' > ',filename)
ccmd%$%.
json_desc = rjson::fromJSON(paste(readLines(filename), collapse=""))
df_desc = as.data.frame(json_desc)
df_desc.name = df_desc[1,grepl('schema.fields.name',names(df_desc))]
df_desc.type = df_desc[1,grepl('schema.fields.type',names(df_desc))]
dt_desc = data.table(name = t(as.matrix(df_desc.name)), type = t(as.matrix(df_desc.type)))
names(dt_desc) = c('name','type')
terminalKill(id)
dt_desc
}
createSchema <- function(A,filename){
# schema = rep('string',ncol(A))
# names(schema) = names(A)
schema = sapply(A,class)
schema[schema == 'character'] = 'string'
schema[schema %in% c("POSIXct", "POSIXt")] = 'timestamp'
schema[schema == 'Date'] = "DATETIME"
schema[schema == 'numeric'] = 'float'
schema[schema == 'integer'] = 'integer'
if(any(! schema %in% c('string','timestamp','float','integer'))) warning(paste0(collapse(schema[! schema %in% c('string','timestamp','float','integer')]),' not in bq data types '))
schema = data.table(description = NA,mode = 'NULLABLE',name = names(schema),type = schema)
#schema[name == 'GoogleSendDate',type := 'timestamp']
#schema[grepl('date',name,T), type := 'timestamp' ]
schemajs = jsonlite::toJSON(schema,pretty = T)
#filename = paste0('./output/',DataExtension,'_Schema.json')
write(schemajs,filename)
}
# Explore Tables -------------------------------------------------------------------------------------------------------------------
#tableMOM <- "hive.workspace.MoM_LTV_zeta"
#tableMOM <- "hive.workspace.MoM_LTV_eta"
#tableMOM <- "hive.workspace.MoM_LTV_iota"
#tableMOM <- "hive.workspace.MoM_LTV_nu"
#tableMOM <- "hive.workspace.MoM_LTV_chi"
#tableMOM <- "hive.workspace.MoM_LTV_omega"
#tableMOM = "hive.workspace.MoM_LTV2_alpha"
#tableMOM = "hive.workspace.MoM_LTV2_gamma"
#tableMOM = "hive.workspace.MoM_LTV2_theta"
#tableMOM = "hive.workspace.MoM_LTV2_iota"
tableMOM = "hive.workspace.MoM_LTV2_kappa"
tableMOM = "hive.workspace.MoM_LTV2_lambda"
#tableRM <- 'hive.workspace.NetRM_alpha'
#tableRM <- 'hive.workspace.NetRM_beta'
#tableRM <- 'hive.workspace.NetRM_zeta'
tableRM <- 'hive.workspace.NetRM_lambda'
tableMCh <- 'hive.workspace.marketingchannel_0014'
#tableUnlocked <- "hive.workspace.tableUnlocked_0001"
#tableUnlocked <- "hive.workspace.Unlocked_gamma"
#tableUnlocked <- "hive.workspace.Unlocked_delta"
tableUnlocked <- "hive.workspace.prime_eta"
#tableMS_raw = 'hive.workspace.primeraw_beta'
tableNPS <- "hive.workspace.tableNPS_0001"
tableEvents <- 'hive.workspace.events'
tableSearches <- "hive.workspace.searches_alpha"
tableCR <- 'hive.workspace.cr'
# tableSessionDE <- 'hive.workspace.sessiondataextensions' # DEPRECATED
# Console ----------------------------------------------------------------------
rmse <- function(pred,data){
stopifnot(class(pred) == 'numeric' & class(data) == 'factor')
data <- as.numeric(data) - 1
sqrt(mean((pred - data)^2))
}
Sys.time2 <- function(ttable = F) {
out = format(Sys.time(),'%Y-%m-%d_%H-%M-%S')
if(ttable) out = gsub('-','_',out)
out
}
shellR <- function(cmd, id, log = T, activate = F, wait = T, wait2 = 0.5, clear= F,caption = NULL,var=F){
cmd <- gsub('\n','',cmd)
#as.environment('package:rstudioapi')
if(var) cmd <- paste0(substitute(cmd),'="',cmd,'"')
if(missing(id)) id <- rstudioapi::terminalList()[1]
if(is.na(id)) id <- rstudioapi::terminalCreate(caption,F)
rstudioapi::terminalActivate(id,F)
if(clear) rstudioapi::terminalClear(id)
rstudioapi::terminalSend(id,paste0(cmd,'\n'))
# \n
Sys.sleep(wait2)
if(wait){
while(last(rstudioapi::terminalBuffer(id))!='$') {
#while(rstudioapi::terminalBusy(id)) {
Sys.sleep(0.2)
}
}
# Return the output of the console
logg <- rstudioapi::terminalBuffer(id)
nlast <- length(logg)
logg <- logg[-nlast]
logg <- logg[(max(c(1,which('$' == logg))) + 2):(nlast - 2)]
nlast <- length(logg)
if(activate & !var) rstudioapi::terminalActivate(id)
if(log) cat(logg[-nlast],sep = '\n')
invisible(logg[-nlast])
}
# Wrapper for shellR
# export
"%$%" <- function(cmd,opt){
# . runs command .. declare var
if(substitute(opt) == '.') shellR(cmd) else
if(substitute(opt) == '..') {
cmd <- paste0(substitute(cmd),'="',cmd,'"')
shellR(cmd) } else
stop('y not in c(. , ..)')
}
"%between%" <- function(x,y) {
stopifnot(length(y)==2)
between(x,y[1],y[2])
}
# quotes around
"%'%" <- function(x,y){
# . runs command .. declare var
if(substitute(y) == '.') paste0("'",x,"'") else
paste0(y,x,y)
}
# quotes around (biquery date format)
"%''%" <- function(x,y){
stopifnot(is.Date(x) == T)
x = format(x,'%Y%m%d')
# . runs command .. declare var
if(substitute(y) == '.') paste0("'",x,"'") else
paste0(y,x,y)
}
# quotes around
'%"%' <- function(x,y){
# . runs command .. declare var
if(substitute(y) == '.') paste0('"',x,'"') else
paste0(y,x,y)
}
# customdime
customdimensions = function(index,table = '') {
if(table != '') table = paste0(table,'.')
paste0("(SELECT MAX(IF(index=",index,", value, NULL)) FROM UNNEST(",table,"customDimensions))")
}
# wrapper of data.table::split
split2 <- function(x, by,vector = T, keep.by = TRUE,sorted = FALSE,...){
if(missing(by)) {
out = split(data.table(x,id=1:x[,.N]), by = 'id',keep.by = F,sorted = sorted)
if(vector) lapply(out, unlist)
} else
split(x, by = by, sorted = sorted, keep.by = keep.by, ...)
}
Jwrite <- function(A,filename){
A <- jsonlite::toJSON(as.matrix(A))
A <- gsub('],[',']\n[',A,fixed=T)
A <- gsub("[[","[",A,fixed=T)
A <- gsub("]]","]",A,fixed=T)
write(A,filename)
}
strpred <- function(pred_Cl){
pred_Cl <- substring(pred_Cl,2)
pred_Cl <- stringr::str_sub(pred_Cl,1,-2)
pred_Cl <- strsplit(pred_Cl,', ')[[1]]
as.numeric(pred_Cl)
}
# Ggplot2 -------------------------------------------------------------------------------
# ggplot as google
#ggplot = function(...) ggplot2::ggplot(...) + scale_color_gdocs() + scale_fill_gdocs()
theme_set(theme_gdocs())
blank_theme <- theme_minimal()+theme(
axis.title.x = element_blank(),
axis.title.y = element_blank(),
panel.border = element_blank(),
panel.grid= element_blank(),
axis.ticks = element_blank(),
plot.title=element_text(size=14, face="bold"),
axis.text = element_blank()
)
geom_pie = function(A,y,fill){
names(A)[names(A) == y] = 'value'
names(A)[names(A) == fill] = 'group'
pie = ggplot(A, aes(x="", y=value, fill=group)) +
geom_bar(width = 1, stat = "identity")
pie
pie = pie + coord_polar("y", start=0)
pie
pie = pie + geom_text(aes(y = value/nrow(A) + c(0, cumsum(value)[-length(value)]),
label = percent(value/100,0)), size=5) + blank_theme
pie
}
grep2 = function(patterns,x,...) {
for(i in 1:length(patterns))
x = grep(patterns[i],x,value = T,...)
x
}
round2 = function(x,p = 0.5){
y = trunc(x)
z = x - y
z = abs(z) > p
y + sign(y)*z
}
sum.is.na = function(x)
sum(is.na(x))
get.na.names = function(A, nnames = T, cutoff = 0.95) {
out = sapply(A, sum.is.na)/A[,.N]
if(nnames)
names(out)[out >= cutoff] else
which(out >= cutoff)
}
mode.s = function(x) {
out = names(which.max(table(x)))
class(out) = class(x)
out
}
# sql =================================================================================================
#cat('ExploreEtl(A,nametable,create = T,csv = F,csvfilename)')
ExploreEtl <- function(A,nametable,create = T,csv=F,filename,savemysql=F,types){
# conection
nametable <- tolower(nametable)
nametable <- gsub('hive.workspace.','',nametable,fixed=T)
shell2(cmd,wait=F)
#cmdmysql <- credentials
shell3 <- function(...,check = 'putty') {
if(sum(grepl(check,shell('tasklist',intern=T))) == 1)
shell(...)
}
shell3(cmdmysql,wait=F)
conmysql <- dbConnect(RMySQL::MySQL(), host="localhost", port= 3308, user= user, password = pswd_mysql, dbname="explore_tmp",
source='local')
# sql <- paste0("desc explore_tmp.prova_20171130_2017_11_30__12_04_57")
# dbGetQuery(conmysql,sql)
# Feature types --------------------------------------------------------------------------------------------------------------------------
if(grepl('.',nametable,fixed = T)) stop('tableIn without Schem or catalog')
mysqltable <- paste0('explore_tmp.',nametable,format(Sys.time(),'_%Y_%m_%d__%H_%M_%S')) # MySQL table
if(savemysql) mysqltable <- paste0('explore_tmp.',nametable) # MySQL table
prestotable <- paste0('hive.workspace.',nametable) # Presto Table
sql <- "show tables from hive.workspace"
existingTables <- dbGetQuery(con,sql)
dbDisconnect(con)
if(create & prestotable %in% existingTables) stop(paste0(prestotable, ' already exists, change nametable or create to F'))
if(csv) A <- fread(filename,nrows = 1)
#flush.console()
if(missing(types)){ types <- sapply(A, class)
stopifnot(types %in% c('numeric','character','integer'))
types[types=='integer'] <- 'numeric'
types <- factor(types, levels=c('character','numeric'))
levels(types) <- c('varchar(100)','double(100,10)') }
#stopifnot(is.factor(types))
string <- data.frame(names(A),types)
string <- paste0(string[,1],' ',string[,2])
string <- paste0(string,collapse = ', ')
# Create MySQL-Table ------------------------------------------------------------------------------------------------------
cat('MySQL\n')
sql <- paste0('show tables from explore_tmp;')
showtables <- dbGetQuery(conmysql,sql)
if(mysqltable %in% paste0('explore_tmp.',showtables$Table)){
sql <- paste0(
"drop table ", mysqltable, " ;"
)
dbGetQuery(conmysql,sql)
}
rm(showtables)
#if(create){
sql <- paste0(
"create table ", mysqltable, " ( ",
string, " );"
)
dbGetQuery(conmysql, sql)
#dbDisconnect(conmysql)
#}
if(!csv){
filename <- paste0("temp",format(Sys.time(),'_%Y_%m_%d__%H_%M_%S'),".csv")
fwrite(A,filename,showProgress = F)
}
# Upload data in MySQL ---------------------------------------------------------------------------------------------------------------
sql0 <- paste0("load data local infile '", filename, "' into table ", mysqltable," FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n' IGNORE 1 lines")
sql1a <- paste0('@',1:length(names(A)),collapse=", ")
sql1b <- paste0(names(A)[1:length(names(A))],' = ', 'nullif(@',1:length(names(A)),",'')",collapse=", ")
sql1 <- paste0("(",sql1a,") set ",sql1b, ";")
sql <- paste0(sql0,sql1)
dbGetQuery(conmysql, sql)
if(!csv){
file.remove(filename)
}
#dbDisconnect(conmysql)
# levels(types) <- c("'",'')
#
# nrow(A)
# sum(sapply(A[1,], nchar))
# seqq <- seq(1,nrow(A),by=3*10^4)
# cat('Number iterations:', length(seqq),'\n')
#
# for(j in 1:length(seqq)){
# cat('\rIteration', j,'\r')
# a <- seqq[j]
# if(j < length(seqq)) b <- seqq[j+1] - 1 else b <- nrow(A)
# B <- A[a:b,]
# cols <- c( 1:ncol(B))
#
# # create a new column `x` with the three columns collapsed together
# #values <- apply( B[ , cols,with=F ] , 1 , paste , collapse = "-" )
# values <- paste0(types[1],B[[1]],types[1])
# values[is.na(B[[1]])] <- "NULL"
# for(i in 2:ncol(A)){
# aux <- paste0(',',types[i],B[[i]],types[i])
# aux[is.na(B[[i]])] <- ",NULL"
# values <- paste0(values, aux)
# }
#
# values <- paste(values,collapse = "), (")
# values <- paste0("(", values,")")
#
# sql <- paste0(
# "insert into ",table,
# " values ", values)
# dbGetQuery(con,sql)
# dbDisconnect(con)
# Import To Presto ------------------------------------------------------------------------------------------------------------
cat('Presto\n')
if(create){
sql <- paste0(
"create table ", prestotable," as
select *
from mysql.", mysqltable
)
dbGetQuery(con,sql)
dbDisconnect(con)
} else {
sql <- paste0(
"insert into ", prestotable,"
select *
from mysql.", mysqltable
)
dbGetQuery(con,sql)
dbDisconnect(con)
}
# Delete MySQL table
if(!savemysql){
sql <- paste0(
"drop table ", mysqltable, " ;"
)
dbGetQuery(conmysql,sql)
}
# Output Validation ---------------------------------------------------------------------------------------------------------------
# sql <- paste0(
# "select count(*) N from ",prestotable)
# n <- dbGetQuery(con,sql)$N
# dbDisconnect(con)
sql <- paste0(
"select * from ",prestotable, " limit 2")
print(dbGetQuery(con,sql))
dbDisconnect(con)
# cat('\n',prestotable,'nrow:',n,'TRUE','\n')
# print(headd)
dbDisconnect(conmysql)
}
cat('ExploreEtl(A,nametable,create = T,csv = F,csvfilename,savemysql=F)')
bq_load <- function(A,nametable,create=T,overwrite = F,filename,bigfile = F, PartitionField = F,schema=F){
nametablevec <- str_split(nametable,'[.]')[[1]]
stopifnot(length(nametablevec) <= 2)
if(length(nametablevec) == 1) nametablevec = c(bqdataset,nametablevec)
stopifnot(is.character(PartitionField) | PartitionField == F)
if(missing(A) & missing(filename)) stop('data.frame or filename')
if(missing(nametable) & missing(filename)) stop('nametable or filename')
if(missing(nametable)) nametable = convertTable(filename)
if(bigfile){
if(missing(filename)) {
missing.filename = missing(filename)
filename = paste0('deleteme_',Sys.time2(),'.csv')
fwrite(A,filename)
}
id <- terminalCreate(paste0('GCloud',Sys.time2()))
flags = rep('',20)
if(schema!=F) {
flags[1] = '--skip_leading_rows=1'
flags[3] = paste0('--schema ',schema)
} else flags[3] = '--autodetect'
if(overwrite) {
flags[1] = '--replace'
}
if(!create){
flags[1] = '--skip_leading_rows=1'
flags[3] = ''
}
if(PartitionField!=F)
flags[2] = paste0(" --time_partitioning_field ", PartitionField)
if(grepl('.',nametable))
shell.c = paste(
"bq load ",flags[2],flags[3],"--source_format CSV ", flags[1], paste0(nametablevec,collapse = '.'), filename )
shellR(shell.c,id)
Sys.sleep(5)
terminalKill(id)
if(missing.filename) file.remove(filename)
} else {
if(nametable != tolower(nametable)) warning('nametable has capital letters')
if(grepl('/',nametable)) {
warning('filename has /: filename to nametable')
nametable = convertTable(nametable)
}
#nametable <- tolower(nametable)
if(missing(A)) A <- fread(filename)
if(any(grepl('.',names(A),fixed=T) | grepl(' ',names(A),fixed=T))) warning(" '.' and ' ' are convert to '_' and '' in column names")
names(A) <- gsub('.','_',names(A),fixed = T)
names(A) <- gsub(' ','',names(A),fixed = T)
create_disposition = ifelse(create,"CREATE_IF_NEEDED","CREATE_NEVER")
write_disposition = ifelse(create,"WRITE_EMPTY","WRITE_APPEND")
if(overwrite) write_disposition = "WRITE_TRUNCATE"
invisible(bigrquery::insert_upload_job(bqproject, nametablevec[1], nametablevec[2], A, billing = bqproject,
create_disposition = create_disposition,
write_disposition = write_disposition))
}
}
bq_query = function(sql, filename, nn_bq = 1e7) {
if(substr(filename,1,2) == './')
filename = substr(filename,3,1e3)
filename = paste0(getwd(),'/',filename)
nn_bq = format(nn_bq, scientific = F)
cmd =
paste0('bq query --nouse_legacy --format csv --n ', nn_bq,' ',sql %'%., ' > ', filename %'%.)
cmd%$%.
cmd = paste0('wc -l ', filename %'%.)
nn_out = cmd %$% .
nn_out = strsplit(nn_out,' ',fixed=T)[[1]][1]
nn_out = as.numeric(nn_out) - 2
if(nn_out == nn_bq ) warning('max lines reached')
}
cat('BigQEtl(A,nametable,create=T,overwrite = F,csv=F,filename)')
# nametable <- "prova_20171220d"
# # filename <- "prova_20171220.csv"
# A <- data.table(a=1:4,b=letters[1:4])
# ExploreEtl(A,nametable)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.