Connect to the Apache Spark
cluster and copy in 3 tables.
library("dplyr")
## Warning: package 'dplyr' was built under R version 3.5.1
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library("sparklyr")
db <- sparklyr::spark_connect(version='2.2.0',
master = "local")
d <- data.frame(key = 1,
val = "a",
stringsAsFactors = FALSE)
d1 <- dplyr::copy_to(db, d, "d1", overwrite = TRUE)
d2 <- dplyr::copy_to(db, d, "d2", overwrite = TRUE)
d3 <- dplyr::copy_to(db, d, "d3", overwrite = TRUE)
Try to use sparklyr
/dplyr
to join the tables. Dies due to poor naming of dupicate columns.
# works
d1 %>%
left_join(., d2, by = "key", suffix = c("_x", "_y")) %>%
left_join(., d3, by = "key", suffix = c("_x", "_y"))
## # Source: lazy query [?? x 4]
## # Database: spark_connection
## key val_x val_y val
## <dbl> <chr> <chr> <chr>
## 1 1 a a a
# bad query
d1 %>%
left_join(., d2, by = "key") %>%
left_join(., d3, by = "key") %>%
dbplyr::remote_query(.)
## <SQL> SELECT `TBL_LEFT`.`key` AS `key`, `TBL_LEFT`.`val`.`x` AS `val.x`, `TBL_LEFT`.`val`.`y` AS `val.y`, `TBL_RIGHT`.`val` AS `val`
## FROM (SELECT `TBL_LEFT`.`key` AS `key`, `TBL_LEFT`.`val` AS `val.x`, `TBL_RIGHT`.`val` AS `val.y`
## FROM `d1` AS `TBL_LEFT`
## LEFT JOIN `d2` AS `TBL_RIGHT`
## ON (`TBL_LEFT`.`key` = `TBL_RIGHT`.`key`)
## ) `TBL_LEFT`
## LEFT JOIN `d3` AS `TBL_RIGHT`
## ON (`TBL_LEFT`.`key` = `TBL_RIGHT`.`key`)
# fails
d1 %>%
left_join(., d2, by = "key") %>%
left_join(., d3, by = "key")
## Error: org.apache.spark.sql.AnalysisException: cannot resolve '`TBL_LEFT.val.x`' given input columns: [val.y, key, val, val.x, key]; line 1 pos 34;
## 'Project [key#273 AS key#276, 'TBL_LEFT.val.x AS val.x#277, 'TBL_LEFT.val.y AS val.y#278, val#159 AS val#279]
## +- Join LeftOuter, (key#273 = key#158)
## :- SubqueryAlias TBL_LEFT
## : +- Project [key#12 AS key#273, val#13 AS val.x#274, val#86 AS val.y#275]
## : +- Join LeftOuter, (key#12 = key#85)
## : :- SubqueryAlias TBL_LEFT
## : : +- SubqueryAlias d1
## : : +- LogicalRDD [key#12, val#13]
## : +- SubqueryAlias TBL_RIGHT
## : +- SubqueryAlias d2
## : +- LogicalRDD [key#85, val#86]
## +- SubqueryAlias TBL_RIGHT
## +- SubqueryAlias d3
## +- LogicalRDD [key#158, val#159]
##
## at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
## at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
## at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
## at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
## at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
## at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
## at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
## at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
## at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
## at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
## at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
## at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
## at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
## at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
## at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
## at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
## at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
## at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
## at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
## at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
## at scala.collection.immutable.List.foreach(List.scala:381)
## at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
## at scala.collection.immutable.List.map(List.scala:285)
## at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
## at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
## at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
## at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
## at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
## at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
## at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
## at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
## at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
## at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
## at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
## at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
## at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
## at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
## at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
## at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
## at java.lang.reflect.Method.invoke(Method.java:497)
## at sparklyr.Invoke.invoke(invoke.scala:137)
## at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
## at sparklyr.StreamHandler.read(stream.scala:66)
## at sparklyr.BackendHandler.channelRead0(handler.scala:51)
## at sparklyr.BackendHandler.channelRead0(handler.scala:4)
## at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
## at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
## at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
## at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
## at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
## at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
## at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
## at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
## at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
## at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
## at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
## at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
## at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
## at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
## at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
## at java.lang.Thread.run(Thread.java:745)
Similar task with rquery
. Note rquery
natural join does not rename (it coalesces), so we need a helper function.
library("rquery")
dbopts <- rq_connection_tests(db)
print(dbopts)
## $rquery.DBIConnection_spark_connection_spark_shell_connection.use_DBI_dbListFields
## [1] FALSE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.use_DBI_dbRemoveTable
## [1] FALSE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.use_DBI_dbExecute
## [1] TRUE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.create_temporary
## [1] FALSE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.control_temporary
## [1] TRUE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.control_rownames
## [1] FALSE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.use_DBI_dbExistsTable
## [1] TRUE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.check_logical_column_types
## [1] FALSE
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.fn_name_map
## mean
## "avg"
##
## $rquery.DBIConnection_spark_connection_spark_shell_connection.zero_arg_fn_map
## random
## "rand"
options(dbopts)
print(getDBOption(db, "control_rownames"))
## [1] FALSE
d1d <- db_td(db, "d1")
d2d <- db_td(db, "d2")
d3d <- db_td(db, "d3")
# disambiguate columns
key <- "key"
col_table <- lapply(
list(d1d, d2d, d3d),
function(di) {
data.frame(table = di$table_name,
cols = column_names(di),
stringsAsFactors = FALSE)
})
col_table <- do.call(rbind, col_table)
col_table$is_key <- col_table$cols %in% key
col_table$new_cols <- col_table$cols
col_table$new_cols[!col_table$is_key] <- make.names(col_table$cols[!col_table$is_key],
unique = TRUE)
col_table$new_cols <- gsub(".", "_", col_table$new_cols, fixed = TRUE)
knitr::kable(col_table)
| table | cols | is_key | new_cols | |:------|:-----|:--------|:----------| | d1 | key | TRUE | key | | d1 | val | FALSE | val | | d2 | key | TRUE | key | | d2 | val | FALSE | val_1 | | d3 | key | TRUE | key | | d3 | val | FALSE | val_2 |
rename_it <- function(dd, col_table) {
ct <- col_table[(col_table$table==dd$table_name) &
(col_table$cols != col_table$new_cols), , drop = FALSE]
if(nrow(ct)<=0) {
return(dd)
}
mp <- ct$cols
names(mp) <- ct$new_cols
rename_columns(dd, mp)
}
d1r <- rename_it(d1d, col_table)
d2r <- rename_it(d2d, col_table)
d3r <- rename_it(d3d, col_table)
optree <- d1r %.>%
natural_join(., d2r, by = key, jointype = "LEFT") %.>%
natural_join(., d3r, by = key, jointype = "LEFT")
# cat(format(optree))
optree %.>%
op_diagram(.) %.>%
DiagrammeR::DiagrammeR(diagram = ., type = "grViz") %.>%
DiagrammeRsvg::export_svg(.) %.>%
charToRaw(.) %.>%
rsvg::rsvg_png(., file = "NestedJoin_diagram.png")
cat(to_sql(optree, db))
## SELECT
## COALESCE(`tsql_31891855505549758549_0000000004`.`key`, `tsql_31891855505549758549_0000000005`.`key`) AS `key`,
## `tsql_31891855505549758549_0000000004`.`val` AS `val`,
## `tsql_31891855505549758549_0000000004`.`val_1` AS `val_1`,
## `tsql_31891855505549758549_0000000005`.`val_2` AS `val_2`
## FROM (
## SELECT
## COALESCE(`tsql_31891855505549758549_0000000001`.`key`, `tsql_31891855505549758549_0000000002`.`key`) AS `key`,
## `tsql_31891855505549758549_0000000001`.`val` AS `val`,
## `tsql_31891855505549758549_0000000002`.`val_1` AS `val_1`
## FROM (
## SELECT
## `key`,
## `val`
## FROM
## `d1`
## ) `tsql_31891855505549758549_0000000001`
## LEFT JOIN (
## SELECT
## `key` AS `key`,
## `val` AS `val_1`
## FROM (
## SELECT
## `key`,
## `val`
## FROM
## `d2`
## ) tsql_31891855505549758549_0000000000
## ) `tsql_31891855505549758549_0000000002`
## ON
## `tsql_31891855505549758549_0000000001`.`key` = `tsql_31891855505549758549_0000000002`.`key`
## ) `tsql_31891855505549758549_0000000004`
## LEFT JOIN (
## SELECT
## `key` AS `key`,
## `val` AS `val_2`
## FROM (
## SELECT
## `key`,
## `val`
## FROM
## `d3`
## ) tsql_31891855505549758549_0000000003
## ) `tsql_31891855505549758549_0000000005`
## ON
## `tsql_31891855505549758549_0000000004`.`key` = `tsql_31891855505549758549_0000000005`.`key`
execute(db, optree) %.>%
knitr::kable(.)
| key| val | val_1 | val_2 | |----:|:----|:-------|:-------| | 1| a | a | a |
Clean up.
sparklyr::spark_disconnect(db)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.