R/database_imports.R

Defines functions find_columns_in_all_new_db get_colnames_duckdb columns_to_find add_zero get_list_of_years read_duckdb_for_new_old_db read_cols import_db_data

Documented in get_list_of_years import_db_data

#' Import crash, vehicle, person from crash database
#'
#' This imports all data based on crash db type, years selected, and columns
#' selected. It combines old and new crash data into a single dataframe. It
#' renames columns of the old db to match db and renames some variables, such as
#' CRSHSVR, to match new db. Note: if an old db is imported, all columns will be
#' imported.
#'
#' Note that DRVRFLAG is derived differently for old and new db. New db defines
#' a driver of any MV, while old db also include non-motorists.
#' @param db_loc location of crash database file ("C:/data/crashes_duck.duckdb")
#' @param db_type Type of database - any one of "crash", "vehicle", or "person"
#' @param years Year(s) of db data c("20", "21").
#' @param columns Columns to be imported. For the new db these columns will
#'   always be imported (if applicable): "CRSHNMBR", "CRSHDATE", "CNTYCODE
#'   ,"CRSHSVR", "UNITNMBR", "ROLE","VEHTYPE","WISINJ", "UNITPRSN", "UNITTYPE". Columns with multiples,
#'   like DRVRPC and ANMLTY, only the first part without the number should be
#'   inputted. For old db, all columns will be imported. If columns = "all", all
#'   columns will be selected.
#' @param filter_by Option to filter by county, separated by a comma. Use
#'   "county:Dane, Rock"
#'
#' @return df of either crash, vehicle or person. 'year' column.
#' @export
#'
#' @examples
#' import_db_data(db_loc = "C:/data/crashes_duck.duckdb", db_type = "crash",
#'   years = c("16", "17","18"), columns = c("DRVRPC"))
import_db_data <-
  function(db_loc = "C:/data/crashes_duck.duckdb",
           db_type = "crash",
           years = c("21"),
           columns = c("CRSHSVR"),
           filter_by = NULL) {
    con <-
      duckdb::dbConnect(duckdb::duckdb(), dbdir = db_loc, read_only = TRUE) # ":memory:")

    # Separate years into old and new db
    all_years <- as.integer(years)
    years_old = as.character(all_years[which(all_years < 17 |
                                               all_years > 85)])
    years_new = as.character(all_years[which(all_years >= 17 &
                                               all_years < 30)])

    # If new years were selected, open new db data and combine df
    if (length(years_new) != 0) {
      data_years = paste(db_type, years_new, sep = "")

      if (columns[1] == "all") {
        all_col_to_select = "all"
      } else {
        cols_to_keep = columns_to_find(col_to_find = columns)
        all_col_to_select <-
          find_columns_in_all_new_db(data_new = data_years,
                                     colsToKeep = cols_to_keep,
                                     con = con)
      }
      # Bind rows differently.
      df_new <- purrr::map_dfr(
        .x = data_years,
        .f = read_duckdb_for_new_old_db,
        col_to_select = all_col_to_select,
        con = con,
        filter_by = filter_by
      )
    }

    # If old years was selected, open old db data
    if (length(years_old) != 0 & !is.na(years_old[1])) {
      # use this function to add a 0 for 2000 - 2009
      years_old <- purrr::map_chr(years_old, add_zero)
      data_years_old = paste(db_type, years_old, sep = "") # combines crashes with years to select data

      df_old <- purrr::map_dfr(
        .x = data_years_old,
        .f = read_duckdb_for_new_old_db,
        col_to_select = "all",
        con = con,
        filter_by = filter_by
      ) |>
        dplyr::filter(.data[["CRSHSVR"]] != 'NON-REPORTABLE',
                      .data[["CRSHLOC"]] == 'INTERSECTION' |
                        .data[["CRSHLOC"]] == 'NON-INTERSECTION')

      df_old
    }
    duckdb::dbDisconnect(con, shutdown = TRUE)

    # Return final df depending what exists
    if (exists("df_new") & exists("df_old")) {
      final_df <- dplyr::bind_rows(df_new, df_old)
    } else if (exists("df_new")) {
      final_df <- df_new
    } else if (exists("df_old")) {
      final_df <- df_old
    }

    return(
      final_df |>
        dplyr::mutate(
          CRSHDATE = lubridate::ymd(.data[["CRSHDATE"]]),
          year = factor(.data[["year"]], levels = sort(unique(.data[["year"]])))
        ) |>
        data.table::as.data.table()
    )
  }

db_columns_with_multiples <- "WTCOND|RDCOND|ENVPC|RDWYPC|ADDTL|CLSRSN|ANMLTY|DMGAR|VEHPC|HAZPLAC|HAZNMBR|HAZCLSS|HAZNAME|HAZFLAG|DRVRDS|DRUGTY|DRVRRS|DRVRPC|DNMFTR|STATNM|NMTACT|NMTSFQ|PROTGR|CITISS|CITNM|STATDS|STATSV|RSTRCT|CITNM"
columns_to_always_keep = c(
  "CRSHDATE",
  "CNTYCODE",
  "CRSHSVR",
  "UNITNMBR",
  "UNITPRSN",
  "UNITTYPE",
  "DRVRFLAG",
  "ROLE",
  "VEHTYPE",
  "WISINJ"
)
# Read the first row to find which columns actually exist, returns columns that exist.
# If columns = "all" , all columns are returned
read_cols <- function(data_new, colsToKeep, con = con) {

  header = dplyr::tbl(con, data_new) |> utils::head(1)

  if (is.null(colsToKeep)) {
    return(colnames(header))
  } else {
    colsToKeep2 <-
      union(
        columns_to_always_keep,
        colsToKeep
      ) # Tack these on

    columns_not_in_db <-
      subset(colsToKeep, !colsToKeep %in% colnames(header))
    columns_not_in_db = subset(columns_not_in_db, !grepl('[[:digit:]]+', columns_not_in_db))
    # Returns warning if column not found
    if (length(columns_not_in_db) != 0)
      warning(paste0(
        " ",
        paste0(columns_not_in_db, collapse = ", "),
        ' not found in new db (ignore if variable is in multiple cols)'
      ), call. = FALSE)
    # Returns only columns found in the df
    return(subset(colsToKeep2, colsToKeep2 %in% colnames(header)))
  }
}

read_duckdb_for_new_old_db <-
  function(data_new_old,
           col_to_select,
           con,
           filter_by) {

    if (col_to_select[1] != "all") {
      read_data_new_old = dplyr::tbl(con, data_new_old) |> dplyr::select(dplyr::any_of(col_to_select), "countyname", "year") |> dplyr::collect()
    } else {
      read_data_new_old = dplyr::tbl(con, data_new_old) |> dplyr::collect()
    }

    if(!is.null(filter_by)){
      counties = strsplit(sub(pattern = "^county:", replacement = "\\1", filter_by), ", ") |> unlist()
      read_data_new_old <- dplyr::filter(read_data_new_old, .data[["countyname"]] %in% counties)
    }
    read_data_new_old
  }

#' Get a list of all years in between two years
#'
#' Useful when importing data among many years. Must be exactly two digits (i.e. 1992 would be "92").
#' @param start_year start year of list, lowest is 1985.
#' @param end_year end year of list, highest is 2030.
#'
#' @return list of dates for years and years_old for data import
#' @export
#'
#' @examples
#' get_list_of_years("99", "14")
get_list_of_years <- function(start_year,
                              end_year) {
  # between 1985 and 2030
  if (start_year > 85 & end_year < 30) {
    year1 = as.character(seq(start_year, 99,  1))
    year2 = formatC(
      seq(00, end_year, 1),
      digits = 0,
      width = 2,
      format = "f",
      flag = "0"
    )
    return(c(year1, year2))
    # between 1985 and 1999
  } else if (start_year > 85 & end_year <= 99) {
    return(as.character(seq(start_year, end_year,  1)))
  } # between 2000 and 2030
  else if (start_year >= 0 & end_year < 30) {
    return(formatC(seq(start_year, end_year, 1), digits = 0, width = 2, format = "f", flag = "0"))
  }
}

add_zero <- function(single_digit) {
  if (length(single_digit) == 1)
    formatC(
      as.integer(single_digit),
      digits = 0,
      width = 2,
      format = "f",
      flag = "0"
    )
}

columns_to_find <-
  function(col_to_find) {
    # If specific columns were selected, find which match in the database
    # if (col_to_find[1] == "all") {
    #   found_columns <- "all"
    # } else {
      # Get all names for data that may be in multiple column
      columns_with_multiples <-
        subset(col_to_find,
               grepl(# Columns that have multiples
                 db_columns_with_multiples,
                 col_to_find))

      # If columns with multiples were found, make a list of all values, tack them onto the end. i.e. DRVRPC01, DRVRPC02, etc.
      if (length(columns_with_multiples) != 0) {
        # This adds the '01' to '20' to the end of each matching column
        get_all_names <-
          sapply(columns_with_multiples,
                 paste0,
                 formatC(seq(1, 20), width = 2, flag = "0")) |> as.character()

        col_to_find <- Reduce(union,
                              list(c("CRSHNMBR"),
                                   col_to_find,
                                   get_all_names))
      } else {
        col_to_find <-
          union(c("CRSHNMBR"), col_to_find)
      }
    # }
    return(col_to_find)
  }

get_colnames_duckdb <- function(file_to_read, con) {
  header <- dplyr::tbl(con, file_to_read) |> utils::head(1)
  colnames(header)
}

find_columns_in_all_new_db <-
  function(data_new,
           # filepath,
           colsToKeep,
           # filetype,
           con = con) {
    all_col_names = unique(unlist(lapply(data_new, get_colnames_duckdb, con = con))) |> unlist()

    if (is.null(colsToKeep)) {
      # WHY THIS??
      return(all_col_names)
    } else {
      colsToKeep2 <-
        union(columns_to_always_keep,
              colsToKeep) # Tack these on

      columns_not_in_db <-
        subset(colsToKeep, !colsToKeep %in% all_col_names)
      columns_not_in_db = subset(columns_not_in_db,
                                 !grepl('[[:digit:]]+', columns_not_in_db))
      # Returns warning if column not found
      if (length(columns_not_in_db) != 0)
        warning(
          paste0(
            " ",
            paste0(columns_not_in_db, collapse = ", "),
            ' not found in new db (ignore if variable is in multiple cols)'
          ),
          call. = FALSE
        )
      # Returns only columns found in the df
      return(subset(colsToKeep2, colsToKeep2 %in% all_col_names))
    }
  }
jacciz/wisdotcrashdatabase documentation built on June 3, 2023, 2:26 a.m.