R/methods-epi_archive.R

Defines functions dplyr_col_modify.col_modify_recorder_df destructure_col_modify_recorder_df new_col_modify_recorder_df epix_truncate_versions_after.epi_archive epix_truncate_versions_after epix_slide_versions_default epix_slide.epi_archive epix_slide epix_detailed_restricted_mutate epix_merge epix_fill_through_version epix_as_of

Documented in epix_as_of epix_fill_through_version epix_merge epix_slide epix_slide.epi_archive epix_truncate_versions_after epix_truncate_versions_after.epi_archive

#' Generate a snapshot from an `epi_archive` object
#'
#' Generates a snapshot in `epi_df` format from an `epi_archive` object, as of a
#' given version. See the [archive
#' vignette](https://cmu-delphi.github.io/epiprocess/articles/archive.html) for
#' examples.
#'
#' @param x An `epi_archive` object
#' @param version Time value specifying the max version to permit in the
#'   snapshot. That is, the snapshot will comprise the unique rows of the
#'   current archive data that represent the most up-to-date signal values, as
#'   of the specified `version` (and whose time values are at least
#'   `min_time_value`.)
#' @param min_time_value Time value specifying the min time value to permit in
#'   the snapshot. Default is `-Inf`, which effectively means that there is no
#'   minimum considered.
#' @param all_versions If `all_versions = TRUE`, then the output will be in
#'   `epi_archive` format, and contain rows in the specified `time_value` range
#'   having `version <= version`. The resulting object will cover a
#'   potentially narrower `version` and `time_value` range than `x`, depending
#'   on user-provided arguments. Otherwise, there will be one row in the output
#'   for the `version` of each `time_value`. Default is `FALSE`.
#' @param max_version `r lifecycle::badge("deprecated")` please use `version`
#'   argument instead.
#' @return An `epi_df` object.
#'
#' @examples
#' epix_as_of(
#'   archive_cases_dv_subset,
#'   version = max(archive_cases_dv_subset$DT$version)
#' )
#'
#' range(archive_cases_dv_subset$DT$version) # 2020-06-02 -- 2021-12-01
#'
#' epix_as_of(archive_cases_dv_subset, as.Date("2020-06-12"))
#'
#' # --- Advanced: ---
#'
#' # When requesting recent versions of a data set, there can be some
#' # reproducibility issues. For example, requesting data as of the current date
#' # may return different values based on whether today's data is available yet
#' # or not. Other factors include the time it takes between data becoming
#' # available and when you download the data, and whether the data provider
#' # will overwrite ("clobber") version data rather than just publishing new
#' # versions. You can include information about these factors by setting the
#' # `clobberable_versions_start` and `versions_end` of an `epi_archive`, in
#' # which case you will get warnings about potential reproducibility issues:
#'
#' archive_cases_dv_subset2 <- as_epi_archive(
#'   archive_cases_dv_subset$DT,
#'   # Suppose last version with an update could potentially be rewritten
#'   # (a.k.a. "hotfixed", "clobbered", etc.):
#'   clobberable_versions_start = max(archive_cases_dv_subset$DT$version),
#'   # Suppose today is the following day, and there are no updates out yet:
#'   versions_end = max(archive_cases_dv_subset$DT$version) + 1L,
#'   compactify = TRUE
#' )
#'
#' epix_as_of(archive_cases_dv_subset2, max(archive_cases_dv_subset$DT$version))
#'
#' @importFrom data.table between key
#' @export
epix_as_of <- function(x, version, min_time_value = -Inf, all_versions = FALSE,
                       max_version = deprecated()) {
  assert_class(x, "epi_archive")

  if (lifecycle::is_present(max_version)) {
    lifecycle::deprecate_warn("0.8.1", "epix_as_of(max_version =)", "epix_as_of(version =)")
    version <- max_version
  }

  other_keys <- setdiff(
    key(x$DT),
    c("geo_value", "time_value", "version")
  )

  # Check a few things on version
  if (!identical(class(version), class(x$DT$version))) {
    cli_abort(
      "`version` must have the same `class` vector as `epi_archive$DT$version`."
    )
  }
  if (!identical(typeof(version), typeof(x$DT$version))) {
    cli_abort(
      "`version` must have the same `typeof` as `epi_archive$DT$version`."
    )
  }
  assert_scalar(version, na.ok = FALSE)
  if (version > x$versions_end) {
    cli_abort("`version` must be at most `epi_archive$versions_end`.")
  }
  assert_logical(all_versions, len = 1)
  if (!is.na(x$clobberable_versions_start) && version >= x$clobberable_versions_start) {
    cli_warn(
      'Getting data as of some recent version which could still be
      overwritten (under routine circumstances) without assigning a new
      version number (a.k.a. "clobbered").  Thus, the snapshot that we
      produce here should not be expected to be reproducible later. See
      `?epi_archive` for more info and `?epix_as_of` on how to muffle.',
      class = "epiprocess__snapshot_as_of_clobberable_version"
    )
  }

  # We can't disable nonstandard evaluation nor use the `..` feature in the `i`
  # argument of `[.data.table` below; try to avoid problematic names and abort
  # if we fail to do so:
  .min_time_value <- min_time_value
  .version <- version
  if (any(c(".min_time_value", ".version") %in% names(x$DT))) {
    cli_abort("epi_archives can't contain a `.min_time_value` or `.version` column")
  }

  # Filter by version and return
  if (all_versions) {
    # epi_archive is copied into result, so we can modify result directly
    result <- epix_truncate_versions_after(x, version)
    result$DT <- result$DT[time_value >= .min_time_value, ] # nolint: object_usage_linter
    return(result)
  }

  # Make sure to use data.table ways of filtering and selecting
  as_of_epi_df <- x$DT[time_value >= .min_time_value & version <= .version, ] %>% # nolint: object_usage_linter
    unique(
      by = c("geo_value", "time_value", other_keys),
      fromLast = TRUE
    ) %>%
    tibble::as_tibble() %>%
    dplyr::select(-"version") %>%
    as_epi_df(
      as_of = version,
      other_keys = other_keys
    )

  return(as_of_epi_df)
}


#' Fill `epi_archive` unobserved history
#'
#' @description
#' This function fills in missing version history in an `epi_archive` object up
#' to a specified version, updating the `versions_end` field as necessary. Note
#' that the filling is done in a compactified way, see details.
#'
#' @param x An `epi_archive`
#' @param fill_versions_end a scalar of the same class&type as `x$version`: the
#'   version through which to fill in missing version history; the
#'   `epi_archive`'s `versions_end` attribute will be set to this, unless it
#'   already had a later `$versions_end`.
#' @param how Optional; `"na"` or `"locf"`: `"na"` fills missing version history
#'   with `NA`s, `"locf"` fills missing version history with the last version of
#'   each observation carried forward (LOCF). Default is `"na"`.
#' @return An `epi_archive`
#' @details
#' Note that we generally store `epi_archive`'s in a compacted form, meaning
#' that, implciitly, if a version does not exist, but the `version_end`
#' attribute is greater, then it is understood that all the versions in between
#' had the same value as the last observed version. This affects the behavior of
#' this function in the following ways:
#'
#' - if `how = "na"`, then the function will fill in at most one missing version
#'   with `NA` and the rest will be implicit.
#' - if `how = "locf"`, then the function will not fill any values.
#'
#' @importFrom data.table copy ":="
#' @importFrom rlang arg_match
#' @return An `epi_archive`
#' @export
#' @examples
#' test_date <- as.Date("2020-01-01")
#' ea_orig <- as_epi_archive(data.table::data.table(
#'   geo_value = "ak",
#'   time_value = test_date + c(rep(0L, 5L), 1L),
#'   version = test_date + c(1:5, 2L),
#'   value = 1:6
#' ))
#' epix_fill_through_version(ea_orig, test_date + 8, "na")
#' epix_fill_through_version(ea_orig, test_date + 8, "locf")
epix_fill_through_version <- function(x, fill_versions_end, how = c("na", "locf")) {
  assert_class(x, "epi_archive")

  validate_version_bound(fill_versions_end, x$DT, na_ok = FALSE)
  how <- arg_match(how)
  if (x$versions_end < fill_versions_end) {
    new_dt <- switch(how,
      "na" = {
        # old DT + a version consisting of all NA observations
        # immediately after the last currently/actually-observed
        # version. Note that this NA-observation version must only be
        # added if `epi_archive` is outdated.
        nonversion_key_cols <- setdiff(key(x$DT), "version")
        nonkey_cols <- setdiff(names(x$DT), key(x$DT))
        next_version_tag <- next_after(x$versions_end)
        if (next_version_tag > fill_versions_end) {
          cli_abort(paste(
            "Apparent problem with {.code next_after} method:",
            "archive contained observations through version {x$versions_end}",
            "and the next possible version was supposed to be {next_version_tag},",
            "but this appeared to jump from a version < {fill_versions_end}",
            "to one > {fill_versions_end}, implying at least one version in between."
          ))
        }
        nonversion_key_vals_ever_recorded <- unique(x$DT, by = nonversion_key_cols)
        # In edge cases, the `unique` result can alias the original
        # DT; detect and copy if necessary:
        if (identical(address(x$DT), address(nonversion_key_vals_ever_recorded))) {
          nonversion_key_vals_ever_recorded <- data.table::copy(nonversion_key_vals_ever_recorded)
        }
        next_version_dt <- nonversion_key_vals_ever_recorded[
          , version := next_version_tag
        ][
          # this makes the class of these columns logical (`NA` is a
          # logical NA; we're relying on the rbind below to convert to
          # the proper class&typeof)
          , (nonkey_cols) := NA
        ]
        # full result DT:
        setkeyv(rbind(x$DT, next_version_dt), key(x$DT))[]
      },
      "locf" = {
        # just the old DT; LOCF is built into other methods:
        x$DT
      }
    )
    new_versions_end <- fill_versions_end
    # Update `epi_archive` all at once with simple, error-free operations +
    # return below:
    x$DT <- new_dt
    x$versions_end <- new_versions_end
  } else {
    # Already sufficiently up to date; nothing to do.
  }
  return(x)
}


#' Merge two `epi_archive` objects
#'
#' Merges two `epi_archive`s that share a common `geo_value`, `time_value`, and
#' set of key columns. When they also share a common `versions_end`, using
#' `epix_as_of` on the result should be the same as using `epix_as_of` on `x`
#' and `y` individually, then performing a full join of the `DT`s on the
#' non-version key columns (potentially consolidating multiple warnings about
#' clobberable versions). If the `versions_end` values differ, the `sync`
#' parameter controls what is done.
#'
#' @param x,y Two `epi_archive` objects to join together.
#' @param sync Optional; character. The argument that decides how to handle the
#'   situation when one signal has a more recent revision than another signal
#'   for a key that they have both already observed. The options are:
#'
#'   - `"forbid"`: the default and the strictest option, throws an error; this
#'   is likely not what you want, but it is strict to make the user aware of the
#'   issues,
#'   - `"locf"`: carry forward the last observed version of the missing signal
#'   to the new version and use `max(x$versions_end, y$versions_end)` as the
#'   result's `versions_end`,
#'   - `"na"`: fill the unobserved values with `NA`'s (this can be handy when
#'   you know that source data is truly missing upstream and you want to
#'   represent the lack of information accurately, for instance) and use
#'   `max(x$versions_end, y$versions_end)` as the result's `versions_end`,
#'   - `"truncate"`: discard any rows containing update rows for later versions
#'   and use `min(x$versions_end, y$versions_end)` as the result's
#'   `versions_end`.
#'
#' @param compactify Optional; `TRUE` (default), `FALSE`, or `NULL`; should the
#'   result be compactified? See `as_epi_archive()` for details.
#' @details
#' When merging archives, unless the archives have identical data release
#' patterns, we often have to handle the situation when one signal has a more
#' recent observation for a key than another signal. In this case, we have two
#' options:
#'
#' - if the the other signal has never observed that key, we need to introduce
#' `NA`s in the non-key variables for the missing signal,
#' - if the other signal has observed that key previously, but at an ealier
#' revision date, then we need to decide how to handle the missing value in the
#' more recent signal; the `sync` argument controls this behavior.
#'
#' @return the resulting `epi_archive`
#'
#' @details In all cases, `clobberable_versions_start` will be set to the
#'   earliest version that could be clobbered in either input archive.
#'
#' @examples
#' # Example 1
#' # The s1 signal at August 1st gets revised from 10 to 11 on August 2nd
#' s1 <- tibble::tibble(
#'   geo_value = c("ca", "ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-01", "2024-08-02")),
#'   version = as.Date(c("2024-08-01", "2024-08-02", "2024-08-02")),
#'   signal1 = c(10, 11, 7)
#' )
#' s2 <- tibble::tibble(
#'   geo_value = c("ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-02")),
#'   version = as.Date(c("2024-08-03", "2024-08-03")),
#'   signal2 = c(2, 3)
#' )
#' s1 <- s1 %>% as_epi_archive()
#' s2 <- s2 %>% as_epi_archive()
#' merged <- epix_merge(s1, s2, sync = "locf")
#' merged[["DT"]]
#'
#' # Example 2
#' # The s1 signal at August 1st gets revised from 12 to 13 on August 3rd
#' s1 <- tibble::tibble(
#'   geo_value = c("ca", "ca", "ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-01", "2024-08-02", "2024-08-03")),
#'   version = as.Date(c("2024-08-01", "2024-08-03", "2024-08-03", "2024-08-03")),
#'   signal1 = c(12, 13, 22, 19)
#' )
#' s2 <- tibble::tibble(
#'   geo_value = c("ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-02")),
#'   version = as.Date(c("2024-08-02", "2024-08-02")),
#'   signal2 = c(4, 5),
#' )
#' s1 <- s1 %>% as_epi_archive()
#' s2 <- s2 %>% as_epi_archive()
#' merged <- epix_merge(s1, s2, sync = "locf")
#' merged[["DT"]]
#'
#'
#' # Example 3:
#' s1 <- tibble::tibble(
#'   geo_value = c("ca", "ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-02", "2024-08-03")),
#'   version = as.Date(c("2024-08-01", "2024-08-02", "2024-08-03")),
#'   signal1 = c(14, 11, 9)
#' )
#' # The s2 signal at August 1st gets revised from 3 to 5 on August 3rd
#' s2 <- tibble::tibble(
#'   geo_value = c("ca", "ca", "ca"),
#'   time_value = as.Date(c("2024-08-01", "2024-08-01", "2024-08-02")),
#'   version = as.Date(c("2024-08-02", "2024-08-03", "2024-08-03")),
#'   signal2 = c(3, 5, 2),
#' )
#' s1 <- s1 %>% as_epi_archive()
#' s2 <- s2 %>% as_epi_archive()
#' merged <- epix_merge(s1, s2, sync = "locf")
#' merged[["DT"]]
#' @importFrom data.table key set setkeyv
#' @export
epix_merge <- function(x, y,
                       sync = c("forbid", "na", "locf", "truncate"),
                       compactify = TRUE) {
  assert_class(x, "epi_archive")
  assert_class(y, "epi_archive")
  sync <- rlang::arg_match(sync)

  if (!identical(x$geo_type, y$geo_type)) {
    cli_abort("`x` and `y` must have the same `$geo_type`")
  }

  if (!identical(x$time_type, y$time_type)) {
    cli_abort("`x` and `y` must share data type on their `time_value` column.")
  }

  result_clobberable_versions_start <-
    if (all(is.na(c(x$clobberable_versions_start, y$clobberable_versions_start)))) {
      NA # (any type of NA is fine here)
    } else {
      min(c(x$clobberable_versions_start, y$clobberable_versions_start), na.rm = TRUE)
    }

  if (sync == "forbid") {
    if (!identical(x$versions_end, y$versions_end)) {
      cli_abort(paste(
        "`x` and `y` were not equally up to date version-wise:",
        "`x$versions_end` was not identical to `y$versions_end`;",
        "either ensure that `x` and `y` are equally up to date before merging,",
        "or specify how to deal with this using `sync`"
      ), class = "epiprocess__epix_merge_unresolved_sync")
    } else {
      new_versions_end <- x$versions_end
      x_dt <- x$DT
      y_dt <- y$DT
    }
  } else if (sync %in% c("na", "locf")) {
    new_versions_end <- max(c(x$versions_end, y$versions_end))
    x_dt <- epix_fill_through_version(x, new_versions_end, sync)$DT
    y_dt <- epix_fill_through_version(y, new_versions_end, sync)$DT
  } else if (sync == "truncate") {
    new_versions_end <- min(c(x$versions_end, y$versions_end))
    x_dt <- x$DT[x[["DT"]][["version"]] <= new_versions_end, names(x$DT), with = FALSE]
    y_dt <- y$DT[y[["DT"]][["version"]] <= new_versions_end, names(y$DT), with = FALSE]
  } else {
    cli_abort("unimplemented")
  }

  # key(x_dt) should be the same as key(x$DT) and key(y_dt) should be the same
  # as key(y$DT). Below, we only use {x,y}_DT in the code (making it easier to
  # split the code into separate functions if we wish), but still refer to
  # {x,y}$DT in the error messages (further relying on this assumption).
  #
  # Check&ensure that the above assumption; if it didn't already hold, we likely
  # have a bug in the preprocessing, a weird/invalid archive as input, and/or a
  # data.table version with different semantics (which may break other parts of
  # our code).
  x_dt_key_as_expected <- identical(key(x$DT), key(x_dt))
  y_dt_key_as_expected <- identical(key(y$DT), key(y_dt))
  if (!x_dt_key_as_expected || !y_dt_key_as_expected) {
    cli_warn("
      `epiprocess` internal warning (please report): pre-processing for
      epix_merge unexpectedly resulted in an intermediate data table (or
      tables) with a different key than the corresponding input archive.
      Manually setting intermediate data table keys to the expected values.
    ", internal = TRUE)
    setkeyv(x_dt, key(x$DT))
    setkeyv(y_dt, key(y$DT))
  }
  # Without some sort of annotations of what various columns represent, we can't
  # do something that makes sense when merging archives with mismatched keys.
  # E.g., even if we assume extra keys represent demographic breakdowns, a
  # sensible default treatment of count-type and rate-type value columns would
  # differ.
  if (!identical(sort(key(x_dt)), sort(key(y_dt)))) {
    cli_abort("
            The archives must have the same set of key column names; if the
            key columns represent the same things, just with different
            names, please retry after manually renaming to match; if they
            represent different things (e.g., x has an age breakdown
            but y does not), please retry after processing them to share
            the same key (e.g., by summarizing x to remove the age breakdown,
            or by applying a static age breakdown to y).
          ", class = "epiprocess__epix_merge_x_y_must_have_same_key_set")
  }
  # `by` cols = result (and each input's) `key` cols, and determine
  # the row set, determined using a full join via `merge`
  #
  # non-`by` cols = "value"-ish cols, and are looked up with last
  #                 version carried forward via rolling joins
  by <- key(x_dt) # = some perm of key(y_dt)
  if (!all(c("geo_value", "time_value", "version") %in% key(x_dt))) {
    cli_abort('Invalid `by`; `by` is currently set to the common `key` of
           the two archives, and is expected to contain
           "geo_value", "time_value", and "version".',
      class = "epiprocess__epi_archive_must_have_required_key_cols"
    )
  }
  if (length(by) < 1L || utils::tail(by, 1L) != "version") {
    cli_abort('Invalid `by`; `by` is currently set to the common `key` of
           the two archives, and is expected to have a "version" as
           the last key col.',
      class = "epiprocess__epi_archive_must_have_version_at_end_of_key"
    )
  }
  x_nonby_colnames <- setdiff(names(x_dt), by)
  y_nonby_colnames <- setdiff(names(y_dt), by)
  if (length(intersect(x_nonby_colnames, y_nonby_colnames)) != 0L) {
    cli_abort("
            `x` and `y` DTs have overlapping non-by column names;
            this is currently not supported; please manually fix up first:
            any overlapping columns that can are key-like should be
            incorporated into the key, and other columns should be renamed.
          ", class = "epiprocess__epix_merge_x_y_must_not_have_overlapping_nonby_colnames")
  }
  x_by_vals <- x_dt[, by, with = FALSE]
  if (anyDuplicated(x_by_vals) != 0L) {
    cli_abort("
            The `by` columns must uniquely determine rows of `x$DT`;
            the `by` is currently set to the common `key` of the two
            archives, so this can be resolved by adding key-like columns
            to `x`'s key (to get a unique key).
          ", class = "epiprocess__epix_merge_by_cols_must_act_as_unique_key")
  }
  y_by_vals <- y_dt[, by, with = FALSE]
  if (anyDuplicated(y_by_vals) != 0L) {
    cli_abort("
            The `by` columns must uniquely determine rows of `y$DT`;
            the `by` is currently set to the common `key` of the two
            archives, so this can be resolved by adding key-like columns
            to `y`'s key (to get a unique key).
          ", class = "epiprocess__epix_merge_by_cols_must_act_as_unique_key")
  }
  result_dt <- merge(x_by_vals, y_by_vals,
    by = by,
    # We must have `all=TRUE` or we may skip updates
    # from x and/or y and corrupt the history
    all = TRUE,
    # We don't want Cartesian products, but the
    # by-is-unique-key check above already ensures
    # this. (Note that `allow.cartesian=FALSE` doesn't
    # actually catch all Cartesian products anyway.)
    # Disable superfluous check:
    allow.cartesian = TRUE
  )
  set(
    result_dt, , x_nonby_colnames,
    x_dt[result_dt[, by, with = FALSE], x_nonby_colnames,
      with = FALSE,
      # It's good practice to specify `on`, and we must
      # explicitly specify `on` if there's a potential key vs.
      # by order mismatch (not possible currently for x
      # with by = key(x$DT), but possible for y):
      on = by,
      # last version carried forward:
      roll = TRUE,
      # requesting non-version key that doesn't exist in the other archive,
      # or before its first version, should result in NA
      nomatch = NA,
      # see note on `allow.cartesian` above; currently have a
      # similar story here.
      allow.cartesian = TRUE
    ]
  )
  set(
    result_dt, , y_nonby_colnames,
    y_dt[result_dt[, by, with = FALSE], y_nonby_colnames,
      with = FALSE,
      on = by,
      roll = TRUE,
      nomatch = NA,
      allow.cartesian = TRUE
    ]
  )
  # The key could be unset in case of a key vs. by order mismatch as
  # noted above. Ensure that we keep it:
  setkeyv(result_dt, by)

  return(as_epi_archive(
    result_dt[], # clear data.table internal invisibility flag if set
    other_keys = setdiff(key(result_dt), c("geo_value", "time_value", "version")),
    # It'd probably be better to pre-compactify before the merge, and might be
    # guaranteed not to be necessary to compactify the merge result if the
    # inputs are already compactified, but at time of writing we don't have
    # compactify in its own method or field, and it seems like it should be
    # pretty fast anyway.
    compactify = compactify,
    clobberable_versions_start = result_clobberable_versions_start,
    versions_end = new_versions_end
  ))
}


#' A more detailed but restricted `mutate` for use in `group_by.epi_archive`
#'
#' More detailed: provides the names of the "requested" columns in addition to
#' the output expected from a regular `mutate` method.
#'
#' Restricted: doesn't allow replacing or removing key cols, where a sort is
#' potentially required at best and what the output key should be is unclear at
#' worst. (The originally expected restriction was that the `mutate` parameters
#' not present in `group_by` would not be recognized, but the current
#' implementation just lets `mutate` handle these even anyway, even if they're
#' not part of the regular `group_by` parameters; these arguments would have to
#' be passed by names with dot prefixes, so just hope that the user means to use
#' them here if provided.)
#'
#' This can introduce column-level aliasing in `data.table`s, which isn't really
#' intended in the `data.table` user model but we can make it part of our user
#' model (see
#' https://stackoverflow.com/questions/45925482/make-a-shallow-copy-in-data-table
#' and links).
#'
#' Don't export this without cleaning up language of "mutate" as in side effects
#' vs. "mutate" as in `dplyr::mutate`.
#' @noRd
epix_detailed_restricted_mutate <- function(.data, ...) {
  # We don't want to directly use `dplyr::mutate` on the `$DT`, as:
  # - `mutate` behavior, including the output class, changes depending on
  #   whether `dtplyr` < 1.3.0 is loaded and would require post-processing
  # - behavior with `dtplyr` isn't fully compatible
  # - it doesn't give the desired details, and `rlang::exprs_auto_name` does not
  #   appropriately handle the `= NULL` and `= <data.frame>` tidyeval cases
  # Instead:
  # - Use `as.list` to get a shallow copy (undocumented, but apparently
  #   intended, behavior), then `as_tibble` (also shallow, given a list) to get
  #   back to something that will use `dplyr`'s included `mutate` method(s),
  #   then convert this using shallow operations into a `data.table`.
  # - Use `col_modify_recorder_df` to get the desired details.
  in_tbl <- tibble::as_tibble(as.list(.data$DT), .name_repair = "minimal")
  col_modify_cols <-
    destructure_col_modify_recorder_df(
      mutate(new_col_modify_recorder_df(in_tbl), ...)
    )[["cols"]]
  invalidated_key_col_is <-
    which(purrr::map_lgl(key(.data$DT), function(key_colname) {
      key_colname %in% names(col_modify_cols) &&
        !rlang::is_reference(in_tbl[[key_colname]], col_modify_cols[[key_colname]])
    }))
  if (length(invalidated_key_col_is) != 0L) {
    rlang::abort(paste_lines(c(
      "Key columns must not be replaced or removed.",
      wrap_varnames(key(.data$DT)[invalidated_key_col_is],
        initial = "Flagged key cols: "
      )
    )))
  } else {
    # Have `dplyr` do the `dplyr_col_modify`, keeping the column-level-aliasing
    # and must-copy-on-write-if-refcount-more-than-1 model, obtaining a tibble,
    # then convert it into a `data.table`. The key should still be valid
    # (assuming that the user did not explicitly alter `key(.data$DT)` or the
    # columns by reference somehow within `...` tidyeval-style computations, or
    # trigger refcount-1 alterations due to still having >1 refcounts on the
    # columns), set the "sorted" attribute accordingly to prevent attempted
    # sorting (including potential extra copies) or sortedness checking, then
    # `setDT` (rather than `as.data.table`, in order to prevent column copying
    # to establish ownership according to `data.table`'s memory model).
    out_dt <- dplyr::dplyr_col_modify(in_tbl, col_modify_cols) %>%
      data.table::setattr("sorted", data.table::key(.data$DT)) %>%
      data.table::setDT(key = key(.data$DT))
    out_archive <- .data
    out_archive$DT <- out_dt
    request_names <- names(col_modify_cols)
    return(list(
      archive = out_archive,
      request_names = request_names
    ))
    # (We might also consider special-casing when `mutate` hands back something
    # equivalent (in some sense) to the input (probably only encountered when
    # we're dealing with `group_by`), and using just `$DT`, not a shallow copy,
    # in the result, primarily in order to hedge against `as.list` or `setDT`
    # changing their behavior and generating deep copies somehow. This could
    # also prevent storage, and perhaps also generation, of shallow copies, but
    # this seems unlikely to be a major gain unless it helps enable some
    # in-place modifications of refcount-1 columns (although detecting this case
    # seems to be common across `group_by` implementations; maybe there is
    # something there).)
  }
}


#' Slide a function over variables in an `epi_archive` or `grouped_epi_archive`
#'
#' Slides a given function over variables in an `epi_archive` object. This
#' behaves similarly to `epi_slide()`, with the key exception that it is
#' version-aware: the sliding computation at any given reference time t is
#' performed on **data that would have been available as of t**. This function
#' is intended for use in accurate backtesting of models; see
#' `vignette("backtesting", package="epipredict")` for a walkthrough.
#'
#' @param .x An [`epi_archive`] or [`grouped_epi_archive`] object. If ungrouped,
#'   all data in `x` will be treated as part of a single data group.
#' @param .f Function, formula, or missing; together with `...` specifies the
#'   computation to slide. To "slide" means to apply a computation over a
#'   sliding (a.k.a. "rolling") time window for each data group. The window is
#'   determined by the `.before` parameter (see details for more). If a
#'   function, `.f` must have the form `function(x, g, t, ...)`, where
#'
#'   - "x" is an epi_df with the same column names as the archive's `DT`, minus
#'     the `version` column
#'   - "g" is a one-row tibble containing the values of the grouping variables
#'   for the associated group
#'   - "t" is the ref_time_value for the current window
#'   - "..." are additional arguments
#'
#'   If a formula, `.f` can operate directly on columns accessed via `.x$var` or
#'   `.$var`, as in `~ mean (.x$var)` to compute a mean of a column `var` for
#'   each group-`ref_time_value` combination. The group key can be accessed via
#'   `.y` or `.group_key`, and the reference time value can be accessed via `.z`
#'   or `.ref_time_value`. If `.f` is missing, then `...` will specify the
#'   computation.
#' @param ... Additional arguments to pass to the function or formula specified
#'   via `f`. Alternatively, if `.f` is missing, then the `...` is interpreted
#'   as a ["data-masking"][rlang::args_data_masking] expression or expressions
#'   for tidy evaluation; in addition to referring columns directly by name, the
#'   expressions have access to `.data` and `.env` pronouns as in `dplyr` verbs,
#'   and can also refer to `.x` (not the same as the input epi_archive),
#'   `.group_key`, and `.ref_time_value`. See details for more.
#' @param .before How many time values before the `.ref_time_value`
#'   should each snapshot handed to the function `.f` contain? If provided, it
#'   should be a single value that is compatible with the time_type of the
#'   time_value column (more below), but most commonly an integer. This window
#'   endpoint is inclusive. For example, if `.before = 7`, `time_type`
#'   in the archive is "day", and the `.ref_time_value` is January 8, then the
#'   smallest time_value in the snapshot will be January 1. If missing, then the
#'   default is no limit on the time values, so the full snapshot is given.
#' @param .versions Reference time values / versions for sliding
#'   computations; each element of this vector serves both as the anchor point
#'   for the `time_value` window for the computation and the `max_version`
#'   `epix_as_of` which we fetch data in this window. If missing, then this will
#'   set to a regularly-spaced sequence of values set to cover the range of
#'   `version`s in the `DT` plus the `versions_end`; the spacing of values will
#'   be guessed (using the GCD of the skips between values).
#' @param .new_col_name Either `NULL` or a string indicating the name of the new
#'   column that will contain the derived values. The default, `NULL`, will use
#'   the name "slide_value" unless your slide computations output data frames,
#'   in which case they will be unpacked into the constituent columns and those
#'   names used. If the resulting column name(s) overlap with the column names
#'   used for labeling the computations, which are `group_vars(x)` and
#'   `"version"`, then the values for these columns must be identical to the
#'   labels we assign.
#' @param .all_versions (Not the same as `.all_rows` parameter of `epi_slide`.)
#'   If `.all_versions = TRUE`, then the slide computation will be passed the
#'   version history (all `version <= .version` where `.version` is one of the
#'   requested `.versions`) for rows having a `time_value` of at least `.version
#'   - before`. Otherwise, the slide computation will be passed only the most
#'   recent `version` for every unique `time_value`. Default is `FALSE`.
#' @return A tibble whose columns are: the grouping variables, `time_value`,
#'   containing the reference time values for the slide computation, and a
#'   column named according to the `.new_col_name` argument, containing the slide
#'   values.
#'
#' @details A few key distinctions between the current function and `epi_slide()`:
#'   1. In `.f` functions for `epix_slide`, one should not assume that the input
#'   data to contain any rows with `time_value` matching the computation's
#'   `.ref_time_value` (accessible via `attributes(<data>)$metadata$as_of`); for
#'   typical epidemiological surveillance data, observations pertaining to a
#'   particular time period (`time_value`) are first reported `as_of` some
#'   instant after that time period has ended.
#'   2. The input class and columns are similar but different: `epix_slide`
#'   (with the default `.all_versions=FALSE`) keeps all columns and the
#'   `epi_df`-ness of the first argument to each computation; `epi_slide` only
#'   provides the grouping variables in the second input, and will convert the
#'   first input into a regular tibble if the grouping variables include the
#'   essential `geo_value` column. (With .all_versions=TRUE`, `epix_slide` will
#'   will provide an `epi_archive` rather than an `epi-df` to each
#'   computation.)
#'   3. The output class and columns are similar but different: `epix_slide()`
#'   returns a tibble containing only the grouping variables, `time_value`, and
#'   the new column(s) from the slide computations, whereas `epi_slide()`
#'   returns an `epi_df` with all original variables plus the new columns from
#'   the slide computations. (Both will mirror the grouping or ungroupedness of
#'   their input, with one exception: `epi_archive`s can have trivial
#'   (zero-variable) groupings, but these will be dropped in `epix_slide`
#'   results as they are not supported by tibbles.)
#'   4. There are no size stability checks or element/row recycling to maintain
#'   size stability in `epix_slide`, unlike in `epi_slide`. (`epix_slide` is
#'   roughly analogous to [`dplyr::group_modify`], while `epi_slide` is roughly
#'   analogous to `dplyr::mutate` followed by `dplyr::arrange`) This is detailed
#'   in the "advanced" vignette.
#'   5. `.all_rows` is not supported in `epix_slide`; since the slide
#'   computations are allowed more flexibility in their outputs than in
#'   `epi_slide`, we can't guess a good representation for missing computations
#'   for excluded group-`.ref_time_value` pairs.
#'   76. The `.versions` default for `epix_slide` is based on making an
#'   evenly-spaced sequence out of the `version`s in the `DT` plus the
#'   `versions_end`, rather than the `time_value`s.
#'
#' Apart from the above distinctions, the interfaces between `epix_slide()` and
#' `epi_slide()` are the same.
#'
#' Furthermore, the current function can be considerably slower than
#'   `epi_slide()`, for two reasons: (1) it must repeatedly fetch
#'   properly-versioned snapshots from the data archive (via `epix_as_of()`),
#'   and (2) it performs a "manual" sliding of sorts, and does not benefit from
#'   the highly efficient `slider` package. For this reason, it should never be
#'   used in place of `epi_slide()`, and only used when version-aware sliding is
#'   necessary (as it its purpose).
#'
#' @examples
#' library(dplyr)
#'
#' # Reference time points for which we want to compute slide values:
#' versions <- seq(as.Date("2020-06-02"),
#'   as.Date("2020-06-15"),
#'   by = "1 day"
#' )
#'
#' # A simple (but not very useful) example (see the archive vignette for a more
#' # realistic one):
#' archive_cases_dv_subset %>%
#'   group_by(geo_value) %>%
#'   epix_slide(
#'     .f = ~ mean(.x$case_rate_7d_av),
#'     .before = 2,
#'     .versions = versions,
#'     .new_col_name = "case_rate_7d_av_recent_av"
#'   ) %>%
#'   ungroup()
#' # We requested time windows that started 2 days before the corresponding time
#' # values. The actual number of `time_value`s in each computation depends on
#' # the reporting latency of the signal and `time_value` range covered by the
#' # archive (2020-06-01 -- 2021-11-30 in this example).  In this case, we have
#' # * 0 `time_value`s, for ref time 2020-06-01 --> the result is automatically
#' #                                                discarded
#' # * 1 `time_value`, for ref time 2020-06-02
#' # * 2 `time_value`s, for the rest of the results
#' # * never the 3 `time_value`s we would get from `epi_slide`, since, because
#' #   of data latency, we'll never have an observation
#' #   `time_value == .ref_time_value` as of `.ref_time_value`.
#' # The example below shows this type of behavior in more detail.
#'
#' # Examining characteristics of the data passed to each computation with
#' # `all_versions=FALSE`.
#' archive_cases_dv_subset %>%
#'   group_by(geo_value) %>%
#'   epix_slide(
#'     function(x, gk, rtv) {
#'       tibble(
#'         time_range = if (nrow(x) == 0L) {
#'           "0 `time_value`s"
#'         } else {
#'           sprintf("%s -- %s", min(x$time_value), max(x$time_value))
#'         },
#'         n = nrow(x),
#'         class1 = class(x)[[1L]]
#'       )
#'     },
#'     .before = 5, .all_versions = FALSE,
#'     .versions = versions
#'   ) %>%
#'   ungroup() %>%
#'   arrange(geo_value, version)
#'
#' # --- Advanced: ---
#'
#' # `epix_slide` with `all_versions=FALSE` (the default) applies a
#' # version-unaware computation to several versions of the data. We can also
#' # use `.all_versions=TRUE` to apply a version-*aware* computation to several
#' # versions of the data, again looking at characteristics of the data passed
#' # to each computation. In this case, each computation should expect an
#' # `epi_archive` containing the relevant version data:
#'
#' archive_cases_dv_subset %>%
#'   group_by(geo_value) %>%
#'   epix_slide(
#'     function(x, gk, rtv) {
#'       tibble(
#'         versions_start = if (nrow(x$DT) == 0L) {
#'           "NA (0 rows)"
#'         } else {
#'           toString(min(x$DT$version))
#'         },
#'         versions_end = x$versions_end,
#'         time_range = if (nrow(x$DT) == 0L) {
#'           "0 `time_value`s"
#'         } else {
#'           sprintf("%s -- %s", min(x$DT$time_value), max(x$DT$time_value))
#'         },
#'         n = nrow(x$DT),
#'         class1 = class(x)[[1L]]
#'       )
#'     },
#'     .before = 5, .all_versions = TRUE,
#'     .versions = versions
#'   ) %>%
#'   ungroup() %>%
#'   # Focus on one geo_value so we can better see the columns above:
#'   filter(geo_value == "ca") %>%
#'   select(-geo_value)
#'
#' @export
epix_slide <- function(
    .x,
    .f,
    ...,
    .before = Inf,
    .versions = NULL,
    .new_col_name = NULL,
    .all_versions = FALSE) {
  UseMethod("epix_slide")
}


#' @rdname epix_slide
#' @export
epix_slide.epi_archive <- function(
    .x,
    .f,
    ...,
    .before = Inf,
    .versions = NULL,
    .new_col_name = NULL,
    .all_versions = FALSE) {
  # For an "ungrouped" slide, treat all rows as belonging to one big
  # group (group by 0 vars), like `dplyr::summarize`, and let the
  # resulting `grouped_epi_archive` handle the slide:
  epix_slide(
    group_by(.x),
    .f,
    ...,
    .before = .before, .versions = .versions,
    .new_col_name = .new_col_name, .all_versions = .all_versions
  ) %>%
    # We want a slide on ungrouped archives to output something
    # ungrouped, rather than retaining the trivial (0-variable)
    # grouping applied above. So we `ungroup()`. However, the current
    # `dplyr` implementation automatically ignores/drops trivial
    # groupings, so this is just a no-op for now.
    ungroup()
}


#' Default value for `ref_time_values` in an `epix_slide`
#'
#' @noRd
epix_slide_versions_default <- function(ea) {
  versions_with_updates <- c(ea$DT$version, ea$versions_end)
  ref_time_values <- tidyr::full_seq(versions_with_updates, guess_period(versions_with_updates))
  return(ref_time_values)
}


#' Filter an `epi_archive` object to keep only older versions
#'
#' Generates a filtered `epi_archive` from an `epi_archive` object, keeping
#' only rows with `version` falling on or before a specified date.
#'
#' @param x An `epi_archive` object.
#' @param max_version The latest version to include in the archive.
#' @return An `epi_archive` object
#'
#' @export
epix_truncate_versions_after <- function(x, max_version) {
  UseMethod("epix_truncate_versions_after")
}


#' @rdname epix_truncate_versions_after
#' @export
epix_truncate_versions_after.epi_archive <- function(x, max_version) {
  if (!identical(class(max_version), class(x$DT$version))) {
    cli_abort("`max_version` must have the same `class` as `epi_archive$DT$version`.")
  }
  if (!identical(typeof(max_version), typeof(x$DT$version))) {
    cli_abort("`max_version` must have the same `typeof` as `epi_archive$DT$version`.")
  }
  assert_scalar(max_version, na.ok = FALSE)
  if (max_version > x$versions_end) {
    cli_abort("`max_version` must be at most `epi_archive$versions_end`.")
  }
  x$DT <- x$DT[x$DT$version <= max_version, colnames(x$DT), with = FALSE]
  # (^ this filter operation seems to always copy the DT, even if it
  # keeps every entry; we don't guarantee this behavior in
  # documentation, though, so we could change to alias in this case)
  if (!is.na(x$clobberable_versions_start) && x$clobberable_versions_start > max_version) {
    x$clobberable_versions_start <- NA
  }
  x$versions_end <- max_version
  return(x)
}


# Helpers for `group_by`:

#' Make non-testing mock to get [`dplyr::dplyr_col_modify`] input
#'
#' A workaround for `dplyr:::mutate_cols` not being exported and directly
#' applying test mock libraries likely being impossible (due to mocking another
#' package's S3 generic or method).
#'
#' Use solely with a single call to the [`dplyr::mutate`] function and then
#' `destructure_col_modify_recorder_df`; other applicable operations from
#' [dplyr::dplyr_extending] have not been implemented.
#'
#' @param parent_df the "parent class" data frame to wrap
#' @return a `col_modify_recorder_df`
#'
#' @noRd
new_col_modify_recorder_df <- function(parent_df) {
  assert_class(parent_df, "data.frame")
  `class<-`(parent_df, c("col_modify_recorder_df", class(parent_df)))
}


#' Extract unchanged parent-class data frame from a `new_col_modify_recorder_df`
#'
#' @param col_modify_recorder_df an instance of a `col_modify_recorder_df`
#' @return named list with elements `unchanged_parent_df`, `cols`; `cols` is the
#'   input to [`dplyr::dplyr_col_modify`] that this class was designed to record
#'
#' @noRd
destructure_col_modify_recorder_df <- function(col_modify_recorder_df) {
  assert_class(col_modify_recorder_df, "col_modify_recorder_df")
  list(
    unchanged_parent_df = col_modify_recorder_df %>%
      `attr<-`("epiprocess::col_modify_recorder_df::cols", NULL) %>%
      `class<-`(setdiff(class(.data), "col_modify_recorder_df")),
    cols = attr(col_modify_recorder_df,
      "epiprocess::col_modify_recorder_df::cols",
      exact = TRUE
    )
  )
}


#' `dplyr_col_modify` method that simply records the `cols` argument
#'
#' Must export S3 methods in R >= 4.0, even if they're only designed to be
#' package internals, and must import any corresponding upstream S3 generic
#' functions:
#' @importFrom dplyr dplyr_col_modify
#' @export
#' @noRd
dplyr_col_modify.col_modify_recorder_df <- function(data, cols) {
  if (!is.null(attr(data, "epiprocess::col_modify_recorder_df::cols", exact = TRUE))) {
    cli_abort("`col_modify_recorder_df` can only record `cols` once",
      internal = TRUE
    )
  }
  attr(data, "epiprocess::col_modify_recorder_df::cols") <- cols
  data
}
cmu-delphi/epiprocess documentation built on Oct. 29, 2024, 5:37 p.m.