knitr::opts_chunk$set( collapse = TRUE, comment = "#>", eval=TRUE, message=FALSE, include=TRUE )
{disk.frame}
The group-by framework of {disk.frame}
has been overhauled in v0.3.0. It is now able to perform some group-by-summarize operations in one stage. In this chapter we will cover
In the custom one-stage group-by chapter, we will cover
{disk.frame}
and its implications for group-byA one-stage group-by is the same as group-by for data.frames. This would be unremarkable, if not for the limitations imposed by the disk-based nature of {disk.frame}
. Before v0.3.0 of {disk.frame}
, one-stage group-by was not possible, and the user had to rely on two-stage group-by even for simple operations like mean
.
However, now that one-stage group-by is possible, there are still limitations and not all functions are supported out-of-the-box. Hence, in the next chapter we will describe how to define custom one-stage group-by functions.
An example of one-stage group-by:
result_from_disk.frame = iris %>% as.disk.frame %>% group_by(Species) %>% summarize( mean(Petal.Length), sumx = sum(Petal.Length/Sepal.Width), sd(Sepal.Width/ Petal.Length), var(Sepal.Width/ Sepal.Width), l = length(Sepal.Width/ Sepal.Width + 2), max(Sepal.Width), min(Sepal.Width), median(Sepal.Width) ) %>% collect
It is important to note that not all functions that can run in dplyr::summarize
would work automatically. This is because of how {disk.frame}
works. Please see the next chapter on defining custom one-stage group-by functions to learn more.
If a function you need/like is missing, please make a feature request here. It is a limitation that function that depend on the order a column can only obtained using estimated methods.
| Function | Exact/Estimate | Notes |
|--------------|----------------|--------------------------------------------|
| min
| Exact | |
| max
| Exact | |
| mean
| Exact | |
| sum
| Exact | |
| length
| Exact | |
| n
| Exact | |
| n_distinct
| Exact | |
| sd
| Exact | |
| var
| Exact | var(x)
only cor, cov
support planned |
| any
| Exact | |
| all
| Exact | |
| median
| Estimate | |
| quantile
| Estimate | One quantile only |
| IQR
| Estimate | |
The results should be exactly the same as if applying the same group-by operations on a data.frame
. If not then please report a bug.
The disk.frame
implements the chunk_group_by
operation with a significant caveat. In the disk.frame
framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by
. However, the hard_group_by
operation can be VERY TIME CONSUMING computationally and should be avoided if possible.
The hard_group_by
operation is best illustrated with an example, suppose a disk.frame
has three chunks
# chunk1 = 1.fst # id n #1 a 1 #2 a 2 #3 b 3 #4 d 4 # chunk2 = 2.fst # id n #1 a 4 #2 a 5 #3 b 6 #4 d 7 # chunk3 = 3.fst # id n #1 a 4 #2 b 5 #3 c 6
and notice that the id
column contains 3 distinct values "a"
,"b"
, and "c"
. To perform hard_group_by(df, by = id)
MAY give you the following disk.frame
where all the id
s with the same values end up in the same chunks.
# chunk1 = 1.fst # id n #1 b 3 #2 b 6 # chunk2 = 2.fst # id n #1 c 6 #2 d 4 #3 d 7 # chunk3 = 3.fst # id n #1 a 1 #2 a 2 #3 a 4 #4 a 5 #5 a 4
Also, notice that there is no guaranteed order for the distribution of the id
s to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id
does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.
Typically, chunk_group_by
is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by
variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.
By forcing the user to choose chunk_group_by
(within each chunk) and hard_group_by
(across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr
the equivalent of a hard_group_by
is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame
has chosen to explain the theory and allow the user to make a conscious choice when performing group_by
.
suppressMessages(library(disk.frame)) flights.df %>% hard_group_by(carrier) %>% # notice that hard_group_by needs to be set chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules collect %>% arrange(carrier)
Prior to {disk.frame}
v0.3.0, there is no general support for one-stage group-by. Hence a two-stage style group-by is needed. The key is understand is the chunk_group_by
which performs group-by
within each chunk.
For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE
by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier
, you can set hard = F
to find the count within each chunk and then use a second group-by to summaries each chunk's results into the desired result. For example,
flights.df %>% chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk chunk_summarize(count = n()) %>% # mean follows normal R rules collect %>% # collect each individul chunks results and row-bind into a data.table group_by(carrier) %>% summarize(count = sum(count)) %>% arrange(carrier)
Because this two-stage approach avoids the expensive hard group_by
operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame
chunking mechanism.
Note: this two-stage approach is similar to a map-reduce operation.
suppressPackageStartupMessages(library(disk.frame)) setup_disk.frame()
flights.df = as.disk.frame(nycflights13::flights) flights.df %>% srckeep(c("year","distance")) %>% # keep only carrier and distance columns chunk_group_by(year) %>% chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk collect
This is two-stage group-by in action
# need a 2nd stage to finalise summing flights.df %>% srckeep(c("year","distance")) %>% # keep only carrier and distance columns chunk_group_by(year) %>% chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk collect %>% group_by(year) %>% summarise(sum_dist = sum(sum_dist))
You can mix group-by with other dplyr verbs as below, here is an example of using filter
.
# filter pt = proc.time() df_filtered <- flights.df %>% filter(month == 1) cat("filtering a < 0.1 took: ", data.table::timetaken(pt), "\n") nrow(df_filtered)
Another way to perform a one-stage group_by
is to perform a hard_group_by
on a disk.frame
. This will rechunk the disk.frame
by the by-columns. This is not recommended for performance reasons, as it can be quite slow to rechunk the file chunks on disk.
pt = proc.time() res1 <- flights.df %>% srckeep(c("month", "dep_delay")) %>% filter(month <= 6) %>% mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>% hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation chunk_summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>% collect cat("group-by took: ", data.table::timetaken(pt), "\n") collect(res1)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.