John Mount October 13, 2019
This
is an tutorial on how to use window functions in either the R
rquery package, or in the
Python data_algebra
package (R example
here,
Python example
here).
(Note: these examples require at least rqdatatable 1.2.3, and
rquery 1.3.9 which may not be up on CRAN yet.)
The rquery provides a
simplified (though verbose) unified interface to Pandas and SQL data
transforms, including windows functions. (Note: for a Python of this
please see
here.)
Let’s work an example. First bring in our packages.
library(wrapr)
library(rquery)
library(rqdatatable)
Now some example data.
d <- data.frame(
  g = c('a', 'b', 'b', 'c', 'c', 'c'),
  x = c(1, 4, 5, 7, 8, 9),
  v = c(10, 40, 50, 70, 80, 90),
  stringsAsFactors = FALSE)
knitr::kable(d)
| g | x | v | | :- | -: | -: | | a | 1 | 10 | | b | 4 | 40 | | b | 5 | 50 | | c | 7 | 70 | | c | 8 | 80 | | c | 9 | 90 |
And we can run a number of ordered and un-ordered window functions (the
distinction is given by if there is an orderby argument present).
table_description = local_td(d)
shift <- data.table::shift
ops <- table_description %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         ngroup := ngroup(),
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 
d %.>%
  ops %.>%
  knitr::kable(.)
| g | x | v | row_number | v_shift | cumsum_v | ngroup | size | max_v | min_v | sum_v | mean_v | | :- | -: | -: | ----------: | -------: | --------: | -----: | ---: | -----: | -----: | -----: | ------: | | a | 1 | 10 | 1 | NA | 10 | 1 | 1 | 10 | 10 | 10 | 10 | | b | 4 | 40 | 1 | NA | 40 | 2 | 2 | 50 | 40 | 90 | 45 | | b | 5 | 50 | 2 | 40 | 90 | 2 | 2 | 50 | 40 | 90 | 45 | | c | 7 | 70 | 1 | NA | 70 | 3 | 3 | 90 | 70 | 240 | 80 | | c | 8 | 80 | 2 | 70 | 150 | 3 | 3 | 90 | 70 | 240 | 80 | | c | 9 | 90 | 3 | 80 | 240 | 3 | 3 | 90 | 70 | 240 | 80 |
Note: we are taking care in separating opeations beween the ordered block and un-ordered block. In databases, the presence of an order constraint in the window function often switches the operation to a cumulative mode.
One of the benefits of rquery is the commands are saved in an object.
cat(format(ops))
## mk_td("d", c(
##   "g",
##   "x",
##   "v")) %.>%
##  extend(.,
##   row_number := row_number(),
##   v_shift := shift(v),
##   cumsum_v := cumsum(v),
##   partitionby = c('g'),
##   orderby = c('x'),
##   reverse = c()) %.>%
##  extend(.,
##   ngroup := ngroup(),
##   size := n(),
##   max_v := max(v),
##   min_v := min(v),
##   sum_v := sum(v),
##   mean_v := mean(v),
##   partitionby = c('g'),
##   orderby = c(),
##   reverse = c())
We can also present a diagram of the operator chain.
ops %.>%
  op_diagram(.) %.>% 
  DiagrammeR::grViz(.)

And these commands can be re-used and even exported to SQL (including large scale SQL such as PostgreSQL, Apache Spark, or Google Big Query).
For a simple demonstration we will use small-scale SQL as realized in SQLite.
raw_connection <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
RSQLite::initExtension(raw_connection)
db <- rquery_db_info(
  connection = raw_connection,
  is_dbi = TRUE,
  connection_options = rq_connection_tests(raw_connection))
ops_db <- table_description %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 
rq_copy_to(db, 'd',
           d,
           temporary = TRUE, 
           overwrite = TRUE)
## [1] "mk_td(\"d\", c( \"g\", \"x\", \"v\"))"
sql1 <- to_sql(ops_db, db)
cat(sql1)
## SELECT
##  `g`,
##  `x`,
##  `v`,
##  `row_number`,
##  `v_shift`,
##  `cumsum_v`,
##  COUNT ( 1 ) OVER (  PARTITION BY `g` ) AS `size`,
##  max ( `v` ) OVER (  PARTITION BY `g` ) AS `max_v`,
##  min ( `v` ) OVER (  PARTITION BY `g` ) AS `min_v`,
##  sum ( `v` ) OVER (  PARTITION BY `g` ) AS `sum_v`,
##  AVG ( `v` ) OVER (  PARTITION BY `g` ) AS `mean_v`
## FROM (
##  SELECT
##   `g`,
##   `x`,
##   `v`,
##   row_number ( ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `row_number`,
##   LAG ( `v` ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `v_shift`,
##   SUM ( `v` ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `cumsum_v`
##  FROM (
##   SELECT
##    `g`,
##    `x`,
##    `v`
##   FROM
##    `d`
##   ) tsql_14658958129420030500_0000000000
##  ) tsql_14658958129420030500_0000000001
And we can execute this SQL either to materialize a remote result (which involves no data motion, as we send the SQL commands to the database, not move the data to/from R), or to bring a result back from the database to R.
res1_db <- execute(db, ops_db)
knitr::kable(res1_db)
| g | x | v | row_number | v_shift | cumsum_v | size | max_v | min_v | sum_v | mean_v | | :- | -: | -: | ----------: | -------: | --------: | ---: | -----: | -----: | -----: | ------: | | a | 1 | 10 | 1 | NA | 10 | 1 | 10 | 10 | 10 | 10 | | b | 4 | 40 | 1 | NA | 40 | 2 | 50 | 40 | 90 | 45 | | b | 5 | 50 | 2 | 40 | 90 | 2 | 50 | 40 | 90 | 45 | | c | 7 | 70 | 1 | NA | 70 | 3 | 90 | 70 | 240 | 80 | | c | 8 | 80 | 2 | 70 | 150 | 3 | 90 | 70 | 240 | 80 | | c | 9 | 90 | 3 | 80 | 240 | 3 | 90 | 70 | 240 | 80 |
Notice we didn’t calculate the group-id rgroup in the SQL version.
This is because this is a much less common window function (and not
often used in applications). This is also only interesting when we are
using a composite key (else the single key column is already the
per-group id). So not all data_algebra pipelines can run in all
environments. However, we can compute (arbitrary) group IDs in a domain
independent manner as follows.
id_ops_a = table_description %.>%
  project(.,
          groupby = 'g') %.>%
  extend(.,
         ngroup:= row_number(),
         orderby = 'g')
id_ops_b = table_description %.>%
    natural_join(.,
                 id_ops_a, by = 'g', jointype = 'LEFT')
cat(format(id_ops_b))
## mk_td("d", c(
##   "g",
##   "x",
##   "v")) %.>%
##  natural_join(.,
##   mk_td("d", c(
##     "g",
##     "x",
##     "v")) %.>%
##    project(., ,
##     groupby = c('g')) %.>%
##    extend(.,
##     ngroup := row_number()),
##   jointype = "LEFT", by = c('g'))
Here we land the result in the database, without moving data through R.
table_2 <- materialize(db, id_ops_b, 'remote_result')
table_2
## [1] "mk_td(\"remote_result\", c( \"g\", \"x\", \"v\", \"ngroup\"))"
And we later copy it over to look at.
res2_db <- execute(db, table_2)
knitr::kable(res2_db)
| g | x | v | ngroup | | :- | -: | -: | -----: | | a | 1 | 10 | 1 | | b | 4 | 40 | 2 | | b | 5 | 50 | 2 | | c | 7 | 70 | 3 | | c | 8 | 80 | 3 | | c | 9 | 90 | 3 |
And we can use the same pipeline in R.
d %.>% 
  id_ops_b %.>%
  knitr::kable(.)
| g | v | x | ngroup | | :- | -: | -: | -----: | | a | 10 | 1 | 1 | | b | 40 | 4 | 2 | | b | 50 | 5 | 2 | | c | 70 | 7 | 3 | | c | 80 | 8 | 3 | | c | 90 | 9 | 3 |
And we can diagram the group labeling operation.
id_ops_b %.>%
  op_diagram(., merge_tables = TRUE) %.>% 
  DiagrammeR::grViz(.)

Or all the steps in one sequence.
all_ops <- id_ops_b %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 
all_ops %.>%
  op_diagram(., merge_tables = TRUE) %.>% 
  DiagrammeR::grViz(.)

And we can run this whole sequence with data.table.
d %.>% 
  all_ops %.>%
  knitr::kable(.)
| g | v | x | ngroup | row_number | v_shift | cumsum_v | size | max_v | min_v | sum_v | mean_v | | :- | -: | -: | -----: | ----------: | -------: | --------: | ---: | -----: | -----: | -----: | ------: | | a | 10 | 1 | 1 | 1 | NA | 10 | 1 | 10 | 10 | 10 | 10 | | b | 40 | 4 | 2 | 1 | NA | 40 | 2 | 50 | 40 | 90 | 45 | | b | 50 | 5 | 2 | 2 | 40 | 90 | 2 | 50 | 40 | 90 | 45 | | c | 70 | 7 | 3 | 1 | NA | 70 | 3 | 90 | 70 | 240 | 80 | | c | 80 | 8 | 3 | 2 | 70 | 150 | 3 | 90 | 70 | 240 | 80 | | c | 90 | 9 | 3 | 3 | 80 | 240 | 3 | 90 | 70 | 240 | 80 |
Or in the database (via automatic SQL generation).
all_ops %.>%
  execute(db, .) %.>%
  knitr::kable(.)
| g | x | v | ngroup | row_number | v_shift | cumsum_v | size | max_v | min_v | sum_v | mean_v | | :- | -: | -: | -----: | ----------: | -------: | --------: | ---: | -----: | -----: | -----: | ------: | | a | 1 | 10 | 1 | 1 | NA | 10 | 1 | 10 | 10 | 10 | 10 | | b | 4 | 40 | 2 | 1 | NA | 40 | 2 | 50 | 40 | 90 | 45 | | b | 5 | 50 | 2 | 2 | 40 | 90 | 2 | 50 | 40 | 90 | 45 | | c | 7 | 70 | 3 | 1 | NA | 70 | 3 | 90 | 70 | 240 | 80 | | c | 8 | 80 | 3 | 2 | 70 | 150 | 3 | 90 | 70 | 240 | 80 | | c | 9 | 90 | 3 | 3 | 80 | 240 | 3 | 90 | 70 | 240 | 80 |
# clean up
DBI::dbDisconnect(raw_connection)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.