#' @name redshift_query_n
#' @title redshift_query_n
#' @param sql.string SQL Query string
#' @param conn Redshift database connection object
#' @param bucket AWS bucket to store unload files
#' @param aws.role AWS role ID as string
#' @param transform.function function object detailing additional transformations to perform on the data
#' @param parallel TRUE or FALSE, do you want to do parallel processing. Defaults to FALSE
#' @param cores Cores to utilise, defaults to NULL, if NULL will detect cores on machine and utilise all of them. If parallel = FALSE parameter is not needed.
#' @param package.list packages packages needed in the transformation function. If parallel = FALSE parameter is not needed. NB: If parallel = FALSE the package must be loaded in the local environment.
#' @description Retrieves unloaded redshift query components from an AWS library. The process is parallelised to speed it up. This means much larger datasets can be queried and compiled. If parallel = FALSE parameter is not needed.
#' @examples
#'
#' # Firstly you'll need to set up AWS permissions
#'
#' Sys.setenv("AWS_ACCESS_KEY_ID" = "key",
#' "AWS_SECRET_ACCESS_KEY" = "key",
#' "AWS_DEFAULT_REGION" = "key")
#'
#' # Define SQL string for database query
#' # e.g.
#'
#' sql.string <- "SELECT * FROM some.table"
#'
#' # Setup a database connection object using redshift.connect
#'
#' conn <- redshift.connect("connection string")
#'
#' # You'll also need an existing bucket where you can deposit interim files
#' # e.g.
#'
#' bucket <- "an-aws-bucket"
#'
#' # Additional aws.role string
#'
#' aws.role <- "an-aws-role"
#'
#' # A transformation function used to make additional alterations on the data
#'
#' transform.function <- function(x) {data.table(x)}
#'
#' # Decide if you want to do parallel processing, if no
#'
#' parallel <- FALSE
#'
#' # if yes
#'
#' parallel <- TRUE
#'
#' # If parallel = TRUE
#' # Define number of threads, if undefined it'll detect the number of cores on the system and use these.
#'
#' cores <- 10
#'
#' # If parallel = TRUE
#' # A list of packages that must be used in the transformation function
#' # e.g.
#'
#' package.list <- "data.table"
#'
#' # Execute the function
#'
#' data <- redshift_query_n(sql.string, conn, bucket, cores, transform.function, package.list)
#'
#' @export
redshift_query_n <- function(sql.string, conn, bucket, aws.role, transform.function = NULL, parallel = FALSE, cores = NULL, package.list = NULL) {
execute_redshift_query <- function(sql.string, conn, bucket, aws.role) {
rand.prefix <- stri_rand_strings(1, 20)
key.prefix <- paste0("parallel-query-dump/output/",rand.prefix,"/")
redshift.unload(conn, sql.string,
filename = paste0("s3://", bucket, "/", key.prefix),
delim = ",", zip = F,
aws.role = paste0("arn:aws:iam::" ,aws.role, ":role/RedshiftCopyUnload"))
entries <- aws.s3::get_bucket(bucket, key.prefix)
get_key <- function(entries, x) {tryCatch(entries[[x]]$Key, error = function(e) {NULL})}
keys <- unlist(llply(seq(1,NROW(entries),1), function(x) {get_key(entries, x)}))
return(keys)}
keys <- execute_redshift_query(sql.string, conn, bucket, aws.role)
readDataFromS3 <- function(key) {
cat(sprintf("reading from bucket %s key %s\n", bucket, key))
obj <- aws.s3::get_object(key, bucket)
read.csv(text = rawToChar(obj), header = FALSE, stringsAsFactors = FALSE)
}
if(parallel == TRUE) {cores.n <- if(is.null(cores)) {detectCores()} else {cores}}
if(parallel == TRUE) {cluster <- makeCluster(cores)}
if(parallel == TRUE) {registerDoParallel(cluster)}
if(parallel == TRUE) {cat(sprintf('Made cluster with %d cores\n', cores.n))}
if(parallel == TRUE) {
data <- tryCatch(if(is.null(transform.function))
{rbindlist(foreach(key = keys, .packages = package.list) %dopar% readDataFromS3(key))} else
{rbindlist(foreach(key = keys, .packages = package.list) %dopar% transform.function(readDataFromS3(key)))},
error = function(err) {print("Data binding failed.")})
} else {
data <- tryCatch(if(is.null(transform.function))
{rbindlist(llply(keys, readDataFromS3))} else
{rbindlist(llply(keys, function(x) {transform.function(readDataFromS3(x))}))},
error = function(err) {print("Data bindling failed")})
}
print("Cleaning up")
if(parallel == TRUE) {
foreach(key = keys, .packages = "aws.s3") %dopar% delete_object(key, bucket, quiet = TRUE)
stopCluster(cluster) } else {
for (i in 1:NROW(keys)) {
delete_object(keys[i], bucket, quiet = TRUE)
}
}
return(data)}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.