spark_grouped_apply: Apply an R UDF in Spark on Grouped Data

Description Usage Arguments Value Examples

Description

Groups the SparkDataFrame using the specified columns and applies the R function to each group.

Usage

1
spark_grouped_apply(.data, .f, schema, cols = NULL)

Arguments

.data

a spark_tbl

.f

function or formula, to be applied to each group partition specified by grouping column of the spark_tbl. The function .f takes as argument a key - grouping columns and a data frame - a local R data.frame. The output of .f is a local R data.frame.

schema

the schema of the resulting spark_tbl after the function is applied. The schema must match to output of .f. It has to be defined for each output column with preferred output column name and corresponding data type. Since Spark 2.3, the DDL-formatted string is also supported for the schema.

cols

(optional) string, grouping columns, if null, these are taken from the incoming data frame's groups. If columns specified here will overwrite incoming grouped data.

Value

a spark_tbl with schema as specified

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
## Not run: 
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the grouping values and the average.

df <- spark_tbl(tibble(a = c(1L, 1L, 3L),
                       b = c(1, 2, 3),
                       c = c("1", "1", "3"),
                       d = c(0.1, 0.2, 0.3)))

# Here our output contains three columns, the key which is a combination of two
# columns with data types integer and string and the mean which is a double.
schema <- StructType(
  StructField("a", IntegerType, TRUE),
  StructField("c", StringType, TRUE),
  StructField("avg", DoubleType, TRUE)
)

result <- df %>%
  group_by(a, c) %>%
  spark_grouped_apply(function(key, .df) {
    data.frame(key, mean(.df$b), stringsAsFactors = FALSE)
  }, schema) %>%
  collect

# The schema also can be specified in a DDL-formatted string and the
# function can be specified as a formula
schema <- "a INT, c STRING, avg DOUBLE"
result <- df %>%
  group_by(a, c) %>%
  spark_grouped_apply(~ data.frame(..1, mean(..2$b), stringsAsFactors = FALSE),
                    schema) %>%
  collect

result
# # A tibble: 2 x 3
#       a c       avg
#   <int> <chr> <dbl>
# 1     3 3       3
# 2     1 1       1.5

# Fits linear models on iris dataset by grouping on the 'Species' column and
# using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
# and 'Petal_Width' as training features.

iris_tbl <- spark_tbl(iris)
schema <- StructType(StructField("(Intercept)", "double"),
                     StructField("Sepal_Width", "double"),
                     StructField("Petal_Length", "double"),
                     StructField("Petal_Width", "double"))
iris_tbl %>%
  group_by(Species) %>%
  spark_grouped_apply(function(key, x) {
    m <- suppressWarnings(lm(Sepal_Length ~
                               Sepal_Width + Petal_Length + Petal_Width, x))
    data.frame(t(coef(m)))
  }, schema) %>%
  collect

## End(Not run)
# # A tibble: 3 x 4
#   `(Intercept)` Sepal_Width Petal_Length Petal_Width
#           <dbl>       <dbl>        <dbl>       <dbl>
# 1         0.700       0.330        0.946      -0.170
# 2         1.90        0.387        0.908      -0.679
# 3         2.35        0.655        0.238       0.252


## End(Not run)

danzafar/tidyspark documentation built on Sept. 30, 2020, 12:19 p.m.