Description Usage Arguments Details Value Examples
performs chunk processing or split-apply-combine on the data in a delimited file (example: CSV).
1 2 3 |
file |
(string) path to input delimited file |
groupby |
(character vector) Columns names to used to split the data(if
missing, |
fun |
(object of class function) function to apply on each subset after the split |
collect |
(string) Collect the result as |
temploc |
(string) Path where intermediary files are kept |
nbins |
(positive integer) Number of directories into which the distributed dataframe (ddf) or distributed data object (ddo) is distributed |
chunk |
(positive integer) Number of rows of the file to be read at a time |
spill |
(positive integer) Maximum number of rows of any subset resulting from split |
cores |
(positive integer) Number of cores to be used in parallel |
buffer |
(positive integer) Size of batches of key-value pairs to be passed to the map OR Size of the batches of key-value pairs to flush to intermediate storage from the map output OR Size of the batches of key-value pairs to send to the reduce |
keepddf |
(flag) whether to save the distributed dataframe (on the disk) |
... |
Arguments to be passed to |
Reading Stage: The delimited file (example:
CSV) is read in smaller chunks to create a distributed dataframe or
ddf on disk. The number of lines read at once is specified by
chunk
argument and nbins
specify the number of
sub-directories the data is distributed. The ...
are additional
inputs to data.table
function for reading the delimited file.
Split and Apply Stage: The variables in groupby
are used to
split the data and load only the subset(possibly many if multiple cores are
in action) into the memory. If groupby
is missing, chunkwise
processing is performed on each subset of the distributed dataframe. A user
defined fun
is applied and results are written to a distributed
object(list or a KV pairs) on disk.
Combine Stage: The
distributed data object(ddo) is read into memory depending on
collect
argument. The default is set to 'none' which would not the
data back into memory.
Memory usage: While processing heavy files(many times the RAM
size), each core might hold a maximum of 800 MB to 1GB of memory overhead
without accounting for the memory used by the user defind function. Memory
usage depending on size of the subset, how many times it is copied by the
user function, how frequently is gc
called. Using appropriate number
of cores keeps memory utilization in check. Setting a smaller buffer
value keeps memory usage low, see localDiskControl
,
but makes the execution slower.
list or a dataframe or a TRUE(when collect is 'none').
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | # split-apply-combine
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
temp <- fileply(file = "mtcars.csv"
, groupby = c("carb", "gear")
, fun = identity
, collect = "list"
, sep = ","
, header = TRUE
)
temp
unlink("mtcars.csv")
# chunkwise processing
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
temp <- fileply(file = "mtcars.csv"
, chunk = 10
, fun = function(x){list(nrow(x))}
, collect = "dataframe"
, sep = ","
, header = TRUE
)
temp
unlink("mtcars.csv")
# example for collect='none'
write.table(mtcars, "mtcars.csv", row.names = FALSE, sep = ",")
outdir <- utils::capture.output(temp <- fileply(file = "mtcars.csv"
, groupby = c("carb", "gear")
, fun = identity
, sep = ","
, header = TRUE
)
, file = NULL
, type = "message"
)
outdir <- gsub("Output Directory: ", "", outdir[5])
diskKV <- datadr::ddo(datadr::localDiskConn(outdir))
diskKV
diskKV[[1]]
unlink(outdir, recursive = TRUE)
unlink("mtcars.csv")
|
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.