Win-Vector LLC 12/11/2017
dplyr
SQL for the rquery
example. Notice the irrelevant columns live a few steps into the query sequence. Also notice the dplyr
SQL
does have less nesting than the rquery
SQL
.
suppressPackageStartupMessages(library("dplyr"))
## Warning: package 'dplyr' was built under R version 3.5.1
packageVersion("dplyr")
## [1] '0.7.6'
my_db <- sparklyr::spark_connect(version='2.2.0',
master = "local")
d_local <- wrapr::build_frame(
'subjectID', 'surveyCategory' , 'assessmentTotal', 'irrelevantCol1', 'irrelevantCol2' |
1 , 'withdrawal behavior', 5 , 'irrel1' , 'irrel2' |
1 , 'positive re-framing', 2 , 'irrel1' , 'irrel2' |
2 , 'withdrawal behavior', 3 , 'irrel1' , 'irrel2' |
2 , 'positive re-framing', 4 , 'irrel1' , 'irrel2' )
d <- dplyr::copy_to(my_db,
d_local,
name = 'd',
temporary = TRUE,
overwrite = FALSE)
scale <- 0.237
dplyr_pipeline <- d %>%
group_by(subjectID) %>%
mutate(probability =
exp(assessmentTotal * scale)/
sum(exp(assessmentTotal * scale), na.rm=TRUE)) %>%
arrange(probability, surveyCategory) %>%
filter(row_number() == n()) %>%
ungroup() %>%
rename(diagnosis = surveyCategory) %>%
select(subjectID, diagnosis, probability) %>%
arrange(subjectID)
# directly prints, can not easilly and reliable capture SQL
show_query(dplyr_pipeline)
## <SQL>
## SELECT `subjectID`, `diagnosis`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory` AS `diagnosis`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`, row_number() OVER (PARTITION BY `subjectID` ORDER BY `probability`, `surveyCategory`) AS `zzz2`, COUNT(*) OVER (PARTITION BY `subjectID`) AS `zzz3`
## FROM (SELECT *
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, EXP(`assessmentTotal` * 0.237) / sum(EXP(`assessmentTotal` * 0.237)) OVER (PARTITION BY `subjectID`) AS `probability`
## FROM `d`) `yjryiymakx`
## ORDER BY `probability`, `surveyCategory`) `ktopcuqwie`) `efcaihutre`
## WHERE (`zzz2` = `zzz3`)) `ykxoyqwoon`) `xkcntyvcwj`
## ORDER BY `subjectID`
# directly prints, can not easilly and reliable capture SQL
explain(dplyr_pipeline)
## <SQL>
## SELECT `subjectID`, `diagnosis`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory` AS `diagnosis`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, `probability`, row_number() OVER (PARTITION BY `subjectID` ORDER BY `probability`, `surveyCategory`) AS `zzz4`, COUNT(*) OVER (PARTITION BY `subjectID`) AS `zzz5`
## FROM (SELECT *
## FROM (SELECT `subjectID`, `surveyCategory`, `assessmentTotal`, `irrelevantCol1`, `irrelevantCol2`, EXP(`assessmentTotal` * 0.237) / sum(EXP(`assessmentTotal` * 0.237)) OVER (PARTITION BY `subjectID`) AS `probability`
## FROM `d`) `pghuhkzufg`
## ORDER BY `probability`, `surveyCategory`) `utjrlsrkrs`) `hghlzewqvr`
## WHERE (`zzz4` = `zzz5`)) `ebewmmjmnf`) `lqrqugaelb`
## ORDER BY `subjectID`
##
## <PLAN>
# the the plan by hand
dplyr_plan <- DBI::dbGetQuery(my_db, paste("EXPLAIN ",
dbplyr::remote_query(dplyr_pipeline)))
cat(dplyr_plan[1, 1])
## == Physical Plan ==
## *Sort [subjectID#15 ASC NULLS FIRST], true, 0
## +- Exchange rangepartitioning(subjectID#15 ASC NULLS FIRST, 4)
## +- *Project [subjectID#15, surveyCategory#16 AS diagnosis#142, probability#139]
## +- *Filter (isnotnull(zzz6#140) && (cast(zzz6#140 as bigint) = zzz7#141L))
## +- Window [count(1) windowspecdefinition(subjectID#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS zzz7#141L], [subjectID#15]
## +- Window [row_number() windowspecdefinition(subjectID#15, probability#139 ASC NULLS FIRST, surveyCategory#16 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS zzz6#140], [subjectID#15], [probability#139 ASC NULLS FIRST, surveyCategory#16 ASC NULLS FIRST]
## +- *Sort [subjectID#15 ASC NULLS FIRST, probability#139 ASC NULLS FIRST, surveyCategory#16 ASC NULLS FIRST], false, 0
## +- Exchange hashpartitioning(subjectID#15, 4)
## +- *Sort [probability#139 ASC NULLS FIRST, surveyCategory#16 ASC NULLS FIRST], true, 0
## +- Exchange rangepartitioning(probability#139 ASC NULLS FIRST, surveyCategory#16 ASC NULLS FIRST, 4)
## +- *Project [subjectID#15, surveyCategory#16, (EXP((assessmentTotal#17 * 0.237)) / _we0#149) AS probability#139]
## +- Window [sum(_w0#148) windowspecdefinition(subjectID#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#149], [subjectID#15]
## +- *Sort [subjectID#15 ASC NULLS FIRST], false, 0
## +- Exchange hashpartitioning(subjectID#15, 4)
## +- *Project [subjectID#15, surveyCategory#16, assessmentTotal#17, EXP((assessmentTotal#17 * 0.237)) AS _w0#148]
## +- InMemoryTableScan [subjectID#15, surveyCategory#16, assessmentTotal#17]
## +- InMemoryRelation [subjectID#15, surveyCategory#16, assessmentTotal#17, irrelevantCol1#18, irrelevantCol2#19], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `d`
## +- Scan ExistingRDD[subjectID#15,surveyCategory#16,assessmentTotal#17,irrelevantCol1#18,irrelevantCol2#19]
# run
dplyr_pipeline
## # Source: lazy query [?? x 3]
## # Database: spark_connection
## # Ordered by: probability, surveyCategory, subjectID
## subjectID diagnosis probability
## <dbl> <chr> <dbl>
## 1 1 withdrawal behavior 0.671
## 2 2 positive re-framing 0.559
library("rquery")
scale <- 0.237
rquery_pipeline <- db_td(my_db, "d") %.>%
extend_nse(.,
probability %:=%
exp(assessmentTotal * scale)) %.>%
normalize_cols(.,
"probability",
partitionby = 'subjectID') %.>%
pick_top_k(.,
partitionby = 'subjectID',
orderby = c('probability', 'surveyCategory'),
reverse = c('probability')) %.>%
rename_columns(., 'diagnosis' %:=% 'surveyCategory') %.>%
select_columns(., c('subjectID',
'diagnosis',
'probability')) %.>%
orderby(., cols = 'subjectID')
rquery_plan <- DBI::dbGetQuery(my_db, paste("EXPLAIN ",
to_sql(rquery_pipeline, my_db)))
cat(rquery_plan[1, 1])
## == Physical Plan ==
## *Sort [subjectID#15 ASC NULLS FIRST], true, 0
## +- Exchange rangepartitioning(subjectID#15 ASC NULLS FIRST, 4)
## +- *Project [subjectID#15, surveyCategory#16 AS diagnosis#275, probability#271]
## +- *Filter (isnotnull(row_number#272) && (row_number#272 <= 1))
## +- Window [row_number() windowspecdefinition(subjectID#15, probability#271 DESC NULLS LAST, surveyCategory#16 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS row_number#272], [subjectID#15], [probability#271 DESC NULLS LAST, surveyCategory#16 ASC NULLS FIRST]
## +- *Sort [subjectID#15 ASC NULLS FIRST, probability#271 DESC NULLS LAST, surveyCategory#16 ASC NULLS FIRST], false, 0
## +- *Project [(probability#270 / _we0#280) AS probability#271, subjectID#15, surveyCategory#16]
## +- Window [sum(probability#270) windowspecdefinition(subjectID#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#280], [subjectID#15]
## +- *Sort [subjectID#15 ASC NULLS FIRST], false, 0
## +- Exchange hashpartitioning(subjectID#15, 4)
## +- *Project [subjectID#15, surveyCategory#16, EXP((assessmentTotal#17 * 0.237)) AS probability#270]
## +- InMemoryTableScan [subjectID#15, surveyCategory#16, assessmentTotal#17]
## +- InMemoryRelation [subjectID#15, surveyCategory#16, assessmentTotal#17, irrelevantCol1#18, irrelevantCol2#19], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `d`
## +- Scan ExistingRDD[subjectID#15,surveyCategory#16,assessmentTotal#17,irrelevantCol1#18,irrelevantCol2#19]
sparklyr::spark_disconnect(my_db)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.