R/shard.r

Defines functions distribute shard

Documented in distribute shard

#' Shard a data.frame/data.table or disk.frame into chunk and saves it into a disk.frame
#' @param df A data.frame/data.table or disk.frame. If disk.frame, then rechunk(df, ...) is run
#' @param shardby The column(s) to shard the data by.
#' @param nchunks The number of chunks
#' @param outdir The output directory of the disk.frame
#' @param overwrite If TRUE then the chunks are overwritten
#' @param ... not used
#' @importFrom data.table setDT
#' @importFrom glue glue
#' @export
#' @examples
#' 
#' # shard the cars data.frame by speed so that rows with the same speed are in the same chunk
#' iris.df = shard(iris, "Species")
#'
#' # clean up cars.df
#' delete(iris.df)
shard <- function(df, shardby, outdir = tempfile(fileext = ".df"), ..., nchunks = recommend_nchunks(df), overwrite = FALSE) {
  force(nchunks)
  overwrite_check(outdir, overwrite)
  # stopifnot(shardby_function %in% c("hash", "sort"))
  
  if("data.frame" %in% class(df)) {
    data.table::setDT(df)
    # if(shardby_function == "hash"){
      # message("Hashing...")
    if(length(shardby) == 1) {
      # TODO rewrite
      code = glue::glue("df[,.out.disk.frame.id := hashstr2i(as.character({shardby}), nchunks)]")
    } else {
      shardby_list = glue::glue("paste0({paste0(sort(shardby),collapse=',')})")
      code = glue::glue("df[,.out.disk.frame.id := hashstr2i({shardby_list}, nchunks)]")
    }
    # } else if(shardby_function == "sort"){
    #   if(nchunks == 1){
    #     message("Only one chunk: set .out.disk.frame.id = 0")
    #     code = glue::glue("df[,.out.disk.frame.id := 0]")
    #   } else {
    #     shard_by_rule <- sortablestr2i(sort_splits, desc_vars)
    #     # message(shard_by_rule)
    #     setDT(df)
    #     code = glue::glue("df[,.out.disk.frame.id := {shard_by_rule}]")
    #   }
    # }

    tryCatch(
      eval(parse(text=code)),
      error = function(e) {
        message("error occurred in shard")
      }
    )
    
    stopifnot(".out.disk.frame.id" %in% names(df))
    
    res = write_disk.frame(df, outdir = outdir, nchunks = nchunks, overwrite = TRUE, shardby = shardby, shardchunks = nchunks)
    return(res)
  } else if ("disk.frame" %in% class(df)){
    nchunks_rechunk = nchunks
    return(rechunk(df, shardby = shardby, nchunks = nchunks_rechunk, outdir = outdir, overwrite = TRUE))
  }
}

#' `distribute` is an alias for `shard`
#' @export
#' @rdname shard
distribute <- function(...) {
  UseMethod("shard")
}

Try the disk.frame package in your browser

Any scripts or data that you put into this service are public.

disk.frame documentation built on Aug. 24, 2023, 5:09 p.m.