knitr::opts_chunk$set( collapse = TRUE, comment = "#>", eval=TRUE, include=TRUE )
Group-by is one the most useful and commonly used operations in data manipulation. The most common to perform group with dplyr syntax would be something similar to the below
dataf %>% group_by(grpA) %>% summarize(mean(X))
However, the above nice and concise group-by syntax creates a challenge in {disk.frame}
, because {disk.frame}
organises the data in chunks and these chunks are often manipulated independently to each other which makes it difficult to compute the mean
of any column if the group-by column is stored in different chunks. In this chapter, I will explain how {disk.frame}
achieve the one-stage group-by syntax that you see, and how you can define custom one-stage group-by functions for use with {disk.frame}
.
{disk.frame}
allows the user to create custom one-stage group-by functions. To make a function fn
one stage, one needs to define two functions:
fn_df.chunk_agg.disk.frame
which applies itself to each chunkfn_df.collected_agg.disk.frame
which accepts a list
of returns from fn_df.chunk_agg.disk.frame
and finalize the computationFor example, to make mean
a one-stage group-by function, {disk.frame}
has defined mean_df.chunk_agg.disk.frame
and mean_df.collected_agg.disk.frame
, which we will illustrate with examples below.
But first, we shall explain some theory behind {disk.frame}
to help you better understand "why does {disk.frame}
do it like that?".
{disk.frame}
workOne may ask, how come only a few functions are supported for one-stage group-by? And why are some functions like median
only produce estimates instead of producing the exact figure? To answer these questions, we need to have an understanding of how {disk.frame}
works.
A disk.frame
is organized as chunks stored on disk. Each chunk is a file stored in fst format. The {future}
package is used to apply the same function to each chunk, each of these operations are carried out in a separate R session. These R sessions cannot communicate with each other during the execution of the operations.
Once the operations have been performed on all chunks, the results will be bought back to the session from which the operations were called. This is the only point of inter-process communication.
To summarize, the two phases of a df %>% some_fn %>% collect
operation are
some_fn
is applied to each chunk, and the result is assumed to be a data.framecollect
then row-binds (rbind
/bind_rows
/rbindlist
) the results together to form a data.frame in the main sessionExcept for passing the result back to the main session, communication between worker sessions are not allowed. This limits how group-by operations can be performed, hence why group-by was done in two stages prior to {disk.frame}
v0.3.0. However, R's meta-programming abilities allow us to rewrite code to perform the two-stage group-bys using one-stage group-by code. For example, consider:
df %>% group_by(grp1) %>% summarize(sum(x)) %>% collect
we can use meta-programming to transform that to
df %>% chunk_group_by(grp1) %>% chunk_summarize(__tmp1__= sum(x)) %>% collect() %>% group_by(grp1) %>% summarize(x = sum(__tmp1__))
Basically, we are "compiling" one-stage group-by code to two-stage group-by code, and then executing it.
For mean
, it's trickier, as one needs to keep track on the numerator and the denominator separately in computing mean(x) = sum(x)/length(x)
.
Therefore, {disk.frame}
compiles
df %>% group_by(grp1) %>% summarize(meanx = mean(x)) %>% collect
to
df %>% chunk_group_by(grp1) %>% chunk_summarize(__tmp1__ = list(mean_df.chunk_agg.disk.frame(x))) %>% collect %>% group_by(grp1) %>% chunk_summarize(meanx = mean_df.chunk_agg.disk.frame(__tmp1__))
where mean_df.chunk_agg.disk.frame
defines what needs to be done to each chunk, as you can see, the return value is a vector where the elements are named sumx
and lengthx
. Also note because the return value is not a scalar, we need to write it in a list
(line 3).
Here is an example implementation of mean.chunk_agg.disk.frame
mean_df.chunk_agg.disk.frame <- function(x, na.rm = FALSE, ...) { sumx = sum(x, na.rm = na.rm) lengthx = length(x) - ifelse(na.rm, sum(is.na(x)), 0) c(sumx = sumx, lengthx = lengthx) }
The mean_df.collected_agg.disk.frame
receives a list of outputs from mean_df.chunk_agg.disk.frame
. Recall that mean.chunk_agg.disk.frame
returns a vector for each chunk, so the input to mean_df.collected_agg.disk.frame
is a list of vectors
mean_df.collected_agg.disk.frame <- function(listx) { sum(sapply(listx, function(x) x["sumx"]))/sum(sapply(listx, function(x) x["lengthx"])) }
Now that we have seen two examples, namely sum
and mean
, we are ready to summarize how group-by functions are implemented.
Given the below
df %>% group_by(grp1) %>% summarize(namex = fn(x)) %>% collect
{disk.frame}
compiles it to
df %>% chunk_group_by(grp1) %>% chunk_summarize(__tmp1__ = list(fn_df.chunk_agg.disk.frame(x))) %>% collect %>% group_by(grp1) %>% chunk_summarize(namex = fn_df.collected_agg.disk.frame(__tmp1__))
Based on the above information, to make fn
a one-stage group-by function, the user has to
fn_df.chunk_agg.disk.frame
which is a function to be applied to each chunkfn_df.collected_agg.disk.frame
which is a function to be applied to a list
containing the returns from fn.chunk_agg.disk.frame
applied to each chunkExample of implementing sum
:
sum_df.chunk_agg.disk.frame
sum_df.chunk_agg.disk.frame <- function(x, na.rm = FALSE) { sum(x, na.rm=na.rm) }
sum_df.collected_agg.disk.frame
, which needs to accept a list of sum(x, na.rm)
, but sum(x, na.rm)
is just a numeric, sosum_df.collected_agg.disk.frame <- function(list_sum) { sum(unlist(list_sum)) }
Example of implementing n_distinct
:
The dplyr::n_distinct
function counts the number of distint values from a vector x
n_distinct_df.chunk_agg.disk.frame
, to return a list of unique values. Because the same value can appear in multiple chunks, to ensure that we don't double count, we simply return all the unique values from each chunk which is then de-duplicated in the next phasen_distinct_df.chunk_agg.disk.frame <- function(x, na.rm = FALSE) { if(na.rm) { setdiff(unique(x), NA) } else { unique(x) } }
n_distinct_df.collected_agg.disk.frame
, which de-duplicates the unique valuesn_distinct_df.collected_agg.disk.frame <- function(list_of_chunkwise_uniques) { dplyr::n_distinct(unlist(list_of_chunkwise_uniques)) }
We have seen that {disk.frame}
performs operations in two phases
and there are no communication between the sessions that applies the functions at chunk level.
Hence, it is generally difficult to compute rank based summarizations like median
exactly. Hence most rank based calculations are estimates only. This is also true of distributed data system like Spark whose median function is also estimates only.
Another limitation for now is that summarization that is more complex then f(x)
is not supported. E.g. sum(x) + 1
, sum(x + mean(x))
, sum(x) + mean(x)
, and fn(sum(x))
are not yet supported as arguments in the summarize
function.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.