#' Create a backup of a table in gs, versioned by current timestamp
#'
#' @param project_id string The project id which can be found in Google Console
#' @param dataset string Dataset in which table is located
#' @param table string Table of interest
#' @param bucket string target gs:// bucket, which needs to be created
#'
#' @return job list in a googleCloud format
gs_backup <- function(project_id, dataset, table, bucket) {
job_name <- paste0(project_id, ".", dataset, ".", table)
# if table doesn't exist, then either project, dataset or table.
assertive::assert_all_are_true(
bigrquery::exists_table(project = project_id, dataset = dataset, table = table)
)
job <- bigrquery::insert_extract_job(
project = project_id,
dataset = dataset,
table = table, # job_name,
destination_uris = paste0('gs://', bucket, '/', job_name, '_', Sys.time(), '.csv.gz'),
print_header = TRUE,
field_delimiter = ",",
destination_format = "CSV",
compression = "GZIP")
job <- bigrquery::wait_for(job, quiet = TRUE)
return(job)
}
#' Delete a table if it exists and do a backup in gs://
#'
#' @param project_id string
#' @param dataset string
#' @param table string
#' @param [bucket="backup_deleted"] string target gs:// bucket
#'
#' @return initial list of tables in the dataset
delete_if_exists <- function(project_id, dataset, table, bucket = "backup_deleted") {
# check if the dataset exists
assertive::assert_all_are_true(
bigrquery::exists_dataset(project = project_id, dataset = dataset)
)
# retrieve list of tables from the dataset
table_list <- bigrquery::dbListTables(
# DBI::dbConnect
conn = DBI::dbConnect(
drv = bigrquery::bigquery(),
project = project_id, # generalize
dataset = dataset
)
)
# delete the table if it exists
if (table %in% table_list) {
# consider to do a backup in google storage, versioned by each deletion (IMPORTANT)
# and put a parameter whether to backup deleted tables
gs_backup(
project_id = project_id,
dataset = dataset,
table = table,
bucket = bucket)
bigrquery::delete_table(project = project_id, dataset = dataset, table = table)
}
return(table_list)
}
#' Store the results of a query in a table
#' If table exists remove it and write the new data.
#'
#' @param project_id string
#' @param view_path string The path the table is written to Dataset_Name.Table_Name
#' @param query string Query in standardSQL
#' @param dataset string A dataset to write into
#' @param table string The destination table name
#'
#' @return data.frame with the first 100 rows of the table written
#' @example
#' bigrquery::set_service_token("my_service_token.json")
#' project_id <- "my_project"
#' query <- "select * from `project.dataset.table`"
#'
#' res <- view_to_table(
#' project_id = project_id
#' view_path = "dataset.table",
#' query = query,
#' dataset = "dataset,
#' table = "table"
#' )
view_to_table <- function(project_id, view_path, query, dataset, table) {
assertive::assert_all_are_true(
bigrquery::exists_dataset(project = project_id, dataset = dataset)
)
table_list <- delete_if_exists(
project_id = project_id,
dataset = dataset,
table = table) # extend to account for the bucket
# Run query, create table and return the hundred rows
result <- bigrquery::query_exec(
query = query,
project = project_id, # generalize
use_legacy_sql = FALSE,
destination_table = view_path,
max_pages = 1,
page_size = 100)
assertive::assert_all_are_true(nrow(result) > 0)
assertive::assert_all_are_true(
bigrquery::exists_table(
project = project_id,
dataset = dataset,
table = table
)
)
# TBD: Return a list containing more stuff useful for debugging
return(result)
}
#' Download table to memory through gs:// and local fs
#' Use case: Big table to be fully downloaded and bq API will be too slow for that
#'
#' @param project_id string
#' @param dataset string
#' @param table string
#' @param bucket string gs:// Bucket for backups
#' @param job_name string optional
#' @param read_csv_col_types vector optional for parsing csv
#' @param read_csv_guess_max numeric guess the type by nr rows
#' @param target_directory string or function indicating where to store temp files
#' @param multi_file Whether to load to memory multiple tables
#'
#' @return data.frame with the full table in memory
download_table <- function(
project_id, dataset,
table, bucket,
job_name = NULL,
read_csv_col_types = NULL,
read_csv_guess_max = 10000,
target_directory = getwd(),
multi_file = FALSE
) {
# set random job name
if (is.null(job_name)) {
job_name <- paste0(sample(LETTERS, 15), collapse = '')
}
# check if table exists (assert) ----
assertive::assert_all_are_true(
bigrquery::exists_table(project = project_id, dataset = dataset, table = table)
)
## check if bucket exists ----
## modify in the future such that it creates the bucket as needed
bucket_list <- googleCloudStorageR::gcs_list_buckets(projectId = project_id)
assertive::assert_all_are_true(
bucket %in% bucket_list$name
)
# Check if multiple files are to be exported
if (multi_file == TRUE) {
job_name_gcs <- paste0(job_name, '_*')
} else {
job_name_gcs <- job_name
}
job <- bigrquery::insert_extract_job(
project = project_id,
dataset = dataset,
table = table, # job_name,
destination_uris = paste0('gs://', bucket, '/', job_name, '.csv.gz'),
print_header = TRUE,
field_delimiter = ",",
destination_format = "CSV",
compression = "GZIP")
job <- bigrquery::wait_for(job, quiet = TRUE)
# import from gcs and tidy up
objects <- googleCloudStorageR::gcs_list_objects(bucket)
to_download <- grep(job_name, objects$name, value = TRUE)
## check if it has permissions to write into local folder ----
# 0 success and 1 failure
if (file.access(".", 2) == 1) {
Sys.chmod(".", "777")
}
# download to the disk
lapply(to_download, function(name){
googleCloudStorageR::gcs_get_object(
bucket = bucket,
name,
overwrite = TRUE,
saveToDisk = paste0(target_directory, '/', name)
)
googleCloudStorageR::gcs_delete_object(name, bucket = bucket)
})
# Read from temporary files
df <- dplyr::bind_rows(
lapply(to_download, function(file){
suppressMessages(
readr::read_csv(file,
progress = FALSE,
col_types = read_csv_col_types,
guess_max = read_csv_guess_max)
)
})
)
## Check if df is non empty
# Clear temporary files
lapply(to_download, function(file){
file.remove(file)
})
return(df)
}
#' Download view results to memory through BQ gs:// and local fs
#' Use case: Big view results to be fully downloaded
#' and bq API will be too slow for that
#' Note that it cleans up all temporary stuff after itself
#'
#' @param sql string the query you want to execute and store
#' @param project string
#' @param use_legacy_sql boolean
#' @param quiet boolean
#' @param target_dataset string
#' @param target_bucket string
#' @param job_name string optional
#' @param read_csv_col_types vector optional for parsing csv
#' @param read_csv_guess_max numeric guess the type by nr rows
#' @param target_directory string or function indicating where to store temp files
#' @param multi_file Whether to load to memory multiple tables
#'
#' @return data.frame with the full table in memory
# TBD: give more meaningful names to arguments
bq_query_gcs <- function(
sql,
project,
use_legacy_sql = FALSE,
quiet = FALSE,
target_dataset,
target_bucket,
target_directory = getwd(),
job_name = NULL,
multi_file = FALSE,
read_csv_col_types = NULL,
read_csv_guess_max = 10000) {
# set random job name
if (is.null(job_name)) {
job_name <- paste0(sample(LETTERS, 15), collapse = '')
}
# temporary table to write the query results (with gibberish name)
destination_table <- paste0(target_dataset, '.', job_name)
# write Query to temporary table
job <- bigrquery::insert_query_job(sql,
project = project,
destination_table = destination_table,
default_dataset = NULL,
use_legacy_sql = use_legacy_sql)
job <- bigrquery::wait_for(job, quiet = quiet)
df <- download_table(
project_id = project,
dataset = target_dataset,
table = job_name,
bucket = target_bucket,
job_name = job_name)
# cleanup the temporary tables from Bq
bigrquery::delete_table(project = project,
dataset = target_dataset,
table = job_name)
return(df)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.