R/class_store.R

Defines functions store_get_packages.default store_get_packages store_validate_packages store_validate.default store_validate store_unmarshal_value.default store_unmarshal_value store_marshal_value.default store_marshal_value store_unmarshal_object.default store_unmarshal_object store_marshal_object.default store_marshal_object store_unload.default store_unload store_sync_file_meta.default store_sync_file_meta store_has_correct_hash.default store_has_correct_hash store_wait_correct_hash store_ensure_correct_hash.default store_ensure_correct_hash store_hash_late.default store_hash_late store_hash_early.default store_hash_early store_assert_format.default store_assert_format store_copy_object.default store_copy_object store_convert_object.default store_convert_object store_produce_stage.default store_produce_stage store_update_stage_early.default store_update_stage_late.default store_update_stage_late store_update_stage_early.default store_update_stage_early store_tar_path.default store_tar_path store_path_from_record.default store_path_from_record store_row_path.default store_row_path store_produce_path.default store_produce_path store_update_path store_upload_object.default store_upload_object store_delete_objects.default store_delete_objects store_delete_object.default store_delete_object store_exist_object.default store_exist_object store_cache_path.default store_cache_path store_write_path store_write_object.default store_write_object store_read_path store_read_object.default store_read_object store_set_timestamp_trust.default store_set_timestamp_trust store_assert_repository_setting.local store_assert_repository_setting.default store_assert_repository_setting store_assert_format_setting.default store_assert_format_setting store_format_dispatch store_class_repository.default store_class_repository store_class_format store_new_default store_new.default store_new store_mock store_init

store_init <- function(
  format = "rds",
  repository = "local",
  resources = list()
) {
  format_dispatch <- store_format_dispatch(format)
  repository_dispatch <- enclass(repository, repository)
  store <- store_new(
    format = format_dispatch,
    file = file_init(),
    resources = resources
  )
  class(store) <- store_class_format(format_dispatch)
  class(store) <- store_class_repository(
    repository = repository_dispatch,
    store = store,
    format = format
  )
  store_set_timestamp_trust(store)
  store
}

store_mock <- function(
  format = "rds",
  repository = "local"
) {
  mock <- enclass(
    list(),
    store_class_format(store_format_dispatch(format))
  )
  class(mock) <- store_class_repository(
    repository = enclass(repository, repository),
    store = mock,
    format = format
  )
  mock
}

store_new <- function(format, file = NULL, resources = NULL) {
  UseMethod("store_new")
}

#' @export
store_new.default <- function(format, file = NULL, resources = NULL) {
  store_new_default(file = file, resources = resources)
}

store_new_default <- function(file = NULL, resources = NULL) {
  force(file)
  force(resources)
  environment()
}

store_class_format <- function(format) {
  UseMethod("store_class_format")
}

store_class_repository <- function(repository, store, format) {
  UseMethod("store_class_repository")
}

#' @export
store_class_repository.default <- function(repository, store, format) {
  class(store)
}

# A format should not be a full class like the store
# because the responsibilities of store and format
# would overlap too much.
store_format_dispatch <- function(format) {
  class <- if_any(
    grepl(pattern = "format_custom", x = format, fixed = TRUE),
    "format_custom",
    format
  )
  enclass(format, class)
}

store_assert_format_setting <- function(format) {
  UseMethod("store_assert_format_setting")
}

#' @export
store_assert_format_setting.default <- function(format) {
  tar_throw_validate(paste("unsupported format:", class(format)[1]))
}

store_assert_repository_setting <- function(repository) {
  UseMethod("store_assert_repository_setting")
}

#' @export
store_assert_repository_setting.default <- function(repository) {
  tar_throw_validate(paste("unsupported repository:", repository))
}

#' @export
store_assert_repository_setting.local <- function(repository) {
}

store_set_timestamp_trust <- function(store) {
  UseMethod("store_set_timestamp_trust")
}

#' @export
store_set_timestamp_trust.default <- function(store) {
  store$file$trust_timestamps <- tar_options$get_trust_object_timestamps()
}

store_read_object <- function(store) {
  UseMethod("store_read_object")
}

#' @export
store_read_object.default <- function(store) {
  store_convert_object(store, store_read_path(store, store$file$path))
}

store_read_path <- function(store, path) {
  UseMethod("store_read_path")
}

store_write_object <- function(store, object) {
  UseMethod("store_write_object")
}

#' @export
store_write_object.default <- function(store, object) {
  path <- store$file$path
  stage <- store$file$stage
  dir_create_runtime(dirname(path))
  dir_create_runtime(dirname(stage))
  store_write_path(store, store_convert_object(store, object), stage)
  file_move(from = stage, to = path)
}

store_write_path <- function(store, object, path) {
  UseMethod("store_write_path")
}

store_cache_path <- function(store, path) {
  UseMethod("store_cache_path")
}

#' @export
store_cache_path.default <- function(store, path) {
  cache <- tar_runtime$file_exist
  if (!is.null(cache)) {
    counter_set_names(counter = cache, names = path)
  }
}

store_exist_object <- function(store, name = NULL) {
  UseMethod("store_exist_object")
}

#' @export
store_exist_object.default <- function(store, name = NULL) {
  all(file.exists(store$file$path))
}

store_delete_object <- function(store, name = NULL) {
  UseMethod("store_delete_object")
}

#' @export
store_delete_object.default <- function(store, name = NULL) {
  unlink(store$file$path)
  unlink(store$file$stage)
}

store_delete_objects <- function(store, meta, batch_size, verbose) {
  UseMethod("store_delete_objects")
}

#' @export
store_delete_objects.default <- function(store, meta, batch_size, verbose) {
  tar_throw_validate(
    "store_delete_objects() is for supported cloud objects only."
  )
}

store_upload_object <- function(store) {
  UseMethod("store_upload_object")
}

store_upload_object.default <- function(store) {
}

store_update_path <- function(store, name, object, path_store) {
  store$file$path <- store_produce_path(store, name, object, path_store)
}

store_produce_path <- function(store, name, object, path_store) {
  UseMethod("store_produce_path")
}

#' @export
store_produce_path.default <- function(store, name, object, path_store) {
  path_objects(path_store = path_store, name = name)
}

store_row_path <- function(store) {
  UseMethod("store_row_path")
}

#' @export
store_row_path.default <- function(store) {
  NA_character_
}

store_path_from_record <- function(store, record, path_store) {
  UseMethod("store_path_from_record")
}

#' @export
store_path_from_record.default <- function(store, record, path_store) {
  path_objects(path_store = path_store, name = record$name)
}

store_tar_path <- function(store, target, path_store) {
  UseMethod("store_tar_path")
}

#' @export
store_tar_path.default <- function(store, target, path_store) {
  path_objects(path_store = path_store, name = target_get_name(target))
}

store_update_stage_early <- function(store, name, path_store) {
  UseMethod("store_update_stage_early")
}

#' @export
store_update_stage_early.default <- function(store, name, path_store) {
  store$file$stage <- store_produce_stage(
    store = store,
    name = name,
    object = NULL,
    path_store = path_store
  )
}

store_update_stage_late <- function(store, name, object, path_store) {
  UseMethod("store_update_stage_late")
}

#' @export
store_update_stage_late.default <- function(store, name, object, path_store) {
}

#' @export
store_update_stage_early.default <- function(store, name, path_store) {
  store$file$stage <- store_produce_stage(
    store = store,
    name = name,
    object = NULL,
    path_store = path_store
  )
}

store_produce_stage <- function(store, name, object, path_store) {
  UseMethod("store_produce_stage")
}

#' @export
store_produce_stage.default <- function(store, name, object, path_store) {
  path_scratch(path_store = path_store, pattern = name)
}

store_convert_object <- function(store, object) {
  UseMethod("store_convert_object")
}

store_convert_object.default <- function(store, object) {
  object
}

store_copy_object <- function(store, object) {
  UseMethod("store_copy_object")
}

store_copy_object.default <- function(store, object) {
  object
}

store_assert_format <- function(store, object, name) {
  UseMethod("store_assert_format")
}

#' @export
store_assert_format.default <- function(store, object, name) {
}

store_hash_early <- function(store) {
  UseMethod("store_hash_early")
}

#' @export
store_hash_early.default <- function(store) {
}

store_hash_late <- function(store) {
  UseMethod("store_hash_late")
}

#' @export
store_hash_late.default <- function(store) {
  tar_assert_path(store$file$path)
  file_update_hash(store$file)
}

store_ensure_correct_hash <- function(
  store,
  storage,
  deployment
) {
  UseMethod("store_ensure_correct_hash")
}

#' @export
store_ensure_correct_hash.default <- function(store, storage, deployment) {
  if (identical(storage, "worker") && identical(deployment, "worker")) {
    store_wait_correct_hash(store)
  }
}

store_wait_correct_hash <- function(store) {
  seconds_interval <- store$resources$network$seconds_interval %|||% 0.25
  seconds_timeout <- store$resources$network$seconds_timeout %|||% 60
  max_tries <- store$resources$network$max_tries %|||% Inf
  verbose <- store$resources$network$verbose %|||% TRUE
  retry_until_true(
    fun = ~store_has_correct_hash(store),
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    max_tries = max_tries,
    catch_error = FALSE,
    message = paste(
      "Path",
      paste(store$file$path, collapse = " "),
      "does not exist or has incorrect hash.",
      "File sync timed out."
    ),
    verbose = verbose
  )
}

store_has_correct_hash <- function(store) {
  UseMethod("store_has_correct_hash")
}

#' @export
store_has_correct_hash.default <- function(store) {
  (all(is.na(store$file$path)) || file_exists_path(store$file)) &&
    file_has_correct_hash(store$file)
}

store_sync_file_meta <- function(store, target, meta) {
  UseMethod("store_sync_file_meta")
}

#' @export
store_sync_file_meta.default <- function(store, target, meta) {
  cue <- target$cue
  if (identical(cue$mode, "never") || identical(cue$file, FALSE)) {
    return()
  }
  name <- target_get_name(target)
  record <- meta$get_record(name)
  file <- file_init(
    path = record$path,
    time = record$time,
    size = record$size,
    bytes = record$bytes
  )
  info <- file_info_runtime(target$store$file$path)
  time <- file_time(info)
  bytes <- file_bytes(info)
  size <- file_size(bytes)
  # Fully automated tests do no use big files.
  # Tested in tests/interactive/test-file.R. # nolint
  # nocov start
  if (!identical(time, file$time) || !identical(size, file$size)) {
    record$time <- time
    record$size <- size
    record$bytes <- bytes
    meta$insert_record(record)
  }
  # nocov end
}

store_unload <- function(store, target) {
  UseMethod("store_unload")
}

#' @export
store_unload.default <- function(store, target) {
}

store_marshal_object <- function(store, object) {
  UseMethod("store_marshal_object")
}

#' @export
store_marshal_object.default <- function(store, object) {
  object
}

store_unmarshal_object <- function(store, object) {
  UseMethod("store_unmarshal_object")
}

#' @export
store_unmarshal_object.default <- function(store, object) {
  object
}

store_marshal_value <- function(store, target) {
  UseMethod("store_marshal_value")
}

#' @export
store_marshal_value.default <- function(store, target) {
}

store_unmarshal_value <- function(store, target) {
  UseMethod("store_unmarshal_value")
}

#' @export
store_unmarshal_value.default <- function(store, target) {
}

store_validate <- function(store) {
  UseMethod("store_validate")
}

#' @export
store_validate.default <- function(store) {
  tar_assert_correct_fields(store, store_new_default)
  store_validate_packages(store)
  tar_assert_list(store$resources)
}

store_validate_packages <- function(store) {
  tar_assert_package(store_get_packages(store))
}

store_get_packages <- function(store) {
  UseMethod("store_get_packages")
}

#' @export
store_get_packages.default <- function(store) {
  character(0)
}
wlandau/targets documentation built on May 1, 2024, 7:27 p.m.