knitr::opts_chunk$set( collapse = TRUE, comment = "#>", fig.path = "README-" )
The goal of arkdb
is to provide a convenient way to move data from large compressed text files (tsv, csv, etc) into any DBI-compliant database connection (e.g. MYSQL, Postgres, SQLite; see DBI), and move tables out of such databases into text files. The key feature of arkdb
is that files are moved between databases and text files in chunks of a fixed size, allowing the package functions to work with tables that would be much too large to read into memory all at once. There is also functionality for filtering and applying transformation to data as it is extracted from the database.
The arkdb
package is easily extended to use custom read and write methods allowing you to dictate your own output formats. See R/streamable_table.R
for examples that include using:
readr
package for c/tsvYou can install arkdb from GitHub with:
# install.packages("devtools") devtools::install_github("cboettig/arkdb")
library(arkdb) # additional libraries just for this demo library(dbplyr) library(dplyr) library(fs)
Consider the nycflights
database in SQLite:
tmp <- tempdir() # Or can be your working directory, "." db <- dbplyr::nycflights13_sqlite(tmp)
Create an archive of the database:
dir <- fs::dir_create(fs::path(tmp, "nycflights")) ark(db, dir, lines = 50000)
Import a list of compressed tabular files (i.e. *.csv.bz2
) into a local SQLite database:
files <- fs::dir_ls(dir) new_db <- DBI::dbConnect(RSQLite::SQLite(), fs::path(tmp, "local.sqlite")) unark(files, new_db, lines = 50000)
disconnect <- function(db){ ## Cleanup if(inherits(db, "DBIConnection")){ DBI::dbDisconnect(db) } else { DBI::dbDisconnect(db$con) } } DBI::dbDisconnect(db) DBI::dbDisconnect(new_db) codemeta::write_codemeta()
This package can also be used to generate slices of data that are required for analytical or operational purposes. In the example below we archive to disk only the flight data that occurred in the month of December. It is recommended to use filters on a single table at a time.
ark(db, dir, lines = 50000, tables = "flights", filter_statement = "WHERE month = 12")
It is possible to use a callback to perform just-in-time data transformations before ark writes your data object to disk in your preferred format. In the example below, we write a simple transformation to convert the flights data arr_delay
field, from minutes, to hours. It is recommended to use callbacks on a single table at a time. A callback function can be anything you can imagine so long as it returns a data.frame that can be written to disk.
mins_to_hours <- function(data) { data$arr_delay <- data$arr_delay/60 data } ark(db, dir, lines = 50000, tables = "flights", callback = mins_to_hours)
There are two strategies for using ark
in parallel. One is to loop over the tables, re-using the ark function per table in parallel. The other, introduced in 0.0.15, is to use the "window-parallel" method which loops over chunks of your table. This is particularly useful if your tables are very large and can speed up the process significantly.
Note: window-parallel
currently only works in conjunction with streamable_parquet
# Strategy 1: Parallel over tables library(arkdb) library(future.apply) plan(multisession) # Any streamable_table method is acceptable future_lapply(vector_of_tables, function(x) ark(db, dir, lines, tables = x)) # Strategy 2: Parallel over chunks of a table library(arkdb) library(future.apply) plan(multisession) ark( db, dir, streamable_table = streamable_parquet(), # required for window-parallel lines = 50000, tables = "flights", method = "window-parallel" ) # Strategy 3: Parallel over tables and chunks of tables library(arkdb) library(future.apply) # 16 core machine for example plan(list(tweak(multisession, n = 4), tweak(multisession, n = 4))) # 4 tables at a time, 4 threads per table future_lapply(vector_of_tables, function(x) { ark( db, dir, streamable_table = streamable_parquet(), # required for window-parallel lines = 50000, tables = x, method = "window-parallel") } )
The arkdb
package can also be used to create a number of ETL pipelines involving text archives or databases given its ability to filter, and use callbacks. In the example below, we leverage duckdb
to read a fictional folder of files by US state, filter by var_filtered
, apply a callback transformation transform_fun
to var_transformed
save as parquet, and then load a folder of parquet files for analysis with Apache Arrow.
library(arrow) library(duckdb) db <- dbConnect(duckdb::duckdb()) transform_fun <- function(data) { data$var_transformed <- sqrt(data$var_transformed) data } for(state in c("DC", state.abb)) { path <- paste0("path/to/archives/", state, ".gz") ark( db, dir = paste0("output/", state), streamable_table = streamable_parquet(), # parquet files of nline rows lines = 100000, # See: https://duckdb.org/docs/data/csv tables = sprintf("read_csv_auto('%s')", path), compress = "none", # Compression meaningless for parquet as it's already compressed overwrite = T, filenames = state, # Overload tablename filter_statement = "WHERE var_filtered = 1", callback = transform_fun ) } # The result is trivial to read in with arrow ds <- open_dataset("output", partitioning = "state")
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.