spark_apply: Apply an R UDF in Spark

Description Usage Arguments Details Value Examples

Description

Applies an R User-Defined Function (UDF) to each partition of a spark_tbl.

Usage

1
spark_apply(.data, .f, schema)

Arguments

.data

a spark_tbl

.f

a function or formula to be applied to each partition of the spark_tbl. Can be an anonymous function e.g. ~ head(. 10) .f should have only one parameter, to which a R data.frame corresponds to each partition will be passed. The output of func should be an R data.frame.

schema

The schema of the resulting SparkDataFrame after the function is applied. It must match the output of func. Since Spark 2.3, the DDL-formatted string is also supported for the schema.

Details

spark_apply is a re-implementation of SparkR::dapply. Importantly, spark_apply (and SparkR::dapply) will scan the function being passed and automatically broadcast any values from the .GlobalEnv that are being referenced. Functions from dplyr are always availiable by default.

Value

a spark_tbl

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
## Not run: 
iris_tbl <- spark_tbl(iris)

# note, my_var will be broadcasted if we include it in the function
my_var <- 1

iris_tbl %>%
  spark_apply(function(.df) head(.df, my_var),
            schema(iris_tbl)) %>%
  collect

# but if you want to use a library (other than dplyr), you need to load it
# in the UDF
iris_tbl %>%
  spark_apply(function(.df) {
    require(purrr)
    .df %>%
      map_df(first)
  }, schema(iris_tbl)) %>%
  collect

# filter and add a column:
df <- spark_tbl(
  data.frame(a = c(1L, 2L, 3L),
             b = c(1, 2, 3),
             c = c("1","2","3"))
)

schema <- StructType(StructField("a", "integer"),
                     StructField("b", "double"),
                     StructField("c", "string"),
                     StructField("add", "integer"))

df %>%
  spark_apply(function(x) {
    x %>%
      filter(a > 1) %>%
      mutate(add = a + 1L)
  },
  schema) %>%
  collect

# The schema also can be specified in a DDL-formatted string.
schema <- "a INT, d DOUBLE, c STRING, add INT"
df %>%
  spark_apply(function(x) {
    x %>%
      filter(a > 1) %>%
      mutate(add = a + 1L)
  },
  schema) %>%
  collect

## End(Not run)

danzafar/tidyspark documentation built on Sept. 30, 2020, 12:19 p.m.