eval_pipe <- FALSE
if(Sys.getenv("GLOBAL_EVAL") != "") eval_pipe <- Sys.getenv("GLOBAL_EVAL")

Spark Pipelines

library(sparklyr)
library(dplyr)

Build an Estimator (plan)

Create a simple estimator that transforms data and fits a model

  1. Use the spark_lineitems variable to create a new aggregation by order_id. Summarize the total sales and number of items r spark_lineitems %>% mutate(price = as.double(price)) %>% group_by(order_id) %>% summarise(total_sales = sum(price, na.rm = TRUE), no_items = n())

  2. Assign the code to a new variable called orders r orders <- spark_lineitems %>% mutate(price = as.double(price)) %>% group_by(order_id) %>% summarise(total_sales = sum(price, na.rm = TRUE), no_items = n())

  3. Start a new code chunk, with calling ml_pipeline(sc) r ml_pipeline(sc)

  4. Pipe the ml_pipeline() code into a ft_dplyr_transfomer() call. Use the orders variable for its argument r ml_pipeline(sc) %>% ft_dplyr_transformer(orders)

  5. Add an ft_binarizer() step that determines if the total sale is above $50. Name the new variable above_50 r ml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50)

  6. Using the ft_r_formula, add a step that sets the model's formula to: above_50 ~ no_items r ml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50) %>% ft_r_formula(above_50 ~ no_items)

  7. Finalize the pipeline by adding a ml_logistic_regression() step, no arguments are needed r ml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50) %>% ft_r_formula(above_50 ~ no_items) %>% ml_logistic_regression()

  8. Assign the code to a new variable called orders_plan r orders_plan <- ml_pipeline(sc) %>% ft_dplyr_transformer(orders) %>% ft_binarizer("total_sales", "above_50", 50) %>% ft_r_formula(above_50 ~ no_items) %>% ml_logistic_regression()

  9. Call orders_plan to confirm that all of the steps are present r orders_plan

Build a Transformer (fit)

Execute the planned changes to obtain a new model

  1. Use ml_fit() to execute the changes in order_plan using the spark_lineitems data. Assign to a new variable called orders_fit r orders_fit <- ml_fit(orders_plan, spark_lineitems)

  2. Call orders_fit to see the print-out of the newly fitted model r orders_fit

Predictions using Spark Pipelines

Overview of how to use a fitted pipeline to run predictions

  1. Use ml_transform() in order to use the orders_fit model to run predictions over spark_lineitems r orders_preds <- ml_transform(orders_fit, spark_lineitems)

  2. With count(), compare the results from above_50 against the predictions, the variable created by ml_transform() is called prediction
    r orders_preds %>% count(above_50, prediction)

Save the pipeline objects

Overview of how to save the Estimator and the Transformer

  1. Use ml_save() to save order_plan in a new folder called "saved_model" r ml_save(orders_plan, "saved_model", overwrite = TRUE)

  2. Navigate to the "saved_model" folder to inspect its contents

  3. Use ml_save() to save orders_fit in a new folder called "saved_pipeline" r ml_save(orders_fit, "saved_pipeline", overwrite = TRUE)

  4. Navigate to the "saved_pipeline" folder to inspect its contents



rstudio-conf-2020/big-data documentation built on Feb. 4, 2020, 5:24 p.m.