knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
library(magrittr)
library(DBI)
library(data.table)
library(Rflow)
library(RETL)
library(RETLflow)

Setup environments

# R environment
.GlobalEnv[["RDATA"]] <- new.env(parent = .GlobalEnv)

# establish SQLite connection / environment
DB <-  DBI::dbConnect(RSQLite::SQLite(), ":memory:")

Defining workflow

RF <- rflow(path = params$rflow)

Rflows are initiated using new_rflow function. The path argument tells Rflow where to store cache and other necessary files.

Creating/adding nodes

The easiest way to create a node is to define it in a list.

list(env  = "RDATA", name = "node1") %>% as_node()

Every node has to have a unique id. This can be set in the definition or it is created automatically during its initialization as a combination of environment and name separated by dot.

The most convenient way is to define our workflow in a single named list. Names are parsed into env + name automatically.

obj_defs <- 
  list(

    "RDATA.table1" =
      list(
        desc = "This is an example of a simple R value target - a data.table with one key and one column of random values",
        tags = c("table"),
        type = "r_node", # r_node is expected by default so we can ommit that when defining R nodes
        r_expr =
          expression_r({
            N = 10; 
            data.table(id = 1:N, value = rnorm(N))
          })
      ), 

    "DB.table1" = 
      list(

        tags    = c("db", "table"),
        type    = "db_node",
        depends = "RDATA.table1",
        r_expr  = 
          expression_r({
            DBI::dbWriteTable(
              conn = self$connection, 
              name = self$name, 
              value = .DATA("RDATA.table1"), 
              overwrite = TRUE
            )
          })
      ), 

    "DB.table2" = 
      list(
        desc    = "Shows usage of ETL tools from RETL and RETLflow packages",
        tags    = c("db", "table"),
        type    = "db_node",
        depends = "RDATA.table2",
        r_expr  = 
          expression_r({
            RETL::etl_pipe(from = self$upstream[[1]], to = self)
          })
      ), 

    "DB.table3" = 
      list(
        tags = c("db", "table"),
        desc = "Table is a join of ...",
        type = "db_node",
        depends = c("DB.table1", "DB.table2"),

        # SQL code is passed as a character value
        sql_code = c("
CREATE TABLE table3 AS
SELECT
  table1.id     AS \"id\",
  table1.value  AS value1,
  table2.value  AS value2
FROM 
  table1, table2
WHERE 
  table1.id = table2.id")
      ),

    list( # unnamed objects has to have env + name combo defined inside
      env  = "RDATA",
      name = "table2",
      tags = c("table", "R"),
      type = "r_node",

      # R code can be passed as a character value as well:
      r_code = "
{
  N = 20
  data.table(id = 1:N, value = rnorm(N))
}"
    ),

"OUTPUT.publish_rmd" = list(
  type = "rmd_node",
  depends = c("DB.table2"),
  path     = file.path(params$rflow, "tmp.md"),
  path_rmd = file.path(params$rflow, "tmp.rmd")
)
  )

Nodes' definitions can now be preprocessed and added to the workflow.

obj_defs <- Rflow::process_obj_defs(obj_defs)

add_nodes(
  objs        = obj_defs,
  rflow       = RF,
  cache       = RF$.cache$path,
  verbose     = TRUE
)

Rflow has tools to detect missing/unused dependencies.

which(!verify_dependencies(RF), useNames = TRUE)

Inspection

Nodes can be printed.

RF$RDATA.table1

Visualisation

Workflows can be visualized in an interactive html widget.

visRflow(RF, includeIsolated = FALSE)

Note: HTML widget cannot be diplayed on github. Screenshots:

Screenshot Screenshot

Making/building the targets

We have multiple ways how to specify, which targets should be made.

We can select all of them:

make(RF)

Or choose them by tags:

make(RF, tags = "db")

Or choose one specific node

make(RF$DB.table1) # equivalent to RF$DB.table1$make()

Tests

DBI::dbReadTable(DB, "table1")
DBI::dbReadTable(DB, "table2")
DBI::dbReadTable(DB, "table3")

Cleanup

# disconnect database
DBI::dbDisconnect(DB)

# close logs
close_log(RF)

# cleanup the file system
clean_cache(RF)
clean_persistence(RF)
unlink(x = file.path(params$rflow, ".rflow"), recursive = TRUE)


vh-d/Rflow documentation built on May 11, 2022, 2:53 a.m.