R/tv.R

Defines functions check_tv_specs check_tv_exposure check_tv_data time_varying

Documented in check_tv_data check_tv_exposure check_tv_specs time_varying

#' Create a time-varying dataset
#'
#' @param x A data.frame with four columns: <id>, "feature", "datetime", "value"
#' @param specs a data.frame with four columns: "feature", "use_for_grid", "lookback_start", "lookback_end", "aggregation". See details below.
#' @param exposure a data.frame with (at least) three columns: <id>, "exposure_start", "exposure_stop"
#' @param grid.only Should just the grid be computed and returned? Useful only for debugging
#' @param time_units What time units should be used? Seconds or days
#' @param n_cores Number of cores to use. If slurm is being used, it checks the \code{SLURM_CPUS_PER_TASK} variable.
#'   Else it defaults to 1, for no parallelization.
#' @param ... Other arguments. Currently just passes \code{check_overlap}.
#' @param expected_features A vector of expected features based on the data.
#' @param expected_ids A vector of expected ids based on the data.
#' @param id The id to use. Default is "pat_id"
#' @param sort Logical, indicating whether to sort the data before performing the analysis. By default (\code{NA}),
#'   sorting is only done when useful (that is: \code{x$datetime} is a POSIXct and \code{time_units == "days"}).
#'   A warning is issued when \code{x$datetime} is a Date to make the user aware that the input ought to be sorted to
#'   get the right answer.
#' @param check_overlap Should overlap be checked among exposure rows? A potentially costly operation,
#'   so you can opt out of it if you're really sure.
#' @return A data.frame, with one row per grid value and one column per feature specification (plus grid columns).
#' @details
#'   The defaults for specs are to use everything for the grid creation, and to set \code{lookback_start=0}, with a message in both cases.
#'   Currently supported aggregation functions include counting ("count" or "n"), last-value-carried forward ("last value" or "lvcf"),
#'   any/none ("any" or "binary"), time since ("time since" or "ts"), min/max/mean, and the special "event" (for which look backs are ignored).
#'
#'   The look back window begins at \code{row_start - lookback_end} and ends at \code{row_start - lookback_start}. Passing NA to either look back
#'   changes the corresponding window boundary to \code{exposure_start}.
#' @examples
#'   data(tv_example)
#'   time_varying(tv_example$data, tv_example$specs, tv_example$exposure,
#'                time_units = "days", id = "mcn")
#' @name tv
NULL

#' @rdname tv
#' @export
time_varying <- function(x, specs, exposure, ...,
                         grid.only = FALSE,
                         time_units = c("days", "seconds"), id = "pat_id", sort = NA,
                         n_cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", 1))) {
  opts <- options(warn = 1)
  on.exit(options(opts))
  time_units <- match.arg(time_units)
  x <- check_tv_data(x, time_units = time_units, id = id, sort = sort)
  specs <- check_tv_specs(specs, unique(x$feature))
  exposure <- check_tv_exposure(exposure, expected_ids = unique(x[[id]]), time_units = time_units, id = id, ...)

  grid <- x %>%
    dplyr::filter(.data$feature %in% specs$feature[specs$use_for_grid]) %>%
    dplyr::bind_rows(dplyr::mutate(dplyr::select(exposure, dplyr::all_of(id), datetime = "exposure_start"), feature = "exposure_start")) %>%
    dplyr::distinct(.data[[id]], row_start = .data$datetime) %>%  # could use unique here, but distinct() is faster
    dplyr::left_join(x = exposure, by = id, relationship = "many-to-many") %>%
    dplyr::filter(.data$exposure_start <= .data$row_start, .data$row_start < .data$exposure_stop) %>%
    dplyr::arrange(.data[[id]], .data$row_start) %>%
    dplyr::group_by(.data[[id]]) %>%
    dplyr::mutate(
      row_stop = pmin(dplyr::lead(.data$row_start, 1), .data$exposure_stop, na.rm = TRUE)
    ) %>%
    dplyr::ungroup()

  if(grid.only) return(grid)

  FN <- if(n_cores > 1) parallel::mcMap else Map

  g2 <- grid %>%
    dplyr::mutate(
      dplyr::across(c("row_start", "row_stop", "exposure_start", "exposure_stop"), as.numeric),
      ..tv.row.. = dplyr::row_number()
    ) %>%
    (function(xxx) split(xxx, xxx[[id]]))

  revert <- order(do.call(c, lapply(g2, "[[", "..tv.row..")))

  x3 <- x %>%
    dplyr::mutate(
      datetime = as.numeric(.data$datetime)
    ) %>%
    (function(xxx) split(xxx, xxx[[id]]))

  stopifnot(
    length(g2) == length(x3),
    names(g2) == names(x3)
  )

  specs3 <- split(specs, seq_len(nrow(specs)))

  out <- FN(x3, g2, mc.cores = n_cores, f = function(xx, gg, ...) {
    gg <- as.matrix(gg[c("exposure_start", "row_start", "row_stop")])
    xfeat <- xx$feature
    yy <- as.matrix(xx[c("value", "datetime")])

    tmp <- vapply(specs3, FUN.VALUE = numeric(nrow(gg)), USE.NAMES = FALSE, function(curr_spec) {
      FUN <- switch(
        curr_spec$aggregation,
        count = tv_count,
        lvcf = tv_lvcf,
        ts = tv_ts,
        any = tv_any,
        max = tv_max,
        min = tv_min,
        mean = tv_mean,
        median = tv_median,
        sum = tv_sum,
        event = tv_count
      )
      curr_feat <- curr_spec$feature
      y <- yy[xfeat == curr_feat, , drop = FALSE]
      val <- y[, 1]
      dttm <- y[, 2]
      if(nrow(y) == 0) {
        return(rep.int(FUN(value = val, datetime = dttm, current_time = stop("Shouldn't need this")), nrow(gg)))
      }

      curr_lookback_start <- curr_spec$lookback_start
      curr_lookback_end <- curr_spec$lookback_end
      curr_aggregation <- curr_spec$aggregation
      idx.true <- rep.int(TRUE, nrow(y))
      apply(gg, 1, function(curr_grid) {
        curr_row_start <- curr_grid[2]
        idx <- idx.true
        if(curr_aggregation == "event") {
          idx <- idx & dttm > curr_row_start & dttm <= curr_grid[3]
        } else {
          if(is.na(curr_lookback_start)) {
            idx <- idx & dttm <= curr_grid[1]
          } else {
            idx <- idx & dttm <= curr_row_start - curr_lookback_start
          }

          if(is.na(curr_lookback_end)) {
            idx <- idx & dttm >= curr_grid[1]
          } else {
            idx <- idx & dttm >= curr_row_start - curr_lookback_end
          }
        }
        FUN(value = val[idx], datetime = dttm[idx], current_time = curr_row_start)
      })
    })
    tmp
  })
  out <- do.call(rbind, out)
  dimnames(out) <- list(NULL, paste0(specs$feature, "_", specs$aggregation))
  out2 <- as.data.frame(out[revert, , drop = FALSE])
  stopifnot(ncol(out2) == nrow(specs))
  out3 <- cbind(grid, out2)
  attr(out3, "coltype") <- c(rep_len("grid", ncol(grid)), dplyr::if_else(specs$aggregation == "event", "event", "feature"))
  out3
}


#' @rdname tv
#' @export
check_tv_data <- function(x, time_units, id, sort) {
  cols <- c(id, "feature", "datetime", "value")
  if(!all(cols %in% names(x))) {
    stop("The data is missing some columns: ", paste0(setdiff(cols, names(x)), collapse = ", "))
  }
  if(time_units == "days") {
    if(!lubridate::is.POSIXct(x$datetime) && !lubridate::is.Date(x$datetime)) {
      stop("x$datetime needs to be a Date or POSIXct.")
    } else if(lubridate::is.Date(x$datetime) && is.na(sort)) {
      message("x$datetime is a Date; as such, be sure that your data is sorted in descending datetime order, so that `lvcf` picks the most recent row correctly",
              " (it picks the first row it finds).\n\n",
              "To silence this message, please specify `sort=TRUE` or `sort=FALSE`. Defaulting to `sort=FALSE`.")
      sort <- FALSE
    } else if(is.na(sort)) {
      # datetime
      sort <- TRUE
    }
  } else {
    if(!lubridate::is.POSIXct(x$datetime)) stop("You specified time_units = 'seconds', but x$datetime is not a POSIXct.")
    if(is.na(sort)) sort <- FALSE
  }

  idx <- is.na(x$datetime)
  if(any(idx)) {
    warning("There are ", sum(idx), " missing times being removed from the data.")
    x <- x[!idx, ]
  }
  if(sort) {
    x <- dplyr::arrange(x, .data[[id]], .data$feature, dplyr::desc(.data$datetime))
  }
  if(time_units == "days" && lubridate::is.POSIXct(x$datetime)) {
    x$datetime <- lubridate::as_date(x$datetime)
  }

  x
}

#' @rdname tv
#' @export
check_tv_exposure <- function(x, expected_ids, time_units, id, ..., check_overlap = TRUE) {
  cols <- c(id, "exposure_start", "exposure_stop")
  if(!all(cols %in% names(x))) {
    stop("'exposure' is missing some columns: ", paste0(setdiff(cols, names(x)), collapse = ", "))
  }
  if(!all(expected_ids %in% x[[id]])) {
    stop("Some ids are in the data but not 'exposure'")
  }
  if(!all(x[[id]] %in% expected_ids)) {
    stop("Some exposure ids are not in the data")
  }
  if(time_units == "days") {
    if(!lubridate::is.Date(x$exposure_start) || !lubridate::is.Date(x$exposure_stop)) stop("You specified time_units = 'days', but exposure start or stop are not a Date")
  } else {
    if(!lubridate::is.POSIXct(x$exposure_start) || !lubridate::is.POSIXct(x$exposure_stop)) stop("You specified time_units = 'seconds', but exposure start or stop is not a POSIXct.")
  }

  idx <- is.na(x$exposure_start) | is.na(x$exposure_stop)
  if(any(idx)) {
    warning("There are ", sum(idx), " missing times being removed from the exposures.")
    x <- x[!idx, ]
  }

  if(any(x$exposure_start >= x$exposure_stop)) {
    stop("There are zero- or negative-length exposures.")
  }


  if(check_overlap) {
    y <- dplyr::mutate(x, .exposure.row = dplyr::row_number())
    z <- dplyr::inner_join(y, y, by = id, relationship = "many-to-many") %>%
      dplyr::filter(
        .data$.exposure.row.x < .data$.exposure.row.y,
        .data$exposure_start.y < .data$exposure_stop.x,
        .data$exposure_stop.y > .data$exposure_start.x
      )
    if(nrow(z) > 0) {
      print(z)
      stop("There are overlaps in the exposure times. `time_varying()` will not return what you expect. ",
           "If you are really sure you want to proceed, use the `check_overlap=FALSE` option.")
    }
  }

  x
}



#' @rdname tv
#' @export
check_tv_specs <- function(specs, expected_features = NULL) {
  `%nin%` <- Negate(`%in%`)
  if(any(specs$feature %nin% expected_features)) {
    warning("Some features in 'specs' do not appear in the data:\n", paste0(setdiff(specs$feature, expected_features), collapse = ", "))
  }
  if(any(expected_features %nin% specs$feature)) {
    stop("Some features in the data do not appear in 'specs':\n", paste0(setdiff(expected_features, specs$feature), collapse = ", "))
  }

  if("lookback_end" %nin% names(specs)) stop("Please specify a 'lookback_end' column in 'specs'")
  if("lookback_start" %nin% names(specs)) {
    message("Setting 'lookback_start' to 0")
    specs$lookback_start <- 0
  }
  if(any(specs$lookback_end < specs$lookback_start, na.rm = TRUE)) stop("Some lookback_end's are smaller than lookback_start's")
  if("aggregation" %nin% names(specs)) stop("Please specify an aggregation column in 'specs'")
  specs_map <- c(
    count = "count",
    n = "count",
    `last value` = "lvcf",
    lvcf = "lvcf",
    `time since` = "ts",
    ts = "ts",
    any = "any",
    binary = "any",
    min = "min",
    max = "max",
    mean = "mean",
    median = "median",
    sum = "sum",
    event = "event"
  )

  bad <- setdiff(specs$aggregation, names(specs_map))
  if(length(bad)) stop("Some aggregations you supplied aren't supported: ", paste0(bad, collapse = ", "))
  specs$aggregation <- specs_map[specs$aggregation]

  if("use_for_grid" %nin% names(specs)) {
    message("Using all features to construct the grid...\n")
    specs$use_for_grid <- TRUE
  } else if(!is.logical(specs$use_for_grid)) stop("specs$use_for_grid must be a logical")

  specs
}

Try the tv package in your browser

Any scripts or data that you put into this service are public.

tv documentation built on May 29, 2024, 5:30 a.m.