R/write_csv_to_hive3.R

Defines functions write_csv_to_hive3

Documented in write_csv_to_hive3

#' @title Write a CSV file to Hive 3
#'
#' @details Uploads a CSV file to the edge node, uploads it to Hive, and creates
#' a managed table. The function also cleans up the csv file on the edge node
#' and in the users' hdfs home location.  This assumes that when you log into
#' Hive/Hadoop, the login is similar to `XXXXX@edge.hadoop.co.com`
#'
#' @param csv_name string name of the file with .csv extension
#' @param csv_folder string path to folder where CSV file is stored. Defaults to
#'   current working directory represented by ".". Do not place a "/" at the end
#'   of the path.
#' @param id string ID of user. Password will be requested at function call.
#' @param schema string schema name in hive
#' @param table string name for the table in hive.  One will be created if not
#'   exists.
#' @param server string server extention or path
#' @param append_data logical, defaults to FALSE for overwrite; TRUE appends the
#'   to the data.
#'
#' @return Does not return anything.
#'
#' @examples
#' \dontrun{
#' library(ssh)
#' library(dplyr)
#' library(readr)
#' library(askpass)
#' library(magrittr)
#'
#' csv_name <- 'file.csv'
#' csv_folder <- getwd()
#' id <- 'XXXXX'
#' server <- 'edge.hadoop.co.com'
#' schema <- 'schema'
#' table <- 'table'
#' write_csv_to_hive3(csv_name = csv_name,
#'                    csv_folder = csv_folder,
#'                    id = id,
#'                    schema = schema,
#'                    table = table,
#'                    server = server,
#'                    append_data = FALSE)
#' }
#'
#' @importFrom magrittr %>%
#' @import ssh
#' @import dplyr
#' @import readr
#' @import askpass
#' @export

write_csv_to_hive3 <- function(csv_name,csv_folder = ".",
                              id,
                              schema,
                              table,
                              server,
                              append_data = FALSE) {

  # build parameters for table names
  schema_table <- paste0(tolower(schema),".",table) # Managed Table
  schema_table_stg <- paste0(schema_table,"_stg") # External Table
  csv_file <- file.path(csv_folder,csv_name) # Build path to file

  # get gassword and set csv file name
  .pwd <- askpass::askpass('password')

  # build metadata
  df <- readr::read_csv(file = csv_file)
  col_names <- names(df)
  col_types <- sapply(df, class)
  schema_df <- data.frame(col_names, col_types) %>%
    dplyr::mutate(hive_col_types = dplyr::case_when(tolower(col_types) %in% c('character', 'date') ~ 'STRING',
                                                    tolower(col_types) == 'integer' ~ 'INT',
                                                    tolower(col_types) == 'numeric' ~ 'FLOAT',
                                                    TRUE ~ 'STRING'))
  cols_for_hive <- paste(schema_df$col_names, " ", schema_df$hive_col_types, collapse = ",\n")

  # ssh to hive
  login <- paste0(toupper(id),'@',server)
  session <- ssh::ssh_connect(login, passwd = .pwd)

  # make directory for SCP to edge node
  edge_dir <- sprintf("/home_dir/%s/write_csv_to_hive",tolower(id))
  ssh::ssh_exec_wait(session, command = c(paste('mkdir',edge_dir)))

  # upload csv file to edge node
  ssh::scp_upload(session, csv_file, to = edge_dir)

  # prep hdfs and copy csv file to hdfs from edge node:
  # hdfs dfs -put -f <local path> <hdfs path> <=> moves file to folder
  ssh::ssh_exec_wait(session, command = c('hdfs getconf -confKey fs.defaultFS'), std_out = './name_node.txt')
  name_node <- read_file('./name_node.txt') %>% stringr::str_replace("\n","")
  hdfs_dir <- paste0(file.path(name_node,"user",toupper(id),"hive",table),"/")
  ssh::ssh_exec_wait(session, command = c(paste('hdfs dfs -mkdir',hdfs_dir)))
  ssh::ssh_exec_wait(session, command = c(paste('hdfs dfs -put -f',file.path(edge_dir,csv_name),hdfs_dir)))


  # Step #1: build external table ---------------------------------------------
  query_external <- dplyr::sql(paste0(
    "hive -e ",
    "'CREATE EXTERNAL TABLE IF NOT EXISTS ", schema_table_stg, " (\n",
    cols_for_hive,
    ') COMMENT "TABLE CREATED BY R CODE" \n',
    'ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ","
    STORED AS TEXTFILE ',
    "\n LOCATION ",
    '"',file.path('',"user",toupper(id),"hive",table),'/"',
    '\n tblproperties ("skip.header.line.count"="1")',
    ";'"
  ))

  ssh::ssh_exec_wait(session, command = c(dplyr::sql(query_external)))

  # Step #2: build the manage table ---------------------------------
  # build managed table schema
  query_managed <- dplyr::sql(paste0(
    "hive -e ",
    "'create table if not exists ", schema_table, " (\n",
    cols_for_hive,
    ') COMMENT "TABLE CREATED BY R CODE" \n',
    ";'"
  ))

  ssh::ssh_exec_wait(session, command = c(dplyr::sql(query_managed)))

  # overwrite/append managed table with data from staged external table
  # append or not append, that is the question
  append_script <- if (append_data == TRUE) {
    ' INSERT INTO TABLE '
  } else {
    ' INSERT OVERWRITE TABLE '
  }

  load_managed <- dplyr::sql(paste0(
    "hive -e ",
    "'", append_script, schema_table,
    " \n select * from ", schema_table_stg,";'"
  ))

  ssh::ssh_exec_wait(session, command = c(dplyr::sql(load_managed)))

  # Step #3: clean up  --------------------------------------------------------
  # remove stage table and external file
  query_rm_stg <- dplyr::sql(
    sprintf("hive -e 'drop table %s;'", schema_table_stg)
  )

  ssh::ssh_exec_wait(session, command = c(dplyr::sql(query_rm_stg)))
  ssh::ssh_exec_wait(session, command = c(paste('hdfs dfs -rm -r',hdfs_dir)))

  # Disconnect and rm password and csv file from edge node
  ssh::ssh_exec_wait(session, command = c(sprintf('rm -rf %s',edge_dir)))
  ssh::ssh_disconnect(session)
  file.remove('./name_node.txt')
  rm(.pwd)
}
Fredo-XVII/RToolShed documentation built on March 17, 2024, 12:15 p.m.