R/rechunk.r

Defines functions rechunk

Documented in rechunk

#' Increase or decrease the number of chunks in the disk.frame
#' @param df the disk.frame to rechunk
#' @param nchunks number of chunks
#' @param shardby the shardkeys
#' @param outdir the output directory
#' @param overwrite overwrite the output directory
#' @export
#' @examples
#' # create a disk.frame with 2 chunks in tempdir()
#' cars.df = as.disk.frame(cars, nchunks = 2)
#'
#' # re-chunking cars.df to 3 chunks, done "in-place" to the same folder as cars.df
#' rechunk(cars.df, 3)
#'
#' new_path = tempfile(fileext = ".df")
#' # re-chunking cars.df to 4 chunks, shard by speed, and done "out-of-place" to a new directory
#' cars2.df = rechunk(cars.df, 4, outdir=new_path, shardby = "speed")
#'
#' # clean up cars.df
#' delete(cars.df)
#' delete(cars2.df)
rechunk <- function(df, nchunks = disk.frame::nchunks(df), outdir = attr(df, "path", exact=TRUE), shardby = NULL, overwrite = TRUE) {
    # we need to force the chunks to be computed first as it's common to make nchunks a multiple of chunks(df)
  # but if we do it too late then the folder could be empty
  force(nchunks) 
  
  if (nchunks < 1) {
    stop(glue::glue("nchunks must be larger than 1"))
  }
  
  stopifnot("disk.frame" %in% class(df))
  
  user_had_not_set_shard_by = is.null(shardby)
  user_had_set_shard_by = !user_had_not_set_shard_by

  # back up the files if writing to the same directory
  if(outdir == attr(df,"path", exact=TRUE)) {
    back_up_tmp_dir <- tempfile("back_up_tmp_dir")
    fs::dir_create(back_up_tmp_dir)
    
    fs::dir_copy(
      file.path(outdir, ".metadata"), #from
      file.path(back_up_tmp_dir, ".metadata") #to
    )
    
    # back-up the files first
    full_files = dir(outdir, full.names = TRUE)
    short_files = dir(outdir)
    
    # move all files to the back up folder
    for(file in full_files) {
      fs::file_move(file, back_up_tmp_dir)
    }
    
    if(fs::dir_exists(file.path(outdir, ".metadata"))) {
      fs::dir_delete(file.path(outdir, ".metadata"))
    }
    
    # TODO check for validity
    message(glue::glue("files have been backed up to temporary dir {back_up_tmp_dir}. You can recover there files until you restart your R session"))
    
    df = disk.frame(back_up_tmp_dir)
  }
  
  overwrite_check(outdir, overwrite)
  
  dfp = attr(df, "path", exact=TRUE)
  existing_shardkey = shardkey(df)
  
  # by default, if shardkey is defined then rechunk will continue to reuse it
  if (is.null(shardby)) {
    shardby = existing_shardkey[[1]]
  }

  if(user_had_set_shard_by) {
    tmp = cmap(df, ~{
      shard(.x, shardby, nchunks=nchunks, overwrite=FALSE)
    }) %>% collect_list
    
    return(
      rbindlist.disk.frame(tmp)
    )
  } else if (identical(shardby, "") | is.null(shardby)) {
    # if no existing shardby 
    nr = nrow(df)
    nr_per_chunk = ceiling(nr/nchunks)
    used_so_far = 0
    done = FALSE
    chunks_read = 1
    chunks_written = 0
    res = disk.frame(outdir)
    a = get_chunk(df, 1)
    used_so_far = nrow(a)
    while(chunks_read < nchunks(df)) {
      if((nr_per_chunk <= used_so_far)) {
        disk.frame::add_chunk(res, a[1:nr_per_chunk,])
        chunks_written = chunks_written + 1
        a = a[-(1:nr_per_chunk),]
        used_so_far = nrow(a)
      } else {
        chunks_read = chunks_read + 1
        newa = get_chunk(df, chunks_read)
        used_so_far = used_so_far + nrow(newa)
        a = rbindlist(list(a, newa))
        rm(newa)
      }
    }
    
    while(chunks_written < nchunks) {
      rows_to_write = min(nr_per_chunk, nrow(a))
      disk.frame::add_chunk(res, a[1:rows_to_write,])
      a = a[-(1:rows_to_write),]
      chunks_written = chunks_written + 1
    }
    
    return(res)
  } else if(!is.null(shardby)) { # if there is existing shard by; shardby has been replaced with new shard by
    if(user_had_not_set_shard_by) {
      warning("shardby = NULL; but there are already shardkey's defined for this disk.frame. Therefore a rechunk algorithm that preserves the shardkey's has been applied and this algorithm is slower than an algorithm that doesn't use a shardkey.")
    }
    
    # using some maths we can cut down on the number of operations
    nc = nchunks(df)
    
    # TODO there is bug here! If the chunks are in numbers form!
    # if the number of possible new chunk ids is one then no need to perform anything. just merge those
    possibles_new_chunk_id = purrr::map(1:nc, ~unique((.x-1 + (0:(nchunks-1))*nc) %% nchunks)+1)
    lp = purrr::map_int(possibles_new_chunk_id,length)
    
    #need to shards
    nts = which(lp != 1)
    
    bad_boys = future.apply::future_lapply(nts, function(chunk_id) {
      df1 = disk.frame::get_chunk(df, chunk_id)
      disk.frame::shard(df1, shardby, nchunks = nchunks, overwrite = TRUE)
    }, future.seed=TRUE)
    
    # for those that don't need to be resharded
    tmp_fdlr = tempfile("rechunk_shard")
    fs::dir_create(tmp_fdlr)

    oks = future.apply::future_lapply(which(lp == 1), function(i) {
      file_chunk = file.path(attr(df, "path", exact=TRUE), paste0(i, ".fst"))
      fs::file_move(file_chunk, file.path(tmp_fdlr, paste0(possibles_new_chunk_id[[i]], ".fst")))
      disk.frame(tmp_fdlr)
    })
    
    list_of_sharded = c(bad_boys, oks)
    new_one <- rbindlist.disk.frame(list_of_sharded, outdir=outdir, by_chunk_id = TRUE, overwrite = overwrite)
    
    res = add_meta(new_one, nchunks = nchunks(new_one), shardkey = shardby, shardchunks = nchunks)
    return(res)
  } else {
    stop("rechunk: option not supported")
  }
}

Try the disk.frame package in your browser

Any scripts or data that you put into this service are public.

disk.frame documentation built on Aug. 24, 2023, 5:09 p.m.