suppressPackageStartupMessages(library(disk.frame))
library(fst)
library(magrittr)
library(nycflights13)
library(dplyr)
library(data.table)

# you need to run this for multi-worker support
# limit to 2 cores if not running interactively; most likely on CRAN
# set-up disk.frame to use multiple workers
if(interactive()) {
  setup_disk.frame()
  # highly recommended, however it is pun into interactive() for CRAN because
  # change user options are not allowed on CRAN
  options(future.globals.maxSize = Inf)  
} else {
  setup_disk.frame(2)
}


knitr::opts_chunk$set(
    eval = TRUE,
    message = FALSE,
    collapse = TRUE,
    comment = "#>",
    include = TRUE
)

Quick Start - replicating dplyr's tutorial on nycflight13

The disk.frame package aims to be the answer to the question: how do I manipulate structured tabular data that doesn't fit into Random Access Memory (RAM)?

In a nutshell, disk.frame makes use of two simple ideas

1) split up a larger-than-RAM dataset into chunks and store each chunk in a separate file inside a folder and 2) provide a convenient API to manipulate these chunks

disk.frame performs a similar role to distributed systems such as Apache Spark, Python's Dask, and Julia's JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.

In this tutorial, we introduce disk.frame, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame constructs.

Installation

Simply run

install.packages("disk.frame") # when CRAN ready

or

devtools::install_github("xiaodaigh/disk.frame")

Set-up disk.frame

disk.frame works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame so that each CPU core runs a background worker is by using

setup_disk.frame()

# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)

The setup_disk.frame() sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.

Alternatively, one may specify the number of workers using setup_disk.frame(workers = n).

Basic Data Operations with disk.frame

The disk.frame package provides convenient functions to convert data.frames and CSVs to disk.frames.

Creating a disk.frame from data.frame

We convert a data.frame to disk.frame using the as.data.frame function.

library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)

# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
  flights, 
  outdir = file.path(tempdir(), "tmp_flights.df"),
  overwrite = TRUE)
flights.df

You should now see a folder called tmp_flights with some files in it, namely 1.fst, 2.fst.... where each fst files is one chunk of the disk.frame.

Creating a disk.frame from CSV

library(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)

# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
  csv_path, 
  outdir = df_path,
  overwrite = T)

flights.df

If the CSV is too large to read in, then we can also use the in_chunk_size option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.

library(nycflights13)
library(disk.frame)

# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")

data.table::fwrite(flights, csv_path)

df_path = file.path(tempdir(), "tmp_flights.df")

flights.df <- csv_to_disk.frame(
  csv_path, 
  outdir = df_path, 
  in_chunk_size = 100000)

flights.df

disk.frame also has a function zip_to_disk.frame that can convert every CSV in a zip file to disk.frames.

Simple dplyr verbs and lazy evaluation

flights.df1 <- select(flights.df, year:day, arr_delay, dep_delay)
flights.df1
class(flights.df1)

The class of flights.df1 is also a disk.frame after the dplyr::select transformation. Also, disk.frame operations are by default (and where possible) lazy, meaning it doesn't perform the operations right away. Instead, it waits until you call collect. Exceptions to this rule are the *_join operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.

For lazily constructed disk.frames (e.g. flights.df1). The function collect can be used to bring the results from disk into R, e.g.

collect(flights.df1) %>% head(2)

Of course, for larger-than-RAM datasets, one wouldn't call collect on the whole disk.frame (because why would you need disk.frame otherwise). More likely, one would call collect on a filtered dataset or one summarized with group_by.

Some examples of other dplyr verbs applied:

filter(flights.df, dep_delay > 1000) %>% collect %>% head(2)
mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head(2)

Examples of NOT fully supported dplyr verbs

The chunk_arrange function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise creates summary variables within each chunk and hence also needs to be used with caution.

# this only sorts within each chunk
chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head(2)
chunk_summarize(flights.df, mean_dep_delay = mean(dep_delay, na.rm =T)) %>% collect

Piping

One can chain dplyr verbs together like with a data.frame

c4 <- flights %>%
  filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
  select(carrier, dep_delay, air_time, distance) %>%
  mutate(air_time_hours = air_time / 60) %>%
  collect %>%
  arrange(carrier)# arrange should occur after `collect`

c4  %>% head

List of supported dplyr verbs

select
rename
filter
chunk_arrange # within each chunk
chunk_group_by # within each chunk
chunk_summarize # within each chunk
group_by # limited functions
summarize # limited functions
mutate
transmute
left_join
inner_join
full_join # careful. Performance!
semi_join
anit_join

Sharding and distribution of chunks

Like other distributed data manipulation frameworks disk.frame utilizes the sharding concept to distribute the data into chunks. For example "to shard by cust_id" means that all rows with the same cust_id will be stored in the same chunk. This enables chunk_group_by by cust_id to produce the same results as non-chunked data.

The by variables that were used to shard the dataset are called the shardkeys. The sharding is performed by computing a deterministic hash on the shard keys (the by variables) for each row. The hash function produces an integer between 1 and n, where n is the number of chunks.

Group-by

{disk.frame} implements the group_by operation some caveats. In the {disk.frame} framework, only a set functions are supported in summarize. However, the user can create more custom group-by functions can be defined.

flights.df %>%
  group_by(carrier) %>% 
  summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
  collect %>% 
  arrange(carrier)

Restrict input columns for faster processing

One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep function which only accepts column names as a string vector.

flights.df %>%
  srckeep(c("carrier","dep_delay")) %>%
  group_by(carrier) %>% 
  summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
  collect

Input column restriction is one of the most critical efficiencies provided by disk.frame. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.

Joins

disk.frame supports many dplyr joins including:

left_join
inner_join
semi_join
inner_join
full_join # requires rechunk on both left and right

In all cases, the left dataset (x) must be a disk.frame, and the right dataset (y) can be either a disk.frame or a data.frame. If the right dataset is a disk.frame and the shardkeys are different between the two disk.frames then two expensive hard rechunk operations are performed eagerly, one on the left disk.frame and one on the right disk.frame to perform the joins correctly.

However, if the right dataset is a data.frame then rechunk``s are only performed in the case offull_join`.

Note disk.frame does not support right_join the user should use left_join instead.

The below joins are performed lazily because airlines.dt is a data.table not a disk.frame:

# make airlines a data.table
airlines.dt <- data.table(airlines)
# flights %>% left_join(airlines, by = "carrier") #
flights.df %>% 
  left_join(airlines.dt, by ="carrier") %>% 
  collect %>% 
  head
flights.df %>% 
  left_join(airlines.dt, by = c("carrier")) %>% 
  collect %>% 
  tail

Window functions and arbitrary functions

{disk.frame} supports all data.frame operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like min_rank and rank are supported out of the box.

For example

# Rank each flight within a daily
ranked <- flights.df %>%
  srckeep(c("year","month","day", "dep_delay")) %>%
  chunk_group_by(year, month, day) %>%
  select(dep_delay) %>%
  mutate(rank = rank(desc(dep_delay))) %>%
  collect

ranked %>% head

Arbitrary by-chunk processing

One can apply arbitrary transformations to each chunk of the disk.frame by using the delayed function which evaluates lazily or the cmap.disk.frame(lazy = F) function which evaluates eagerly. For example to return the number of rows in each chunk

flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list

and to do the same with cmap.disk.frame

cmap(flights.df, ~nrow(.x), lazy = F) %>% head

The cmap function can also output the results to another disk.frame folder, e.g.

# return the first 10 rows of each chunk
flights.df2 <- cmap(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)

flights.df2 %>% head

Notice {disk.frame} supports the purrr syntax for defining a function using ~.

Sampling

In the disk.frame framework, sampling a proportion of rows within each chunk can be performed using sample_frac.

flights.df %>% sample_frac(0.01) %>% collect %>% head

Writing Data

One can output a disk.frame by using the write_disk.frame function. E.g.

write_disk.frame(flights.df, outdir="out")

this will output a disk.frame to the folder "out"

# fs::dir_delete(file.path(tempdir(), "tmp_flights.df"))
# fs::dir_delete(file.path(tempdir(), "tmp2"))
# fs::file_delete(file.path(tempdir(), "tmp_flights.csv"))


xiaodaigh/disk.frame documentation built on Feb. 3, 2023, 10:04 p.m.