R/process.R

Defines functions calc_count_by_site_inst calc_active_instruments pre_process calc_n_active_rolling create_serial_list_cols get_site_info add_negative co_detection split_instrument_version path_count_by_site

Documented in calc_active_instruments calc_count_by_site_inst co_detection get_site_info pre_process

# Script started 2/20/20


# custom R functions used  in respiratory panel etc. data analysis

# package to import
#'@import dplyr
#'@import tidyr


# daily pathogen count by site --------------------------------------------

path_count_by_site <- function(df) {
  # args:
  #   df--dataframe with at least "RunDataID", "SiteID", "TargetName", "date"
  # returns:
  #   list (each list element is for one SiteID) each element is a df
  #     that includes daily counts of each pathogen

  # run check
  check_path_count_input(df)

  SiteIDs <- unique(df$SiteID)

  # unique pathogens
  all_pathogens <- unique(df$TargetName)

  # non-control pathogen names
  all_pathogens <- all_pathogens[!stringr::str_detect(all_pathogens, "[Cc]ontrol")]

  out <- purrr::map(SiteIDs, function(id) {
    df_site <- filter(df, SiteID == id)
    #print(id)

    # list where each element is df for each pathogen (for the site)
    # some of these data frames will be empty if that pathogen not
    # detect at that site--this is good because of right join with date
    # and makes later join with TUR data work (i.e. all pathogen/date combos accounted for)
    # note this scoping wouldn't work if the following anonymous function
    # weren't made in the outer function above
    site_by_path <- purrr::map(all_pathogens, function(path) {
      df_site %>%
        filter(TargetName == path)
    })

    # note--previously right join to a complete dates vector was done
    # this creates a very large df (many dates with a specific pathogen missing)
    # however that means caution needs to be applied in the future to add
    # those missing dates back in when needed

    # inner "loop" over pathogens--i.e. daily count
    # of number of that particular pathogen for this site
    count_by_path1 <- purrr::map2(site_by_path, all_pathogens, function(df, path) {
      df_out <- df %>%
        # including SiteID in group so this column preserved
        # even though all siteID are equal in this df
        group_by(date, SiteID) %>%
        # number of tests per day
        summarize(daily_count = lu(RunDataID)) %>%
        ungroup() %>% # leaving grouped may cause unexpected behaviour downstream
        arrange(date) %>% # not strictly necessary here
        mutate(TargetName = path,
               SiteID = id)  # because otherwise site ID can be id b/ join with date

      df_out
    })

    df_site_out <- bind_rows(count_by_path1)

    df_site_out
  })

  out
}


# split by instrument type ------------------------------------------------

split_instrument_version <- function(df) {
  # args:
  #   df--dataframe that includes an InstrumentVersion column
  # returns:
  #   list with elements being seperate dfs for each instrument version,
  #     and an "all" element which is the orginal df
  stopifnot(
    is.data.frame(df),
    "InstrumentVersion" %in% names(df),
    df$InstrumentVersion %in% c("FA1.5", "FA 1.5", "FA2.0", "FA 2.0", "Torch")
  )
  out <- split(df, df$InstrumentVersion)
  out[["all"]] <- df # also include the entire df
  out
}

# co detections -----------------------------------------------------------


#'  Replace co-detections
#'
#'  When multiple TargetShorNames of ResultType == organism occur for a
#'  specific RunDataID, then TargetNames are converted to "co-detection".
#'
#' @param df dataframe with at least columns of "RunDataID", "ResultType",
#'   "PouchTitle", "TargetName"
#' @param target_PouchTitle Name of the PouchTitle of interest
#' @return dataframe with same dimensions as input
#' @examples
#' df <- pre_process(rp_raw)
#' co_detection(df)
#' @export

co_detection <- function(df, target_PouchTitle = "Respiratory_Panel") {

  required_cols <- c("RunDataID", "ResultType", "PouchTitle", "TargetName")
  check_cols(df, required_cols)

  stopifnot(
    is.character(target_PouchTitle),
    length(target_PouchTitle) == 1,
    target_PouchTitle %in% df$PouchTitle,
    "organism" %in% df$ResultType
  )

  co_detection_df <- df %>%
    filter(PouchTitle == target_PouchTitle, ResultType == "organism") %>%
    group_by(RunDataID) %>%
    summarize(num_co = lu(TargetName)) %>% # number of co-detections
    filter(num_co > 1)

  # this creates duplicate rows of co-detection--downstream functions should remove
  out <- left_join(df, co_detection_df, by = "RunDataID") %>%
    mutate(TargetName = ifelse(!is.na(num_co)  & num_co > 1, "co-detection",
                                    TargetName)) %>%
    select(-num_co)
  out
}


# add negatives -----------------------------------------------------------

# does not need to be exported
add_negative <- function(df, target_PouchTitle) {
  # args:
  #   df--dataframe with "RunDataID", "date", "SiteID", "InstrumentSerialNumber",
  #    "InstrumentVersion", "PouchTitle", "ResultType" columns
  # returns:
  #   df with new rows added to for RunDataID's that are negatives (ie no organism for ResultType)
  #   TargetName of new rows are labeled "negative"
  required_cols <- c("RunDataID", "date", "SiteID", "InstrumentSerialNumber",
                     "InstrumentVersion", "PouchTitle", "ResultType", "TargetResult",
                     "TargetName")

  check_cols(df, required_cols)

  stopifnot(
    is.character(target_PouchTitle),
    length(target_PouchTitle) == 1,
    target_PouchTitle %in% df$PouchTitle,
    "organism" %in% df$ResultType
  )

  negatives <- df %>%
    filter(PouchTitle == target_PouchTitle) %>%
    group_by(RunDataID, date, SiteID, InstrumentSerialNumber,
             InstrumentVersion, PouchTitle) %>%
    # number of organisms detected
    summarize(num_organism = sum(ResultType == "organism")) %>%
    filter(num_organism == 0) %>% # only negatives
    select(-num_organism) %>%
    mutate(ResultType = "organism",  # calling negative an organism for now
           TargetResult = "Positive",
           TargetName = "negative")

  out <- bind_rows(df, negatives)
  out
}


# get site info -----------------------------------------------------------


#'  Extract info for each site
#'
#'  Returns info associated with with each SiteID, removing duplicated rows
#'
#' @param df dataframe that includes SiteID, and other site specific columns
#' @param cols character vector that contains the names of columns to return.
#' @return dataframe with with columns of SiteID, Region, Country, ZipCode (default)
#'   and no duplicate rows
#' @examples
#'  get_site_info(rp_raw)
#' @export
get_site_info <- function(df, cols = c("SiteID", "Region", "Country", "ZipCode")) {
  # args:
  #   original df that contains SiteID, region and country columns
  # returns:
  #   each unique SiteID and corresponding region and country
  stopifnot(
    is.data.frame(df),
    is.character(cols),
    'SiteID' %in% cols,
    cols %in% names(df)
  )

  out <- df[!duplicated(df[["SiteID"]]), cols]
  out
}


# create list cols of unique serial numbers by site -----------------------

# df <- all2

create_serial_list_cols <- function(df, target_PouchTitle) {
  # args:
  #   df--dataframe containing columns of "RunDataID", "SiteID", "date",
  #         "InstrumentSerialNumber", "InstrumentVersion", "PouchTitle"
  #   target_PouchTitle--name of PouchTitle of interest
  # returns:
  #   list with 4 elements, each element is a dataframe, "all" for all test,
  #   and the others are FA1.5, FA2.0 and Torch. each output data frame
  #   includes a list column "unique_serial_all" where the element of the list
  #   is a vector of all unique instrument serial numbers on that date
  #   also returns number of RP tests and num non rp tests for that date
  required_cols <- c("RunDataID", "SiteID", "date",
                     "InstrumentSerialNumber", "InstrumentVersion",
                     "PouchTitle")

  check_cols(df, required_cols)

  stopifnot(
    any(df$PouchTitle == target_PouchTitle)
  )

  if (all(df$PouchTitle == target_PouchTitle)) {
    warning("all PouchTitles are ", target_PouchTitle, " --include non ",
            target_PouchTitle, " tests also")
  }

  # only interested in unique tests--not interested in what pathogens
  # detected
  # NOTE to generalize nameing down the road switch from rp and other to
  # something else
  df_a <- df[!duplicated(df$RunDataID), ] %>%
    mutate(is_rp = stringr::str_detect(PouchTitle, target_PouchTitle),# logical--is an RP test
           panel = ifelse(is_rp, "rp", "other")) %>%
    select(RunDataID, SiteID, date, panel, InstrumentSerialNumber, InstrumentVersion)

  # make list of df where first df includes all instruments
  # the other list elements just include specific instrument types
  df_b <- split_instrument_version(df_a)

  # create nested df for each instrument verstiion
  df_c <- purrr::map(df_b, function(df){
    df %>%
      select(-InstrumentVersion) %>% # column not needed here, added back later
      group_by(date, SiteID, panel) %>%
      # creates list column where each element of list is a df of the
      # RunDataID and instrument serial nums for that date/site for rp and non rp
      nest() %>%
      arrange(date, SiteID)
  })


  out <- purrr::map(df_c, function(df){
    df_out <- df %>%
      summarize(
        # number of unique tests/day/site
        daily_TUR = purrr::map_dbl(data, function(df) {
          lu(df$RunDataID)
        }),
        # vector of unique serial numbers for that date/site
        unique_serial = purrr::map(data, function(df) unique(df$InstrumentSerialNumber))
      ) %>%
      # seperate list cols of vectors of serial numbers other and rp
      tidyr::pivot_wider(names_from = "panel",
                  values_from = c(daily_TUR, unique_serial))

    # temp fix to deal with case where none of target pouch title present in
    # a InstrumentVersion
    if(is.null(df_out[["daily_TUR_rp"]])) {
      df_out[["daily_TUR_rp"]] <- NA
    }
    if(is.null(df_out[["unique_serial_rp"]])) {
      df_out[["unique_serial_rp"]] <- NA
    }

    # list col of vectors of unique serial numbers (both rp and non)
    df_out <- df_out %>%
      mutate(unique_serial_all = purrr::map2(unique_serial_other, unique_serial_rp,
                                      function(x, y) {
                                        unique(c(x, y))
                                      })) %>%
      select(-unique_serial_rp, -unique_serial_other) %>%
      ungroup()

    df_out
  })

  out

}


# calc n serial in last 3 months ------------------------------------------

calc_n_active_rolling <- function(list, window = 91) {
  # args:
  #   list--list of 4 data frames (one for all and the 3 instrument versions)
  #       each list element is daily data with columns of"SiteID", "date",
  #       "daily_TUR_rp", "daily_TUR_other", "unique_serial_all", where the
  #       the latter is a list column of vectors of unique serial numbers for the date
  # returns:
  #   list, same as input, except with columns of daily_TUR_all (num all tests that date)
  #     and n_active--the number of unique serial numbers (instruments) in last 3 months
  stopifnot(is.list(list))

  out <- purrr::map(list, function(df) {

    required_cols <- c("SiteID", "date", "daily_TUR_rp", "daily_TUR_other",
                       "unique_serial_all")

    check_cols(df, required_cols)

    out_df <- df %>%
      mutate(daily_TUR_all = daily_TUR_other + daily_TUR_rp) %>%
      select(-daily_TUR_other) %>%
      group_by(SiteID) %>%
      arrange(date, .by_group = TRUE) %>%
      mutate(
        # the serial numbers within window of a date
        # roll_list is a custom fun
        serial_3mo = roll_list(unique_serial_all, window = window, align = "right"),
        # number of unique serial numbers within 3 months
        n_active = purrr::map_dbl(serial_3mo, function(x) lu(unlist(x)))
      ) %>%
      select(-unique_serial_all, -serial_3mo) %>%
      ungroup()
    out_df
  })
  out
}


# pre process  ------------------------------------------------------------

#'  Pre process raw data from database
#'
#'  To be run after \code{initial_check()} check has passed.
#'  Parses columns, removes undesired TargetNames, replaces synonyms
#'  of TargetNames, and when only Controls present addes new row for
#'  with TargetName of 'negative'. ie adds new 'negative' pathogen when no
#'  ResultType == organism present.
#'
#' @param df dataframe of raw RP panel (and non RP panel) data with at least
#'   the following columns:
#'   "FlaggedAsValidation", "TargetName", "StartTime", "AssayName",
#'   "AssayResult", "StartTime", "ZipCode", "Region", "Country"
#' @param target_PouchTitle Name of the PouchTitle of interest
#' @param  remove_targets character vector of TargetNames you want to remove
#'   (or NULL)
#' @param synonyms put NULL if don't want to use. Otherwise named list
#'       where each element of the list are TargetNames that are synonyms
#'       and should be combined, the name of the element is the name to replace
#'       the synonyms with.
#' @note This function may take several minutes to run when large dataframe is
#'    used.
#' @return   Dataframe with synonyms and FlaggedAsValidation == 0 rows removed,
#'    StartTime column parsed to date, synonyms replaced, and unnecessary columns
#'    removed
#' @examples
#' pre_process(rp_raw)
#' @export
pre_process <- function(df,
                        target_PouchTitle = "Respiratory_Panel",
                        remove_targets = c("Bocavirus", "Bordetella parapertussis (IS1001)"),
                        synonyms = list(
                          "Coronavirus OC43" = c("Coronavirus OC43", "Coronavirus OC43 (RP)"),
                          "Bordetella pertussis" = c("Bordetella pertussis", "Bordetella pertussis (ptxP)"))
                        ) {


  required_cols <- c("FlaggedAsValidation", "TargetName", "StartTime",
                     "AssayName", "AssayResult", "StartTime", "ZipCode",
                     "Region", "Country")
  check_cols(df, required_cols)

  stopifnot(
    # this code only works with following values
    df$TargetResult %in% c("Positive", "Pass"),
    df$AssayResult %in% c("Negative", "Positive"),
    df$FlaggedAsValidation %in% c(0, 1),
    is.character(target_PouchTitle),
    length(target_PouchTitle) == 1,
    target_PouchTitle %in% df$PouchTitle
  )

  df1 <- filter(df, FlaggedAsValidation == 0)

  # remove certain Target Names
  if(!is.null(remove_targets)) {
    df1 <- filter(df1, !TargetName %in% remove_targets)
  }

  # replace synonyms
  if (!is.null(synonyms)) {

    stopifnot(is.list(synonyms),
              !is.null(names(synonyms))
    )
    # making named lookup vector
    lookup_list <- purrr::map2(synonyms, names(synonyms), function(x, name) {
      out <- rep(name, length(x))
      names(out) <- x
      out
    })
    lookup_vector <- purrr::reduce(lookup_list, c) # combine to one vector
    df1 <- mutate(df1,
                  new_name = lookup_vector[TargetName],
                  # replace with new name if one was given in the list
                  TargetName = ifelse(is.na(new_name), TargetName, new_name)) %>%
      select(-new_name)
  }

  out <- mutate(df1,
                StartTime = lubridate::ymd_hms(StartTime),
                date = lubridate::date(StartTime),
                # convert eg FA 2.0 to FA2.0
                InstrumentVersion = stringr::str_replace_all(InstrumentVersion, " ", "")) %>%
    select(-AssayName, -AssayResult, -StartTime, -ZipCode, -Region, -Country) %>%
    as_tibble()
  out <- add_negative(out, target_PouchTitle = target_PouchTitle)
  out
}


# calculate active instruments --------------------------------------------


#'  Calculate active instruments and number of daily tests
#'
#'  Calculates for the number of instruments with at least 1 test
#'  in the prior 3 months (calculated for each instrument version as well
#'  as all instruments combined). Additionally calculates the number of
#'  respiratory panel (or other as supplied by target_PouchTitle)
#'  tests on a given date, as well as the number of all
#'  tests on that date.
#'
#'
#' @param df dataframe of daily site level testing data. Columns must include
#'   "RunDataID", "SiteID", "date",  "InstrumentSerialNumber",
#'   "InstrumentVersion", and "PouchTitle"
#' @param window number of days to look back when calculating number of active
#'   of active instruments (default is 3 months (91 days))
#' @param target_PouchTitle Name of the PouchTitle of interest
#' @note This function may take at least several minutes to run when large
#'   dataframe is used.
#' @return   Dataframe with columns "SiteID", "date",
#'   "daily_TUR_rp" (number of tests that day--for target_PouchTitle),
#'   "daily_TUR_all" (number of all tests for that day),
#'   "n_active" (number of active instruments in past 3 months (default
#'      is 3 months)),
#'   and "InstrumentVersion" (either the name of the instrument, or 'all', for
#'   all instrument versions combined)
#'@examples
#' df <- pre_process(rp_raw)
#' calc_active_instruments(df)
#' @export
calc_active_instruments <- function(df, window = 91,
                                    target_PouchTitle = "Respiratory_Panel") {

  required_cols <- c("RunDataID", "SiteID", "date",
                     "InstrumentSerialNumber", "InstrumentVersion",
                     "PouchTitle")

  check_cols(df, required_cols)

  stopifnot(
    is.character(target_PouchTitle),
    length(target_PouchTitle) == 1,
    target_PouchTitle %in% df$PouchTitle
  )

  # for join so that no gaps in dates for each site code, grabbing range
  # of dates for each siteID
  # (not all sites go full date range so leads to smaller objects)
  dates_SiteID <- df %>%
    filter(PouchTitle == target_PouchTitle) %>%
    select(SiteID, date) %>%
    group_by(SiteID) %>%
    summarize(min = min(date),
              max = max(date)) %>%
    group_by(SiteID) %>%
    nest() %>%
    mutate(date = purrr::map(data, function(df) {
      seq(from = df$min,
          to = df$max, by = 1)
    })) %>%
    select(-data) %>%
    unnest(cols = "date")

  # create list columns with unique instrument serial numbers
  # for each date
  all4 <- create_serial_list_cols(df, target_PouchTitle = target_PouchTitle)

  all5 <- purrr::map(all4, function(df)  {
    # joining back in so has all dates/siteids--required for subsequent
    # rolling window
    out <- df %>%
      ungroup() %>%
      right_join(dates_SiteID, by = c("date", "SiteID"))

    # if TUR for a day is NA (ie no tests that day) the call 0
    # [consider not doing this]
    out[is.na(out)] <- 0
    out
  })

  # daily TUR of rp and non; and num active devices in last 3 months
  all6 <- calc_n_active_rolling(all5, window = window)

  out <- purrr::map2(all6, names(all6), function(df, inst) {
    df$InstrumentVersion <- inst # adding back, in which instrument (including "all")
    df
  }) %>%
    bind_rows()

  out

}


# calculate pathogen count by site ----------------------------------------

#'  Calculate pathogen count by site and instrument version
#'
#'  Calculates the number of each pathogen found on a given date
#'  for each site and instrument version (also for all instrument versions
#'  combined). To reduce the size of the output, pathogens are not listed
#'  on dates they were not detected.
#'
#'
#' @param df Data frame that has been processed by \code{pre_process()}
#'   and has at least "RunDataID", "TargetName", "PouchTitle"
#'   "date", "InstrumentVersion", "SiteID" columns
#' @param target_PouchTitle Name of the PouchTitle of interest
#' @note This function is slow and can take at least several minutes to run
#'   when large dataframe is used.
#' @return   Dataframe with columns "SiteID", "date",
#'   "daily_count" (number of tests positive for that pathogen that day),
#'   "TargetName" (name of pathogen,
#'   and "InstrumentVersion" (either the name of the instrument, or 'all', for
#'   all instrument versions combined)
#' @examples
#' df <- pre_process(rp_raw)
#' calc_count_by_site_inst(df)
#' @export
#'
calc_count_by_site_inst <- function(df,
                                    target_PouchTitle = "Respiratory_Panel") {

  required_cols <- c("RunDataID", "TargetName", "PouchTitle",
                     "date", "InstrumentVersion", "SiteID")

  check_cols(df, required_cols)

  stopifnot(
    is.character(target_PouchTitle),
    length(target_PouchTitle) == 1,
    target_PouchTitle %in% df$PouchTitle
  )

  # removing duplicated (i.e assay) rows--just using using these
  # two columns to calculate duplication b/ leads to same
  # result as using all columns and is 2.5 x faster.
  df2 <- df[df$PouchTitle == target_PouchTitle &
              !duplicated(df[, c("RunDataID", "TargetName")]), ]


  # split by instrument version--so can calculate count for each version
  df4a <- split_instrument_version(df2)

  # pathogen count (daily) by site and pathogen
  count_by_site_path1 <- purrr::map(df4a, path_count_by_site)

  # combine into one df for output
  count_by_site_path2 <- purrr::map2(count_by_site_path1, names(count_by_site_path1),
                              function(x, name) {
                                df <- bind_rows(x)
                                df$InstrumentVersion <- name
                                df
                              }) %>%
    bind_rows()


  # removing NA's means site/pathogen/date combinations are missing when
  # the pathogen not detected--but makes output file much smaller

  out <- count_by_site_path2[!is.na(count_by_site_path2$daily_count), ]
  out
}
MartinHoldrege/turnr documentation built on May 16, 2020, 10:39 a.m.