R/helpers.R

Defines functions gs_backup delete_if_exists view_to_table download_table bq_query_gcs

Documented in bq_query_gcs delete_if_exists download_table gs_backup view_to_table

#' Create a backup of a table in gs, versioned by current timestamp
#'
#' @param project_id string The project id which can be found in Google Console
#' @param dataset string Dataset in which table is located
#' @param table string Table of interest
#' @param bucket string target gs:// bucket, which needs to be created
#'
#' @return job list in a googleCloud format

gs_backup <- function(project_id, dataset, table, bucket) {

  job_name <- paste0(project_id, ".", dataset, ".", table)

  # if table doesn't exist, then either project, dataset or table.
  assertive::assert_all_are_true(
    bigrquery::exists_table(project = project_id, dataset = dataset, table = table)
  )

  job <- bigrquery::insert_extract_job(
    project = project_id,
    dataset = dataset,
    table = table, # job_name,
    destination_uris = paste0('gs://', bucket, '/', job_name, '_', Sys.time(), '.csv.gz'),
    print_header = TRUE,
    field_delimiter = ",",
    destination_format = "CSV",
    compression = "GZIP")

  job <- bigrquery::wait_for(job, quiet = TRUE)

  return(job)
}


#' Delete a table if it exists and do a backup in gs://
#'
#' @param project_id string
#' @param dataset string
#' @param table string
#' @param [bucket="backup_deleted"] string target gs:// bucket
#'
#' @return initial list of tables in the dataset

delete_if_exists <- function(project_id, dataset, table, bucket = "backup_deleted") {

  # check if the dataset exists
  assertive::assert_all_are_true(
    bigrquery::exists_dataset(project = project_id, dataset = dataset)
  )

  # retrieve list of tables from the dataset
  table_list <- bigrquery::dbListTables(
    # DBI::dbConnect
    conn = DBI::dbConnect(
      drv = bigrquery::bigquery(),
      project = project_id, # generalize
      dataset = dataset
    )
  )

  # delete the table if it exists
  if (table %in% table_list) {
    # consider to do a backup in google storage, versioned by each deletion (IMPORTANT)
    # and put a parameter whether to backup deleted tables
    gs_backup(
      project_id = project_id,
      dataset = dataset,
      table = table,
      bucket = bucket)
    bigrquery::delete_table(project = project_id, dataset = dataset, table = table)
  }

  return(table_list)
}


#' Store the results of a query in a table
#' If table exists remove it and write the new data.
#'
#' @param project_id string
#' @param view_path string The path the table is written to Dataset_Name.Table_Name
#' @param query string Query in standardSQL
#' @param dataset string A dataset to write into
#' @param table string The destination table name
#'
#' @return data.frame with the first 100 rows of the table written

#' @example
#' bigrquery::set_service_token("my_service_token.json")
#' project_id <- "my_project"
#' query <- "select * from `project.dataset.table`"
#'
#' res <- view_to_table(
#'  project_id = project_id
#'  view_path = "dataset.table",
#'  query = query,
#'  dataset = "dataset,
#'  table = "table"
#' )

view_to_table <- function(project_id, view_path, query, dataset, table) {
  assertive::assert_all_are_true(
    bigrquery::exists_dataset(project = project_id, dataset = dataset)
    )

  table_list <- delete_if_exists(
    project_id = project_id,
    dataset = dataset,
    table = table) # extend to account for the bucket

  # Run query, create table and return the hundred rows
  result <- bigrquery::query_exec(
    query = query,
    project = project_id, # generalize
    use_legacy_sql = FALSE,
    destination_table = view_path,
    max_pages = 1,
    page_size = 100)

  assertive::assert_all_are_true(nrow(result) > 0)
  assertive::assert_all_are_true(
    bigrquery::exists_table(
      project = project_id,
      dataset = dataset,
      table = table
      )
  )

  # TBD: Return a list containing more stuff useful for debugging
  return(result)
}


#' Download table to memory through gs:// and local fs
#' Use case: Big table to be fully downloaded and bq API will be too slow for that
#'
#' @param project_id string
#' @param dataset string
#' @param table string
#' @param bucket string gs:// Bucket for backups
#' @param job_name string optional
#' @param read_csv_col_types vector optional for parsing csv
#' @param read_csv_guess_max numeric guess the type by nr rows
#' @param target_directory string or function indicating where to store temp files
#' @param multi_file  Whether to load to memory multiple tables
#'
#' @return data.frame with the full table in memory

download_table <- function(
  project_id, dataset,
  table, bucket,
  job_name = NULL,
  read_csv_col_types = NULL,
  read_csv_guess_max = 10000,
  target_directory = getwd(),
  multi_file = FALSE
  ) {
  # set random job name
  if (is.null(job_name)) {
    job_name <- paste0(sample(LETTERS, 15), collapse = '')
  }

  # check if table exists (assert) ----
  assertive::assert_all_are_true(
    bigrquery::exists_table(project = project_id, dataset = dataset, table = table)
  )

  ## check if bucket exists ----
  ## modify in the future such that it creates the bucket as needed
  bucket_list <- googleCloudStorageR::gcs_list_buckets(projectId = project_id)
  assertive::assert_all_are_true(
    bucket %in% bucket_list$name
  )

  # Check if multiple files are to be exported
  if (multi_file == TRUE) {
    job_name_gcs <- paste0(job_name, '_*')
  } else {
    job_name_gcs <- job_name
  }

  job <- bigrquery::insert_extract_job(
    project = project_id,
    dataset = dataset,
    table = table, # job_name,
    destination_uris = paste0('gs://', bucket, '/', job_name, '.csv.gz'),
    print_header = TRUE,
    field_delimiter = ",",
    destination_format = "CSV",
    compression = "GZIP")
  job <- bigrquery::wait_for(job, quiet = TRUE)


  # import from gcs and tidy up
  objects <- googleCloudStorageR::gcs_list_objects(bucket)
  to_download <- grep(job_name, objects$name, value = TRUE)

  ## check if it has permissions to write into local folder ----
  # 0 success and 1 failure
  if (file.access(".", 2) == 1) {
    Sys.chmod(".", "777")
  }

  # download to the disk
  lapply(to_download, function(name){
    googleCloudStorageR::gcs_get_object(
      bucket = bucket,
      name,
      overwrite = TRUE,
      saveToDisk = paste0(target_directory, '/', name)
    )
    googleCloudStorageR::gcs_delete_object(name, bucket = bucket)
  })

  # Read from temporary files
  df <- dplyr::bind_rows(
    lapply(to_download, function(file){
    suppressMessages(
      readr::read_csv(file,
               progress = FALSE,
               col_types = read_csv_col_types,
               guess_max = read_csv_guess_max)
    )
  })
  )

  ## Check if df is non empty

  # Clear temporary files
  lapply(to_download, function(file){
    file.remove(file)
  })

  return(df)
}

#' Download view results to memory through BQ gs:// and local fs
#' Use case: Big view results to be fully downloaded
#'   and bq API will be too slow for that
#' Note that it cleans up all temporary stuff after itself
#'
#' @param sql string the query you want to execute and store
#' @param project string
#' @param use_legacy_sql boolean
#' @param quiet boolean
#' @param target_dataset string
#' @param target_bucket string
#' @param job_name string optional
#' @param read_csv_col_types vector optional for parsing csv
#' @param read_csv_guess_max numeric guess the type by nr rows
#' @param target_directory string or function indicating where to store temp files
#' @param multi_file Whether to load to memory multiple tables
#'
#' @return data.frame with the full table in memory

# TBD: give more meaningful names to arguments
bq_query_gcs <- function(
  sql,
  project,
  use_legacy_sql = FALSE,
  quiet = FALSE,
  target_dataset,
  target_bucket,
  target_directory = getwd(),
  job_name = NULL,
  multi_file = FALSE,
  read_csv_col_types = NULL,
  read_csv_guess_max = 10000) {

  # set random job name
  if (is.null(job_name)) {
    job_name <- paste0(sample(LETTERS, 15), collapse = '')
  }

  # temporary table to write the query results (with gibberish name)
  destination_table <- paste0(target_dataset, '.', job_name)

  # write Query to temporary table
  job <- bigrquery::insert_query_job(sql,
    project = project,
    destination_table = destination_table,
    default_dataset = NULL,
    use_legacy_sql = use_legacy_sql)
  job <- bigrquery::wait_for(job, quiet = quiet)

  df <- download_table(
    project_id = project,
    dataset = target_dataset,
    table = job_name,
    bucket = target_bucket,
    job_name = job_name)

  # cleanup the temporary tables from Bq
  bigrquery::delete_table(project = project,
                          dataset = target_dataset,
                          table = job_name)

  return(df)
}
Bizovi/gcphelper documentation built on May 7, 2019, 8:19 a.m.