tests/testthat/test-rxp_pipeline.R

test_that("rxp_pipeline creates valid pipeline object", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create derivations
  d1 <- rxp_r(test1, 1 + 1)
  d2 <- rxp_r(test2, 2 + 2)

  # Create a pipeline
  pipe <- rxp_pipeline("TestPipe", list(d1, d2), color = "#FF0000")

  # Check structure
  expect_s3_class(pipe, "rxp_pipeline")
  expect_equal(pipe$name, "TestPipe")
  expect_equal(pipe$color, "#FF0000")
  expect_length(pipe$derivs, 2)

  # Check that metadata is attached to derivations
  expect_equal(pipe$derivs[[1]]$pipeline_group, "TestPipe")
  expect_equal(pipe$derivs[[1]]$pipeline_color, "#FF0000")
  expect_equal(pipe$derivs[[2]]$pipeline_group, "TestPipe")
  expect_equal(pipe$derivs[[2]]$pipeline_color, "#FF0000")
})

test_that("rxp_pipeline validates inputs", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  d1 <- rxp_r(test1, 1 + 1)

  # Empty name should error
  expect_error(rxp_pipeline("", list(d1)), "non-empty character string")

  # Single derivation (not in list) should error with helpful message
  expect_error(
    rxp_pipeline("Pipe", d1),
    "'path' must be a list of derivation objects or a file path"
  )

  # Non-derivation in list should error
  expect_error(
    rxp_pipeline("Pipe", list(d1, "not a deriv")),
    "not an rxp_derivation object"
  )
})

test_that("rxp_pipeline sources from file", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create a temporary R script defining a pipeline
  temp_script <- tempfile(fileext = ".R")
  writeLines(
    'library(rixpress)
     list(
       rxp_r(name = d1, expr = 1 + 1),
       rxp_r(name = d2, expr = 2 + 2)
     )',
    temp_script
  )
  on.exit(unlink(temp_script))

  # Create pipeline from file
  pipe <- rxp_pipeline("FilePipe", temp_script, color = "green")

  expect_s3_class(pipe, "rxp_pipeline")
  expect_equal(pipe$name, "FilePipe")
  expect_length(pipe$derivs, 2)
  expect_equal(pipe$derivs[[1]]$name, "d1")
})

test_that("flatten_derivations works correctly", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create derivations
  d1 <- rxp_r(etl1, 1)
  d2 <- rxp_r(etl2, 2)
  d3 <- rxp_r(model1, 3)
  d4 <- rxp_r(model2, 4)

  # Create pipelines
  pipe_etl <- rxp_pipeline("ETL", list(d1, d2), color = "orange")
  pipe_model <- rxp_pipeline("Model", list(d3, d4), color = "blue")

  # Flatten
  result <- flatten_derivations(list(pipe_etl, pipe_model))

  # Check result
  expect_length(result, 4)
  expect_equal(result[[1]]$name, "etl1")
  expect_equal(result[[1]]$pipeline_group, "ETL")
  expect_equal(result[[1]]$pipeline_color, "orange")
  expect_equal(result[[3]]$name, "model1")
  expect_equal(result[[3]]$pipeline_group, "Model")
  expect_equal(result[[3]]$pipeline_color, "blue")
})

test_that("flatten_derivations handles flat lists with defaults", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create plain derivations (no pipeline)
  d1 <- rxp_r(plain1, 1)
  d2 <- rxp_r(plain2, 2)

  # Flatten
  result <- flatten_derivations(list(d1, d2))

  # Check that defaults are applied
  expect_length(result, 2)
  expect_equal(result[[1]]$pipeline_group, "default")
  expect_null(result[[1]]$pipeline_color)
})

test_that("rxp_populate handles rxp_pipeline objects", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create temporary directory
  temp_dir <- tempfile()
  dir.create(temp_dir)
  on.exit(unlink(temp_dir, recursive = TRUE), add = TRUE)

  rix::rix(
    date = "2025-04-11",
    r_pkgs = "dplyr",
    git_pkgs = list(
      package_name = "rixpress",
      repo_url = "https://github.com/ropensci/rixpress",
      commit = "HEAD"
    ),
    ide = "none",
    project_path = temp_dir,
    overwrite = TRUE
  )

  setwd(temp_dir)

  # Create derivations and pipelines
  d1 <- rxp_r(etl_clean, 1 + 1)
  d2 <- rxp_r(model_train, etl_clean + 1)

  pipe_etl <- rxp_pipeline("Data Prep", list(d1), color = "#E69F00")
  pipe_model <- rxp_pipeline("Modeling", list(d2), color = "#56B4E9")

  # Populate
  rxp_populate(list(pipe_etl, pipe_model), project_path = ".", build = FALSE)

  # Check that dag.json contains pipeline metadata
  dag_json <- jsonlite::fromJSON("_rixpress/dag.json")

  expect_true("pipeline_group" %in% names(dag_json$derivations))
  expect_true("pipeline_color" %in% names(dag_json$derivations))

  # Check specific values (handle both list and vector formats from jsonlite)
  etl_idx <- which(dag_json$derivations$deriv_name == "etl_clean")
  model_idx <- which(dag_json$derivations$deriv_name == "model_train")

  # Use unlist to handle potential list format
  etl_group <- unlist(dag_json$derivations$pipeline_group[etl_idx])
  etl_color <- unlist(dag_json$derivations$pipeline_color[etl_idx])
  model_group <- unlist(dag_json$derivations$pipeline_group[model_idx])
  model_color <- unlist(dag_json$derivations$pipeline_color[model_idx])

  expect_equal(etl_group, "Data Prep")
  expect_equal(etl_color, "#E69F00")
  expect_equal(model_group, "Modeling")
  expect_equal(model_color, "#56B4E9")
})

test_that("get_nodes_edges reads pipeline metadata from dag.json", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  # Create a temporary dag.json with pipeline metadata
  temp_dir <- tempfile()
  dir.create(temp_dir, recursive = TRUE)
  on.exit(unlink(temp_dir, recursive = TRUE), add = TRUE)

  dag_content <- list(
    derivations = data.frame(
      deriv_name = c("node1", "node2"),
      depends = I(list(character(0), "node1")),
      decoder = c("rds", "rds"),
      type = c("rxp_r", "rxp_r"),
      noop_build = c(FALSE, FALSE),
      pipeline_group = c("Pipeline A", "Pipeline B"),
      pipeline_color = c("#FF0000", "#00FF00"),
      stringsAsFactors = FALSE
    )
  )

  dag_path <- file.path(temp_dir, "dag.json")
  jsonlite::write_json(dag_content, dag_path, auto_unbox = TRUE)

  # Read with get_nodes_edges
  result <- get_nodes_edges(path_dag = dag_path)

  # Check that pipeline metadata is present
  expect_true("pipeline_group" %in% names(result$nodes))
  expect_true("pipeline_color" %in% names(result$nodes))

  expect_equal(result$nodes$pipeline_group[1], "Pipeline A")
  expect_equal(result$nodes$pipeline_group[2], "Pipeline B")
  expect_equal(result$nodes$pipeline_color[1], "#FF0000")
  expect_equal(result$nodes$pipeline_color[2], "#00FF00")
})

test_that("print.rxp_pipeline works", {
  testthat::skip_on_cran()
  testthat::skip_if_not_installed("rix")

  d1 <- rxp_r(test1, 1)
  pipe <- rxp_pipeline("MyPipe", list(d1), color = "red")

  # Capture print output
  output <- capture.output(print(pipe))

  expect_true(any(grepl("MyPipe", output)))
  expect_true(any(grepl("red", output)))
  expect_true(any(grepl("test1", output)))
})

Try the rixpress package in your browser

Any scripts or data that you put into this service are public.

rixpress documentation built on Feb. 19, 2026, 9:06 a.m.