knitr::opts_chunk$set( collapse = TRUE, comment = "#>" )
library(magrittr) library(DBI) library(data.table) library(Rflow) library(RETL) library(RETLflow)
# R environment .GlobalEnv[["RDATA"]] <- new.env(parent = .GlobalEnv) # establish SQLite connection / environment DB <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
RF <- rflow(path = params$rflow)
Rflows are initiated using new_rflow
function. The path
argument tells Rflow where to store cache and other necessary files.
The easiest way to create a node is to define it in a list.
list(env = "RDATA", name = "node1") %>% as_node()
Every node has to have a unique id. This can be set in the definition or it is created automatically during its initialization as a combination of environment and name separated by dot.
The most convenient way is to define our workflow in a single named list. Names are parsed into env + name automatically.
obj_defs <- list( "RDATA.table1" = list( desc = "This is an example of a simple R value target - a data.table with one key and one column of random values", tags = c("table"), type = "r_node", # r_node is expected by default so we can ommit that when defining R nodes r_expr = expression_r({ N = 10; data.table(id = 1:N, value = rnorm(N)) }) ), "DB.table1" = list( tags = c("db", "table"), type = "db_node", depends = "RDATA.table1", r_expr = expression_r({ DBI::dbWriteTable( conn = self$connection, name = self$name, value = .DATA("RDATA.table1"), overwrite = TRUE ) }) ), "DB.table2" = list( desc = "Shows usage of ETL tools from RETL and RETLflow packages", tags = c("db", "table"), type = "db_node", depends = "RDATA.table2", r_expr = expression_r({ RETL::etl_pipe(from = self$upstream[[1]], to = self) }) ), "DB.table3" = list( tags = c("db", "table"), desc = "Table is a join of ...", type = "db_node", depends = c("DB.table1", "DB.table2"), # SQL code is passed as a character value sql_code = c(" CREATE TABLE table3 AS SELECT table1.id AS \"id\", table1.value AS value1, table2.value AS value2 FROM table1, table2 WHERE table1.id = table2.id") ), list( # unnamed objects has to have env + name combo defined inside env = "RDATA", name = "table2", tags = c("table", "R"), type = "r_node", # R code can be passed as a character value as well: r_code = " { N = 20 data.table(id = 1:N, value = rnorm(N)) }" ), "OUTPUT.publish_rmd" = list( type = "rmd_node", depends = c("DB.table2"), path = file.path(params$rflow, "tmp.md"), path_rmd = file.path(params$rflow, "tmp.rmd") ) )
Nodes' definitions can now be preprocessed and added to the workflow.
obj_defs <- Rflow::process_obj_defs(obj_defs) add_nodes( objs = obj_defs, rflow = RF, cache = RF$.cache$path, verbose = TRUE )
Rflow has tools to detect missing/unused dependencies.
which(!verify_dependencies(RF), useNames = TRUE)
Nodes can be printed.
RF$RDATA.table1
Workflows can be visualized in an interactive html widget.
visRflow(RF, includeIsolated = FALSE)
Note: HTML widget cannot be diplayed on github. Screenshots:
We have multiple ways how to specify, which targets should be made.
We can select all of them:
make(RF)
Or choose them by tags:
make(RF, tags = "db")
Or choose one specific node
make(RF$DB.table1) # equivalent to RF$DB.table1$make()
DBI::dbReadTable(DB, "table1") DBI::dbReadTable(DB, "table2") DBI::dbReadTable(DB, "table3")
# disconnect database DBI::dbDisconnect(DB) # close logs close_log(RF) # cleanup the file system clean_cache(RF) clean_persistence(RF) unlink(x = file.path(params$rflow, ".rflow"), recursive = TRUE)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.