knitr::opts_chunk$set(
  collapse = TRUE,
  comment = " # "
)
options(width =100)

replyr::gapply gives you the ability to apply a custom pipeline once per group of a data item with a user specified in-group order.

data.frame example.

library('dplyr')
library('replyr')
d <- data.frame(group=c(1,1,2,2,2),
                order=c(.1,.2,.3,.4,.5),
                values=c(10,20,2,4,8))

# User supplied window functions.  These depend on known column names and
# the data back-end matching function names (such as cumsum).  The idea
# the user supplies one of these to gapply, and gapply
# organizes the calcuation (spliting on gcolumn, and optionally ordering
# on ocolumn).
cumulative_sum <- . %>% arrange(order) %>% mutate(cv=cumsum(values))

# split version of sumgroup
sumgroupS <- . %>% summarize(group=min(group), # pseudo aggregation, as group constant in groups
                   minv=min(values),maxv=max(values))
# group version of sumgroup
sumgroupG <- . %>% summarize(minv=min(values),maxv=max(values))
sumgroup <- list(group_by=sumgroupG,split=sumgroupS,extract=sumgroupS)
sumgroup <- list('TRUE'=sumgroupG,'FALSE'=sumgroupS)

rank_in_group <- . %>% mutate(constcol=1) %>% mutate(rank=cumsum(constcol)) %>% select(-constcol)

In memory example.

for(partitionMethod in c('group_by','split','extract')) {
  print(partitionMethod)
  print('cumulative sum example')
  print(d %>% gapply('group',cumulative_sum,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('summary example')
  print(d %>% gapply('group',sumgroup[[partitionMethod]],
                     partitionMethod=partitionMethod))
  print('ranking example')
  print(d %>% gapply('group',rank_in_group,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('ranking example (decreasing)')
  print(d %>% gapply('group',rank_in_group,ocolumn='order',decreasing=TRUE,
                     partitionMethod=partitionMethod))
}

PostgreSQL example.

#below only works for services which have a cumsum operator
my_db <- dplyr::src_postgres(host = 'localhost',port = 5432,user = 'postgres',password = 'pg')
dR <- replyr_copy_to(my_db,d,'dR')

for(partitionMethod in c('group_by','extract')) {
  print(partitionMethod)
  print('cumulative sum example')
  print(dR %>% gapply('group',cumulative_sum,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('summary example')
  print(dR %>% gapply('group',sumgroup[[partitionMethod]],
                     partitionMethod=partitionMethod))
  print('ranking example')
  print(dR %>% gapply('group',rank_in_group,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('ranking example (decreasing)')
  print(dR %>% gapply('group',rank_in_group,ocolumn='order',decreasing=TRUE,
                     partitionMethod=partitionMethod))
}

my_db <- NULL; gc();

Spark example.

#below only works for services which have a cumsum operator
my_db <- sparklyr::spark_connect(version='2.2.0', 
                                 master = "local")
class(my_db)
my_db$spark_home
dS <- replyr_copy_to(my_db,d,'dS')

for(partitionMethod in c('group_by','extract')) {
  print(partitionMethod)
  print('cumulative sum example')
  print(dS %>% gapply('group',cumulative_sum,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('summary example')
  print(dS %>% gapply('group',sumgroup[[partitionMethod]],
                     partitionMethod=partitionMethod))
  print('ranking example')
  print(dS %>% gapply('group',rank_in_group,ocolumn='order',
                     partitionMethod=partitionMethod))
  print('ranking example (decreasing)')
  print(dS %>% gapply('group',rank_in_group,ocolumn='order',decreasing=TRUE,
                     partitionMethod=partitionMethod))
}
my_db <- NULL; gc();


WinVector/replyr documentation built on Oct. 22, 2020, 8:07 p.m.