tests/testthat/test_pipeline.R

# Pipeline

describe("initialize",
{
    test_that("returns a pipeline object",
    {
        expect_true(methods::is(Pipeline$new("pipe"), "Pipeline"))
    })

    test_that("pipeline name must be a non-empty string",
    {
        expect_no_error(Pipeline$new("foo"))

        expect_error(
            Pipeline$new(name = 1),
            "name must be a string"
        )

        expect_error(
            Pipeline$new(name = ""),
            "name must not be empty"
        )
    })

    test_that("data is added as first step to pipeline",
    {
        pip <- Pipeline$new("pipe1", data = 1)

        expect_equal(pip$get_step_names(), "data")

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(pip$get_out("data"), 1)
    })

    test_that("the logger can be customized",
    {
        my_logger <-\(level, msg, ...) {
            message("My Logger: ", msg)
        }

        pip <- Pipeline$new("pipe", logger = my_logger)

        out <- capture.output(
            pip$run(),
            type = "message"
        )
        expect_equal(
            out,
            c(
                "My Logger: Start run of 'pipe' pipeline:",
                "My Logger: Step 1/1 data",
                "My Logger: Finished execution of steps.",
                "My Logger: Done."
            )
        )
    })

    test_that("bad definition of the custom logger is signalled",
    {
        expected_error_msg <- paste(
            "logger function must have the following signature:",
            "function(level, msg, ...)"
        )


        logger_with_missing_level_arg <-\(msg, ...) {
            message("My Logger: ", msg)
        }
        expect_error(
            Pipeline$new("pipe1", logger = logger_with_missing_level_arg),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_missing_msg_arg <-\(level, ...) {
            message("My Logger: ", ...)
        }
        expect_error(
            Pipeline$new("pipe1", logger = logger_with_missing_msg_arg),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_missing_dots <-\(msg, level) {
            message("My Logger: ", msg)
        }
        expect_error(
            Pipeline$new("pipe1", logger = logger_with_missing_dots),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_additional_arg <-\(level, msg, foo, ...) {
            message("My Logger: ", msg)
        }
        expect_error(
            Pipeline$new("pipe1", logger = logger_with_additional_arg),
            expected_error_msg,
            fixed = TRUE
        )
    })
})


describe("add",
{
    test_that("step must be non-empty string",
    {
        pip <- Pipeline$new("pipe1")

        foo <-\(a = 0) a

        expect_error(pip$add("", foo))
        expect_error(pip$add(c("a", "b"), foo))
    })

    test_that("fun must be passed as a function or string",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$add("step1", fun = 1),
            "is.function(fun) || is_string(fun) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("params must be a list",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$add("step1", fun =\() 1, params = 1),
            "is.list(params)",
            fixed = TRUE
        )
    })

    test_that("description must be (possibly empty) string",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$add("step1", fun =\() 1, description = 1),
            "is_string(description)",
            fixed = TRUE
        )
        expect_no_error(pip$add("step1", fun =\() 1, description = ""))
    })

    test_that("group must be non-empty string",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$add("step1", fun =\() 1, group = 1),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
        expect_error(
            pip$add("step1", fun =\() 1, group = ""),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("keepOut must be logical",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$add("step1", fun =\() 1, keepOut = 1),
            "is.logical(keepOut) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("duplicated step names are signaled",
    {
        pip <- Pipeline$new("pipe1")

        foo <-\(a = 0) a

        pip$add("f1", foo)
        expect_error(pip$add("f1", foo), "step 'f1' already exists")
        expect_error(pip$add("f1", \(x) x), "step 'f1' already exists")
    })

    test_that("missing dependencies are signaled",
    {
        pip <- Pipeline$new("pipe1")

        foo <-\(a = 0) a

        pip$add("f1", foo)
        expect_error(
            pip$add("f2", foo, params = list(a = ~undefined)),
            "dependency 'undefined' not found"
        )
    })

    test_that("step can refer to previous step by relative number",
    {
        pip <- Pipeline$new("pipe1")
        pip$add("f1", \(a = 5) a)
        pip$add("f2", \(x = ~-1) 2*x, keepOut = TRUE)

        out = pip$run()$collect_out()
        expect_equal(out[["f2"]][[1]], 10)

        pip$add("f3", \(x = ~-1, a = ~-2) x + a, keepOut = TRUE)
        out = pip$run()$collect_out()
        expect_equal(out[["f3"]][[1]], 10 + 5)
    })

    test_that("a bad relative step referal is signalled",
    {
        pip <- Pipeline$new("pipe1")
        expect_error(
            pip$add("f1", \(x = ~-10) x),
            paste(
                "step 'f1': relative dependency x=-10",
                "points to outside the pipeline"
            ),
            fixed = TRUE
        )
    })

    test_that("added step can use lambda functions",
    {
        data <- 9
        pip <- Pipeline$new("pipe1", data = data)

        pip$add("f1", \(data = ~data) data, keepOut = TRUE)
        a <- 1
        pip$add("f2", \(a, b) a + b,
            params = list(a = a, b = ~f1),
            keepOut = TRUE
        )

        expect_equal(unlist(pip$get_step("f1")[["depends"]]), c(data = "data"))
        expect_equal(unlist(pip$get_step("f2")[["depends"]]), c(b = "f1"))

        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]][[1]], data)
        expect_equal(out[["f2"]][[1]], a + data)
    })


    test_that(
        "supports functions with wildcard arguments",
    {
        my_mean <-\(x, na.rm = FALSE) {
            mean(x, na.rm = na.rm)
        }
        foo <-\(x, ...) {
            my_mean(x, ...)
        }
        v <- c(1, 2, NA, 3, 4)
        pip <- Pipeline$new("pipe", data = v)

        params <- list(x = ~data, na.rm = TRUE)
        pip$add("mean", fun = foo, params = params, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(out[["mean"]], mean(v, na.rm = TRUE))

        pip$set_params_at_step("mean", list(na.rm = FALSE))
        out <- pip$run()$collect_out()
        expect_equal(out[["mean"]], as.numeric(NA))
    })

    test_that("can have a variable defined outside as parameter default",
    {
        x <- 1

        pip <- Pipeline$new("pipe")$
            add("f1", \(a) a, params = list(a = x))

        expect_equal(pip$get_params_at_step("f1")$a, x)

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(out[["f1"]], x)
    })

    test_that("handles Param object args",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = new("NumericParam", "a", value = 1)) a)

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(out[["f1"]], 1)
    })

    test_that(
        "can have a Param object defined outside as parameter default",
    {
        x <- 1
        p <- new("NumericParam", "a", value = x)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a) a, params = list(a = p))

        expect_equal(pip$get_params_at_step("f1")$a, p)

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(out[["f1"]], x)
    })

    test_that(
        "function can be passed as a string",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", fun = "mean", params = list(x = 1:5))

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(out[["f1"]], mean(1:5))

        expect_equal(pip$get_step("f1")[["funcName"]], "mean")
    })

    test_that(
        "if passed as a function, name is derived from the function",
    {
        pip <- Pipeline$new("pipe")
        pip$add("f1", fun = mean, params = list(x = 1:5))
        expect_equal(pip$get_step("f1")[["funcName"]], "mean")

        pip <- Pipeline$new("pipe")$
            add("f1", fun = mean, params = list(x = 1:5))
        expect_equal(pip$get_step("f1")[["funcName"]], "mean")
    })

    test_that(
        "lampda functions, are named 'function'",
    {
        pip <- Pipeline$new("pipe")$add("f1", fun = \(x = 1) x)
        expect_equal(pip$get_step("f1")[["funcName"]], "function")

        pip <- Pipeline$new("pipe")$
            add("f1", fun = \(x = 1) x)
        expect_equal(pip$get_step("f1")[["funcName"]], "function")
    })
})


describe("append",
{
    test_that("pipelines can be combined even if their steps share names,
        unless tryAutofixNames is FALSE",
    {
        pip1 <- Pipeline$new("pipe1", data = 1)$
            add("f1", \(a = 1) a, keepOut = TRUE)$
            add("f2", \(b = ~f1) b)

        pip2 <- Pipeline$new("pipe2")$
            add("f1", \(a = 10) a)$
            add("f2", \(b = ~f1) b, keepOut = TRUE)

        expect_error(
            pip1$append(pip2, tryAutofixNames = FALSE),
            paste(
                "combined pipeline would have duplicated step names:",
                "'data', 'f1', 'f2',"
            )
        )

        pp <- pip1$append(pip2)
        expect_equal(pp$length(), pip1$length() + pip2$length())

        out1 <- pip1$run()$collect_out()
        out2 <- pip2$run()$collect_out()

        out <- pp$run()$collect_out()

        expect_equivalent(out, c(out1, out2))
    })

    test_that("auto-fixes only the names that need auto-fix",
    {
        pip1 <- Pipeline$new("pipe1", data = 1)$
            add("f1", \(a = 1) a, keepOut = TRUE)$
            add("f2", \(b = ~f1) b)

        pip2 <- Pipeline$new("pipe2")$
            add("f3", \(a = 10) a)$
            add("f4", \(b = ~f3) b, keepOut = TRUE)

        pp <- pip1$append(pip2)
        expect_equal(
            pp$get_step_names(),
            c("data", "f1", "f2", "data.pipe2", "f3", "f4")
        )

        out1 <- pip1$run()$collect_out()
        out2 <- pip2$run()$collect_out()

        out <- pp$run()$collect_out()

        expect_equivalent(out, c(out1, out2))
    })

    test_that("the separator used for step names can be customized",
    {
        pip1 <- Pipeline$new("pipe1", data = 1)$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)

        pip2 <- Pipeline$new("pipe2")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)

        pp <- pip1$append(pip2, sep = "_")

        expect_equal(
            pp$get_step_names(),
            c("data", "f1", "f2", "data_pipe2", "f1_pipe2", "f2_pipe2")
        )
    })


    test_that(
        "output of first pipeline can be set as input of appended pipeline",
    {
        pip1 <- Pipeline$new("pipe1", data = 1)
        pip1$add("f1", \(a = ~data) a * 2)

        pip2 <- Pipeline$new("pipe2", data = 99)
        pip2$add("f1", \(a = ~data) a * 3)
        pip2$add("f2", \(a = ~f1) a * 4)

        pp <- pip1$append(pip2, outAsIn = TRUE)

        depends <- pp$get_depends()
        expect_equal(depends[["data.pipe2"]], c(data = "f1"))

        out <- pp$run()$collect_out(all = TRUE)
        pipe1_out <- out[["f1"]][["f1"]]
        expect_equal(pipe1_out, 1 * 2)
        expect_equal(out[["data.pipe2"]], pipe1_out)
        expect_equal(out[["f1"]][["f1.pipe2"]], pipe1_out * 3)
        expect_equal(out[["f2"]], out[["f1"]][["f1.pipe2"]] * 4)
    })

    test_that("if duplicated step names would be created, an error is given",
    {
        pip1 <- Pipeline$new("pipe1")
        pip1$add("f1", \(a = ~data) a + 1)
        pip1$add("f1.pipe2", \(a = ~data) a + 1)

        pip2 <- Pipeline$new("pipe2")
        pip2$add("f1", \(a = ~data) a + 1)

        expect_error(
            pip1$append(pip2),
            "Cannot auto-fix name clash for step 'f1' in pipeline 'pipe2'",
            fixed = TRUE
        )
    })
})



describe("append_to_step_names",
{
    test_that("postfix can be appended to step names",
    {
        pip <- Pipeline$new("pipe", data = 1)

        pip$add("f1", \(a = ~data) a + 1)
        pip$add("f2", \(a = ~data, b = ~f1) a + b)
        pip$append_to_step_names("foo")

        expected_names <- c("data.foo", "f1.foo", "f2.foo")
        expect_equal(pip$get_step_names(), expected_names)

        expected_depends <- list(
            data.foo = character(0),
            f1.foo = c(a = "data.foo"),
            f2.foo = c(a = "data.foo", b = "f1.foo")
        )

        expect_equal(pip$get_depends(), expected_depends)
    })
})



describe("collect_out",
{
    test_that("data is set as first step but not part of output by default",
    {
        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- Pipeline$new("pipe1", data = dat)
        expect_equal(pip$pipeline[["step"]], "data")

        out <- pip$run()$collect_out()
        expect_equal(out, list())

        pip <- Pipeline$new("pipe1", data = dat)$
            add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]], dat)
    })

    test_that("at the end, pipeline can clean output that shall not be kept",
    {
        data <- 9
        pip <- Pipeline$new("pipe1", data = data)

        foo <-\(a = 1) a
        bar <-\(a, b) a + b

        a <- 5
        pip$add("f1", foo, params = list(a = a))
        pip$add("f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE)

        pip$run(cleanUnkept = TRUE)
        expect_equal(pip$get_out("f1"), NULL)
        expect_equal(pip$get_out("f2"), a + data)
    })

    test_that("output is collected as expected",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a, keepOut = TRUE)$
            add("f2", \(a = 2, b = ~f1) a + b)$
            add("f3", \(a = 3, b = ~f2) a + b, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(length(out), 2)
        expect_equal(names(out), c("f1", "f3"))
    })

    test_that("output can be grouped",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 1, b = 2) a + b, group = "plus")$
            add("f3", \(a = 1, b = 2) a / b)$
            add("f4", \(a = 2, b = 2) a + b, group = "plus")

        out <- pip$run()$collect_out(all = TRUE)

        expect_equal(out[["plus"]], list(f2 = 3, f4 = 4))
        expect_equal(out[["f1"]], 1)
        expect_equal(out[["f3"]], 1/2)
    })

    test_that("output is ordered in the order of steps",
    {
        pip <- Pipeline$new("pipe")$
            add("f2", \(a = 1) a, keepOut = TRUE)$
            add("f1", \(b = 2) b, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(names(out), c("f2", "f1"))
    })

    test_that(
        "grouped output is ordered in the order of group definitions",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(x = 1) x, group = "g2")$
            add("f2", \(x = 2) x, group = "g1")$
            add("f3", \(x = 3) x, group = "g2")$
            add("f4", \(x = 4) x, group = "g1")

        out <- pip$run()$collect_out(all = TRUE)

        expect_equal(names(out), c("data", "g2", "g1"))
    })

    test_that(
        "if just one group the output name still will take the group name",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 1, b = 2) a + b, group = "plus")$
            add("f3", \(a = 1, b = 2) a / b, group = "my f3")$
            add("f4", \(a = 2, b = 2) a + b, group = "plus")

        out <- pip$run()$collect_out(all = TRUE)

        expect_equal(names(out), c("data", "f1", "plus", "my f3"))
    })

    describe("groupBy option",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 1, b = 2) a + b, group = "plus")$
            add("f3", \(a = 1, b = 2) a / b)

        test_that("column to groupBy can be customized",
        {
            pip$run_step("f1")
            pip$run_step("f3")
            out <- pip$collect_out(groupBy = "state", all = TRUE)

            expect_equal(
                out,
                list(
                    New = list(data = NULL, f2 = NULL),
                    Done = list(f1 = 1, f3 = 0.5)
                )
            )
        })

        test_that("signals bad  groupBy input",
        {
            expect_error(
                pip$collect_out(groupBy = c("not", "a", "string")),
                "groupBy must be a single string"
            )

            expect_error(
                pip$collect_out(groupBy = "foo"),
                "groupBy column does not exist"
            )

            expect_error(
                pip$collect_out(groupBy = "time"),
                "groupBy column must be character"
            )
        })
    })
})



describe("discard_steps",
{
    test_that("pipeline steps can be discarded by pattern",
    {
        pip <- Pipeline$new("pipe1")$
            add("calc", \(a = 1) a)$
            add("plot1", \(x = ~calc) x)$
            add("plot2", \(x = ~plot1) x)

        out <- capture.output(
            pip$discard_steps("plot"),
            type = "message"
        )
        expect_equal(
            out,
            c(
                "step 'plot2' was removed",
                "step 'plot1' was removed"
            )
        )

        expect_equal(pip$pipeline[["step"]], c("data", "calc"))
    })

    test_that("if no pipeline step matches pattern, pipeline remains unchanged",
    {
        pip <- Pipeline$new("pipe1")$
            add("calc", \(a = 1) a)$
            add("plot1", \(x = ~calc) x)$
            add("plot2", \(x = ~plot1) x)

        steps_before = pip$pipeline[["step"]]

        expect_silent(pip$discard_steps("bla"))
        expect_equal(pip$pipeline[["step"]], steps_before)
    })


    test_that("if step has downstream dependencies, an error is given",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) b)

        expect_error(
            pip$discard_steps("f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2'"
            )
        )

        pip$add("f3", \(x = ~f1) x)
        expect_error(
            pip$discard_steps("f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f3'"
            )
        )
    })
})


describe("get_data",
{
    test_that(
        "data can be retrieved",
    {
        p <- Pipeline$new("pipe", data = 1:2)
        expect_equal(p$get_data(), 1:2)

        p$set_data(3:4)
        expect_equal(p$get_data(), 3:4)
    })

    test_that(
        "signals missing data",
    {
        p <- Pipeline$new("pipe", data = 1:2)
        p$pop_step()    # remove data step

        expect_error(
            p$get_data(),
            "no data step defined"
        )
    })
})


describe("get_depends",
{
    test_that(
        "dependencies can be retrieved and are named after the steps",
    {
        pip <- Pipeline$new("pipe", data = 1)

        pip$add("f1", \(a = ~data) a + 1)
        pip$add("f2", \(b = ~f1) b + 1)

        depends <- pip$get_depends()
        expected_depends <- list(
            data = character(0),
            f1 = c(a = "data"),
            f2 = c(b = "f1")
        )

        expect_equal(depends, expected_depends)
        expect_equal(names(depends), pip$get_step_names())
    })
})



describe("get_depends_down",
{
    test_that("dependencies can be determined recursively for given step",
    {
        pip <- Pipeline$new("pipe")

        pip$add("f1", \(a = 1) a)
        pip$add("f2", \(a = ~f1) a)
        pip$add("f3", \(a = ~f1, b = ~f2) a + b)
        pip$add("f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c)

        expect_equal(pip$get_depends_down("f3"), c("f4"))
        expect_equal(pip$get_depends_down("f2"), c("f3", "f4"))
        expect_equal(pip$get_depends_down("f1"), c("f2", "f3", "f4"))
    })

    test_that("if no dependencies an empty character vector is returned",
    {
        pip <- Pipeline$new("pipe")

        pip$add("f1", \(a = 1) a)
        expect_equal(pip$get_depends_down("f1"), character(0))
    })

    test_that("step must exist",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$get_depends_down("f1"),
            "step 'f1' does not exist"
        )
    })

    test_that(
        "works with complex dependencies as created by data splits",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = 2) b)$
            add("f3", \(x = ~f1, y = ~f2) x + y)

        pip$set_data_split(dataList, toStep = "f2")

        expect_equal(
            pip$get_depends_down("f1.1"),
            c("f2.1", "f3")
        )

        expect_equal(
            pip$get_depends_down("f1.2"),
            c("f2.2", "f3")
        )

        expect_equal(pip$get_depends_down("f2.1"), "f3")
        expect_equal(pip$get_depends_down("f2.2"), "f3")
    })
})



describe("get_depends_up",
{
    test_that("dependencies can be determined recursively for given step",
    {
        pip <- Pipeline$new("pipe")

        pip$add("f1", \(a = 1) a)
        pip$add("f2", \(a = ~f1) a)
        pip$add("f3", \(a = ~f1, b = ~f2) a + b)
        pip$add("f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c)

        expect_equal(pip$get_depends_up("f2"), c("f1"))
        expect_equal(pip$get_depends_up("f3"), c("f1", "f2"))
        expect_equal(pip$get_depends_up("f4"), c("f1", "f2", "f3"))
    })

    test_that("if no dependencies an empty character vector is returned",
    {
        pip <- Pipeline$new("pipe")

        pip$add("f1", \(a = 1) a)
        expect_equal(pip$get_depends_up("f1"), character(0))
    })

    test_that("step must exist",
    {
        pip <- Pipeline$new("pipe")

        expect_error(
            pip$get_depends_up("f1"),
            "step 'f1' does not exist"
        )
    })

    test_that("works with complex dependencies as created by data splits",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = 2) b)$
            add("f3", \(x = ~f1, y = ~f2) x + y)

        pip$set_data_split(dataList, toStep = "f2")

        expect_equal(pip$get_depends_up("f2.1"), c("f1.1"))
        expect_equal(pip$get_depends_up("f2.2"), c("f1.2"))

        expect_equivalent(
            pip$get_depends_up("f3"),
            c("f1.1", "f2.1", "f1.2", "f2.2")
        )
    })
})


describe("get_graph",
{
    pip <- Pipeline$new("pipe")$
        add("f1", \(a = 1) a)$
        add("f2", \(a = ~f1, b = ~data) a)

    res <- pip$get_graph()

    test_that("returns a node table with the expected columns",
    {
        tab <- res$nodes
        expect_true(is.data.frame(tab))
        expectedColumns <- c("id", "label", "group", "shape", "color", "title")

        expect_equal(colnames(tab), expectedColumns)
    })

    test_that("the node table contains all steps",
    {
        tab <- res$nodes
        expect_equal(tab$label, pip$get_step_names())
    })

    test_that("returns an edges table with the expected columns",
    {
        tab <- res$edges
        expect_true(is.data.frame(tab))
        expectedColumns <- c("from", "to", "arrows")

        expect_equal(colnames(tab), expectedColumns)
    })

    test_that("can be printed created for certain groups",
    {
        pip <- Pipeline$new("pipe")
        pip$add("step2", \(a = ~data) a + 1, group = "add")
        pip$add("step3", \(a = ~step2) 2 * a, group = "mult")
        pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add")
        pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult")

        res.add <- pip$get_graph(groups = "add")
        expect_equal(res.add$nodes$label, c("step2", "step4"))

        res.mult <- pip$get_graph(groups = "mult")
        expect_equal(res.mult$nodes$label, c("step3", "step5"))
    })
})


describe("get_out",
{
    test_that("output at given step can be retrieved",
    {
        data <- airquality
        pip <- Pipeline$new("pipe", data = data)$
            add("model",
               \(data = ~data) {
                    lm(Ozone ~ Wind, data = data)
                },
            )

        pip$run()

        expect_equal(pip$get_out("data"), data)
        expect_equivalent(
            pip$get_out("model"),
            lm(Ozone ~ Wind, data = data)
        )
    })

    test_that("step of requested output must exist",
    {
        pip <- Pipeline$new("pipe")
        pip$run()
        expect_error(pip$get_out("foo"), "step 'foo' does not exist")
    })
})



describe("get_params",
{
    test_that("parameters can be retrieved",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a, keepOut = TRUE)$
            add("f2", \(a, b = ~f1) a + b,
                params = list(a = 8),
                keepOut = TRUE
            )$
            add("f3", \(a = ~f2, b = 3) a + b, keepOut = TRUE)

        p <- pip$get_params()
        expect_equal(
            p, list(f1 = list(a = 1), f2 = list(a = 8), f3 = list(b = 3))
        )
    })


    test_that("empty pipeline gives empty list of parameters",
    {
        pip <- Pipeline$new("pipe1")
        expect_equivalent(pip$get_params(), list())

        pip$add("f1", \() 1)
        expect_equivalent(pip$get_params(), list())
    })

    test_that("hidden parameters are filtered out by default",
    {

        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, .hidden = 2) a)

        p <- pip$get_params()
        expect_equal(p, list(f1 = list(a = 1)))

        p <- pip$get_params(ignoreHidden = FALSE)
        expect_equal(p, list(f1 = list(a = 1, .hidden = 2)))
    })

    test_that("works with Param objects",
    {
        pip <- Pipeline$new("pipe1")$
            add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            )$
            add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        par <- pip$get_params()
        expect_true(all(par$f1 |> sapply(is, "NumericParam")))
        expect_equal(par$f1$x@value, 1)
        expect_equal(par$f1$y@value, 2)

        expect_true(all(par$f2 |> sapply(is, "StringParam")))
        expect_equal(par$f2$s1@value, "Hello")
        expect_equal(par$f2$s2@value, "World")
    })
})



describe("get_params_at_step",
{
    test_that("list of step parameters can be retrieved",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)$
            add("f2", \(x = 1, y = 2) x + y)

        expect_equal(
            pip$get_params_at_step("f1"), list(a = 1, b = 2)
        )

        expect_equal(
            pip$get_params_at_step("f2"), list(x = 1, y = 2)
        )
    })

    test_that("if no parameters empty list is returned",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \() 1)

        expect_equal(
            pip$get_params_at_step("f1"), list()
        )
    })

    test_that(
        "hidden parameters are not returned, unless explicitly requested",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, .b = 2) a + .b)

        expect_equal(
            pip$get_params_at_step("f1"), list(a = 1)
        )

        expect_equal(
            pip$get_params_at_step("f1", ignoreHidden = FALSE),
            list(a = 1, .b = 2)
        )
    })

    test_that("bound parameters are never returned",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = ~data) a + b)

        expect_equal(
            pip$get_params_at_step("f1"), list(a = 1)
        )

        expect_equal(
            pip$get_params_at_step("f1", ignoreHidden = FALSE),
            list(a = 1)
        )
    })

    test_that("step must exist",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = ~data) a + b)

        expect_error(
            pip$get_params_at_step("foo"),
            "step 'foo' does not exist"
        )
    })

    test_that("works with Param objects",
    {
        pip <- Pipeline$new("pipe1")$
            add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            )$
            add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        par1 <- pip$get_params_at_step("f1")
        expect_true(all(par1 |> sapply(is, "NumericParam")))
        expect_equal(par1$x@value, 1)
        expect_equal(par1$y@value, 2)

        par2 <- pip$get_params_at_step("f2")
        expect_true(all(par2 |> sapply(is, "StringParam")))
        expect_equal(par2$s1@value, "Hello")
        expect_equal(par2$s2@value, "World")
    })
})


describe("get_params_unique",
{
    test_that("parameters can be retrieved uniquely and if occuring multiple
        times, the 1st default value is used",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 2, b = 3) a + b)$
            add("f3", \(a = 4, b = 5, c = 6) a + b)

        p <- pip$get_params_unique()
        expect_equivalent(p, list(a = 1, b = 3, c = 6))
    })

    test_that("empty pipeline gives empty list",
    {
        pip <- Pipeline$new("pipe")
        expect_equivalent(pip$get_params_unique(), list())
    })

    test_that("pipeline with no parameters gives empty list",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \() 1)
        expect_equivalent(pip$get_params_unique(), list())
    })

    test_that("works with Param objects",
    {
        pip <- Pipeline$new("pipe1")$
            add(
                "f1",
               \(
                    a = 0,
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    a * (x + y)
                }
            )$
            add(
                "f2",
               \(
                    a = new("NumericParam", "a", value = 0),
                    x = new("NumericParam", "x", value = 1),
                    y = 2,
                    z = new("NumericParam", "y", value = 3)
                ) {
                    a * (x + y + z)
                }
            )

        par <- pip$get_params_unique()
        expect_equal(names(par), c("a", "x", "y", "z"))
        expect_equal(par$a, 0)
        expect_equal(par$x@value, 1)
        expect_equal(par$y@value, 2)
        expect_equal(par$z@value, 3)
    })
})



describe("get_params_unique_json",
{
    test_that("the elements are not named",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a, keepOut = TRUE)

        p <- pip$get_params_unique_json()
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(names(pl), NULL)


        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = new("StringParam", "my x", "some x")) x)$
            add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pip$get_params_unique_json()
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(names(pl), NULL)
    })


    test_that("standard parameters are returned as name-value pairs",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 1, b = 2) a)$
            add("f3", \(a = 1, b = 2, c = list(a = 1, b = 2)) a)

        p <- pip$get_params_unique_json()
        expect_true(methods::is(p, "json"))

        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(
            pl,
            list(
                list(name = "a", value = 1),
                list(name = "b", value = 2),
                list(name = "c", value = list(a = 1, b = 2))
            )
        )
    })

    test_that("Param objects are returned with full information",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = new("StringParam", "my x", "some x")) x)$
            add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pip$get_params_unique_json()
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)

        expect_equal(
            pl,
            list(
                list(
                    value = "some x",
                    name = "x",
                    advanced = FALSE,
                    label = "my x",
                    description = "",
                    source = "internal",
                    domain = "",
                    class = "StringParam"
                ),
                list(
                    value = "some y",
                    name = "y",
                    advanced = FALSE,
                    label = "my y",
                    description = "",
                    source = "internal",
                    domain = "",
                    class = "StringParam"
                )
            )
        )
    })

    test_that("the name of the arg is set to the name of the Param object",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = new("StringParam", "my x", "some x")) x)$
            add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pip$get_params_unique_json()
        pl <- jsonlite::fromJSON(p)

        expect_true(pl[["label"]][[1]] == "my x")
        expect_false(pl[["name"]][[1]] == "my x")
        hasArgName <- pl[["name"]][[1]] == "x"

        expect_true(pl[["label"]][[2]] == "my y")
        expect_false(pl[["name"]][[2]] == "my y")
        hasArgName <- pl[["name"]][[2]] == "y"
    })

    test_that("works with mixed, that is, standard and Param objects",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = 1) x)$
            add("f2", \(s = new("StringParam", "my s", "some s")) s)

        p <- pip$get_params_unique_json()
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)

        expect_equal(pl[[1]], list(name = "x", value = 1L))
        expect_equal(
            pl[[2]],
            list(
                value = "some s",
                name = "s",
                advanced = FALSE,
                label = "my s",
                description = "",
                source = "internal",
                domain = "",
                class = "StringParam"
            )
        )
    })
})



describe("get_step",
{
    test_that("single steps can be retrieved",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", identity, params = list(x = 1))

        expect_equal(pip$get_step("data"), pip$pipeline[1, ])
        expect_equal(pip$get_step("f1"), pip$pipeline[2, ])

        expect_error(pip$get_step("foo"), "step 'foo' does not exist")
    })

    test_that("dependencies are recorded as expected",
    {
        pip <- Pipeline$new("pipe1", data = 9)

        foo <-\(a = 0) a
        bar <-\(a = 1, b = 2) a + b

        pip$add("f1", foo)
        pip$add("f2", bar, params = list(a = ~data, b = ~f1))

        expect_true(length(unlist(pip$get_step("f1")[["depends"]])) == 0)
        expect_equal(
            unlist(pip$get_step("f2")[["depends"]]),
            c("a" = "data", b = "f1")
        )
    })
})


describe("get_step_names",
{
    test_that("step names be retrieved",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \() {})$
            add("f2", \(x = 1) x)

        expect_equal(pip$get_step_names(), pip$pipeline[["step"]])
    })
})



describe("get_step_number",
{
    test_that("get_step_number",
    {
        expect_true(is.function(Pipeline$new("pipe")$get_step_number))

        test_that("get_step_number works as expected",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(a = 1) a)

            pip$get_step_number("f1") |> expect_equal(2)
            pip$get_step_number("f2") |> expect_equal(3)
        })

        test_that("signals non-existent step",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            pip$add("f1", \(a = 1) a)

            expect_error(
                pip$get_step_number("non-existent"),
                "step 'non-existent' does not exist"
            )
        })
    })
})


describe("has_step",
{
    test_that("it can be checked if pipeline has a step",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_true(pip$has_step("f1"))
        expect_false(pip$has_step("f2"))
    })
})


describe("insert_after",
{
    test_that("can insert a step after another step",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        pip$insert_after(
            "f1",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pip$get_step_names(),
            c("data", "f1", "f3", "f2")
        )
    })

    test_that("will not insert a step if the step already exists",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_after("f1", step = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that(
        "will not insert a step if the reference step does not exist",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_after("non-existent", step = "f3"),
            "step 'non-existent' does not exist"
        )
    })

    test_that(
        "will not insert a step with bad parameter dependencies",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_after("f1", step = "f3", \(x = ~f2) x),
            "step 'f3': dependency 'f2' not found"
        )
    })

    test_that("will work if insert happens at last position",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        pip$insert_after(
            "f2",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pip$get_step_names(),
            c("data", "f1", "f2", "f3")
        )
    })
})


describe("insert_before",
{
    test_that("can insert a step after another step",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        pip$insert_before(
            "f2",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pip$get_step_names(),
            c("data", "f1", "f3", "f2")
        )
    })

    test_that("will not insert a step if the step already exists",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_before("f1", step = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that(
        "will not allow step to be inserted at first position",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)

        expect_error(
            pip$insert_before("data", step = "f2"),
            "cannot insert before first step"
        )
    })

    test_that("will not insert a step if the reference step does not exist",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_before("non-existent", step = "f3"),
            "step 'non-existent' does not exist"
        )
    })

    test_that(
        "will not insert a step with bad parameter dependencies",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a + 1)$
            add("f2", \(a = ~f1) a + 1)

        expect_error(
            pip$insert_before("f2", step = "f3", \(x = ~f2) x),
            "step 'f3': dependency 'f2' not found"
        )
    })
})


describe("length",
{
    test_that("returns the number of steps",
    {
        pip <- Pipeline$new("pipe")
        expect_equal(pip$length(), 1)

        pip$add("f1", \(a = 1) a)
        expect_equal(pip$length(), 2)

        pip$remove_step("f1")
        expect_equal(pip$length(), 1)
    })
})


describe("lock_step",
{
    test_that("sets state to 'locked'",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)

        pip$lock_step("f1")
        expect_equal(pip$get_step("f1")[["state"]], "Locked")

        pip
    })
})


describe("print",
{
    test_that("pipeline can be printed",
    {
        pip <- Pipeline$new("pipe1", data = 9)

        expect_output(pip$print())
    })

    test_that("missing function is signaled",
    {
        pip <- Pipeline$new("pipe1")

        expect_error(
            pip$add("f1", "non-existing-function"),
            "object 'non-existing-function' of mode 'function' was not found"
        )
    })

    test_that("if verbose is TRUE, all columns are printed",
    {
        op <- options(width = 1000L)
        on.exit(options(op))
        pip <- Pipeline$new("pipe1", data = 9)

        out <- capture.output(pip$print(verbose = TRUE))
        header <- out[1] |> trimws() |> strsplit("\\s+") |> unlist()
        expected_header <- colnames(pip$pipeline)
        expect_equal(header, expected_header)
    })
})


describe("pop_step",
{
    test_that("last pipeline step can be popped",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        pip_copy = pip$clone()

        pip$add("f2", \(b = 2) b)

        expect_equal(pip$length(), 3)
        expect_equal(pip_copy$length(), 2)

        res = pip$pop_step()
        expect_equal(res, "f2")

        expect_equal(pip$length(), 2)
        expect_equal(pip, pip_copy)
    })
})



describe("pop_steps_after",
{
    test_that("all steps after a given step can be removed",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)$
            add("f2", \(x = ~f1) x)$
            add("f3", \(x = ~f2) x)

        steps = pip$pop_steps_after("f1")
        expect_equal(steps, c("f2", "f3"))

        hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]])
        expect_true(hasAllStepsRemoved)
    })

    test_that("if given step does not exist, an error is signalled",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)

        expect_error(
            pip$pop_steps_after("bad_step"),
            "step 'bad_step' does not exist"
        )
    })

    test_that("if given step is the last step, nothing gets removed",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)$
            add("f2", \(x = ~f1) x)

        length_before = pip$length()

        res = pip$pop_steps_after("f2")

        expect_equal(res, character(0))
        hasNothingRemoved = pip$length() == length_before
        expect_true(hasNothingRemoved)
    })
})



describe("pop_steps_from",
{
    test_that("all steps from a given step can be removed",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)$
            add("f2", \(x = ~f1) x)$
            add("f3", \(x = ~f2) x)

        steps = pip$pop_steps_from("f2")
        expect_equal(steps, c("f2", "f3"))

        hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]])
        expect_true(hasAllStepsRemoved)
    })

    test_that("if given step does not exist, an error is signalled",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)

        expect_error(
            pip$pop_steps_from("bad_step"),
            "step 'bad_step' does not exist"
        )
    })

    test_that("if given step is the last step, one step removed",
    {

        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(x = 1) x)$
            add("f2", \(x = ~f1) x)

        length_before = pip$length()

        res = pip$pop_steps_from("f2")

        expect_equal(res, "f2")
        hasOneStepRemoved = pip$length() == length_before - 1
        expect_true(hasOneStepRemoved)
    })
})



describe("remove_step",
{
    test_that("pipeline step can be removed",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 1) b)

        pip$remove_step("f1")

        expect_equal(pip$get_step_names(), c("data", "f2"))
    })

    test_that("step must exist",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_error(
            pip$remove_step("non-existent-step"),
            "step 'non-existent-step' does not exist"
        )
    })

    test_that("if step has downstream dependencies, an error is given",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) b)

        expect_error(
            pip$remove_step("f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2'"
            )
        )

        pip$add("f3", \(x = ~f1) x)
        expect_error(
            pip$remove_step("f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f3'"
            )
        )
    })

    test_that(
        "if error, only the direct downstream dependencies are reported",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) b)$
            add("f3", \(c = ~f2) c)$
            add("f4", \(d = ~f1) d)

        expect_error(
            pip$remove_step("f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f4'"
            )
        )
    })

    test_that(
        "step can be removed together with is downstream dependencies",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) b)$
            add("f3", \(c = ~f2) c)$
            add("f4", \(d = ~f1) d)$
            add("f5", \(x = ~data) x)

        out <- utils::capture.output(
            pip$remove_step("f1", recursive = TRUE),
            type = "message"
        )

        expect_equal(pip$get_step_names(), c("data", "f5"))
        expect_equal(
            out,
            paste(
                "Removing step 'f1' and its downstream dependencies:",
                "'f2', 'f3', 'f4'"
            )
        )
    })
})


describe("remove_step",
{
    f <- Pipeline$new("pipe")$remove_step

    test_that("pipeline step can be renamed",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)

        pip$rename_step(from = "f1", to = "first")

        pip$get_step_names() |> expect_equal(c("data", "first", "f2"))
    })

    test_that("signals name clash",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)

        expect_error(
            pip$rename_step(from = "f1", to = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that("renames dependencies as well",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) b)$
            add("f3", \(a = ~f1, b = ~f2) a + b)

        pip$rename_step(from = "f1", to = "first")

        expect_equal(
            pip$get_depends(),
            list(
                data = character(0),
                first = character(0),
                f2 = c(b = "first"),
                f3 = c(a = "first", b = "f2")
            )
        )
    })
})


describe("replace_step",
{
    test_that("pipeline steps can be replaced",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)$
            add("f3", \(c = ~f2) c, keepOut = TRUE)

        out = unname(unlist(pip$run()$collect_out()))
        expect_equal(out, 2)

        pip$replace_step("f2", \(z = 4) z)
        out = unname(unlist(pip$run()$collect_out()))
        expect_equal(out, 4)
    })

    test_that("fun must be passed as a function or string",
    {
        pip <- Pipeline$new("pipe")$
            add("step1", \(a = 1) a)

        expect_error(
            pip$replace_step("step1", fun = 1),
            "is.function(fun) || is_string(fun) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("params must be a list",
    {
        pip <- Pipeline$new("pipe")$
            add("step1", \(a = 1) a)

        expect_error(
            pip$replace_step("step1", fun =\() 1, params = 1),
            "is.list(params)",
            fixed = TRUE
        )
    })

    test_that("description must be (possibly empty) string",
    {
        pip <- Pipeline$new("pipe")$
            add("step1", \(a = 1) a)

        expect_error(
            pip$replace_step("step1", fun =\() 1, description = 1),
            "is_string(description)",
            fixed = TRUE
        )
        expect_no_error(
            pip$replace_step("step1", fun =\() 1, description = "")
        )
    })

    test_that("group must be non-empty string",
    {
        pip <- Pipeline$new("pipe")$
            add("step1", \(a = 1) a)

        expect_error(
            pip$replace_step("step1", fun =\() 1, group = 1),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
        expect_error(
            pip$replace_step("step1", fun =\() 1, group = ""),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("keepOut must be logical",
    {
        pip <- Pipeline$new("pipe")$
            add("step1", \(a = 1) a)

        expect_error(
            pip$replace_step("step1", fun =\() 1, keepOut = 1),
            "is.logical(keepOut) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("the replacing function can be passed as a string",
    {

        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = 3) x, keepOut = TRUE)

        out = unname(unlist(pip$run()$collect_out()))
        expect_equal(out, 3)

        .my_func <-\(x = 3) {
            2 * x
        }
        assign(".my_func", .my_func, envir = globalenv())
        on.exit(rm(".my_func", envir = globalenv()))

        pip$replace_step("f1", fun = ".my_func", keepOut = TRUE)

        out = unname(unlist(pip$run()$collect_out()))
        expect_equal(out, 6)
    })

    test_that(
        "when replacing function, default parameters can be overridden",
    {

        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = 1:3) x, keepOut = TRUE)

        .my_func <-\(x = 3) {
            2 * x
        }
        assign(".my_func", .my_func, envir = globalenv())
        on.exit(rm(".my_func", envir = globalenv()))

        pip$replace_step(
            "f1",
            fun = ".my_func",
            params = list(x = 10),
            keepOut = TRUE
        )

        out = unname(unlist(pip$run()$collect_out()))
        expect_equal(out, 20)
    })

    test_that("the pipeline step that is being replaced must exist",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)$
            add("f3", \(c = ~f2) c, keepOut = TRUE)

        expect_error(pip$replace_step("non-existent", \(z = 4) z))
    })


    test_that(
        "if replacing a pipeline step, dependencies are verified correctly",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = 2) b)$
            add("f3", \(c = ~f2) c, keepOut = TRUE)

        expect_error(
            pip$replace_step("f2", \(z = ~foo) z),
            "dependency 'foo' not found up to step 'f1'"
        )

        expect_error(
            pip$replace_step("f2", \(z = ~f2) z),
            "dependency 'f2' not found up to step 'f1'"
        )

        expect_error(
            pip$replace_step("f2", \(z = ~f3) z),
            "dependency 'f3' not found up to step 'f1'"
        )
    })

    test_that(
        "states are updated correctly",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 2) a)$
            add("f3", \(a = ~f2) a, keepOut = TRUE)$
            add("f4", \(a = ~f3) a, keepOut = TRUE)

        pip$run()

        pip$replace_step("f2", \(a = 2) 2* a)
        expect_equal(pip$get_step("f1")$state, "Done")
        expect_equal(pip$get_step("f2")$state, "New")
        expect_equal(pip$get_step("f3")$state, "Outdated")
        expect_equal(pip$get_step("f4")$state, "Outdated")
    })

    it("can have a variable defined outside as parameter default",
    {
        x <- 3
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        pip$replace_step("f1", fun = \(a) a, params = list(a = x))

        out <- pip$run()$get_out("f1")
        expect_equal(out, x)
    })

    it("handles Param object args",
    {
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        pip$replace_step(
            "f1",
            fun = \(a = new("NumericParam", "a", value = 3)) a
        )

        out <- pip$run()$get_out("f1")
        expect_equal(out, 3)
    })

    it("can have a Param object defined outside as parameter default",
    {
        x <- 3
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        p <- new("NumericParam", "a", value = x)

        pip$replace_step("f1", fun = \(a) a, params = list(a = p))

        pip <- Pipeline$new("pipe")$
            add("f1", \(a) a, params = list(a = p))

        expect_equal(pip$get_params_at_step("f1")$a, p)
        out <- pip$run()$get_out("f1")
        expect_equal(out, x)
    })

    it("function can be passed as a string",
    {
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        pip$replace_step("f1", fun = "mean", params = list(x = 1:5))

        out <- pip$run()$get_out("f1")
        expect_equal(out, mean(1:5))
        expect_equal(pip$get_step("f1")[["funcName"]], "mean")
    })

    it("if passed as a function, name is derived from the function",
    {
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        pip$replace_step("f1", fun = mean, params = list(x = 1:5))

        out <- pip$run()$get_out("f1")
        expect_equal(out, mean(1:5))
        expect_equal(pip$get_step("f1")[["funcName"]], "mean")
    })

    it("lampda functions, are named 'function'",
    {
        pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x)
        pip$replace_step("f1", fun = \(x = 1) x)
        expect_equal(pip$get_step("f1")[["funcName"]], "function")
    })
})


describe("reset",
{
    test_that(
        "after reset pipeline is the same as before the run",
    {
        p <- Pipeline$new("pipe", data = 1:2)
        p$add("f1", \(x = 1) x)
        p$add("f2", \(y = 1) y)

        p$run()
        expect_equal(
            p$collect_out(all = TRUE),
            list(data = 1:2, f1 = 1, f2 = 1)
        )
        expect_true(all(p$pipeline[["state"]] == "Done"))


        p$reset()
        expect_equal(
            p$collect_out(all = TRUE),
            list(data = NULL, f1 = NULL, f2 = NULL)
        )
        expect_true(all(p$pipeline[["state"]] == "New"))
    })
})


describe("run",
{
    test_that("empty pipeline can be run",
    {
        expect_no_error(Pipeline$new("pipe1")$run())
    })

    test_that("returns the pipeline object",
    {
        pip <- Pipeline$new("pipe1")$run()
        expect_equal(pip$name, "pipe1")
    })

    test_that("if function result is a list, all names are preserved",
    {
        # Result list length == 1 - the critical case
        resultList = list(foo = 1)

        pip <- Pipeline$new("pipe")$
            add("f1", \() resultList, keepOut = TRUE)

        out = pip$run()$collect_out()
        expect_equal(out[["f1"]], resultList)

        # Result list length > 1
        resultList = list(foo = 1, bar = 2)

        pip <- Pipeline$new("pipe")$
            add("f1", \() resultList, keepOut = TRUE)

        out = pip$run()$collect_out()
        expect_equal(out[["f1"]], resultList)
    })

    test_that("pipeline execution is correct",
    {
        data <- 9
        pip <- Pipeline$new("pipe1", data = data)

        foo <-\(a = 1) a
        bar <-\(a, b) a + b

        a <- 5
        pip$add("f1", foo, params = list(a = a), keepOut = TRUE)
        pip$add("f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE)

        pip$run()

        expect_equal(pip$pipeline[["out"]][[2]], a)
        expect_equal(pip$pipeline[["out"]][[3]], a + data)
    })


    test_that("pipeline execution can cope with void functions",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) {}, keepOut = TRUE)$
            add("f2", \(b = 2) b, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(out, list(f1 = NULL, f2 = 2))
    })

    test_that(
        "if pipeline execution fails, the error message is returned as error",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(b = ~f1) stop("something went wrong"))

        expect_error(pip$run(), "something went wrong")
    })

    test_that(
        "can handle 'NULL' results",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]], 1)

        pip$set_data(NULL)
        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]], NULL)
    })

    test_that(
        "can be run recursively to dynamically create and run pipelines",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add(
                "f1",
                fun =\(data = 10) {
                    pip <- Pipeline$new("2nd pipe", data = data)$
                        add("step1", \(x = ~data) x)$
                        add("step2", \(x = ~step1) {
                            print(x)
                            2 * x
                        }, keepOut = TRUE)
                }
            )

        pip2 <- pip$run(recursive = TRUE)
        expect_equal(pip2$get_step_names(), c("data", "step1", "step2"))

        out <- pip2$collect_out()
        expect_equal(out[["step2"]], 20)
    })

    test_that("will not re-run steps that are already done unless forced",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(y = ~f1) y + 1)

        pip$run()
        expect_equal(pip$get_step("f1")$state, "Done")
        expect_equal(pip$get_step("f2")$state, "Done")
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pip$run()
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)

        # Set parameter, which outdates step and run again
        pip$run(force = TRUE)
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)
    })

    test_that("will never re-run locked steps",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(y = ~f1) y + 1)

        pip$run()
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pip$run()
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)
        pip$lock_step("f1")
        pip$lock_step("f2")

        # Set parameter, which outdates step and run again
        pip$run(force = TRUE)
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)
    })

    test_that("can clean unkept steps",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(y = ~f1) y + 1)

        pip$run()
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$run(cleanUnkept = TRUE)
        expect_true(all(sapply(pip$pipeline[["out"]], is.null)))

        pip$set_keep_out("f1", TRUE)
        pip$run(cleanUnkept = TRUE)
        expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL))
    })

    test_that("logs warning without interrupting the run",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(x = ~f1) {
                warning("something might be wrong")
                x
            })$
            add("f3", \(x = ~f2) x)

        log <- utils::capture.output(
            expect_warning(pip$run(), "something might be wrong")
        )

        Filter(log, f =\(x) x |>
            startsWith("WARN")) |>
            grepl(pattern = "something might be wrong") |>
            expect_true()

        wasRunTillEnd <- pip$get_out("f3") == 2
        expect_true(wasRunTillEnd)
    })

    test_that("logs error and stops at failed step",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(x = ~f1) {
                stop("something went wrong")
                x
            })$
            add("f3", \(x = ~f2) x)

        log <- utils::capture.output(
            expect_error(pip$run(), "something went wrong")
        )

        Filter(log, f =\(x) x |>
            startsWith("ERROR")) |>
            grepl(pattern = "something went wrong") |>
            expect_true()

        wasRunTillEnd <- isTRUE(pip$get_out("f3") == 2)
        expect_false(wasRunTillEnd)
    })

    test_that("can show progress",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(x = ~f1) x)

        m <- mockery::mock()
        pip$run(progress = m)

        args <- mockery::mock_args(m)

        expect_equal(length(m), pip$length())
        expect_equal(args[[1]][[1]], 1)
        expect_equal(args[[1]][["detail"]], "data")
        expect_equal(args[[2]][[1]], 2)
        expect_equal(args[[2]][["detail"]], "f1")
        expect_equal(args[[3]][[1]], 3)
        expect_equal(args[[3]][["detail"]], "f2")
    })

    test_that("works with Param objects",
    {
        pip <- Pipeline$new("pipe1")$
            add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            )$
            add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        pip$run()
        pip$get_out("f1") |> expect_equal(3)
        pip$get_out("f2") |> expect_equal("Hello World")
    })
})



describe("run_step",
{
    test_that("pipeline can be run at given step",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)

        expect_no_error(pip$run_step("A"))
    })


    test_that("upstream steps are by default run with given step",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))

        pip$run_step("B")

        expect_equal(pip$get_out("A"), 1)
        expect_equal(pip$get_out("B"), c(1, 2))
        expect_true(is.null(pip$get_out("C")))
    })

    test_that("runs upstream steps in correct order",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))

        pip$run_step("C")
        expect_equal(pip$get_out("C"), 1:3)
    })

    test_that("runs downstream steps in correct order",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))

        pip$run_step("A", downstream = TRUE)
        expect_equal(pip$get_out("C"), 1:3)
    })

    test_that("pipeline can be run at given step excluding
        all upstream dependencies",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))

        pip$run_step("B", upstream = FALSE)
        expect_true(is.null(pip$get_out("A")))
        expect_equal(pip$get_out("B"), 2)
        expect_true(is.null(pip$get_out("C")))
    })

    test_that("pipeline can be run at given step excluding upstream
        but including downstream dependencies",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))


        pip$run_step(
            "B",
            upstream = FALSE,
            downstream = TRUE
        )
        expect_true(is.null(pip$get_out("A")))
        expect_equal(pip$get_out("B"), 2)
        expect_equal(pip$get_out("C"), c(2, 3))
    })

    test_that("pipeline can be run at given step including
        up- and downstream dependencies",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))


        pip$run_step(
            "B", upstream = TRUE, downstream = TRUE
        )
        expect_equal(pip$get_out("A"), 1)
        expect_equal(pip$get_out("B"), c(1, 2))
        expect_equal(pip$get_out("C"), c(1, 2, 3))
    })

    test_that("if not marked as keepOut, output of run steps can be cleaned",
    {
        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)


        pip$run_step("A", cleanUnkept = TRUE)
        expect_true(is.null(pip$get_out("A")))

        pip$set_keep_out("A", TRUE)$run_step("A", cleanUnkept = TRUE)
        expect_false(is.null(pip$get_out("A")))
    })

    test_that("up- and downstream steps are marked in log",
    {
        lgr::unsuspend_logging()
        on.exit(lgr::suspend_logging())

        pip <- Pipeline$new("pipe")$
            add("A", \(a = 1) a)$
            add("B", \(b = ~A) c(b, 2))$
            add("C", \(c = ~B) c(c, 3))

        logOut <- utils::capture.output(
            pip$run_step("B", upstream = TRUE, downstream = TRUE)
        )

        contains <-\(x, pattern) {
            grepl(pattern = pattern, x = x, fixed = TRUE)
        }

        expect_true(logOut[2] |> contains("Step 1/3 A (upstream)"))
        expect_true(logOut[3] |> contains("Step 2/3 B"))
        expect_true(logOut[4] |> contains("Step 3/3 C (downstream)"))
    })

    test_that(
        "updates the timestamp of the run steps",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = ~data) x, keepOut = TRUE)

        before <- pip$pipeline[["time"]]
        Sys.sleep(1)

        pip$run_step("f1", upstream = FALSE)
        after <- pip$pipeline[["time"]]

        expect_equal(before[1], after[1])
        expect_true(before[2] < after[2])
    })

    test_that(
        "updates the state of the run steps",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = ~data) x, keepOut = TRUE)

        before <- pip$pipeline[["state"]]
        pip$run_step("f1", upstream = FALSE)
        after <- pip$pipeline[["state"]]

        expect_equal(before, c("New", "New"))
        expect_equal(after, c("New", "Done"))
    })

    test_that("will never re-run locked step",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(y = ~f1) y + 1)

        pip$run()
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pip$lock_step("f1")
        pip$run_step("f1", downstream = TRUE)

        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 1)
    })

    test_that("can clean unkept steps",
    {
        pip <- Pipeline$new("pipe", data = 1)$
            add("f1", \(x = 2) x)$
            add("f2", \(y = ~f1) y + 1)

        pip$run_step("f1", downstream = TRUE, cleanUnkept = TRUE)
        expect_true(all(sapply(pip$pipeline[["out"]], is.null)))

        pip$set_keep_out("f1", TRUE)
        pip$run_step("f1", downstream = TRUE, cleanUnkept = TRUE)
        expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL))
    })
})



describe("set_data",
{
    test_that("data can be set later after pipeline definition",
    {
        dat <- data.frame(a = 1:2, b = 1:2)

        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]], NULL)

        pip$set_data(dat)

        out <- pip$run()$collect_out()
        expect_equal(out[["f1"]], dat)
    })

    test_that("if data is set, all dependent steps are set to outdated",
    {
        dat <- data.frame(a = 1:2, b = 1:2)

        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = ~data) x, keepOut = TRUE)$
            add("f2", \(x = ~f1) x, keepOut = TRUE)

        pip$run()

        expect_equal(pip$get_step("f1")$state, "Done")
        expect_equal(pip$get_step("f2")$state, "Done")
        pip$set_data(dat)
        expect_equal(pip$get_step("f1")$state, "Outdated")
        expect_equal(pip$get_step("f2")$state, "Outdated")
    })
})



describe("set_data_split",
{
    test_that("the new steps have the names of the list attached",
    {
        dataList <- list(A = 1, B = 2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)

        pip$set_data_split(dataList)

        pip$get_step_names() |>
            expect_equal(c("data.A", "f1.A", "data.B", "f1.B"))
    })

    test_that("the separator used in the creation of the new steps
    can be customized",
    {
        dataList <- list(A = 1, B = 2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)

        pip$set_data_split(dataList, sep = "_")

        pip$get_step_names() |>
            expect_equal(c("data_A", "f1_A", "data_B", "f1_B"))
    })


    test_that("simple split pipeline computes results as expected",
    {
        dataList <- list(A = 1, B = 2, C = 3)
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = ~data) {
                b + a
            }, keepOut = TRUE)

        pip$set_data_split(dataList)

        out <- pip$run()$collect_out()
        expect_equivalent(
            unlist(out),
            unlist(lapply(dataList, \(x) x + 1))
        )
    })


    test_that(
        "split pipeline by default overrides output groups according to split",
    {
        dataList <- list(A = 1, B = 2)

        pip <- Pipeline$new("pipe")$
            add("f0", \(a = 1) a, group = "id")$
            add("f1", \(a = 1) a, group = "id")$
            add("f2", \(a = 2) a)

        pip$set_data_split(dataList)

        out <- pip$run()$collect_out(all = TRUE)
        expect_equal(names(out), names(dataList))
    })

    test_that("the grouping override can be omitted",
    {
        dataList <- list(A = 1, B = 2)

        pip <- Pipeline$new("pipe")$
            add("f0", \(a = 1) a, group = "id")$
            add("f1", \(a = 1) a, group = "id")$
            add("f2", \(a = 2) a)

        pip$set_data_split(dataList, groupBySplit = FALSE)

        out <- pip$run()$collect_out(all = TRUE)

        expect_equal(
            names(out),
            c("data.A", "id.A", "f2.A", "data.B", "id.B", "f2.B")
        )
    })

    test_that("the separator used in the creation of the groups
        can be customized",
    {
        dataList <- list(A = 1, B = 2)

        pip <- Pipeline$new("pipe")$
            add("f0", \(a = 1) a, group = "id")$
            add("f1", \(a = 1) a, group = "id")$
            add("f2", \(a = 2) a)

        pip$set_data_split(dataList, groupBySplit = FALSE, sep = "_")

        out <- pip$run()$collect_out(all = TRUE)

        expect_equal(
            names(out),
            c("data_A", "id_A", "f2_A", "data_B", "id_B", "f2_B")
        )
    })

    test_that("split pipeline works for list of data frames",
    {
        dat <- data.frame(x = 1:2, y = 1:2, z = 1:2)
        dataList <- list(A = dat, B = dat, C = dat)
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)$
            add("f3", \(a = ~f1, b = ~data) b[, 2:3], keepOut = TRUE)

        pip$set_data_split(dataList)

        out <- pip$run()$collect_out()

        expect_equal(out[["A"]], c(f2.A = list(dat), f3.A = list(dat[, 2:3])))
        expect_equal(out[["B"]], c(f2.B = list(dat), f3.B = list(dat[, 2:3])))
        expect_equal(out[["C"]], c(f2.C = list(dat), f3.C = list(dat[, 2:3])))
    })


    test_that("if unnamed list of data frames, they are named with numbers",
    {
        dat <- data.frame(x = 1:2, y = 1:2, z = 1:2)
        dataList <- list(dat, dat)
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)

        pip$set_data_split(dataList)

        out <- pip$run()$collect_out()

        expect_equal(out[["1"]], dat)
        expect_equal(out[["2"]], dat)
    })


    test_that(
        "depends are updated correctly, if data split on subset of pipeline",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)$
            add("f3", \(x = ~f1, y = ~f2) list(x, y), keepOut = TRUE)$
            add("f4", \(x = ~f3) x[[1]], keepOut = TRUE)

        pip$set_data_split(dataList, toStep = "f2")

        ee = expect_equivalent
        pp = pip$pipeline

        depends <- pip$get_depends()

        expect_equal(depends[["f2.1"]], c(a = "f1.1", b = "data.1"))
        expect_equal(depends[["f2.2"]], c(a = "f1.2", b = "data.2"))

        # Pipeline was not split for f3, which therefore has parameters that
        # each depend on two steps
        expect_equal(
            depends[["f3"]],
            list(x = c("f1.1", "f1.2"), y = c("f2.1", "f2.2"))
        )

        # Pipeline was not split for f4, so just depdends on f3
        ee(depends[["f4"]], c(x = "f3"))


        out <- pip$run()$collect_out()

        expect_equal(out[["1"]], dat1)
        expect_equal(out[["2"]], dat2)
        expected_f3_res = list(
            list("f1.1" = 1, "f1.2" = 1),
            list("f2.1" = dat1, "f2.2" = dat2)
        )
        expect_equal(out[["f3"]], expected_f3_res)
        expect_equal(out[["f4"]], expected_f3_res[[1]])
    })

    test_that("split data set can be created dynamically",
    {
        data = data.frame(a = 1:10, group = c("a", "b"))

        pip <- Pipeline$new("pipe", data = data)$
            add("split_data_step",
               \(.self = NULL, data = ~data)
                {
                    splitData = split(data, data[, "group"])

                    .self$remove_step("split_data_step")
                    .self$set_data_split(splitData)
                    .self$name = paste(.self$name, "after data split")
                    .self
                }
            )$
            add("f1", \(data = ~data) {
                data
            }, keepOut = TRUE)

        pip$set_params(list(.self = pip))

        out <- pip$run(recursive = TRUE)$collect_out()

        expect_equivalent(out, split(data, data[, "group"]))
    })
})



describe("set_keep_out",
{
    test_that("keep-out state can be set",
    {
        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(a = 1) a)

        out <- pip$run()$collect_out()
        expect_false("f1" %in% names(out))

        out <- pip$set_keep_out("f1", keepOut = TRUE)$collect_out()
        expect_true("f1" %in% names(out))

        out <- pip$set_keep_out("f1", keepOut = FALSE)$collect_out()
        expect_false("f1" %in% names(out))
    })

    test_that("step must be a string and exist",
    {
        pip <- Pipeline$new("pipe1")
        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(a = 1) a)

        expect_error(
            pip$set_keep_out(1),
            "is_string(step)",
            fixed = TRUE
        )

        expect_error(
            pip$set_keep_out("f2"),
            "step 'f2' does not exist",
            fixed = TRUE
        )
    })

    test_that("state must be logical",
    {
        pip <- Pipeline$new("pipe1", data = 0)$
            add("f1", \(a = 1) a)

        expect_error(
            pip$set_keep_out("f1", keepOut = 1),
            "is.logical(keepOut)",
            fixed = TRUE
        )
    })
})




describe("set_params",
{
    test_that("parameters can be set commonly on existing pipeline",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 2, b = 3) a)$
            add("f3", \(a = 4, b = 5) a)

        before <- pip$get_params()

        after <- pip$set_params(list(a = 9, b = 99))$get_params()
        expect_equal(after, list(
            f1 = list(a = 9),
            f2 = list(a = 9, b = 99),
            f3 = list(a = 9, b = 99)
        ))
    })

    test_that(
        "parameters depending on other steps are protected
        from being overwritten",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = 2, b = ~f1) a)$
            add("f3", \(a = ~f2, b = 5) a)

        before <- pip$get_params()
        expect_equal(
            before,
            list(
                f1 = list(a = 1),
                f2 = list(a = 2),
                f3 = list(b = 5)
            )
        )

        after <- pip$set_params(list(a = 9, b = 99))$get_params()
        expect_equal(
            after,
            list(
                f1 = list(a = 9),
                f2 = list(a = 9),
                f3 = list(b = 99)
            )
        )
    })

    test_that("an error is given if params argument is not a list",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_error(
            pip$set_params(c(a = 9)),
            "params must be a list",
            fixed = TRUE
        )
    })

    test_that("trying to set undefined parameters is signaled with a warning",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_warning(
            pip$set_params(list(a = 9, b = 9, c = 9)),
            "Trying to set parameters not defined in the pipeline: b, c",
            fixed = TRUE
        )
    })

    test_that("warning for undefined parameters can be omitted",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_no_warning(
            pip$set_params(list(a = 9, b = 9, c = 9),
                warnUndefined = FALSE
            )
        )
    })

    test_that(
        "after setting a single parameter the params entry is still a list",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)

        expect_equal(pip$pipeline[["params"]][[2]], list(a = 1))

        pip$set_params(list(a = 9))
        expect_equal(pip$pipeline[["params"]][[2]], list(a = 9))
    })

    test_that(
        "hidden parameters can be set as well",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, .b = 2) a)

        pip$set_params(list(a = 9, .b = 10))
        pp <- pip$get_params(ignoreHidden = FALSE)
        expect_equal(pp, list(f1 = list(a = 9, .b = 10)))
    })

    test_that(
        "trying to set locked parameters is ignored until they are unlocked",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)$
            add("f2", \(a = 1, b = 2) a + b)

        pip$lock_step("f1")
        expect_message(
            pip$set_params(list(a = 9, b = 99)),
            "skipping setting parameters a, b at locked step 'f1'"
        )

        pip$get_params_at_step("f1") |> expect_equal(list(a = 1, b = 2))
        pip$get_params_at_step("f2") |> expect_equal(list(a = 9, b = 99))

        pip$unlock_step("f1")
        pip$set_params(list(a = 9, b = 99))
        pip$get_params_at_step("f1") |> expect_equal(list(a = 9, b = 99))
    })
})


describe("set_params_at_step",
{
    test_that("parameters can be set at given step",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)$
            add("f2", \(x = 1) x)

        expect_equal(pip$get_params_at_step("f1"), list(a = 1, b = 2))

        pip$set_params_at_step("f1", list(a = 9, b = 99))
        expect_equal(pip$get_params_at_step("f1"), list(a = 9, b = 99))

        pip$set_params_at_step("f2", list(x = 9))
        expect_equal(pip$get_params_at_step("f2"), list(x = 9))
    })

    test_that("step must be passed as a string and params as a list",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)

        expect_error(
            pip$set_params_at_step(1, list(a = 9, b = 99)),
            "is_string(step) is not TRUE",
            fixed = TRUE
        )

        expect_error(
            pip$set_params_at_step("f1", params = c(a = 9, b = 99)),
            "is.list(params) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("hidden parameters can be set as well",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, .b = 2) a + b)

        pip$set_params_at_step("f1", list(a = 9, .b = 99))

        expect_equal(
            pip$get_params_at_step("f1", ignoreHidden = FALSE),
            list(a = 9, .b = 99)
        )
    })


    test_that("trying to set undefined parameter signals an error",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)

        expect_error(
            pip$set_params_at_step("f1", list(a = 9, z = 99)),
            "Unable to set parameter(s) z at step f1 - candidates are a, b",
            fixed = TRUE
        )
    })

    test_that("trying to set locked parameter is ignored until it is unlocked",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)

        pip$lock_step("f1")
        expect_message(
            pip$set_params_at_step("f1", list(a = 9, b = 99)),
            "skipping setting parameters a, b at locked step 'f1'"
        )

        pip$get_params_at_step("f1") |>
            expect_equal(list(a = 1, b = 2))

        pip$unlock_step("f1")
        pip$set_params_at_step("f1", list(a = 9, b = 99))
        pip$get_params_at_step("f1") |>
            expect_equal(list(a = 9, b = 99))
    })


    test_that("setting values for bound parameters is not allowed",
    {
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1, b = 2) a + b)$
            add("f2", \(x = 1, y = ~f1) x + y)

        expect_error(
            pip$set_params_at_step("f2", list(x = 9, y = 99)),
            "Unable to set parameter(s) y at step f2 - candidates are x",
            fixed = TRUE
        )
    })


    test_that(
        "states of affected steps are updated once the pipeline was run",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1) a)$
            add("f3", \(a = ~f2) a)$
            add("f4", \(a = ~data) a)

        pip$set_params_at_step("f1", params = list(a = 2))
        expect_true(all(pip$pipeline[["state"]] == "New"))

        pip$run()
        pip$set_params_at_step("f1", params = list(a = 3))
        expect_equal(pip$get_step("data")$state, "Done")
        expect_equal(pip$get_step("f1")$state, "Outdated")
        expect_equal(pip$get_step("f2")$state, "Outdated")
        expect_equal(pip$get_step("f3")$state, "Outdated")
        expect_equal(pip$get_step("f4")$state, "Done")

        pip$run()
        expect_true(all(pip$pipeline[["state"]] == "Done"))
    })


    test_that("parameters can be set to NULL",
    {
        pip <- Pipeline$new("pipe1")$
        add("f1", \(a = NULL, b = 1) a)

        pip$set_params_at_step("f1", list(a = 1, b = NULL))

        expect_equal(
            pip$get_params_at_step("f1"),
            list(a = 1, b = NULL)
        )
    })

    test_that("preserves Param objects",
    {
        pip <- Pipeline$new("pipe1")$
        add("f1", \(
            a = 1,
            b = new("NumericParam", "num", 2)) a + b
        )

        pip$set_params_at_step("f1", list(a = 3, b = 4))

        params <- pip$get_params_at_step("f1")
        expect_equal(params$a, 3)
        expect_true(params$b |> is("NumericParam"))
        expect_equal(params$b@value, 4)
    })
})


describe("split",
{
    test_that("pipeline split of initial pipeline gives the expected result",
    {
        pip <- Pipeline$new("pipe")
        res <- pip$split()

        expect_true(is.list(res))
        expect_equal(res[[1]]$name, "pipe1")
        expect_equal(res[[1]]$pipeline, pip$pipeline)
    })


    test_that("pipeline with two indepdendent groups is split correctly",
    {
        pip <- Pipeline$new("pipe")

        pip$add("f1", \(a = ~data) a)
        pip$add("f2", \(a = 1) a)
        pip$add("f3", \(a = ~f2) a)
        pip$add("f4", \(a = ~f1) a)
        pip$run()

        res <- pip$split()
        pip1 <- res[[1]]
        pip2 <- res[[2]]
        expect_equal(pip1$name, "pipe1")
        expect_equal(pip2$name, "pipe2")

        expect_equal(pip1$get_step_names(), c("f2", "f3"))
        expect_equal(pip2$get_step_names(), c("data", "f1", "f4"))

        expect_equal(
            pip1$collect_out(all = TRUE),
            pip$collect_out(all = TRUE)[c("f2", "f3")]
        )
        expect_equal(
            pip2$collect_out(all = TRUE),
            pip$collect_out(all = TRUE)[c("data", "f1", "f4")]
        )
    })

    test_that(
        "split is done correctly for complete data split",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)$
            add("f2", \(a = ~f1, b = 2) b)$
            add("f3", \(x = ~f1, y = ~f2) x + y)

        pip$set_data_split(dataList)

        res <- pip$split()
        steps <- lapply(res, \(x) x$get_step_names())

        expect_equal(
            steps,
            list(
                "data.1",
                c("f1.1", "f2.1", "f3.1"),
                "data.2",
                c("f1.2", "f2.2", "f3.2")
            )
        )
    })
})



describe("unlock_step",
{
    test_that("sets state to 'unlocked' if it was locked before",
    {
        pip <- Pipeline$new("pipe")$
            add("f1", \(a = 1) a)

        pip$lock_step("f1")
        expect_equal(pip$get_step("f1")[["state"]], "Locked")

        pip$unlock_step("data")
        expect_equal(pip$get_step("data")[["state"]], "New")

        pip$unlock_step("f1")
        expect_equal(pip$get_step("f1")[["state"]], "Unlocked")

        pip
    })
})




describe("pipeline logging",
{
    expect_no_error(set_log_layout("json"))
    on.exit(set_log_layout("text"))

    test_that("pipeline logging is done in json format",
    {
        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- Pipeline$new("pipe1", data = dat)

        log <- utils::capture.output(pip$run())
        isValidJSON <- sapply(log, jsonlite::validate)
        expect_true(all(isValidJSON))
    })

    test_that("each step is logged with its name",
    {
        lgr::unsuspend_logging()
        on.exit(lgr::suspend_logging())

        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- Pipeline$new("pipe1")$
            add("f1", \(x = ~data) x)

        log <- utils::capture.output(pip$run())

        step1 = jsonlite::fromJSON(log[2])
        step2 = jsonlite::fromJSON(log[3])

        expect_equal(step1[["message"]], "Step 1/2 data")
        expect_equal(step2[["message"]], "Step 2/2 f1")
    })

    test_that(
        "upon warning during run, both context and warn msg are logged",
    {
        lgr::unsuspend_logging()
        on.exit(lgr::suspend_logging())

        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \() warning("this is a warning"))

        expect_warning(log <- utils::capture.output(pip$run()))

        log_fields = lapply(head(log, -1), jsonlite::fromJSON)
        warnings = Filter(
            log_fields,
            f =\(x) x[["level"]] == "warn"
        )[[1]]

        expect_equal(warnings[["message"]], "this is a warning")
        expect_equal(warnings[["context"]], "Step 3 ('f2')")
    })


    test_that(
        "upon error during run, both context and error msg are logged",
    {
        lgr::unsuspend_logging()
        on.exit(lgr::suspend_logging())

        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- Pipeline$new("pipe1")$
            add("f1", \(a = 1) a)$
            add("f2", \() stop("this is an error"))

        log <- utils::capture.output({
            tryCatch(pip$run(), error = identity)
        })

        log_fields = lapply(head(log, -1), jsonlite::fromJSON)
        last = tail(log_fields, 1)[[1]]

        expect_equal(last[["message"]], "this is an error")
        expect_equal(last[["context"]], "Step 3 ('f2')")
    })
})


# ---------------
# private methods
# ---------------

describe("private methods",
{
    # Helper function to access private fields
    get_private <-\(x) {
        x[[".__enclos_env__"]]$private
    }

    pip <- Pipeline$new("pipe")
    expect_true(is.environment(get_private(pip)))

    describe(".clean_out_not_kept",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.clean_out_not_kept

        test_that("cleans all output if nothing is marked to be kept",
        {
            pip <- Pipeline$new("pipe1", data = 1)$
                add("A", \(a = 2) a)

            fclean <- get_private(pip)$.clean_out_not_kept

            fexe <- get_private(pip)$.run_step
            fexe(step = "data")
            fexe(step = "A")

            expect_equal(pip$get_out("data"), 1)
            expect_equal(pip$get_out("A"), 2)
            fclean()

            hasCleanedData <- is.null(pip$get_out("data"))
            expect_true(hasCleanedData)
            hasCleanedA <- is.null(pip$get_out("A"))
            expect_true(hasCleanedA)
        })

        test_that("does not clean if marked to be kept",
        {
            pip <- Pipeline$new("pipe1", data = 1)$
                add("A", \(a = 2) a, keepOut = TRUE)

            fclean <- get_private(pip)$.clean_out_not_kept

            fexe <- get_private(pip)$.run_step
            fexe(step = "data")
            fexe(step = "A")

            expect_equal(pip$get_out("data"), 1)
            expect_equal(pip$get_out("A"), 2)
            fclean()

            hasCleanedData <- is.null(pip$get_out("data"))
            expect_true(hasCleanedData)
            hasCleanedA <- is.null(pip$get_out("A"))
            expect_false(hasCleanedA)
        })
    })


    describe(".create_edge_table",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.create_edge_table

        test_that("returns a data frame with the expected columns",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_edge_table

            res <- f()
            expect_true(is.data.frame(res))
            expected_columns <- c("from", "to", "arrows")
            expect_equal(colnames(res), expected_columns)
        })

        test_that("if no dependencies are defined, edge table is empty",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_edge_table

            res <- f()
            hasEmptyEdgeTable <- nrow(res) == 0
            expect_true(hasEmptyEdgeTable)

            pip$add("f1", \(a = 1) a)
            res <- f()
            hasEmptyEdgeTable <- nrow(res) == 0
            expect_true(hasEmptyEdgeTable)

            pip$add("f2", \(a = ~f1) a)
            res <- f()
            hasEmptyEdgeTable <- nrow(res) == 0
            expect_false(hasEmptyEdgeTable)
        })

        test_that("adds one edge for each dependency",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_edge_table

            # step1 is data
            pip$add("step2", \(a = 1) a)
            pip$add("step3", \(a = ~step2) a)
            pip$add("step4", \(a = ~step2, b = ~step3) a)
            pip$add("step5", \(a = ~data, b = ~step4) a)

            res <- f()
            expect_equal(nrow(res), 5)

            expect_equivalent(
                res["step3", c("from", "to")],      # from step2 to step3
                data.frame(from = 2, to = 3)
            )
            expect_equivalent(
                res["step4.a", c("from", "to")],    # from step2 to step4
                data.frame(from = 2, to = 4)
            )
            expect_equivalent(
                res["step4.b", c("from", "to")],   # from step3 to step4
                data.frame(from = 3, to = 4)
            )
            expect_equivalent(
                res["step5.a", c("from", "to")],   # from data to step5
                data.frame(from = 1, to = 5)
            )
            expect_equivalent(
                res["step5.b", c("from", "to")],   # from data to step5
                data.frame(from = 4, to = 5)
            )
        })

        test_that("can be created for certain groups",
        {
            pip <- Pipeline$new("pipe")

            # step1 is data
            pip$add("step2", \(a = ~data) a + 1, group = "add")
            pip$add("step3", \(a = ~step2) 2 * a, group = "mult")
            pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add")
            pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult")

            f <- get_private(pip)$.create_edge_table
            res.all <- f()

            res.add <- f(groups = "add")
            expect_equal(
                res.add,
                res.all[c("step2", "step4.a", "step4.b"), ]
            )

            res.mult <- f(groups = "mult")
            expect_equal(
                res.mult,
                res.all[c("step5.a", "step3", "step5.b"), ]
            )

            expect_equal(
                f(groups = c("data", "add", "mult")),
                f()
            )
        })

        test_that("signals bad group specification",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_edge_table

            expect_error(
                f(groups = "foo"),
                "all(groups %in% self$pipeline[[\"group\"]]) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(groups = 1),
                "is.character(groups) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("can be created for parameters with multiple dependencies",
        {
            pip <- Pipeline$new("pipe")

            # step1 is data
            pip$add("step2", \(x = ~data) x + 1)
            pip$add("step3", \(y = ~step2) sum(unlist(y)))

            pip$set_data_split(
                list(data.frame(x = 1:2), data.frame(x = 3:4)),
                toStep = "step2"
            )

            # Step 3 depends on both step2.1 and step2.2
            expect_equal(
                pip$get_depends()$step3$y,
                c("step2.1", "step2.2")
            )

            f <- get_private(pip)$.create_edge_table
            res <- f()

            expectedRes <- data.frame(
                from = 1:4,
                to = c(2, 5, 4, 5),
                arrows = "to"
            )
            expect_equivalent(res, expectedRes)

            expect_equal(
                rownames(res),
                c("step2.1", "step3.y1", "step2.2", "step3.y2")
            )
        })
    })


    describe(".create_node_table",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.create_node_table

        test_that("returns a data frame with the expected columns",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_node_table

            res <- f()
            expect_true(is.data.frame(res))
            expected_columns <- c(
                "id", "label", "group", "shape", "color", "title"
            )
            expect_equal(colnames(res), expected_columns)
        })

        test_that("adds one node for each step",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_node_table

            res <- f()
            expect_equal(nrow(res), pip$length())

            pip$add("f2", \(a = 1) a)
            res <- f()
            expect_equal(nrow(res), pip$length())

            pip$add("f3", \(a = ~f2) a)
            res <- f()
            expect_equal(nrow(res), pip$length())
        })

        test_that("description is shown in the title",
        {
            pip <- Pipeline$new("pipe")$
                add("step2", \(a = 1) a, description = "my 2nd step")

            f <- get_private(pip)$.create_node_table
            res <- f()
            expect_equal(res[2, "title"], "<p>my 2nd step</p>")

        })

        test_that("can be created for certain groups",
        {
            pip <- Pipeline$new("pipe")

            # step1 is data
            pip$add("step2", \(a = ~data) a + 1, group = "add")
            pip$add("step3", \(a = ~step2) 2 * a, group = "mult")
            pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add")
            pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult")

            f <- get_private(pip)$.create_node_table
            res.all <- f()

            res.add <- f(groups = "add")
            expect_equal(res.add, res.all[c(2, 4), ])

            res.mult <- f(groups = "mult")
            expect_equal(res.mult, res.all[c(3, 5), ])

            expect_equal(
                f(groups = c("data", "add", "mult")),
                f()
            )
        })

        test_that("signals bad group specification",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.create_node_table

            expect_error(
                f(groups = "foo"),
                "all(groups %in% self$pipeline[[\"group\"]]) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(groups = 1),
                "is.character(groups) is not TRUE",
                fixed = TRUE
            )
        })
    })


    describe(".derive_dependencies",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.derive_dependencies

        test_that("if no params are defined, dependencies are empty",
        {
            expect_equal(f(params = list(), step = "step"), character(0))
            expect_equal(f(params = list(a = 1), step = "step"), character(0))
        })

        test_that("signals bad input",
        {
            expect_error(
                f(params = list(), step = 1),
                "is_string(step) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(params = list(), step = "step", toStep = 2),
                "is_string(toStep) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("extracts all dependencies defined via step name",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.derive_dependencies
            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(b = 2) b)

            expect_equal(
                f(params = list(x = ~f1), step = "step3"),
                c(x = "f1")
            )
            expect_equal(
                f(params = list(x = ~f1, y = ~f2), step = "step3"),
                c(x = "f1", y = "f2")
            )
        })

        test_that("extracts all dependencies defined relative",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.derive_dependencies
            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(b = 2) b)

            expect_equal(
                f(params = list(x = ~-1), step = "step3"),
                c(x = "f2")
            )

            expect_equal(
                f(params = list(x = ~-1, y = ~-2), step = "step3"),
                c(x = "f2", y = "f1")
            )
        })

        test_that("extracts all dependencies if defined both ways",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.derive_dependencies
            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(b = 2) b)

            expect_equal(
                f(params = list(x = ~-1, y = ~f2), step = "step3"),
                c(x = "f2", y = "f2")
            )
        })
    })


    describe(".extract_dependent_out",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.extract_dependent_out

        test_that("if no depends, NULL is returned",
        {
            expect_true(is.null(f(depends = list(), out = list())))
        })

        test_that("signals badly typed input",
        {
            expect_error(
                f(1:2, list()),
                "is.character(depends) || is.list(depends) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(list(), 1),
                "is.list(out) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("signals if dependency is not found in out",
        {
            expect_no_error(
                f(depends = "foo", out = list("foo" = 1))
            )
            expect_error(
                f(depends = "foo", out = list()),
                "all(unlist(depends) %in% names(out)) is not TRUE",
                fixed = TRUE
            )

            # Dependencies can be lists of dependencies
            depends <- list(x = c("a", "b"))
            out <- list(a = 1)
            expect_error(f(depends, out))
        })

        test_that("output is extracted as expected",
        {
            out <- list(a = 1, b = 2, c = 3, d = 4)
            expect_equal(
                f(c(x = "a"), out),
                list(x = 1)
            )

            expect_equal(
                f(c(x = "a", y = "c"), out),
                list(x = 1, y = 3)
            )

            expect_equal(
                f(list(x = c("a", "c"), y = c("b", "d")), out),
                list(
                    x = list(a = 1, c = 3),
                    y = list(b = 2, d = 4)
                )
            )
        })
    })


    describe(".extract_depends_from_param_list",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.extract_depends_from_param_list

        test_that("if no params are defined, dependencies are empty",
        {
            expect_equal(f(params = list()), character(0))
        })

        test_that("params must be a list or NULL",
        {
            expect_equal(f(params = NULL), character(0))

            expect_error(
                f(params = c(a = 1)),
                "is.list(params) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("if no dependencies are defined, nothing is extracted",
        {
            expect_equal(f(params = list(a = 1)), character(0))
            expect_equal(f(params = list(a = 1, b = 2)), character(0))
        })

        test_that("if dependencies are defined, they are extracted",
        {
            expect_equal(f(params = list(a = ~x)), c(a = "x"))
            expect_equal(
                f(params = list(a = ~x, b = ~-1)),
                c(a = "x", b = "-1")
            )
            expect_equal(
                f(params = list(a = ~x, b = ~-1, c = 1)),
                c(a = "x", b = "-1")
            )
        })
    })


    describe(".get_depends_grouped",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.get_depends_grouped

        test_that("grouped dependencies are obtained correctly",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.get_depends_grouped

            expect_equal(f(), list("data"))

            pip$add("f1", \(a = ~data) a)
            expect_equal(f(), list(c("data", "f1")))

            pip$add("f2", \(a = 1) a)
            pip$add("f3", \(a = ~f2) a)
            pip$add("f4", \(a = ~f1) a)

            expect_equal(
                f(),
                list(
                    c("f2", "f3"),
                    c("data", "f1", "f4")
                )
            )
        })

        test_that("grouped dependencies are obtained correctly for
            complete data split",
        {
            dat1 <- data.frame(x = 1:2)
            dat2 <- data.frame(y = 1:2)
            dataList <- list(dat1, dat2)

            pip <- Pipeline$new("pipe")$
                add("f1", \(a = 1) a)$
                add("f2", \(a = ~f1, b = 2) b)$
                add("f3", \(x = ~f1, y = ~f2) x + y)


            f <- get_private(pip)$.get_depends_grouped
            pip$set_data_split(dataList, toStep = "f3")

            expect_equal(
                f(),
                list(
                    "data.1",
                    c("f1.1", "f2.1", "f3.1"),
                    "data.2",
                    c("f1.2", "f2.2", "f3.2")
                )
            )
        })

        test_that("grouped dependencies are obtained correctly for
            partial data split",
        {
            dat1 <- data.frame(x = 1:2)
            dat2 <- data.frame(y = 1:2)
            dataList <- list(dat1, dat2)

            pip <- Pipeline$new("pipe")$
                add("f1", \(a = 1) a)$
                add("f2", \(a = ~f1, b = 2) b)$
                add("f3", \(x = ~f1, y = ~f2) x + y)

            f <- get_private(pip)$.get_depends_grouped
            pip$set_data_split(dataList, toStep = "f2")

            expect_equal(
                f(),
                list(
                    "data.1",
                    "data.2",
                    c("f1.1", "f2.1", "f1.2", "f2.2", "f3")
                )
            )
        })

        test_that(
            "if all steps somehow are dependent on each other,
                just one group is returned",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.get_depends_grouped

            pip$add("f1", \(a = ~data) a)
            pip$add("f2", \(a = 1) a)
            pip$add("f3", \(a = ~f2) a)
            pip$add("f4", \(a = ~f1, b = ~f3) a)

            expect_equal(
                unlist(f()),
                pip$get_step_names()
            )
        })
    })


    describe(".get_downstream_depends",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.get_downstream_depends

        test_that("badly typed inputs are signalled",
        {
            expect_error(
                f(step = 1),
                "is_string(step) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(step = "foo", depends = c(a = 1)),
                "is.character(depends) || is.list(depends) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(step = "foo", depends = list(), recursive = 1),
                "is.logical(recursive)",
                fixed = TRUE
            )
        })

        test_that("if no dependencies, empty character vector is returned",
        {
            expect_equal(f(step = "foo", depends = character()), character(0))
            expect_equal(f(step = "foo", depends = list()), character(0))
        })

        test_that(
            "if no depends, recursive should give same as non-recursiv call",
        {
            expect_equal(
                f(step = "foo", depends = list()),
                f(step = "foo", depends = list(), recursive = FALSE)
            )
        })


        test_that("dependencies by default are determined recursively",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f1"),
                f3 = c(a = "f2")
            )

            expect_equal(f("f3", depends), character(0))
            expect_equal(f("f2", depends), c("f3"))
            expect_equal(f("f1", depends), c("f2", "f3"))
            expect_equal(f("f0", depends), c("f1", "f2", "f3"))
        })

        test_that("returned dependencies are unique",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f0", b = "f1"),
                f3 = c(a = "f0", b = "f1", c = "f2")
            )

            expect_equal(f("f3", depends), character(0))
            expect_equal(f("f2", depends), c("f3"))
            expect_equal(f("f1", depends), c("f2", "f3"))
            expect_equal(f("f0", depends), c("f1", "f2", "f3"))
        })

        test_that("dependencies can be determined non-recursively",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f1"),
                f3 = c(a = "f2")
            )

            expect_equal(f("f3", depends, recursive = FALSE), character(0))
            expect_equal(f("f2", depends, recursive = FALSE), c("f3"))
            expect_equal(f("f1", depends, recursive = FALSE), c("f2"))
            expect_equal(f("f0", depends, recursive = FALSE), c("f1"))
        })

        test_that("works with multiple dependencies given in sublist",
        {
            depends <- list(
                f2 = c(a = "f1", b = "data"),
                f3 = list(
                    x = c("f0", "f1"),
                    y = c("f1", "f2")
                )
            )

            expect_equal(f("f1", depends), c("f2", "f3"))
        })
    })


    describe(".get_last_step",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.get_last_step

        expect_equal(f(), "data")

        pip$add("f1", \(a = 1) a)
        expect_equal(f(), "f1")

        pip$add("f2", \(a = 1) a)
        expect_equal(f(), "f2")

        pip$pop_step()
        expect_equal(f(), "f1")
    })


    describe(".get_upstream_depends",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.get_upstream_depends

        test_that("badly typed inputs are signalled",
        {
            expect_error(
                f(step = 1),
                "is_string(step) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(step = "foo", depends = c(a = 1)),
                "is.character(depends) || is.list(depends) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(step = "foo", depends = list(), recursive = 1),
                "is.logical(recursive)",
                fixed = TRUE
            )
        })

        test_that("if no dependencies, empty character vector is returned",
        {
            expect_equal(f(step = "foo", depends = character()), character(0))
            expect_equal(f(step = "foo", depends = list()), character(0))
        })

        test_that("dependencies by default are determined recursively",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f1"),
                f3 = c(a = "f2")
            )

            expect_equal(f("f0", depends), character(0))
            expect_equal(f("f1", depends), c("f0"))
            expect_equal(f("f2", depends), c("f1", "f0"))
            expect_equal(f("f3", depends), c("f2", "f1", "f0"))
        })

        test_that("returned dependencies are unique",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f0", b = "f1"),
                f3 = c(a = "f0", b = "f1", c = "f2")
            )

            expect_equal(f("f0", depends), character(0))
            expect_equal(f("f1", depends), c("f0"))
            expect_equal(f("f2", depends), c("f0", "f1"))
            expect_equal(f("f3", depends), c("f0", "f1", "f2"))
        })

        test_that("dependencies can be determined non-recursively",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f1"),
                f3 = c(a = "f2")
            )

            expect_equal(f("f0", depends, recursive = FALSE), character(0))
            expect_equal(f("f1", depends, recursive = FALSE), c("f0"))
            expect_equal(f("f2", depends, recursive = FALSE), c("f1"))
            expect_equal(f("f3", depends, recursive = FALSE), c("f2"))
        })

        test_that("works with multiple dependencies given in sublist",
        {
            depends <- list(
                f1 = c(a = "f0"),
                f2 = c(a = "f1", b = "data"),
                f3 = list(
                    x = c("f1"),
                    y = c("f1", "f2")
                ),
                f4 = list(x = c("f3", "data"))
            )

            expect_equal(f("f2", depends), c("f1", "data", "f0"))
            expect_equal(f("f3", depends), c("f1", "f2", "f0", "data"))
            expect_equal(f("f4", depends), c("f3", "data", "f1", "f2", "f0"))
        })
    })


    describe(".init_function_and_params",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.prepare_and_verify_params

        test_that("fun must be a function",
        {
            expect_error(
                f(fun = 1),
                "is.function(fun) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("funcName must be a string",
        {
            expect_error(
                f(fun = identity, funcName = 1),
                "is_string(funcName) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("params must be a list",
        {
            expect_error(
                f(fun = identity, funcName = "identity", params = 1),
                "is.list(params) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("returns all function args in the parameter list",
        {
            foo <-\(a = 1, b = 2, c = 3) 1
            res <- f(foo, "foo")
            expect_equal(res, list(a = 1, b = 2, c = 3))
        })

        test_that("params given in the list override the function arg",
        {
            foo <-\(a = 1, b = 2, c = 3) 1
            res <- f(foo, "foo", params = list(a = 9, b = 99))
            expect_equal(res, list(a = 9, b = 99, c = 3))
        })

        test_that("signals undefined function args unless they are
            defined in the parameter list",
        {
            foo <-\(a, b = 2, c = 3) 1
            expect_error(
                f(foo, "foo"),
                "'a' parameter(s) must have default values",
                fixed = TRUE
            )

            res <- f(foo, "foo", params = list(a = 9))
            expect_equal(res, list(a = 9, b = 2, c = 3))
        })

        test_that("signals if parameter is not defined in function args,
            unless function is defined with ...",
        {
            foo <-\(a) 1

            expect_error(
                f(foo, "foo", params = list(a = 1, b = 2, c = 3)),
                "'b', 'c' are no function parameters of 'foo'",
                fixed = TRUE
            )

            foo <-\(a, ...) 1
            res <- f(foo, "foo", params = list(a = 1, b = 2, c = 3))
            expect_equal(res, list(a = 1, b = 2, c = 3))
        })
    })



    describe(".relative_dependency_to_index",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.relative_dependency_to_index

        test_that("signals bad input",
        {
            expect_error(
                f(relative_dep = "-1"),
                "is_number(relative_dep) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(relative_dep = 1),
                "relative_dep < 0",
                fixed = TRUE
            )
            expect_error(
                f(-1, dependencyName = NULL),
                "is_string(dependencyName) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(-1, "dep-name", startIndex = "2"),
                "is_number(startIndex) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(-1, "dep-name", startIndex = -2),
                "startIndex > 0 is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(-1, "dep-name", 2, step = 3),
                "is_string(step) is not TRUE",
                fixed = TRUE
            )
            expect_no_error(
                f(-1, "dep-name", 2, step = "foo"),
            )
        })

        test_that("signals if relative dependency exceeds pipeline",
        {
            expect_error(
                f(
                    relative_dep = -10,
                    dependencyName = "dep-name",
                    startIndex = 1,
                    step = "step-name"
                ),
                paste(
                    "step 'step-name': relative dependency dep-name=-10",
                    "points to outside the pipeline"
                ),
                fixed = TRUE
            )

            expect_error(f(-1, "dep-name", startIndex = 1, "step-name"))
            expect_error(f(-2, "dep-name", startIndex = 2, "step-name"))
        })

        test_that("returns correct index for relative dependency",
        {
            expect_equal(
                f(
                    relative_dep = -1,
                    dependencyName = "dep-name",
                    startIndex = 3,
                    step = "step-name"
                ),
                2
            )
            expect_equal(
                f(
                    relative_dep = -2,
                    dependencyName = "dep-name",
                    startIndex = 3,
                    step = "step-name"
                ),
                1
            )
        })
    })

    describe(".run_step",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.run_step

        test_that("returns the result of the function at the given step",
        {
            pip <- Pipeline$new("pipe")$
                add("A", \(a = 1) a)$
                add("B", \(b = 2) b)

            f <- get_private(pip)$.run_step

            expect_equal(f(step = "A"), 1)
            expect_equal(f(step = "B"), 2)
        })

        test_that("stores the result at the given step",
        {
            pip <- Pipeline$new("pipe")$
                add("A", \(a = 1) a)$
                add("B", \(b = 2) b)

            f <- get_private(pip)$.run_step

            expect_equal(f(step = "A"), 1)
            expect_equal(pip$get_out("A"), 1)
            expect_equal(f(step = "B"), 2)
            expect_equal(pip$get_out("B"), 2)
        })

        test_that("uses output of dependent steps",
        {
            pip <- Pipeline$new("pipe")$
                add("A", \(a = 1) a)$
                add("B", \(b = ~A) b)

            f <- get_private(pip)$.run_step

            expect_equal(f(step = "B"), NULL)

            # Now set output for step 'A'
            i <- match("A", pip$get_step_names())
            pip$pipeline[i, "out"] <- 2

            expect_equal(f(step = "B"), 2)
        })

        test_that("accepts Param object args",
        {
            pip <- Pipeline$new("pipe")$
                add("A", \(a = new("NumericParam", "a", value = 1)) a)

            f <- get_private(pip)$.run_step
            expect_no_error(f("A"))
        })

        test_that("if error, the failing step is given in the log",
        {
            lgr::unsuspend_logging()
            on.exit(lgr::suspend_logging())

            pip <- Pipeline$new("pipe")$
                add("A", \(a = 1) stop("something went wrong"))

            f <- get_private(pip)$.run_step
            log <- capture.output(expect_error(f("A")))
            hasInfo <- grepl(
                log[1],
                pattern = "something went wrong {\"context\":\"Step 2 ('A')\"}",
                fixed = TRUE
            )
            expect_true(hasInfo)
        })

        test_that(
            "updates the state to 'done' if step was run successfully
            otherwise to 'failed'",
        {
            pip <- Pipeline$new("pipe")$
                add("ok", \(x = 1) x)$
                add("error", \(x = 2) stop("ups"))$
                add("warning", \(x = 3) warning("hm"))

            f <- get_private(pip)$.run_step

            expect_true(all(pip$pipeline[["state"]] == "New"))

            f(step = "ok")
            pip$get_step("ok")$state

            expect_equal(pip$get_step("ok")$state, "Done")

            expect_error(f(step = "error"))
            expect_equal(pip$get_step("error")$state, "Failed")

            expect_warning(f(step = "warning"))
            expect_equal(pip$get_step("warning")$state, "Done")
        })

        test_that("will re-compute if locked step does not keep output",
        {
            pip <- Pipeline$new("pipe")$
                add("A", \(a = 1) a)$
                add("B", \(b = ~A) b, keepOut = FALSE)

            f <- get_private(pip)$.run_step

            expect_equal(f(step = "A"), 1)
            expect_equal(f(step = "B"), 1)

            pip$set_params_at_step("A", list(a = 2))
            pip$lock_step("B")
            get_private(pip)$.clean_out_not_kept()
            expect_true(is.null(pip$get_out("B")))

            expect_equal(f(step = "A"), 2)
            expect_equal(f(step = "B"), 2)
        })
    })


    describe(".set_at_step",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.set_at_step

        test_that("field must be a string and exist",
        {
            pip <- Pipeline$new("pipe")

            expect_error(
                f("data", field = 1),
                "is_string(field) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f("data", field = "foo"),
                "field %in% names(self$pipeline) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("sets the value without change data class of the fields",
        {
            pip <- Pipeline$new("pipe")$
                add("f1", \(a = 1) a)

            f <- get_private(pip)$.set_at_step
            before <- sapply(pip$pipeline, data.class)

            f("f1", field = "step", value = "f2")
            f("f2", field = "fun", value = mean)
            f("f2", field = "funcName", value = "my fun")
            f("f2", field = "params", value = list(a = 1, b = 2))
            f("f2", field = "out", value = 1)
            f("f2", field = "keepOut", value = TRUE)
            f("f2", field = "group", value = "my group")
            f("f2", field = "description", value = "my description")
            f("f2", field = "time", value = Sys.time())
            f("f2", field = "state", value = "new-state")

            after <- sapply(pip$pipeline, data.class)

            expect_equal(before, after)
        })

        test_that("signals class mismatch unless field is of class list",
        {
            pip <- Pipeline$new("pipe")$
                add("f1", \(a = 1) a)

            f <- get_private(pip)$.set_at_step
            ee <- expect_error

            ee(f("f1", field = "step", value = 1))
            ee(f("f2", field = "fun", value = "mean"))
            ee(f("f2", field = "funcName", value = list("my fun")))
            ee(f("f2", field = "params", value = c(a = 1, b = 2)))
            ee(f("f2", field = "keepOut", value = 1))
            ee(f("f2", field = "group", value = list("my group")))
            ee(f("f2", field = "description", value = list("my description")))
            ee(f("f2", field = "time", value = as.character(Sys.time())))
            ee(f("f2", field = "state", value = list("new-state")))
        })
    })


    describe(".update_states_downstream",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.update_states_downstream

        test_that("state must be a string",
        {
            pip <- Pipeline$new("pipe")
            expect_error(
                f("data", state = 1),
                "is_string(state) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("states are updated according to dependencies",
        {
            pip <- Pipeline$new("pipe")
            f <- get_private(pip)$.update_states_downstream

            pip$add("f1", \(a = ~data) a)
            pip$add("f2", \(a = ~f1) a)
            pip$add("f3", \(a = 1) a)
            pip$add("f4", \(a = ~f2) a)

            expect_true(all(pip$pipeline[["state"]] == "New"))
            f("f1", state = "new-state")
            states <- pip$pipeline[["state"]] |>
                stats::setNames(pip$get_step_names())

            expect_equal(
                states,
                c(
                    data = "New",
                    f1 = "New",
                    f2 = "new-state",
                    f3 = "New",
                    f4 = "new-state"
                )
            )

            f("data", state = "another-state")

            states <- pip$pipeline[["state"]] |>
                stats::setNames(pip$get_step_names())

            expect_equal(
                states,
                c(
                    data = "New",
                    f1 = "another-state",
                    f2 = "another-state",
                    f3 = "New",
                    f4 = "another-state"
                )
            )
        })
    })


    describe(".verify_dependency",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.verify_dependency

        test_that("signals badly typed input",
        {
            expect_error(
                f(dep = 1),
                "is_string(dep) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(dep = "dep", step = 1),
                "is_string(step) is not TRUE",
                fixed = TRUE
            )
            expect_error(
                f(dep = "dep", step = "step", toStep = 1),
                "is_string(toStep) is not TRUE",
                fixed = TRUE
            )
        })

        test_that("signals non existing toStep",
        {
            expect_error(
                f("dep", "step", toStep = "non-existent"),
                "step 'non-existent' does not exist",
                fixed = TRUE
            )
        })

        test_that("verifies valid depdendencies",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            f <- get_private(pip)$.verify_dependency

            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(a = 1) a)

            expect_true(f(dep = "f1", "new-step"))
            expect_true(f(dep = "f2", "new-step"))
        })

        test_that("signals if dependency is not defined",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            f <- get_private(pip)$.verify_dependency

            pip$add("f1", \(a = 1) a)
            pip$add("f2", \(a = 1) a)

            expect_error(
                f(dep = "f3", "new-step"),
                "dependency 'f3' not found",
                fixed = TRUE
            )

            expect_error(
                f(dep = "f2", "new-step", toStep = "f1"),
                "dependency 'f2' not found up to step 'f1'",
                fixed = TRUE
            )
        })
    })


    describe(".verify_from_to",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.verify_from_to
        eefix <-\(...) expect_error(..., fixed = TRUE)

        it("signals badly typed args",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            pip$add("f1", \(a = 1) a)
            f <- get_private(pip)$.verify_from_to

            eefix(
                f(from = "f1", to = 2),
                "is_number(from) is not TRUE"
            )
            eefix(
                f(from = 1, to = "f1"),
                "is_number(to) is not TRUE"
            )
        })

        it("returns true if to <= from",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            pip$add("f1", \(a = 1) a)
            f <- get_private(pip)$.verify_from_to

            expect_true(f(from = 1, to = 2))
        })

        it("signals if from > to",
        {
            eefix(
                f(from = 2, to = 1),
                "from <= to is not TRUE"
            )
        })

        it("signals if to > pipeline length",
        {
            pip <- expect_no_error(Pipeline$new("pipe"))
            pip$add("f1", \(a = 1) a)
            f <- get_private(pip)$.verify_from_to

            eefix(
                f(from = 1, to = pip$length() + 1),
                "'to' must not be larger than pipeline length"
            )
        })
    })


    describe(".verify_fun_params",
    {
        pip <- expect_no_error(Pipeline$new("pipe"))
        f <- get_private(pip)$.verify_fun_params

        test_that("returns TRUE if function has no args",
        {
            fun <-\() 1
            expect_true(f(fun, "funcName"))
        })

        test_that("returns TRUE if args are defined with default values",
        {
            fun <-\(x = 1, y = 2) x + y
            expect_true(f(fun, "funcName"))
        })

        test_that("if not defined with default values, params can be
            passed in addition",
        {
            fun <-\(x, y) x + y
            expect_true(f(fun, "funcName", params = list(x = 1, y = 2)))
        })

        test_that("signals parameters with no default values",
        {
            fun <-\(x, y, z = 1) x + y

            expect_error(
                f(fun, "funcName"),
                "'x', 'y' parameter(s) must have default values",
                fixed = TRUE
            )

            fun <-\(x, y = 1, z) x + y
            expect_error(
                f(fun, "funcName"),
                "'x', 'z' parameter(s) must have default values",
                fixed = TRUE
            )
        })

        test_that("supports ...",
        {
            fun <-\(x, ...) x
            expect_true(f(fun, "funcName", params = list(x = 1)))
        })

        test_that("signals undefined parameters",
        {
            fun <-\(x = 1, y = 1) x + y
            params <- list(x = 1, undef1 = 1, undef2 = 2)
            expect_error(
                f(fun, "funcName", params = params),
                "'undef1', 'undef2' are no function parameters of 'funcName'",
                fixed = TRUE
            )
        })

        test_that(
            "supports additional parameters if function is defined with ...",
        {
            fun <-\(x, ...) x
            params <- list(x = 1, add1 = 1, add2 = 2)
            expect_true(f(fun, "funcName", params = params))
        })


        test_that("signals badly typed input",
        {
            expect_error(
                f("mean"),
                "is.function(fun) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(mean, funcName = 1),
                "is_string(funcName) is not TRUE",
                fixed = TRUE
            )

            expect_error(
                f(mean, funcName = "mean", params = 1),
                "is.list(params) is not TRUE",
                fixed = TRUE
            )
        })
    })
})

Try the pipeflow package in your browser

Any scripts or data that you put into this service are public.

pipeflow documentation built on April 3, 2025, 10:50 p.m.