A set of time and cost (import/compute) saving functions as tools:
db_map_csv - Use a sample CSV file to create a Hive table or pass the 'columns' argument to spark_read_csv. This technique is meant to cut down the time of reading CSV files into the Spark context. It does that by either passing the column names and types in spark_read_csv or by using SQL to create the table
db_histogram - Computes the bins of the histogram in the server. The function will use the first column in the data set. Using the dplyr::select statement prior to calling db_histogram will ease adding or removing a plotting step. The 'data' value for the 'db' argument will return a data frame instead of a plot.
db_raster - Computes the frequency of x, y intersections and concentrates them based on the resolution. The function will use the first two columns in the data set. Using the dplyr::select statement prior to calling db_raster will ease adding or removing a plotting step.. The 'data' value for the 'db' argument will return a data frame instead of a plot.
knitr::opts_chunk$set(echo = TRUE)
You can install dbutilities from GitHub
devtools::install_github("edgararuiz/dbutilities")
We will review how to use dbutilities with the following example. We'll start by loading the needed libraries. We will use the nycflights13 as the data source for this example
# Required library(tidyverse) library(sparklyr) library(dbutilities) # Only for the example library(nycflights13)
# Required library(tidyverse) library(sparklyr) library(dbutilities) # Only for the example library(nycflights13)
Calling the flights table into R memory
data("flights") flights
We will use a local Spark context
conf <- spark_config() conf$`sparklyr.shell.driver-memory` <- "16G" sc <- spark_connect(master = "local", version = "2.1.0", config = conf)
We will create a CSV file based on the flights table to use as our source. The file is then placed on a folder that should contain only CSV files with the exact same layout.
if(!dir.exists("csv"))dir.create("csv") write_csv(flights, "csv/flights.csv") list.files("csv/")
The following db_map_csv call will return a character variable containing the SQL statement that can be used to create the table.
create_sql <- db_map_csv(sample_file = "csv/flights.csv", db = "hive", dir_location = file.path(getwd(), "csv"), table_name = "sql_flights") create_sql
Using the DBI package, we will pass the SQL statement in the resulting create_sql
DBI::dbGetQuery(sc, create_sql) tbl(sc, "sql_flights")
The following db_map_csv command will return a list that contains all of the field names. Because currently, the spark_read_csv command does not like NA's in numeric fields, all fields are mapped as character. They can then be converted to the proper type using dplyr.
flights_columns <- db_map_csv(sample_file = "csv/flights.csv") flights_columns
The resulting list can then be passed in the columns argument. Also, make sure to pass FALSE in the infer_schema argument. This technique looked to be the fastest way of importing a CSV file into Spark, see: http://spark.rstudio.com/example-s3.html
flights_noinfer <- spark_read_csv(sc, name = "noinfer_flights", path = "csv/", infer_schema = FALSE, columns = flights_columns) tbl(sc, "noinfer_flights")
Copying the flights data frame into Spark
flights <- copy_to(sc, flights, "spark_flights")
We will select the distance field and then pass the db_histogram function using all of the defaults. This will return a ggplot with a 30 bin histogram.
flights %>% filter(arr_delay < 100) %>% select(arr_delay) %>% db_histogram()
We can also control the number of bins returned.
flights %>% filter(arr_delay < 100) %>% select(arr_delay) %>% db_histogram(bins = 10)
Passing the "data" in the output argument will return a data frame with the results. This is intended for the user to plot with a different package.
flights %>% filter(arr_delay < 100) %>% select(arr_delay) %>% db_histogram(output = "data")
We will select the arr_delay and dep_delay fields to create a raster plot. The defaults return a ggplot with a resolutions of 300.
flights %>% select(arr_delay, dep_delay) %>% db_raster()
Reducing the resolution returns larger squares and, most importantly, less records into R memory.
flights %>% select(arr_delay, dep_delay) %>% db_raster(resolution = 20)
Just like db_histogram, passing "data" to the output argument returns a data frame.
flights %>% select(arr_delay, dep_delay) %>% db_raster(output = "data")
spark_disconnect(sc)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.