tests/testthat/test_aliases.R

test_that("an alias function is defined for each member function
    with the correct body",
{
    skip_if(Sys.getenv("R_CODECOV_ENV") == "GITHUB_ACTION")

    pip <- Pipeline$new("pipe")
    funs2check <- sapply(names(pip), \(x) is.function(pip[[x]])) |>
        Filter(f = isTRUE) |>
        names() |>
        setdiff("initialize")

    for (fun in funs2check) {
        alias_fun <- paste0("pipe_", fun)
        expect_true(exists(alias_fun), info = fun)
    }
})


describe("pipe_new",
{
    test_that("returns a pipeline object",
    {
        expect_true(methods::is(pipe_new("pipe"), "Pipeline"))
    })

    test_that("pipeline name must be a non-empty string",
    {
        expect_no_error(pipe_new("foo"))

        expect_error(
            pipe_new(name = 1),
            "name must be a string"
        )

        expect_error(
            pipe_new(name = ""),
            "name must not be empty"
        )
    })

    test_that("data is added as first step to pipeline",
    {
        pip <- pipe_new("pipe1", data = 1)

        expect_equal(pipe_get_step_names(pip), "data")

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(pipe_get_out(pip, "data"), 1)
    })

    test_that("the logger can be customized",
    {
        my_logger <- \(level, msg, ...) {
            message("My Logger: ", msg)
        }

        pip <- pipe_new("pipe", logger = my_logger)

        out <- capture.output(
            pipe_run(pip),
            type = "message"
        )
        expect_equal(
            out,
            c(
                "My Logger: Start run of 'pipe' pipeline:",
                "My Logger: Step 1/1 data",
                "My Logger: Finished execution of steps.",
                "My Logger: Done."
            )
        )
    })

    test_that("bad definition of the custom logger is signalled",
    {
        expected_error_msg <- paste(
            "logger function must have the following signature:",
            "function(level, msg, ...)"
        )


        logger_with_missing_level_arg <- \(msg, ...) {
            message("My Logger: ", msg)
        }
        expect_error(
            pipe_new("pipe1", logger = logger_with_missing_level_arg),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_missing_msg_arg <- \(level, ...) {
            message("My Logger: ", ...)
        }
        expect_error(
            pipe_new("pipe1", logger = logger_with_missing_msg_arg),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_missing_dots <- \(msg, level) {
            message("My Logger: ", msg)
        }
        expect_error(
            pipe_new("pipe1", logger = logger_with_missing_dots),
            expected_error_msg,
            fixed = TRUE
        )


        logger_with_additional_arg <- \(level, msg, foo, ...) {
            message("My Logger: ", msg)
        }
        expect_error(
            pipe_new("pipe1", logger = logger_with_additional_arg),
            expected_error_msg,
            fixed = TRUE
        )
    })
})


describe("pipe_add",
{
    test_that("step must be non-empty string",
    {
        pip <- pipe_new("pipe1")

        foo <- \(a = 0) a

        expect_error(pipe_add(pip, "", foo))
        expect_error(pipe_add(pip, c("a", "b"), foo))
    })

    test_that("fun must be passed as a function or string",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_add(pip, "step1", fun = 1),
            "is.function(fun) || is_string(fun) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("params must be a list",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_add(pip, "step1", fun = \() 1, params = 1),
            "is.list(params)",
            fixed = TRUE
        )
    })

    test_that("description must be (possibly empty) string",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_add(pip, "step1", fun = \() 1, description = 1),
            "is_string(description)",
            fixed = TRUE
        )
        expect_no_error(
            pipe_add(pip, "step1", fun = \() 1, description = "")
        )
    })

    test_that("group must be non-empty string",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_add(pip, "step1", fun = \() 1, group = 1),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
        expect_error(
            pipe_add(pip, "step1", fun = \() 1, group = ""),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("keepOut must be logical",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_add(pip, "step1", fun = \() 1, keepOut = 1),
            "is.logical(keepOut) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("duplicated step names are signaled",
    {
        pip <- pipe_new("pipe1")

        foo <- \(a = 0) a

        pipe_add(pip, "f1", foo)
        expect_error(pipe_add(pip, "f1", foo), "step 'f1' already exists")
        expect_error(
            pipe_add(pip, "f1", \(x) x),
            "step 'f1' already exists"
        )
    })

    test_that("missing dependencies are signaled",
    {
        pip <- pipe_new("pipe1")

        foo <- \(a = 0) a

        pipe_add(pip, "f1", foo)
        expect_error(
            pipe_add(pip, "f2", foo, params = list(a = ~undefined)),
            "dependency 'undefined' not found"
        )
    })

    test_that("step can refer to previous step by relative number",
    {
        pip <- pipe_new("pipe1")
        pipe_add(pip, "f1", \(a = 5) a)
        pipe_add(pip, "f2", \(x = ~-1) 2*x, keepOut = TRUE)

        out = pipe_run(pip, ) |> pipe_collect_out()
        expect_equal(out[["f2"]][[1]], 10)

        pipe_add(pip, "f3", \(x = ~-1, a = ~-2) x + a, keepOut = TRUE)
        out = pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f3"]][[1]], 10 + 5)
    })

    test_that("a bad relative step referal is signalled",
    {
        pip <- pipe_new("pipe1")
        expect_error(
            pipe_add(pip, "f1", \(x = ~-10) x),
            paste(
                "step 'f1': relative dependency x=-10",
                "points to outside the pipeline"
            ),
            fixed = TRUE
        )
    })

    test_that("added step can use lambda functions",
    {
        data <- 9
        pip <- pipe_new("pipe1", data = data)

        pipe_add(pip, "f1", \(data = ~data) data, keepOut = TRUE)
        a <- 1
        pipe_add(pip, "f2", \(a, b) a + b,
            params = list(a = a, b = ~f1),
            keepOut = TRUE
        )

        expect_equal(
            unlist(pipe_get_step(pip, "f1")[["depends"]]),
            c(data = "data")
        )
        expect_equal(
            unlist(pipe_get_step(pip, "f2")[["depends"]]),
            c(b = "f1")
        )

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]][[1]], data)
        expect_equal(out[["f2"]][[1]], a + data)
    })


    test_that(
        "supports functions with wildcard arguments",
    {
        my_mean <- \(x, na.rm = FALSE) {
            mean(x, na.rm = na.rm)
        }
        foo <- \(x, ...) {
            my_mean(x, ...)
        }
        v <- c(1, 2, NA, 3, 4)
        pip <- pipe_new("pipe", data = v)

        params <- list(x = ~data, na.rm = TRUE)
        pipe_add(pip, "mean", fun = foo, params = params, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["mean"]], mean(v, na.rm = TRUE))

        pipe_set_params_at_step(pip, "mean", list(na.rm = FALSE))
        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["mean"]], as.numeric(NA))
    })

    test_that("can have a variable defined outside as parameter default",
    {
        x <- 1

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a) a, params = list(a = x))

        expect_equal(pipe_get_params_at_step(pip, "f1")$a, x)

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(out[["f1"]], x)
    })

    test_that("handles Param object args",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = new("NumericParam", "a", value = 1)) a)

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(out[["f1"]], 1)
    })

    test_that(
        "can have a Param object defined outside as parameter default",
    {
        x <- 1
        p <- new("NumericParam", "a", value = x)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a) a, params = list(a = p))

        expect_equal(pipe_get_params_at_step(pip, "f1")$a, p)

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(out[["f1"]], x)
    })

    test_that(
        "function can be passed as a string",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", fun = "mean", params = list(x = 1:5))

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(out[["f1"]], mean(1:5))

        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean")
    })

    test_that(
        "if passed as a function, name is derived from the function",
    {
        pip <- pipe_new("pipe")
        pipe_add(pip, "f1", fun = mean, params = list(x = 1:5))
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean")

        pip <- pipe_new("pipe") |>
            pipe_add("f1", fun = mean, params = list(x = 1:5))
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean")
    })

    test_that(
        "lampda functions, are named 'function'",
    {
        pip <- pipe_new("pipe") |> pipe_add("f1", fun = \(x = 1) x)
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function")

        pip <- pipe_new("pipe") |>
            pipe_add("f1", fun = \(x = 1) x)
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function")
    })
})


describe("pipe_append",
{
    test_that("pipelines can be combined even if their steps share names,
        unless tryAutofixNames is FALSE",
    {
        pip1 <- pipe_new("pipe1", data = 1) |>
            pipe_add("f1", \(a = 1) a, keepOut = TRUE) |>
            pipe_add("f2", \(b = ~f1) b)

        pip2 <- pipe_new("pipe2") |>
            pipe_add("f1", \(a = 10) a) |>
            pipe_add("f2", \(b = ~f1) b, keepOut = TRUE)

        expect_error(
            pip1 |> pipe_append(pip2, tryAutofixNames = FALSE),
            paste(
                "combined pipeline would have duplicated step names:",
                "'data', 'f1', 'f2',"
            )
        )

        pp <- pip1 |> pipe_append(pip2)
        expect_equal(
            pp |> pipe_length(),
            pip1 |> pipe_length() + pip2 |> pipe_length()
        )

        out1 <- pip1 |> pipe_run() |> pipe_collect_out()
        out2 <- pip2 |> pipe_run() |> pipe_collect_out()

        out <- pp |> pipe_run() |> pipe_collect_out()

        expect_equivalent(out, c(out1, out2))
    })

    test_that("auto-fixes only the names that need auto-fix",
    {
        pip1 <- pipe_new("pipe1", data = 1) |>
            pipe_add("f1", \(a = 1) a, keepOut = TRUE) |>
            pipe_add("f2", \(b = ~f1) b)

        pip2 <- pipe_new("pipe2") |>
            pipe_add("f3", \(a = 10) a) |>
            pipe_add("f4", \(b = ~f3) b, keepOut = TRUE)

        pp <- pip1 |> pipe_append(pip2)
        expect_equal(
            pp |> pipe_get_step_names(),
            c("data", "f1", "f2", "data.pipe2", "f3", "f4")
        )

        out1 <- pip1 |> pipe_run() |> pipe_collect_out()
        out2 <- pip2 |> pipe_run() |> pipe_collect_out()

        out <- pp |> pipe_run() |> pipe_collect_out()

        expect_equivalent(out, c(out1, out2))
    })

    test_that("the separator used for step names can be customized",
    {
        pip1 <- pipe_new("pipe1", data = 1) |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b)

        pip2 <- pipe_new("pipe2") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b)

        pp <- pip1 |> pipe_append(pip2, sep = "_")

        expect_equal(
            pp |> pipe_get_step_names(),
            c("data", "f1", "f2", "data_pipe2", "f1_pipe2", "f2_pipe2")
        )
    })


    test_that(
        "output of first pipeline can be set as input of appended pipeline",
    {
        pip1 <- pipe_new("pipe1", data = 1)
        pip1 |> pipe_add("f1", \(a = ~data) a * 2)

        pip2 <- pipe_new("pipe2", data = 99) |>
            pipe_add("f1", \(a = ~data) a * 3) |>
            pipe_add("f2", \(a = ~f1) a * 4)

        pp <- pip1 |> pipe_append(pip2, outAsIn = TRUE)

        depends <- pipe_get_depends(pp)
        expect_equal(depends[["data.pipe2"]], c(data = "f1"))

        out <- pp |> pipe_run() |> pipe_collect_out(all = TRUE)
        pipe1_out <- out[["f1"]][["f1"]]
        expect_equal(pipe1_out, 1 * 2)
        expect_equal(out[["data.pipe2"]], pipe1_out)
        expect_equal(out[["f1"]][["f1.pipe2"]], pipe1_out * 3)
        expect_equal(out[["f2"]], out[["f1"]][["f1.pipe2"]] * 4)
    })

    test_that("if duplicated step names would be created, an error is given",
    {
        pip1 <- pipe_new("pipe1")
        pip1 |> pipe_add("f1", \(a = ~data) a + 1)
        pip1 |> pipe_add("f1.pipe2", \(a = ~data) a + 1)

        pip2 <- pipe_new("pipe2")
        pip2 |> pipe_add("f1", \(a = ~data) a + 1)

        expect_error(
            pip1 |> pipe_append(pip2),
            "Cannot auto-fix name clash for step 'f1' in pipeline 'pipe2'",
            fixed = TRUE
        )
    })
})



describe("append_to_step_names",
{
    test_that("postfix can be appended to step names",
    {
        pip <- pipe_new("pipe", data = 1)

        pipe_add(pip, "f1", \(a = ~data) a + 1)
        pipe_add(pip, "f2", \(a = ~data, b = ~f1) a + b)
        pipe_append_to_step_names(pip, "foo")

        expected_names <- c("data.foo", "f1.foo", "f2.foo")
        expect_equal(pipe_get_step_names(pip), expected_names)

        expected_depends <- list(
            data.foo = character(0),
            f1.foo = c(a = "data.foo"),
            f2.foo = c(a = "data.foo", b = "f1.foo")
        )

        expect_equal(pipe_get_depends(pip), expected_depends)
    })
})



describe("pipe_collect_out",
{
    test_that("data is set as first step but not part of output by default",
    {
        dat <- data.frame(a = 1:2, b = 1:2)
        pip <- pipe_new("pipe1", data = dat)
        expect_equal(pip$pipeline[["step"]], "data")

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out, list())

        pip <- pipe_new("pipe1", data = dat) |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], dat)
    })

    test_that("at the end, pipeline can clean output that shall not be kept",
    {
        data <- 9
        pip <- pipe_new("pipe1", data = data)

        foo <-\(a = 1) a
        bar <-\(a, b) a + b

        a <- 5
        pipe_add(pip, "f1", foo, params = list(a = a))
        pipe_add(
            pip, "f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE
        )

        pipe_run(pip, cleanUnkept = TRUE)
        expect_equal(pipe_get_out(pip, "f1"), NULL)
        expect_equal(pipe_get_out(pip, "f2"), a + data)
    })

    test_that("output is collected as expected",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a, keepOut = TRUE) |>
            pipe_add("f2", \(a = 2, b = ~f1) a + b) |>
            pipe_add("f3", \(a = 3, b = ~f2) a + b, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(length(out), 2)
        expect_equal(names(out), c("f1", "f3"))
    })

    test_that("output can be grouped",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |>
            pipe_add("f3", \(a = 1, b = 2) a / b) |>
            pipe_add("f4", \(a = 2, b = 2) a + b, group = "plus")

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)

        expect_equal(out[["plus"]], list(f2 = 3, f4 = 4))
        expect_equal(out[["f1"]], 1)
        expect_equal(out[["f3"]], 1/2)
    })

    test_that("output is ordered in the order of steps",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f2", \(a = 1) a, keepOut = TRUE) |>
            pipe_add("f1", \(b = 2) b, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(names(out), c("f2", "f1"))
    })

    test_that(
        "grouped output is ordered in the order of group definitions",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(x = 1) x, group = "g2") |>
            pipe_add("f2", \(x = 2) x, group = "g1") |>
            pipe_add("f3", \(x = 3) x, group = "g2") |>
            pipe_add("f4", \(x = 4) x, group = "g1")

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)

        expect_equal(names(out), c("data", "g2", "g1"))
    })

    test_that(
        "if just one group the output name still will take the group name",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |>
            pipe_add("f3", \(a = 1, b = 2) a / b, group = "my f3") |>
            pipe_add("f4", \(a = 2, b = 2) a + b, group = "plus")

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)

        expect_equal(names(out), c("data", "f1", "plus", "my f3"))
    })

    describe("groupBy option",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |>
            pipe_add("f3", \(a = 1, b = 2) a / b)

        test_that("column to groupBy can be customized",
        {
            pipe_run_step(pip, "f1")
            pipe_run_step(pip, "f3")
            out <- pipe_collect_out(pip, groupBy = "state", all = TRUE)

            expect_equal(
                out,
                list(
                    New = list(data = NULL, f2 = NULL),
                    Done = list(f1 = 1, f3 = 0.5)
                )
            )
        })

        test_that("signals bad  groupBy input",
        {
            expect_error(
                pipe_collect_out(pip, groupBy = c("not", "a", "string")),
                "groupBy must be a single string"
            )

            expect_error(
                pipe_collect_out(pip, groupBy = "foo"),
                "groupBy column does not exist"
            )

            expect_error(
                pipe_collect_out(pip, groupBy = "time"),
                "groupBy column must be character"
            )
        })
    })
})



describe("pipe_discard_steps",
{
    test_that("pipeline steps can be discarded by pattern",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("calc", \(a = 1) a) |>
            pipe_add("plot1", \(x = ~calc) x) |>
            pipe_add("plot2", \(x = ~plot1) x)

        out <- capture.output(
            pipe_discard_steps(pip, "plot"),
            type = "message"
        )
        expect_equal(
            out,
            c(
                "step 'plot2' was removed",
                "step 'plot1' was removed"
            )
        )

        expect_equal(pip$pipeline[["step"]], c("data", "calc"))
    })

    test_that("if no pipeline step matches pattern, pipeline remains unchanged",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("calc", \(a = 1) a) |>
            pipe_add("plot1", \(x = ~calc) x) |>
            pipe_add("plot2", \(x = ~plot1) x)

        steps_before = pip$pipeline[["step"]]

        expect_silent(pipe_discard_steps(pip, "bla"))
        expect_equal(pip$pipeline[["step"]], steps_before)
    })


    test_that("if step has downstream dependencies, an error is given",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) b)

        expect_error(
            pipe_discard_steps(pip, "f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2'"
            )
        )

        pipe_add(pip, "f3", \(x = ~f1) x)
        expect_error(
            pipe_discard_steps(pip, "f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f3'"
            )
        )
    })
})


describe("pipe_get_data",
{
    test_that(
        "data can be retrieved",
    {
        p <- pipe_new("pipe", data = 1:2)
        expect_equal(p |> pipe_get_data(), 1:2)

        p |> pipe_set_data(3:4)
        expect_equal(p |> pipe_get_data(), 3:4)
    })

    test_that(
        "signals missing data",
    {
        p <- pipe_new("pipe", data = 1:2)
        p |> pipe_pop_step()    # remove data step

        expect_error(
            p |> pipe_get_data(),
            "no data step defined"
        )
    })
})


describe("pipe_get_depends",
{
    test_that(
        "dependencies can be retrieved and are named after the steps",
    {
        pip <- pipe_new("pipe", data = 1)

        pipe_add(pip, "f1", \(a = ~data) a + 1)
        pipe_add(pip, "f2", \(b = ~f1) b + 1)

        depends <- pipe_get_depends(pip)
        expected_depends <- list(
            data = character(0),
            f1 = c(a = "data"),
            f2 = c(b = "f1")
        )

        expect_equal(depends, expected_depends)
        expect_equal(names(depends), pipe_get_step_names(pip))
    })
})



describe("pipe_get_depends_down",
{
    test_that("dependencies can be determined recursively for given step",
    {
        pip <- pipe_new("pipe")

        pipe_add(pip, "f1", \(a = 1) a)
        pipe_add(pip, "f2", \(a = ~f1) a)
        pipe_add(pip, "f3", \(a = ~f1, b = ~f2) a + b)
        pipe_add(pip, "f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c)

        expect_equal(pipe_get_depends_down(pip, "f3"), c("f4"))
        expect_equal(pipe_get_depends_down(pip, "f2"), c("f3", "f4"))
        expect_equal(pipe_get_depends_down(pip, "f1"), c("f2", "f3", "f4"))
    })

    test_that("if no dependencies an empty character vector is returned",
    {
        pip <- pipe_new("pipe")

        pipe_add(pip, "f1", \(a = 1) a)
        expect_equal(pipe_get_depends_down(pip, "f1"), character(0))
    })

    test_that("step must exist",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_get_depends_down(pip, "f1"),
            "step 'f1' does not exist"
        )
    })

    test_that(
        "works with complex dependencies as created by data splits",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = 2) b) |>
            pipe_add("f3", \(x = ~f1, y = ~f2) x + y)

        pipe_set_data_split(pip, dataList, toStep = "f2")

        expect_equal(
            pipe_get_depends_down(pip, "f1.1"),
            c("f2.1", "f3")
        )

        expect_equal(
            pipe_get_depends_down(pip, "f1.2"),
            c("f2.2", "f3")
        )

        expect_equal(pipe_get_depends_down(pip, "f2.1"), "f3")
        expect_equal(pipe_get_depends_down(pip, "f2.2"), "f3")
    })
})



describe("pipe_get_depends_up",
{
    test_that("dependencies can be determined recursively for given step",
    {
        pip <- pipe_new("pipe")

        pipe_add(pip, "f1", \(a = 1) a)
        pipe_add(pip, "f2", \(a = ~f1) a)
        pipe_add(pip, "f3", \(a = ~f1, b = ~f2) a + b)
        pipe_add(pip, "f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c)

        expect_equal(pipe_get_depends_up(pip, "f2"), c("f1"))
        expect_equal(pipe_get_depends_up(pip, "f3"), c("f1", "f2"))
        expect_equal(pipe_get_depends_up(pip, "f4"), c("f1", "f2", "f3"))
    })

    test_that("if no dependencies an empty character vector is returned",
    {
        pip <- pipe_new("pipe")

        pipe_add(pip, "f1", \(a = 1) a)
        expect_equal(pipe_get_depends_up(pip, "f1"), character(0))
    })

    test_that("step must exist",
    {
        pip <- pipe_new("pipe")

        expect_error(
            pipe_get_depends_up(pip, "f1"),
            "step 'f1' does not exist"
        )
    })

    test_that("works with complex dependencies as created by data splits",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = 2) b) |>
            pipe_add("f3", \(x = ~f1, y = ~f2) x + y)

        pipe_set_data_split(pip, dataList, toStep = "f2")

        expect_equal(pipe_get_depends_up(pip, "f2.1"), c("f1.1"))
        expect_equal(pipe_get_depends_up(pip, "f2.2"), c("f1.2"))

        expect_equivalent(
            pipe_get_depends_up(pip, "f3"),
            c("f1.1", "f2.1", "f1.2", "f2.2")
        )
    })
})


describe("pipe_get_graph",
{
    pip <- pipe_new("pipe") |>
        pipe_add("f1", \(a = 1) a) |>
        pipe_add("f2", \(a = ~f1, b = ~data) a)

    res <- pipe_get_graph(pip)

    test_that("returns a node table with the expected columns",
    {
        tab <- res$nodes
        expect_true(is.data.frame(tab))
        expectedColumns <- c("id", "label", "group", "shape", "color", "title")

        expect_equal(colnames(tab), expectedColumns)
    })

    test_that("the node table contains all steps",
    {
        tab <- res$nodes
        expect_equal(tab$label, pipe_get_step_names(pip))
    })

    test_that("returns an edges table with the expected columns",
    {
        tab <- res$edges
        expect_true(is.data.frame(tab))
        expectedColumns <- c("from", "to", "arrows")

        expect_equal(colnames(tab), expectedColumns)
    })

    test_that("can be printed created for certain groups",
    {
        pip <- pipe_new("pipe")
        pipe_add(pip, "step2", \(a = ~data) a + 1, group = "add")
        pipe_add(pip, "step3", \(a = ~step2) 2 * a, group = "mult")
        pipe_add(pip, "step4", \(a = ~step2, b = ~step3) a + b, group = "add")
        pipe_add(pip, "step5", \(a = ~data, b = ~step4) a * b, group = "mult")

        res.add <- pipe_get_graph(pip, groups = "add")
        expect_equal(res.add$nodes$label, c("step2", "step4"))

        res.mult <- pipe_get_graph(pip, groups = "mult")
        expect_equal(res.mult$nodes$label, c("step3", "step5"))
    })
})


describe("pipe_get_out",
{
    test_that("output at given step can be retrieved",
    {
        data <- airquality
        pip <- pipe_new("pipe", data = data) |>
            pipe_add("model",
               \(data = ~data) {
                    lm(Ozone ~ Wind, data = data)
                },
            )

        pipe_run(pip)

        expect_equal(pipe_get_out(pip, "data"), data)
        expect_equivalent(
            pipe_get_out(pip, "model"),
            lm(Ozone ~ Wind, data = data)
        )
    })

    test_that("step of requested output must exist",
    {
        pip <- pipe_new("pipe")
        pipe_run(pip)
        expect_error(pipe_get_out(pip, "foo"), "step 'foo' does not exist")
    })
})



describe("pipe_get_params",
{
    test_that("parameters can be retrieved",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a, keepOut = TRUE) |>
            pipe_add("f2", \(a, b = ~f1) a + b,
                params = list(a = 8),
                keepOut = TRUE
            ) |>
            pipe_add("f3", \(a = ~f2, b = 3) a + b, keepOut = TRUE)

        p <- pipe_get_params(pip)
        expect_equal(
            p, list(f1 = list(a = 1), f2 = list(a = 8), f3 = list(b = 3))
        )
    })


    test_that("empty pipeline gives empty list of parameters",
    {
        pip <- pipe_new("pipe1")
        expect_equivalent(pipe_get_params(pip), list())

        pipe_add(pip, "f1", \() 1)
        expect_equivalent(pipe_get_params(pip), list())
    })

    test_that("hidden parameters are filtered out by default",
    {

        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, .hidden = 2) a)

        p <- pipe_get_params(pip)
        expect_equal(p, list(f1 = list(a = 1)))

        p <- pipe_get_params(pip, ignoreHidden = FALSE)
        expect_equal(p, list(f1 = list(a = 1, .hidden = 2)))
    })

    test_that("works with Param objects",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            ) |>
            pipe_add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        par <- pipe_get_params(pip)
        expect_true(all(par$f1 |> sapply(is, "NumericParam")))
        expect_equal(par$f1$x@value, 1)
        expect_equal(par$f1$y@value, 2)

        expect_true(all(par$f2 |> sapply(is, "StringParam")))
        expect_equal(par$f2$s1@value, "Hello")
        expect_equal(par$f2$s2@value, "World")
    })
})



describe("pipe_get_params_at_step",
{
    test_that("list of step parameters can be retrieved",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b) |>
            pipe_add("f2", \(x = 1, y = 2) x + y)

        expect_equal(
            pipe_get_params_at_step(pip, "f1"), list(a = 1, b = 2)
        )

        expect_equal(
            pipe_get_params_at_step(pip, "f2"), list(x = 1, y = 2)
        )
    })

    test_that("if no parameters empty list is returned",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \() 1)

        expect_equal(
            pipe_get_params_at_step(pip, "f1"), list()
        )
    })

    test_that(
        "hidden parameters are not returned, unless explicitly requested",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, .b = 2) a + .b)

        expect_equal(
            pipe_get_params_at_step(pip, "f1"), list(a = 1)
        )

        expect_equal(
            pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE),
            list(a = 1, .b = 2)
        )
    })

    test_that("bound parameters are never returned",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = ~data) a + b)

        expect_equal(
            pipe_get_params_at_step(pip, "f1"), list(a = 1)
        )

        expect_equal(
            pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE),
            list(a = 1)
        )
    })

    test_that("step must exist",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = ~data) a + b)

        expect_error(
            pipe_get_params_at_step(pip, "foo"),
            "step 'foo' does not exist"
        )
    })

    test_that("works with Param objects",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            ) |>
            pipe_add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        par1 <- pipe_get_params_at_step(pip, "f1")
        expect_true(all(par1 |> sapply(is, "NumericParam")))
        expect_equal(par1$x@value, 1)
        expect_equal(par1$y@value, 2)

        par2 <- pipe_get_params_at_step(pip, "f2")
        expect_true(all(par2 |> sapply(is, "StringParam")))
        expect_equal(par2$s1@value, "Hello")
        expect_equal(par2$s2@value, "World")
    })
})


describe("pipe_get_params_unique",
{
    test_that("parameters can be retrieved uniquely and if occuring multiple
        times, the 1st default value is used",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 2, b = 3) a + b) |>
            pipe_add("f3", \(a = 4, b = 5, c = 6) a + b)

        p <- pipe_get_params_unique(pip)
        expect_equivalent(p, list(a = 1, b = 3, c = 6))
    })

    test_that("empty pipeline gives empty list",
    {
        pip <- pipe_new("pipe")
        expect_equivalent(pipe_get_params_unique(pip), list())
    })

    test_that("pipeline with no parameters gives empty list",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \() 1)
        expect_equivalent(pipe_get_params_unique(pip), list())
    })

    test_that("works with Param objects",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add(
                "f1",
               \(
                    a = 0,
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    a * (x + y)
                }
            ) |>
            pipe_add(
                "f2",
               \(
                    a = new("NumericParam", "a", value = 0),
                    x = new("NumericParam", "x", value = 1),
                    y = 2,
                    z = new("NumericParam", "y", value = 3)
                ) {
                    a * (x + y + z)
                }
            )

        par <- pipe_get_params_unique(pip)
        expect_equal(names(par), c("a", "x", "y", "z"))
        expect_equal(par$a, 0)
        expect_equal(par$x@value, 1)
        expect_equal(par$y@value, 2)
        expect_equal(par$z@value, 3)
    })
})



describe("pipe_get_params_unique_json",
{
    test_that("the elements are not named",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a, keepOut = TRUE)

        p <- pipe_get_params_unique_json(pip)
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(names(pl), NULL)


        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |>
            pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pipe_get_params_unique_json(pip)
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(names(pl), NULL)
    })


    test_that("standard parameters are returned as name-value pairs",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 1, b = 2) a) |>
            pipe_add("f3", \(a = 1, b = 2, c = list(a = 1, b = 2)) a)

        p <- pipe_get_params_unique_json(pip)
        expect_true(methods::is(p, "json"))

        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)
        expect_equal(
            pl,
            list(
                list(name = "a", value = 1),
                list(name = "b", value = 2),
                list(name = "c", value = list(a = 1, b = 2))
            )
        )
    })

    test_that("Param objects are returned with full information",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |>
            pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pipe_get_params_unique_json(pip)
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)

        expect_equal(
            pl,
            list(
                list(
                    value = "some x",
                    name = "x",
                    advanced = FALSE,
                    label = "my x",
                    description = "",
                    source = "internal",
                    domain = "",
                    class = "StringParam"
                ),
                list(
                    value = "some y",
                    name = "y",
                    advanced = FALSE,
                    label = "my y",
                    description = "",
                    source = "internal",
                    domain = "",
                    class = "StringParam"
                )
            )
        )
    })

    test_that("the name of the arg is set to the name of the Param object",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |>
            pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y)

        p <- pipe_get_params_unique_json(pip)
        pl <- jsonlite::fromJSON(p)

        expect_true(pl[["label"]][[1]] == "my x")
        expect_false(pl[["name"]][[1]] == "my x")
        hasArgName <- pl[["name"]][[1]] == "x"

        expect_true(pl[["label"]][[2]] == "my y")
        expect_false(pl[["name"]][[2]] == "my y")
        hasArgName <- pl[["name"]][[2]] == "y"
    })

    test_that("works with mixed, that is, standard and Param objects",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = 1) x) |>
            pipe_add("f2", \(s = new("StringParam", "my s", "some s")) s)

        p <- pipe_get_params_unique_json(pip)
        pl <- jsonlite::fromJSON(p, simplifyVector = FALSE)

        expect_equal(pl[[1]], list(name = "x", value = 1L))
        expect_equal(
            pl[[2]],
            list(
                value = "some s",
                name = "s",
                advanced = FALSE,
                label = "my s",
                description = "",
                source = "internal",
                domain = "",
                class = "StringParam"
            )
        )
    })
})



describe("pipe_get_step",
{
    test_that("single steps can be retrieved",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", identity, params = list(x = 1))

        expect_equal(pipe_get_step(pip, "data"), pip$pipeline[1, ])
        expect_equal(pipe_get_step(pip, "f1"), pip$pipeline[2, ])

        expect_error(pipe_get_step(pip, "foo"), "step 'foo' does not exist")
    })

    test_that("dependencies are recorded as expected",
    {
        pip <- pipe_new("pipe1", data = 9)

        foo <-\(a = 0) a
        bar <-\(a = 1, b = 2) a + b

        pipe_add(pip, "f1", foo)
        pipe_add(pip, "f2", bar, params = list(a = ~data, b = ~f1))

        expect_true(length(unlist(pipe_get_step(pip, "f1")[["depends"]])) == 0)
        expect_equal(
            unlist(pipe_get_step(pip, "f2")[["depends"]]),
            c("a" = "data", b = "f1")
        )
    })
})


describe("pipe_get_step_names",
{
    test_that("step names be retrieved",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \() {}) |>
            pipe_add("f2", \(x = 1) x)

        expect_equal(pipe_get_step_names(pip), pip$pipeline[["step"]])
    })
})



describe("pipe_get_step_number",
{
    test_that("get_step_number",
    {
        test_that("get_step_number works as expected",
        {
            pip <- expect_no_error(pipe_new("pipe"))
            pipe_add(pip, "f1", \(a = 1) a)
            pipe_add(pip, "f2", \(a = 1) a)

            pipe_get_step_number(pip, "f1") |> expect_equal(2)
            pipe_get_step_number(pip, "f2") |> expect_equal(3)
        })

        test_that("signals non-existent step",
        {
            pip <- expect_no_error(pipe_new("pipe"))
            pipe_add(pip, "f1", \(a = 1) a)

            expect_error(
                pipe_get_step_number(pip, "non-existent"),
                "step 'non-existent' does not exist"
            )
        })
    })
})


describe("pipe_has_step",
{
    test_that("it can be checked if pipeline has a step",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_true(pipe_has_step(pip, "f1"))
        expect_false(pipe_has_step(pip, "f2"))
    })
})


describe("pipe_insert_after",
{
    test_that("can insert a step after another step",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        pipe_insert_after(
            pip,
            "f1",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pipe_get_step_names(pip),
            c("data", "f1", "f3", "f2")
        )
    })

    test_that("will not insert a step if the step already exists",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_after(pip, "f1", step = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that(
        "will not insert a step if the reference step does not exist",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_after(pip, "non-existent", step = "f3"),
            "step 'non-existent' does not exist"
        )
    })

    test_that(
        "will not insert a step with bad parameter dependencies",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_after(pip, "f1", step = "f3", \(x = ~f2) x),
            "step 'f3': dependency 'f2' not found"
        )
    })

    test_that("will work if insert happens at last position",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        pipe_insert_after(
            pip,
            "f2",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pipe_get_step_names(pip),
            c("data", "f1", "f2", "f3")
        )
    })
})


describe("pipe_insert_before",
{
    test_that("can insert a step after another step",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        pipe_insert_before(
            pip,
            "f2",
            step = "f3",
            fun =\(a = ~f1) a + 1
        )

        expect_equal(
            pipe_get_step_names(pip),
            c("data", "f1", "f3", "f2")
        )
    })

    test_that("will not insert a step if the step already exists",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_before(pip, "f1", step = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that(
        "will not allow step to be inserted at first position",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1)

        expect_error(
            pipe_insert_before(pip, "data", step = "f2"),
            "cannot insert before first step"
        )
    })

    test_that("will not insert a step if the reference step does not exist",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_before(pip, "non-existent", step = "f3"),
            "step 'non-existent' does not exist"
        )
    })

    test_that(
        "will not insert a step with bad parameter dependencies",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a + 1) |>
            pipe_add("f2", \(a = ~f1) a + 1)

        expect_error(
            pipe_insert_before(pip, "f2", step = "f3", \(x = ~f2) x),
            "step 'f3': dependency 'f2' not found"
        )
    })
})


describe("pipe_length",
{
    test_that("returns the number of steps",
    {
        pip <- pipe_new("pipe")
        expect_equal(pipe_length(pip), 1)

        pipe_add(pip, "f1", \(a = 1) a)
        expect_equal(pipe_length(pip), 2)

        pipe_remove_step(pip, "f1")
        expect_equal(pipe_length(pip), 1)
    })
})


describe("pipe_lock_step",
{
    test_that("sets state to 'locked'",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a)

        pipe_lock_step(pip, "f1")
        expect_equal(pipe_get_step(pip, "f1")[["state"]], "Locked")

        pip
    })
})


describe("pipe_print",
{
    test_that("pipeline can be printed",
    {
        pip <- pipe_new("pipe1", data = 9)

        expect_output(pipe_print(pip))
    })

    test_that("missing function is signaled",
    {
        pip <- pipe_new("pipe1")

        expect_error(
            pipe_add(pip, "f1", "non-existing-function"),
            "object 'non-existing-function' of mode 'function' was not found"
        )
    })

    test_that("if verbose is TRUE, all columns are printed",
    {
        op <- options(width = 1000L)
        on.exit(options(op))
        pip <- pipe_new("pipe1", data = 9)

        out <- capture.output(pipe_print(pip, verbose = TRUE))
        header <- out[1] |> trimws() |> strsplit("\\s+") |> unlist()
        expected_header <- colnames(pip$pipeline)
        expect_equal(header, expected_header)
    })
})


describe("pipe_pop_step",
{
    test_that("last pipeline step can be popped",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        pip_copy = pipe_clone(pip)

        pipe_add(pip, "f2", \(b = 2) b)

        expect_equal(pipe_length(pip), 3)
        expect_equal(pip_copy$length(), 2)

        res = pipe_pop_step(pip)
        expect_equal(res, "f2")

        expect_equal(pipe_length(pip), 2)
        expect_equal(pip, pip_copy)
    })
})



describe("pipe_pop_steps_after",
{
    test_that("all steps after a given step can be removed",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x) |>
            pipe_add("f2", \(x = ~f1) x) |>
            pipe_add("f3", \(x = ~f2) x)

        steps = pipe_pop_steps_after(pip, "f1")
        expect_equal(steps, c("f2", "f3"))

        hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]])
        expect_true(hasAllStepsRemoved)
    })

    test_that("if given step does not exist, an error is signalled",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x)

        expect_error(
            pipe_pop_steps_after(pip, "bad_step"),
            "step 'bad_step' does not exist"
        )
    })

    test_that("if given step is the last step, nothing gets removed",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x) |>
            pipe_add("f2", \(x = ~f1) x)

        length_before = pipe_length(pip)

        res = pipe_pop_steps_after(pip, "f2")

        expect_equal(res, character(0))
        hasNothingRemoved = pipe_length(pip) == length_before
        expect_true(hasNothingRemoved)
    })
})



describe("pipe_pop_steps_from",
{
    test_that("all steps from a given step can be removed",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x) |>
            pipe_add("f2", \(x = ~f1) x) |>
            pipe_add("f3", \(x = ~f2) x)

        steps = pipe_pop_steps_from(pip, "f2")
        expect_equal(steps, c("f2", "f3"))

        hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]])
        expect_true(hasAllStepsRemoved)
    })

    test_that("if given step does not exist, an error is signalled",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x)

        expect_error(
            pipe_pop_steps_from(pip, "bad_step"),
            "step 'bad_step' does not exist"
        )
    })

    test_that("if given step is the last step, one step removed",
    {

        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(x = 1) x) |>
            pipe_add("f2", \(x = ~f1) x)

        length_before = pipe_length(pip)

        res = pipe_pop_steps_from(pip, "f2")

        expect_equal(res, "f2")
        hasOneStepRemoved = pipe_length(pip) == length_before - 1
        expect_true(hasOneStepRemoved)
    })
})



describe("pipe_remove_step",
{
    test_that("pipeline step can be removed",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 1) b)

        pipe_remove_step(pip, "f1")

        expect_equal(pipe_get_step_names(pip), c("data", "f2"))
    })

    test_that("step must exist",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_error(
            pipe_remove_step(pip, "non-existent-step"),
            "step 'non-existent-step' does not exist"
        )
    })

    test_that("if step has downstream dependencies, an error is given",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) b)

        expect_error(
            pipe_remove_step(pip, "f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2'"
            )
        )

        pipe_add(pip, "f3", \(x = ~f1) x)
        expect_error(
            pipe_remove_step(pip, "f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f3'"
            )
        )
    })

    test_that(
        "if error, only the direct downstream dependencies are reported",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) b) |>
            pipe_add("f3", \(c = ~f2) c) |>
            pipe_add("f4", \(d = ~f1) d)

        expect_error(
            pipe_remove_step(pip, "f1"),
            paste(
                "cannot remove step 'f1' because the following",
                "steps depend on it: 'f2', 'f4'"
            )
        )
    })

    test_that(
        "step can be removed together with is downstream dependencies",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) b) |>
            pipe_add("f3", \(c = ~f2) c) |>
            pipe_add("f4", \(d = ~f1) d) |>
            pipe_add("f5", \(x = ~data) x)

        out <- utils::capture.output(
            pipe_remove_step(pip, "f1", recursive = TRUE),
            type = "message"
        )

        expect_equal(pipe_get_step_names(pip), c("data", "f5"))
        expect_equal(
            out,
            paste(
                "Removing step 'f1' and its downstream dependencies:",
                "'f2', 'f3', 'f4'"
            )
        )
    })
})


describe("pipe_remove_step",
{
    test_that("pipeline step can be renamed",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b)

        pipe_rename_step(pip, from = "f1", to = "first")

        pipe_get_step_names(pip) |> expect_equal(c("data", "first", "f2"))
    })

    test_that("signals name clash",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b)

        expect_error(
            pipe_rename_step(pip, from = "f1", to = "f2"),
            "step 'f2' already exists"
        )
    })

    test_that("renames dependencies as well",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) b) |>
            pipe_add("f3", \(a = ~f1, b = ~f2) a + b)

        pipe_rename_step(pip, from = "f1", to = "first")

        expect_equal(
            pipe_get_depends(pip),
            list(
                data = character(0),
                first = character(0),
                f2 = c(b = "first"),
                f3 = c(a = "first", b = "f2")
            )
        )
    })
})


describe("pipe_replace_step",
{
    test_that("pipeline steps can be replaced",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b) |>
            pipe_add("f3", \(c = ~f2) c, keepOut = TRUE)

        out = unname(unlist(pipe_run(pip) |> pipe_collect_out()))
        expect_equal(out, 2)

        pipe_replace_step(pip, "f2", \(z = 4) z)
        out = unname(unlist(pipe_run(pip) |> pipe_collect_out()))
        expect_equal(out, 4)
    })

    test_that("fun must be passed as a function or string",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("step1", \(a = 1) a)

        expect_error(
            pipe_replace_step(pip, "step1", fun = 1),
            "is.function(fun) || is_string(fun) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("params must be a list",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("step1", \(a = 1) a)

        expect_error(
            pipe_replace_step(pip, "step1", fun =\() 1, params = 1),
            "is.list(params)",
            fixed = TRUE
        )
    })

    test_that("description must be (possibly empty) string",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("step1", \(a = 1) a)

        expect_error(
            pipe_replace_step(
                pip, "step1", fun =\() 1, description = 1
            ),
            "is_string(description)",
            fixed = TRUE
        )
        expect_no_error(
            pipe_replace_step(
                pip, "step1", fun =\() 1, description = ""
            )
        )
    })

    test_that("group must be non-empty string",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("step1", \(a = 1) a)

        expect_error(
            pipe_replace_step(pip, "step1", fun =\() 1, group = 1),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
        expect_error(
            pipe_replace_step(pip, "step1", fun =\() 1, group = ""),
            "is_string(group) && nzchar(group) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("keepOut must be logical",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("step1", \(a = 1) a)

        expect_error(
            pipe_replace_step(pip, "step1", fun =\() 1, keepOut = 1),
            "is.logical(keepOut) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("the replacing function can be passed as a string",
    {

        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = 3) x, keepOut = TRUE)

        out = unname(unlist(pipe_run(pip) |> pipe_collect_out()))
        expect_equal(out, 3)

        .my_func <-\(x = 3) {
            2 * x
        }
        assign(".my_func", .my_func, envir = globalenv())
        on.exit(rm(".my_func", envir = globalenv()))

        pipe_replace_step(pip, "f1", fun = ".my_func", keepOut = TRUE)

        out = unname(unlist(pipe_run(pip) |> pipe_collect_out()))
        expect_equal(out, 6)
    })

    test_that(
        "when replacing function, default parameters can be overridden",
    {

        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = 1:3) x, keepOut = TRUE)

        .my_func <-\(x = 3) {
            2 * x
        }
        assign(".my_func", .my_func, envir = globalenv())
        on.exit(rm(".my_func", envir = globalenv()))

        pipe_replace_step(
            pip,
            "f1",
            fun = ".my_func",
            params = list(x = 10),
            keepOut = TRUE
        )

        out = unname(unlist(pipe_run(pip) |> pipe_collect_out()))
        expect_equal(out, 20)
    })

    test_that("the pipeline step that is being replaced must exist",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b) |>
            pipe_add("f3", \(c = ~f2) c, keepOut = TRUE)

        expect_error(pipe_replace_step(pip, "non-existent", \(z = 4) z))
    })


    test_that(
        "if replacing a pipeline step, dependencies are verified correctly",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = 2) b) |>
            pipe_add("f3", \(c = ~f2) c, keepOut = TRUE)

        expect_error(
            pipe_replace_step(pip, "f2", \(z = ~foo) z),
            "dependency 'foo' not found up to step 'f1'"
        )

        expect_error(
            pipe_replace_step(pip, "f2", \(z = ~f2) z),
            "dependency 'f2' not found up to step 'f1'"
        )

        expect_error(
            pipe_replace_step(pip, "f2", \(z = ~f3) z),
            "dependency 'f3' not found up to step 'f1'"
        )
    })

    test_that(
        "states are updated correctly",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 2) a) |>
            pipe_add("f3", \(a = ~f2) a, keepOut = TRUE) |>
            pipe_add("f4", \(a = ~f3) a, keepOut = TRUE)

        pipe_run(pip)

        pipe_replace_step(pip, "f2", \(a = 2) 2* a)
        expect_equal(pipe_get_step(pip, "f1")$state, "Done")
        expect_equal(pipe_get_step(pip, "f2")$state, "New")
        expect_equal(pipe_get_step(pip, "f3")$state, "Outdated")
        expect_equal(pipe_get_step(pip, "f4")$state, "Outdated")
    })


    it("can have a variable defined outside as parameter default",
    {
        x <- 3
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        pip |> pipe_replace_step("f1", fun = \(a) a, params = list(a = x))

        out <- pipe_run(pip) |> pipe_get_out("f1")
        expect_equal(out, x)
    })

    it("handles Param object args",
    {
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        pip |> pipe_replace_step(
            "f1",
            fun = \(a = new("NumericParam", "a", value = 3)) a
        )

        out <- pipe_run(pip) |> pipe_get_out("f1")
        expect_equal(out, 3)
    })

    it("can have a Param object defined outside as parameter default",
    {
        x <- 3
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        p <- new("NumericParam", "a", value = x)

        pip |> pipe_replace_step("f1", fun = \(a) a, params = list(a = p))

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a) a, params = list(a = p))

        expect_equal(pipe_get_params_at_step(pip, "f1")$a, p)
        out <- pipe_run(pip) |> pipe_get_out("f1")
        expect_equal(out, x)
    })

    it("function can be passed as a string",
    {
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        pip |> pipe_replace_step("f1", fun = "mean", params = list(x = 1:5))

        out <- pipe_run(pip) |> pipe_get_out("f1")
        expect_equal(out, mean(1:5))
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean")
    })

    it("if passed as a function, name is derived from the function",
    {
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        pip |> pipe_replace_step("f1", fun = mean, params = list(x = 1:5))

        out <- pipe_run(pip) |> pipe_get_out("f1")
        expect_equal(out, mean(1:5))
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean")
    })

    it("lampda functions, are named 'function'",
    {
        pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x)
        pip |> pipe_replace_step("f1", fun = \(x = 1) x)
        expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function")
    })
})


describe("pipe_reset",
{
    test_that(
        "after reset pipeline is the same as before the run",
    {
        p <- pipe_new("pipe", data = 1:2)
        p |> pipe_add("f1", \(x = 1) x)
        p |> pipe_add("f2", \(y = 1) y)

        p |> pipe_run()
        expect_equal(
            p |> pipe_collect_out(all = TRUE),
            list(data = 1:2, f1 = 1, f2 = 1)
        )
        expect_true(all(p$pipeline[["state"]] == "Done"))


        p |> pipe_reset()
        expect_equal(
            p |> pipe_collect_out(all = TRUE),
            list(data = NULL, f1 = NULL, f2 = NULL)
        )
        expect_true(all(p$pipeline[["state"]] == "New"))
    })
})


describe("pipe_run",
{
    test_that("empty pipeline can be run",
    {
        expect_no_error(pipe_new("pipe1") |> pipe_run())
    })

    test_that("returns the pipeline object",
    {
        pip <- pipe_new("pipe1") |> pipe_run()
        expect_equal(pip$name, "pipe1")
    })

    test_that("if function result is a list, all names are preserved",
    {
        # Result list length == 1 - the critical case
        resultList = list(foo = 1)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \() resultList, keepOut = TRUE)

        out = pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], resultList)

        # Result list length > 1
        resultList = list(foo = 1, bar = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \() resultList, keepOut = TRUE)

        out = pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], resultList)
    })

    test_that("pipeline execution is correct",
    {
        data <- 9
        pip <- pipe_new("pipe1", data = data)

        foo <-\(a = 1) a
        bar <-\(a, b) a + b

        a <- 5
        pipe_add(pip, "f1", foo, params = list(a = a), keepOut = TRUE)
        pipe_add(
            pip, "f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE
        )

        pipe_run(pip)

        expect_equal(pip$pipeline[["out"]][[2]], a)
        expect_equal(pip$pipeline[["out"]][[3]], a + data)
    })


    test_that("pipeline execution can cope with void functions",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) {}, keepOut = TRUE) |>
            pipe_add("f2", \(b = 2) b, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out, list(f1 = NULL, f2 = 2))
    })

    test_that(
        "if pipeline execution fails, the error message is returned as error",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(b = ~f1) stop("something went wrong"))

        expect_error(pipe_run(pip), "something went wrong")
    })

    test_that(
        "can handle 'NULL' results",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], 1)

        pipe_set_data(pip, NULL)
        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], NULL)
    })

    test_that(
        "can be run recursively to dynamically create and run pipelines",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add(
                "f1",
                fun =\(data = 10) {
                    pip <- pipe_new("2nd pipe", data = data) |>
                        pipe_add("step1", \(x = ~data) x) |>
                        pipe_add("step2", \(x = ~step1) {
                            print(x)
                            2 * x
                        }, keepOut = TRUE)
                }
            )

        pip2 <- pipe_run(pip, recursive = TRUE)
        expect_equal(pip2 |> pipe_get_step_names(), c("data", "step1", "step2"))

        out <- pip2 |> pipe_collect_out()
        expect_equal(out[["step2"]], 20)
    })

    test_that("will not re-run steps that are already done unless forced",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(y = ~f1) y + 1)

        pipe_run(pip)
        expect_equal(pipe_get_step(pip, "f1")$state, "Done")
        expect_equal(pipe_get_step(pip, "f2")$state, "Done")
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pipe_run(pip)
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)

        # Set parameter, which outdates step and run again
        pipe_run(pip, force = TRUE)
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)
    })

    test_that("will never re-run locked steps",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(y = ~f1) y + 1)

        pipe_run(pip)
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pipe_run(pip)
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)
        pipe_lock_step(pip, "f1")
        pipe_lock_step(pip, "f2")

        # Set parameter, which outdates step and run again
        pipe_run(pip, force = TRUE)
        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 0)
    })

    test_that("can clean unkept steps",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(y = ~f1) y + 1)

        pipe_run(pip)
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pipe_run(pip, cleanUnkept = TRUE)
        expect_true(all(sapply(pip$pipeline[["out"]], is.null)))

        pipe_set_keep_out(pip, "f1", TRUE)
        pipe_run(pip, cleanUnkept = TRUE)
        expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL))
    })

    test_that("logs warning without interrupting the run",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(x = ~f1) {
                warning("something might be wrong")
                x
            }) |>
            pipe_add("f3", \(x = ~f2) x)

        log <- utils::capture.output(
            expect_warning(pipe_run(pip), "something might be wrong")
        )

        Filter(log, f =\(x) x |>
            startsWith("WARN")) |>
            grepl(pattern = "something might be wrong") |>
            expect_true()

        wasRunTillEnd <- pipe_get_out(pip, "f3") == 2
        expect_true(wasRunTillEnd)
    })

    test_that("logs error and stops at failed step",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(x = ~f1) {
                stop("something went wrong")
                x
            }) |>
            pipe_add("f3", \(x = ~f2) x)

        log <- utils::capture.output(
            expect_error(pipe_run(pip), "something went wrong")
        )

        Filter(log, f =\(x) x |>
            startsWith("ERROR")) |>
            grepl(pattern = "something went wrong") |>
            expect_true()

        wasRunTillEnd <- isTRUE(pipe_get_out(pip, "f3") == 2)
        expect_false(wasRunTillEnd)
    })

    test_that("can show progress",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(x = ~f1) x)

        m <- mockery::mock()
        pipe_run(pip, progress = m)

        args <- mockery::mock_args(m)

        expect_equal(length(m), pipe_length(pip))
        expect_equal(args[[1]][[1]], 1)
        expect_equal(args[[1]][["detail"]], "data")
        expect_equal(args[[2]][[1]], 2)
        expect_equal(args[[2]][["detail"]], "f1")
        expect_equal(args[[3]][[1]], 3)
        expect_equal(args[[3]][["detail"]], "f2")
    })

    test_that("works with Param objects",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add(
                "f1",
               \(
                    x = new("NumericParam", "x", value = 1),
                    y = new("NumericParam", "y", value = 2)
                ) {
                    x + y
                }
            ) |>
            pipe_add(
                "f2",
               \(
                    s1 = new("StringParam", "s1", "Hello"),
                    s2 = new("StringParam", "s2", "World")
                ) {
                    paste(s1, s2)
                }
            )

        pipe_run(pip)
        pipe_get_out(pip, "f1") |> expect_equal(3)
        pipe_get_out(pip, "f2") |> expect_equal("Hello World")
    })
})



describe("pipe_run_step",
{
    test_that("pipeline can be run at given step",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a)

        expect_no_error(pipe_run_step(pip, "A"))
    })


    test_that("upstream steps are by default run with given step",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))

        pipe_run_step(pip, "B")

        expect_equal(pipe_get_out(pip, "A"), 1)
        expect_equal(pipe_get_out(pip, "B"), c(1, 2))
        expect_true(is.null(pipe_get_out(pip, "C")))
    })

    test_that("runs upstream steps in correct order",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))

        pipe_run_step(pip, "C")
        expect_equal(pipe_get_out(pip, "C"), 1:3)
    })

    test_that("runs downstream steps in correct order",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))

        pipe_run_step(pip, "A", downstream = TRUE)
        expect_equal(pipe_get_out(pip, "C"), 1:3)
    })

    test_that("pipeline can be run at given step excluding
        all upstream dependencies",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))

        pipe_run_step(pip, "B", upstream = FALSE)
        expect_true(is.null(pipe_get_out(pip, "A")))
        expect_equal(pipe_get_out(pip, "B"), 2)
        expect_true(is.null(pipe_get_out(pip, "C")))
    })

    test_that("pipeline can be run at given step excluding upstream
        but including downstream dependencies",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))


        pipe_run_step(
            pip,
            "B",
            upstream = FALSE,
            downstream = TRUE
        )
        expect_true(is.null(pipe_get_out(pip, "A")))
        expect_equal(pipe_get_out(pip, "B"), 2)
        expect_equal(pipe_get_out(pip, "C"), c(2, 3))
    })

    test_that("pipeline can be run at given step including
        up- and downstream dependencies",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))


        pipe_run_step(
            pip,
            "B",
            upstream = TRUE,
            downstream = TRUE
        )
        expect_equal(pipe_get_out(pip, "A"), 1)
        expect_equal(pipe_get_out(pip, "B"), c(1, 2))
        expect_equal(pipe_get_out(pip, "C"), c(1, 2, 3))
    })

    test_that("if not marked as keepOut, output of run steps can be cleaned",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a)


        pipe_run_step(pip, "A", cleanUnkept = TRUE)
        expect_true(is.null(pipe_get_out(pip, "A")))

        pipe_set_keep_out(pip, "A", TRUE) |>
            pipe_run_step("A", cleanUnkept = TRUE)
        expect_false(is.null(pipe_get_out(pip, "A")))
    })

    test_that("up- and downstream steps are marked in log",
    {
        lgr::unsuspend_logging()
        on.exit(lgr::suspend_logging())

        pip <- pipe_new("pipe") |>
            pipe_add("A", \(a = 1) a) |>
            pipe_add("B", \(b = ~A) c(b, 2)) |>
            pipe_add("C", \(c = ~B) c(c, 3))

        logOut <- utils::capture.output(
            pipe_run_step(pip, "B", upstream = TRUE, downstream = TRUE)
        )

        contains <-\(x, pattern) {
            grepl(pattern = pattern, x = x, fixed = TRUE)
        }

        expect_true(logOut[2] |> contains("Step 1/3 A (upstream)"))
        expect_true(logOut[3] |> contains("Step 2/3 B"))
        expect_true(logOut[4] |> contains("Step 3/3 C (downstream)"))
    })

    test_that(
        "updates the timestamp of the run steps",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE)

        before <- pip$pipeline[["time"]]
        Sys.sleep(1)

        pipe_run_step(pip, "f1", upstream = FALSE)
        after <- pip$pipeline[["time"]]

        expect_equal(before[1], after[1])
        expect_true(before[2] < after[2])
    })

    test_that(
        "updates the state of the run steps",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE)

        before <- pip$pipeline[["state"]]
        pipe_run_step(pip, "f1", upstream = FALSE)
        after <- pip$pipeline[["state"]]

        expect_equal(before, c("New", "New"))
        expect_equal(after, c("New", "Done"))
    })

    test_that("will never re-run locked step",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(y = ~f1) y + 1)

        pipe_run(pip)
        expect_equal(pip$pipeline[["out"]][[2]], 2)
        expect_equal(pip$pipeline[["out"]][[3]], 3)

        pip$pipeline[2, "out"] <- 0
        pip$pipeline[3, "out"] <- 0
        pipe_lock_step(pip, "f1")
        pipe_run_step(pip, "f1", downstream = TRUE)

        expect_equal(pip$pipeline[["out"]][[2]], 0)
        expect_equal(pip$pipeline[["out"]][[3]], 1)
    })

    test_that("can clean unkept steps",
    {
        pip <- pipe_new("pipe", data = 1) |>
            pipe_add("f1", \(x = 2) x) |>
            pipe_add("f2", \(y = ~f1) y + 1)

        pipe_run_step(pip, "f1", downstream = TRUE, cleanUnkept = TRUE)
        expect_true(all(sapply(pip$pipeline[["out"]], is.null)))

        pipe_set_keep_out(pip, "f1", TRUE)
        pipe_run_step(pip, "f1", downstream = TRUE, cleanUnkept = TRUE)
        expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL))
    })
})



describe("pipe_set_data",
{
    test_that("data can be set later after pipeline definition",
    {
        dat <- data.frame(a = 1:2, b = 1:2)

        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], NULL)

        pipe_set_data(pip, dat)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equal(out[["f1"]], dat)
    })

    test_that("if data is set, all dependent steps are set to outdated",
    {
        dat <- data.frame(a = 1:2, b = 1:2)

        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(x = ~data) x, keepOut = TRUE) |>
            pipe_add("f2", \(x = ~f1) x, keepOut = TRUE)

        pipe_run(pip)

        expect_equal(pipe_get_step(pip, "f1")$state, "Done")
        expect_equal(pipe_get_step(pip, "f2")$state, "Done")
        pipe_set_data(pip, dat)
        expect_equal(pipe_get_step(pip, "f1")$state, "Outdated")
        expect_equal(pipe_get_step(pip, "f2")$state, "Outdated")
    })
})



describe("pipe_set_data_split",
{
    test_that("the new steps have the names of the list attached",
    {
        dataList <- list(A = 1, B = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a)

        pipe_set_data_split(pip, dataList)

        pipe_get_step_names(pip) |>
            expect_equal(c("data.A", "f1.A", "data.B", "f1.B"))
    })

    test_that("the separator used in the creation of the new steps
    can be customized",
    {
        dataList <- list(A = 1, B = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a)

        pipe_set_data_split(pip, dataList, sep = "_")

        pipe_get_step_names(pip) |>
            expect_equal(c("data_A", "f1_A", "data_B", "f1_B"))
    })


    test_that("simple split pipeline computes results as expected",
    {
        dataList <- list(A = 1, B = 2, C = 3)
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = ~data) {
                b + a
            }, keepOut = TRUE)

        pipe_set_data_split(pip, dataList)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_equivalent(
            unlist(out),
            unlist(lapply(dataList, \(x) x + 1))
        )
    })


    test_that(
        "split pipeline by default overrides output groups according to split",
    {
        dataList <- list(A = 1, B = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f0", \(a = 1) a, group = "id") |>
            pipe_add("f1", \(a = 1) a, group = "id") |>
            pipe_add("f2", \(a = 2) a)

        pipe_set_data_split(pip, dataList)

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)
        expect_equal(names(out), names(dataList))
    })

    test_that("the grouping override can be omitted",
    {
        dataList <- list(A = 1, B = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f0", \(a = 1) a, group = "id") |>
            pipe_add("f1", \(a = 1) a, group = "id") |>
            pipe_add("f2", \(a = 2) a)

        pipe_set_data_split(pip, dataList, groupBySplit = FALSE)

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)

        expect_equal(
            names(out),
            c("data.A", "id.A", "f2.A", "data.B", "id.B", "f2.B")
        )
    })

    test_that("the separator used in the creation of the groups
        can be customized",
    {
        dataList <- list(A = 1, B = 2)

        pip <- pipe_new("pipe") |>
            pipe_add("f0", \(a = 1) a, group = "id") |>
            pipe_add("f1", \(a = 1) a, group = "id") |>
            pipe_add("f2", \(a = 2) a)

        pipe_set_data_split(pip, dataList, groupBySplit = FALSE, sep = "_")

        out <- pipe_run(pip) |> pipe_collect_out(all = TRUE)

        expect_equal(
            names(out),
            c("data_A", "id_A", "f2_A", "data_B", "id_B", "f2_B")
        )
    })

    test_that("split pipeline works for list of data frames",
    {
        dat <- data.frame(x = 1:2, y = 1:2, z = 1:2)
        dataList <- list(A = dat, B = dat, C = dat)
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) |>
            pipe_add("f3", \(a = ~f1, b = ~data) b[, 2:3], keepOut = TRUE)

        pipe_set_data_split(pip, dataList)

        out <- pipe_run(pip) |> pipe_collect_out()

        expect_equal(out[["A"]], c(f2.A = list(dat), f3.A = list(dat[, 2:3])))
        expect_equal(out[["B"]], c(f2.B = list(dat), f3.B = list(dat[, 2:3])))
        expect_equal(out[["C"]], c(f2.C = list(dat), f3.C = list(dat[, 2:3])))
    })


    test_that("if unnamed list of data frames, they are named with numbers",
    {
        dat <- data.frame(x = 1:2, y = 1:2, z = 1:2)
        dataList <- list(dat, dat)
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)

        pipe_set_data_split(pip, dataList)

        out <- pipe_run(pip) |> pipe_collect_out()

        expect_equal(out[["1"]], dat)
        expect_equal(out[["2"]], dat)
    })


    test_that(
        "depends are updated correctly, if data split on subset of pipeline",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) |>
            pipe_add("f3", \(x = ~f1, y = ~f2) list(x, y), keepOut = TRUE) |>
            pipe_add("f4", \(x = ~f3) x[[1]], keepOut = TRUE)

        pipe_set_data_split(pip, dataList, toStep = "f2")

        ee = expect_equivalent
        pp = pip$pipeline

        depends <- pipe_get_depends(pip)

        expect_equal(depends[["f2.1"]], c(a = "f1.1", b = "data.1"))
        expect_equal(depends[["f2.2"]], c(a = "f1.2", b = "data.2"))

        # Pipeline was not split for f3, which therefore has parameters that
        # each depend on two steps
        expect_equal(
            depends[["f3"]],
            list(x = c("f1.1", "f1.2"), y = c("f2.1", "f2.2"))
        )

        # Pipeline was not split for f4, so just depdends on f3
        ee(depends[["f4"]], c(x = "f3"))


        out <- pipe_run(pip) |> pipe_collect_out()

        expect_equal(out[["1"]], dat1)
        expect_equal(out[["2"]], dat2)
        expected_f3_res = list(
            list("f1.1" = 1, "f1.2" = 1),
            list("f2.1" = dat1, "f2.2" = dat2)
        )
        expect_equal(out[["f3"]], expected_f3_res)
        expect_equal(out[["f4"]], expected_f3_res[[1]])
    })

    test_that("split data set can be created dynamically",
    {
        data = data.frame(a = 1:10, group = c("a", "b"))

        pip <- pipe_new("pipe", data = data) |>
            pipe_add("split_data_step",
               \(.self = NULL, data = ~data)
                {
                    splitData = split(data, data[, "group"])

                    .self$remove_step("split_data_step")
                    .self$set_data_split(splitData)
                    .self$name = paste(.self$name, "after data split")
                    .self
                }
            ) |>
            pipe_add("f1", \(data = ~data) {
                data
            }, keepOut = TRUE)

        pipe_set_params(pip, list(.self = pip))

        out <- pipe_run(pip, recursive = TRUE) |> pipe_collect_out()

        expect_equivalent(out, split(data, data[, "group"]))
    })
})



describe("pipe_set_keep_out",
{
    test_that("keep-out state can be set",
    {
        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(a = 1) a)

        out <- pipe_run(pip) |> pipe_collect_out()
        expect_false("f1" %in% names(out))

        out <- pipe_set_keep_out(pip, "f1", keepOut = TRUE) |>
            pipe_collect_out()
        expect_true("f1" %in% names(out))

        out <- pipe_set_keep_out(pip, "f1", keepOut = FALSE) |>
            pipe_collect_out()
        expect_false("f1" %in% names(out))
    })

    test_that("step must be a string and exist",
    {
        pip <- pipe_new("pipe1")
        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(a = 1) a)

        expect_error(
            pipe_set_keep_out(pip, 1),
            "is_string(step)",
            fixed = TRUE
        )

        expect_error(
            pipe_set_keep_out(pip, "f2"),
            "step 'f2' does not exist",
            fixed = TRUE
        )
    })

    test_that("state must be logical",
    {
        pip <- pipe_new("pipe1", data = 0) |>
            pipe_add("f1", \(a = 1) a)

        expect_error(
            pipe_set_keep_out(pip, "f1", keepOut = 1),
            "is.logical(keepOut)",
            fixed = TRUE
        )
    })
})




describe("pipe_set_params",
{
    test_that("parameters can be set commonly on existing pipeline",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 2, b = 3) a) |>
            pipe_add("f3", \(a = 4, b = 5) a)

        before <- pipe_get_params(pip)

        after <- pipe_set_params(pip, list(a = 9, b = 99)) |> pipe_get_params()
        expect_equal(after, list(
            f1 = list(a = 9),
            f2 = list(a = 9, b = 99),
            f3 = list(a = 9, b = 99)
        ))
    })

    test_that(
        "parameters depending on other steps are protected
        from being overwritten",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = 2, b = ~f1) a) |>
            pipe_add("f3", \(a = ~f2, b = 5) a)

        before <- pipe_get_params(pip)
        expect_equal(
            before,
            list(
                f1 = list(a = 1),
                f2 = list(a = 2),
                f3 = list(b = 5)
            )
        )

        after <- pipe_set_params(pip, list(a = 9, b = 99)) |> pipe_get_params()
        expect_equal(
            after,
            list(
                f1 = list(a = 9),
                f2 = list(a = 9),
                f3 = list(b = 99)
            )
        )
    })

    test_that("an error is given if params argument is not a list",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_error(
            pipe_set_params(pip, c(a = 9)),
            "params must be a list",
            fixed = TRUE
        )
    })

    test_that("trying to set undefined parameters is signaled with a warning",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_warning(
            pipe_set_params(pip, list(a = 9, b = 9, c = 9)),
            "Trying to set parameters not defined in the pipeline: b, c",
            fixed = TRUE
        )
    })

    test_that("warning for undefined parameters can be omitted",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_no_warning(
            pipe_set_params(pip, list(a = 9, b = 9, c = 9),
                warnUndefined = FALSE
            )
        )
    })

    test_that(
        "after setting a single parameter the params entry is still a list",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1) a)

        expect_equal(pip$pipeline[["params"]][[2]], list(a = 1))

        pipe_set_params(pip, list(a = 9))
        expect_equal(pip$pipeline[["params"]][[2]], list(a = 9))
    })

    test_that(
        "hidden parameters can be set as well",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, .b = 2) a)

        pipe_set_params(pip, list(a = 9, .b = 10))
        pp <- pipe_get_params(pip, ignoreHidden = FALSE)
        expect_equal(pp, list(f1 = list(a = 9, .b = 10)))
    })

    test_that(
        "trying to set locked parameters is ignored until they are unlocked",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b) |>
            pipe_add("f2", \(a = 1, b = 2) a + b)

        pipe_lock_step(pip, "f1")
        expect_message(
            pipe_set_params(pip, list(a = 9, b = 99)),
            "skipping setting parameters a, b at locked step 'f1'"
        )

        pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 1, b = 2))
        pipe_get_params_at_step(pip, "f2") |> expect_equal(list(a = 9, b = 99))

        pipe_unlock_step(pip, "f1")
        pipe_set_params(pip, list(a = 9, b = 99))
        pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 9, b = 99))
    })
})


describe("pipe_set_params_at_step",
{
    test_that("parameters can be set at given step",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b) |>
            pipe_add("f2", \(x = 1) x)

        expect_equal(pipe_get_params_at_step(pip, "f1"), list(a = 1, b = 2))

        pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99))
        expect_equal(pipe_get_params_at_step(pip, "f1"), list(a = 9, b = 99))

        pipe_set_params_at_step(pip, "f2", list(x = 9))
        expect_equal(pipe_get_params_at_step(pip, "f2"), list(x = 9))
    })

    test_that("step must be passed as a string and params as a list",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b)

        expect_error(
            pipe_set_params_at_step(pip, 1, list(a = 9, b = 99)),
            "is_string(step) is not TRUE",
            fixed = TRUE
        )

        expect_error(
            pipe_set_params_at_step(pip, "f1", params = c(a = 9, b = 99)),
            "is.list(params) is not TRUE",
            fixed = TRUE
        )
    })

    test_that("hidden parameters can be set as well",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, .b = 2) a + b)

        pipe_set_params_at_step(pip, "f1", list(a = 9, .b = 99))

        expect_equal(
            pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE),
            list(a = 9, .b = 99)
        )
    })


    test_that("trying to set undefined parameter signals an error",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b)

        expect_error(
            pipe_set_params_at_step(pip, "f1", list(a = 9, z = 99)),
            "Unable to set parameter(s) z at step f1 - candidates are a, b",
            fixed = TRUE
        )
    })

    test_that("trying to set locked parameter is ignored until it is unlocked",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b)

        pipe_lock_step(pip, "f1")
        expect_message(
            pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99)),
            "skipping setting parameters a, b at locked step 'f1'"
        )

        pipe_get_params_at_step(pip, "f1") |>
            expect_equal(list(a = 1, b = 2))

        pipe_unlock_step(pip, "f1")
        pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99))
        pipe_get_params_at_step(pip, "f1") |>
            expect_equal(list(a = 9, b = 99))
    })


    test_that("setting values for bound parameters is not allowed",
    {
        pip <- pipe_new("pipe1") |>
            pipe_add("f1", \(a = 1, b = 2) a + b) |>
            pipe_add("f2", \(x = 1, y = ~f1) x + y)

        expect_error(
            pipe_set_params_at_step(pip, "f2", list(x = 9, y = 99)),
            "Unable to set parameter(s) y at step f2 - candidates are x",
            fixed = TRUE
        )
    })


    test_that(
        "states of affected steps are updated once the pipeline was run",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1) a) |>
            pipe_add("f3", \(a = ~f2) a) |>
            pipe_add("f4", \(a = ~data) a)

        pipe_set_params_at_step(pip, "f1", params = list(a = 2))
        expect_true(all(pip$pipeline[["state"]] == "New"))

        pipe_run(pip)
        pipe_set_params_at_step(pip, "f1", params = list(a = 3))
        expect_equal(pipe_get_step(pip, "data")$state, "Done")
        expect_equal(pipe_get_step(pip, "f1")$state, "Outdated")
        expect_equal(pipe_get_step(pip, "f2")$state, "Outdated")
        expect_equal(pipe_get_step(pip, "f3")$state, "Outdated")
        expect_equal(pipe_get_step(pip, "f4")$state, "Done")

        pipe_run(pip)
        expect_true(all(pip$pipeline[["state"]] == "Done"))
    })


    test_that("parameters can be set to NULL",
    {
        pip <- pipe_new("pipe1") |>
        pipe_add("f1", \(a = NULL, b = 1) a)

        pipe_set_params_at_step(pip, "f1", list(a = 1, b = NULL))

        expect_equal(
            pipe_get_params_at_step(pip, "f1"),
            list(a = 1, b = NULL)
        )
    })

    test_that("preserves Param objects",
    {
        pip <- pipe_new("pipe1") |>
        pipe_add("f1", \(
            a = 1,
            b = new("NumericParam", "num", 2)) a + b
        )

        pipe_set_params_at_step(pip, "f1", list(a = 3, b = 4))

        params <- pipe_get_params_at_step(pip, "f1")
        expect_equal(params$a, 3)
        expect_true(params$b |> is("NumericParam"))
        expect_equal(params$b@value, 4)
    })
})


describe("pipe_split",
{
    test_that("pipeline split of initial pipeline gives the expected result",
    {
        pip <- pipe_new("pipe")
        res <- pipe_split(pip)

        expect_true(is.list(res))
        expect_equal(res[[1]]$name, "pipe1")
        expect_equal(res[[1]]$pipeline, pip$pipeline)
    })


    test_that("pipeline with two indepdendent groups is split correctly",
    {
        pip <- pipe_new("pipe")

        pipe_add(pip, "f1", \(a = ~data) a)
        pipe_add(pip, "f2", \(a = 1) a)
        pipe_add(pip, "f3", \(a = ~f2) a)
        pipe_add(pip, "f4", \(a = ~f1) a)
        pipe_run(pip)

        res <- pipe_split(pip)
        pip1 <- res[[1]]
        pip2 <- res[[2]]
        expect_equal(pip1$name, "pipe1")
        expect_equal(pip2$name, "pipe2")

        expect_equal(pip1$get_step_names(), c("f2", "f3"))
        expect_equal(pip2$get_step_names(), c("data", "f1", "f4"))

        expect_equal(
            pip1$collect_out(all = TRUE),
            pipe_collect_out(pip, all = TRUE)[c("f2", "f3")]
        )
        expect_equal(
            pip2$collect_out(all = TRUE),
            pipe_collect_out(pip, all = TRUE)[c("data", "f1", "f4")]
        )
    })

    test_that(
        "split is done correctly for complete data split",
    {
        dat1 <- data.frame(x = 1:2)
        dat2 <- data.frame(y = 1:2)
        dataList <- list(dat1, dat2)

        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a) |>
            pipe_add("f2", \(a = ~f1, b = 2) b) |>
            pipe_add("f3", \(x = ~f1, y = ~f2) x + y)

        pipe_set_data_split(pip, dataList)

        res <- pipe_split(pip)
        steps <- lapply(res, \(x) x$get_step_names())

        expect_equal(
            steps,
            list(
                "data.1",
                c("f1.1", "f2.1", "f3.1"),
                "data.2",
                c("f1.2", "f2.2", "f3.2")
            )
        )
    })
})



describe("pipe_unlock_step",
{
    test_that("sets state to 'unlocked' if it was locked before",
    {
        pip <- pipe_new("pipe") |>
            pipe_add("f1", \(a = 1) a)

        pipe_lock_step(pip, "f1")
        expect_equal(pipe_get_step(pip, "f1")[["state"]], "Locked")

        pipe_unlock_step(pip, "data")
        expect_equal(pipe_get_step(pip, "data")[["state"]], "New")

        pipe_unlock_step(pip, "f1")
        expect_equal(pipe_get_step(pip, "f1")[["state"]], "Unlocked")

        pip
    })
})

Try the pipeflow package in your browser

Any scripts or data that you put into this service are public.

pipeflow documentation built on April 3, 2025, 10:50 p.m.