R/dplyr_utils.R

#---- tbl -----
tbl.src_connectR <-
  function(src, from, ...) {
    tryCatch({
      dbplyr::tbl_sql("connectR", src = src, from = from, ...)
    })
  }

#---- copy_to ----

copy_to.src_connectR <-
  function(conn,
           df,
           name = deparse(substitute(df)),
           overwrite = FALSE,
           temporary = FALSE,
           types = NULL,
           indexes = NULL,
           unique_indexes = NULL,
           force = FALSE,
           append = FALSE,
           primary = NULL,
           ...) {
    assertthat::assert_that(
      is.data.frame(df),
      assertthat::is.string(name),
      assertthat::is.flag(temporary),
      assertthat::is.flag(overwrite),
      assertthat::is.flag(append)
    )
    
    # cannot add indexes to volatile tables
    if (temporary == TRUE &&
        (!is.null(indexes) || !is.null(unique_indexes))) {
      stop(paste0(
        conn$info$dbms.name,
        " cannot add indexes to volatile tables."
      ),
      call. = FALSE)
    }
    
    # prevent confusion from having multiple classes
    class(df) <- "data.frame"
    
    #overwrite functionality
    if (append == TRUE && overwrite == TRUE) {
      stop("Overwrite and append can't both be TRUE.")
    }
    
    if (conn$info$dbms.name == "PostgreSQL") {
      names(df) <- tolower(names(df))
    }
    
    #column name formatting
    db_rename_column(conn, df) -> names(df)
    
    vars <- names(df)
    
    # PRIMARY INDEX
    if (!is.null(primary)) {
      if (overwrite == TRUE) {
        db_drop_table(conn, name)
      }
      assertthat::assert_that(all(tolower(unlist(primary)) %in% tolower(vars)))
      
      db_create_primary(conn,
                        name = name,
                        value = df,
                        primary = primary)
      
      DBI::dbWriteTable(
        conn = conn$con,
        name = name,
        value = df,
        temporary = temporary,
        types = types,
        unique_indexes = unique_indexes,
        indexes = indexes,
        force = force,
        append = TRUE,
        ...
      )
      #crude method to add indexes
      if (!is.null(indexes) | !is.null(unique_indexes)) {
        if (is.null(unique_indexes)) {
          assertthat::assert_that(all(tolower(unlist(indexes)) %in% tolower(vars)))
          db_create_indexes(conn$con, name, indexes, unique = FALSE)
        } else {
          assertthat::assert_that(all(tolower(unlist(
            unique_indexes
          )) %in% tolower(vars)))
          db_create_indexes(conn$con, name, unique_indexes, unique = TRUE)
        }
      }
      
      print(
        paste0(
          "data.frame ",
          name,
          " with rows: ",
          nrow(df),
          " size: ",
          format(object.size(df), units = "auto"),
          " has been created in ",
          conn$info$dbms.name
        )
      )
      
      # No Primary Index
    } else{
      assertthat::assert_that(all(tolower(unlist(primary)) %in% tolower(vars)))
      
      DBI::dbWriteTable(
        conn = conn$con,
        name = name,
        value = df,
        overwrite = overwrite,
        temporary = temporary,
        types = types,
        force = force,
        append = append,
        ...
      )
      #crude method to add indexes
      if (!is.null(indexes) | !is.null(unique_indexes)) {
        if (is.null(unique_indexes)) {
          assertthat::assert_that(all(tolower(unlist(indexes)) %in% tolower(vars)))
          db_create_indexes(conn$con, name, indexes, unique = FALSE)
        } else {
          assertthat::assert_that(all(tolower(unlist(
            unique_indexes
          )) %in% tolower(vars)))
          db_create_indexes(conn$con, name, unique_indexes, unique = TRUE)
        }
      }
      
      print(
        paste0(
          "data.frame ",
          name,
          " with rows: ",
          nrow(df),
          " size: ",
          format(object.size(df), units = "auto"),
          " has been created in ",
          conn$info$dbms.name
        )
      )
    }
  }

#---- copied from dplyr ----
"%||%" <-
  function(x, y) {
    if (is.null(x)) {
      y
    } else {
      x
    }
  }


sql_mutating_join <-
  function(type,
           x,
           y,
           by = NULL,
           copy = FALSE,
           suffix = c(".x", ".y"),
           auto_index = FALSE,
           ...) {
    by <- dplyr:::common_by(by, x, y)
    y <-
      dplyr:::auto_copy(x, y, copy, indexes = if (auto_index)
        list(by$y))
    sql <-
      sql_join(x$src$con,
               x,
               y,
               type = type,
               by = by,
               suffix = suffix)
    update(tbl(x$src, sql), group_by = groups(x))
  }

#---- db_drop_table ----
db_drop_table.src_connectR <-
  function(conn,
           table,
           force = FALSE,
           purge = FALSE,
           ...) {
    SQL <-
      sql(paste0("DROP TABLE ", if (force) {
        "IF EXISTS "
      }, table, if (purge) {
        " PURGE"
      }))
    DBI::dbExecute(conn$con, SQL)
    print(paste0(
      "Table ",
      table,
      " has been deleted from the ",
      conn$info$dbms.name
    ))
  }

#---- db_drop_view ----
db_drop_view <-
  function(conn,
           table,
           force = FALSE,
           purge = FALSE,
           ...) {
    UseMethod("db_drop_view")
  }

db_drop_view.src_connectR <-
  function(conn,
           table,
           force = FALSE,
           purge = FALSE,
           ...) {
    SQL <-
      sql(paste0("DROP VIEW ", if (force) {
        "IF EXISTS "
      }, table, if (purge) {
        " PURGE"
      }))
    DBI::dbExecute(conn$con, SQL)
    print(paste0("View ", table, " has been deleted from the ", conn$info$dbms.name))
  }

#---- sql_escape_ident ----
#
sql_escape_ident.src_connectR <-
  function(con, x) {
    dbplyr::sql_quote(x, "`")
  }

#---- sql_escape_string ----
sql_escape_string.src_connectR <-
  function(con, x) {
    dbplyr::sql_quote(x, "'")
  }

#---- compute ----
compute.tbl_connectR <-
  function(x,
           name,
           primary = NULL,
           temporary = FALSE,
           append = FALSE,
           overwrite = FALSE,
           force = FALSE,
           type = NULL,
           data = FALSE,
           query = FALSE,
           ...) {
    # TBD: add params to control location and other CREATE TABLE options
    
    assertthat::assert_that(
      assertthat::is.string(name),
      assertthat::is.flag(temporary),
      assertthat::is.flag(append),
      assertthat::is.flag(force),
      assertthat::is.flag(data),
      assertthat::is.flag(query)
    )
    
    if (is.null(type)) {
      type <- "table"
    }
    
    if (temporary) {
      stop(
        "connectR does not support temporary tables. Set temporary = FALSE in compute().",
        call. = FALSE
      )
    }
    
    #overwrite functionality
    if (append == TRUE && overwrite == TRUE) {
      stop("Overwrite and append can't both be TRUE.")
    }
    
    #primary functionality
    if (append == TRUE  && !is.null(primary)) {
      stop("can't create a primary index when appending to existing table",
           call. = FALSE)
    }
    
    if (trimws(tolower(type)) == "view" && !is.null(primary)) {
      stop("can't create a primary index when on a view",
           call. = FALSE)
    }
    
    
    if (trimws(tolower(type)) == "table" && overwrite == TRUE) {
      db_drop_table(x$src, name)
    } else if (trimws(tolower(type)) == "view" &&
               overwrite == TRUE) {
      db_drop_view(x$src, name)
    }
    
    #variables from query
    vars <- dbplyr::op_vars(x)
    assertthat::assert_that(all(tolower(unlist(primary)) %in% tolower(vars)))
    
    x_aliased <- select(x,!!!rlang::syms(vars))
    SQL <- dbplyr::db_sql_render(x$src$con, x_aliased$ops)
    
    # TBD: implement db_compute.impala_connection and call it here instead of db_save_query
    
    
    
    name <- db_save_query(
      conn = x$src,
      sql = SQL,
      name = name,
      primary = primary,
      temporary = FALSE,
      append = append,
      force = force,
      type = trimws(type),
      data = data,
      query = query,
      ...
    )
    if (query == TRUE)
      (return(name))
  }


#---- db_save_query ----
db_save_query.src_connectR <-
  function(conn,
           sql,
           name,
           primary = NULL,
           temporary = FALSE,
           append = FALSE,
           force = FALSE,
           type = NULL,
           data = FALSE,
           query = FALSE,
           ...) {
    # TBD: add params to control location and other CREATE TABLE options
    
    assertthat::assert_that(
      assertthat::is.string(name),
      assertthat::is.flag(temporary),
      assertthat::is.flag(append),
      assertthat::is.flag(force),
      assertthat::is.flag(data),
      assertthat::is.flag(query)
    )
    if (temporary) {
      stop(
        "connectR does not support temporary tables. Set temporary = FALSE in db_save_query().",
        call. = FALSE
      )
    }
    
    prep <-
      paste0("CREATE ", if (is.null(type) ||
                            tolower(type) == "table") {
        sql("TABLE ")
      } else if (tolower(type) == "view") {
        sql("VIEW ")
      } else {
        stop("Check type, connectR currently only supports table or view",
             call. = FALSE)
      },
      name,
      " ",
      "AS (")
    
    dat <- if (data == TRUE) {
      ""
    } else {
      "NO"
    }
    
    sqlend <-
      paste0(if (is.null(type) ||
                 (tolower(type) == "table") && is.null(primary)) {
        sql(paste0("WITH ", dat , " DATA;"))
      } else if (tolower(type) == "view") {
        sql(";")
      } else {
        pri(primary, data)
      })
    
    tt_sql <- sql(paste0(if (append == T) {
      paste0("INSERT INTO ", name, " ")
    } else {
      prep
    },
    sql, if (append == FALSE) {
      paste0(") ", sqlend)
    } else{
      ""
    }))
    
    if (query == TRUE) {
      return(tt_sql)
    }
    
    DBI::dbExecute(conn$con, tt_sql)
    name
  }

pri <-
  function(primary, data) {
    x <- paste0(primary, collapse = "\', \'")
    if (data == TRUE) {
      pri <- paste0("WITH DATA \n PRIMARY INDEX (\'", x, "\');")
    } else {
      pri <- paste0("WITH NO DATA \n PRIMARY INDEX (\'", x, "\');")
    }
    pri
  }

#---- db_data_type ----

db_data_type.src_connectR <-
  function(con, fields, ...) {
    char_type <- function(x) {
      n <- max(nchar(as.character(x), "bytes"), 0L, na.rm = TRUE)
      if (n <= 65535) {
        paste0("varchar(", n, ")")
      } else {
        "mediumtext"
      }
    }
    
    data_type <-
      function(x) {
        switch(
          class(x)[1],
          logical =   "boolean",
          integer =   "integer",
          numeric =   "float",
          factor =    char_type(x),
          character = char_type(x),
          Date =      "date",
          POSIXct =   "datetime",
          stop("Unknown class ", paste(class(x), collapse = "/"), call. = FALSE)
        )
      }
    vapply(fields, data_type, character(1))
  }
DyfanJones/connectR documentation built on May 23, 2019, 10:32 p.m.