R/rxp_populate.R

Defines functions get_need_jl get_need_py get_need_r generate_libraries_from_nix gen_pipeline escape_regex gen_flat_pipeline parse_nix_envs rxp_populate

Documented in rxp_populate

#' Generate Nix Pipeline Code
#'
#' @family pipeline functions
#' @param derivs A list of derivation objects, where each object is a list of
#'   five elements:
#'   - *name*, name of the derivation,
#'   - *snippet*, the nix code snippet to build this derivation,
#'   - *type*, can be R, Python or Quarto,
#'   - *additional_files*, character vector of paths to files to make available to build sandbox,
#'   - *nix_env*, path to Nix environment to build this derivation.
#'   A single deriv is the output of `rxp_r()`, `rxp_qmd()` or `rxp_py()`
#'   function.
#'
#' @param project_path Path to root of project, defaults to ".".
#'
#' @param build Logical, defaults to FALSE. Should the pipeline get built right
#'   after being generated? When FALSE, use `rxp_make()` to build the pipeline
#'   at a later stage.
#'
#' @param py_imports Named character vector of Python import rewrites. Names are
#'   the base modules that rixpress auto-imports as "import name", and values
#'   are the desired import lines. For example: c(numpy = "import numpy as np",
#'   xgboost = "from xgboost import XGBClassifier"). Each entry is applied by
#'   replacing "import name" with the provided string across generated
#'   _rixpress Python library files.
#'
#' @param ... Further arguments passed down to methods. Use `max-jobs` and
#'   `cores` to set parallelism during build. See the documentation of
#'   `rxp_make()` for more details.
#'
#' @return Nothing, writes a file called `pipeline.nix` with the Nix code to
#'   build the pipeline, as well as folder called _rixpress with required
#'   internal files.
#'
#' @details
#' This function generates a `pipeline.nix` file based on a list of derivation
#' objects. Each derivation defines a build step, and `rxp_populate()` chains these
#' steps and handles the serialization and conversion of Python objects into R
#' objects (or vice-versa). Derivations are created with `rxp_r()`, `rxp_py()`
#' and so on. By default, the pipeline is also immediately built after being
#' generated, but the build process can be postponed by setting `build` to
#' FALSE. In this case, the pipeline can then be built using `rxp_make()` at
#' a later stage.
#' The generated `pipeline.nix` expression includes:
#' - the required imports of environments, typically `default.nix` files generated by
#'   the `rix` package;
#' - correct handling of interdependencies of the different derivations;
#' - serialization and deserialization of both R and Python objects, and conversion
#'   between them when objects are passed from one language to another;
#' - correct loading of R and Python packages, or extra functions needed to build
#'   specific targets
#'
#' The `_rixpress` folder contains:
#' - R, Python or Julia scripts to load the required packages that need to be
#' available to the pipeline.
#' - a JSON file with the DAG of the pipeline, used for visualisation, and to
#' allow `rxp_populate()` to generate the right dependencies between derivations.
#' - `.rds` files with build logs, required for `rxp_inspect()` and `rxp_gc()`.
#' See `vignette("debugging")` for more details.
#'
#' Inline Python import adjustments
#' In some cases, due to the automatic handling of Python packages, users might
#' want to change import statements. By default if, say, `pandas` is needed to
#' build a derivation, it will be imported with `import pandas`. However, Python
#' programmers typically use `import pandas as pd`. You can either:
#' - use `py_imports` to rewrite these automatically during population, or
#' - use `adjust_import()` and `add_import()` for advanced/manual control.
#' See `vignette("polyglot")` for more details.
#'
#' @examples
#' \dontrun{
#' # Create derivation objects
#' d1 <- rxp_r(mtcars_am, filter(mtcars, am == 1))
#' d2 <- rxp_r(mtcars_head, head(mtcars_am))
#' list_derivs <- list(d1, d2)
#'
#' # Generate and build in one go
#' rxp_populate(derivs = list_derivs, project_path = ".", build = TRUE)
#'
#' # Or only populate, with inline Python import adjustments
#' rxp_populate(
#'   derivs = list_derivs,
#'   project_path = ".",
#'   build = FALSE,
#'   py_imports = c(pandas = "import pandas as pd")
#' )
#' # Then later:
#' rxp_make()
#' }
#' @export
rxp_populate <- function(
  derivs,
  project_path = ".",
  build = FALSE,
  py_imports = NULL,
  ...
) {
  # Flatten any rxp_pipeline objects to get a flat list of derivations

  # This supports hierarchical organization while preserving metadata
  derivs <- flatten_derivations(derivs)

  rxp_write_dag(
    derivs,
    output_file = file.path(project_path, "_rixpress", "dag.json")
  )

  # Read back the DAG to get the final no-op flags
  dag <- jsonlite::read_json(file.path(project_path, "_rixpress", "dag.json"))

  # Update derivation snippets based on final no-op flags
  for (i in seq_along(derivs)) {
    dag_entry <- Find(
      function(x) x$deriv_name[1] == derivs[[i]]$name,
      dag$derivations
    )
    if (!is.null(dag_entry) && isTRUE(dag_entry$noop_build[[1]])) {
      # Only update if it wasn't already a no-op
      if (!isTRUE(derivs[[i]]$noop_build)) {
        # Just update the fields directly
        derivs[[i]]$noop_build <- TRUE
        derivs[[i]]$snippet <- sprintf(
          "  %s = defaultPkgs.runCommand \"%s\" {} \"\n    mkdir -p $out\n    echo 'Build skipped for %s (cascading no-op)' > $out/NOOPBUILD\n  \";",
          derivs[[i]]$name,
          derivs[[i]]$name,
          derivs[[i]]$name
        )
      }
    }
  }

  # Need to combine nix envs and additional files into a
  # list of two elements, "nix_env" and "additional_files"
  # which list all the unique combinations
  nix_expressions_and_additional_files <- lapply(
    derivs,
    function(d) {
      list(
        "nix_env" = d$nix_env,
        "additional_files" = d$additional_files,
        "type" = d$type
      )
    }
  )
  # Drop quarto objects, as these are handled separately
  nix_expressions_and_additional_files <- lapply(derivs, function(d) {
    if (d$type == "rxp_qmd" || d$type == "rxp_rmd") {
      d$additional_files <- ""
    }
    list(
      nix_env = d$nix_env,
      additional_files = d$additional_files,
      type = d$type
    )
  })

  flat_list <- list(
    nix_env = sapply(
      X = nix_expressions_and_additional_files,
      FUN = `[[`,
      "nix_env",
      USE.NAMES = FALSE
    ),
    additional_files = sapply(
      X = nix_expressions_and_additional_files,
      FUN = `[[`,
      "additional_files",
      USE.NAMES = FALSE
    )
  )

  nix_env_all <- flat_list$nix_env
  add_files_all <- flat_list$additional_files

  unique_env <- unique(nix_env_all)

  additional_files_combined <- lapply(
    unique_env,
    function(env) {
      idx <- which(nix_env_all == env)
      files <- unlist(add_files_all[idx])
      files <- files[!is.na(files) & files != ""]
      if (length(files) == 0) {
        return("")
      }
      unique(files)
    }
  )

  result <- list(
    nix_env = unique_env,
    additional_files = additional_files_combined
  )

  suppressWarnings(
    for (i in seq_along(result$nix_env)) {
      generate_libraries_from_nix(
        result$nix_env[i],
        result$additional_files[[i]],
        project_path = project_path
      )
    }
  )

  # Apply inline Python import adjustments, if provided.
  if (!is.null(py_imports)) {
    if (!is.character(py_imports) || is.null(names(py_imports))) {
      stop(
        "py_imports must be a named character vector, e.g. c(numpy = 'import numpy as np')."
      )
    }
    for (mod in names(py_imports)) {
      desired <- unname(py_imports[[mod]])
      old <- paste0("import ", mod)
      adjust_import(
        old_import = old,
        new_import = desired,
        project_path = project_path
      )
    }
  }

  # Finalize pipeline
  flat_pipeline <- gen_flat_pipeline(derivs)

  pipeline <- gen_pipeline(
    dag_file = file.path(paste0(project_path, "/_rixpress/dag.json")),
    flat_pipeline = flat_pipeline
  )

  writeLines(pipeline, file.path(project_path, "pipeline.nix"))

  if (build) {
    rxp_make(...)
  }
}


#' Parse Nix Environment Element of a Derivation Object
#' @param derivs A list of derivation objects, where each object is a list of
#'   five elements:
#'     - `$name`, character, name of the derivation
#'     - `$snippet`, character, the nix code snippet to build this derivation
#'     - `$type`, character, can be R, Python or Quarto
#'     - `$additional_files`, character vector of paths to files to make
#'        available to build sandbox
#'     - `$nix_env`, character, path to Nix environment to build this derivation
#'   Typically, these objects are created by a function like `rxp_r`.
#' @noRd
parse_nix_envs <- function(derivs) {
  # Add required elements
  # base name of libraries file
  derivs <- lapply(
    derivs,
    function(d) {
      d$base_name <- sub(
        "_nix$",
        "",
        gsub("[^a-zA-Z0-9]", "_", basename(d$nix_env))
      )
      d
    }
  )
  # path to libraries file
  derivs <- lapply(
    derivs,
    function(d) {
      d$library <- list.files("_rixpress", pattern = d$base_name)
      d$library_in_sandbox <- gsub(paste0(d$base_name, "_"), "", d$library)
      list(
        "nix_env" = d$nix_env,
        "base_name" = d$base_name,
        "library" = d$library,
        "library_in_sandbox" = d$library_in_sandbox
      )
    }
  )

  derivs <- unique(derivs)

  generate_configurePhase <- function(d) {
    # Compute the configure_phases_str
    configure_phases_str <- paste0(
      d$base_name,
      "ConfigurePhase = ''\n    ",
      paste0(
        "cp ${./_rixpress/",
        unlist(d$library),
        "} ",
        unlist(d$library_in_sandbox),
        collapse = "\n    "
      ),
      "\n    mkdir -p $out  ",
      "\n    mkdir -p .julia_depot  ",
      "\n    export JULIA_DEPOT_PATH=$PWD/.julia_depot  ",
      "\n    export HOME_PATH=$PWD\n  ",
      "'';\n  "
    )

    # Create the individual lines
    lines <- c(
      paste0(d$base_name, " = import ./", d$nix_env, ";"),
      paste0(d$base_name, "Pkgs = ", d$base_name, ".pkgs;"),
      paste0(d$base_name, "Shell = ", d$base_name, ".shell;"),
      paste0(d$base_name, "BuildInputs = ", d$base_name, "Shell.buildInputs;"),
      configure_phases_str
    )

    # Combine all lines into a single string with newline separators
    paste(lines, collapse = "\n  ")
  }

  nix_lines <- character(0)
  for (d in seq_along(derivs)) {
    current_lines <- generate_configurePhase(derivs[[d]])
    nix_lines <- c(nix_lines, current_lines)
  }

  paste(nix_lines, collapse = "\n\n  ")
}

#' Generate Flat Pipeline Boilerplate for pipeline.nix
#' @param derivs A list of derivation objects, where each object is a list of
#'   five elements:
#'     - `$name`, character, name of the derivation
#'     - `$snippet`, character, the nix code snippet to build this derivation
#'     - `$type`, character, can be R, Python or Quarto
#'     - `$additional_files`, character vector of paths to files to make
#'        available to build sandbox
#'     - `$nix_env`, character, path to the Nix environment to build this derivation
#'   A single deriv is the output of `rxp_r()`, `rxp_qmd()` or `rxp_py()`
#'   function.
#' @noRd
gen_flat_pipeline <- function(derivs) {
  derivation_texts <- vapply(
    derivs,
    function(d) d$snippet,
    FUN.VALUE = character(1)
  )
  derivations_code <- paste(derivation_texts, collapse = "\n\n")

  deriv_names <- vapply(derivs, function(d) d$name, character(1))
  names_line <- paste(deriv_names, collapse = " ")

  nix_envs <- parse_nix_envs(derivs)

  # Determine required functions
  types <- vapply(derivs, function(d) d$type, character(1))
  need_r <- get_need_r(types)
  need_py <- get_need_py(types)
  need_jl <- get_need_jl(types)

  # Build function definitions
  function_defs <- ""
  if (need_r) {
    function_defs <- paste0(
      function_defs,
      "\n  # Function to create R derivations
  makeRDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
    defaultPkgs.stdenv.mkDerivation {
      inherit name src;
      dontUnpack = true;
      inherit buildInputs configurePhase buildPhase;
      installPhase = ''
        cp ${name} $out/
      '';
    };"
    )
  }
  if (need_py) {
    function_defs <- paste0(
      function_defs,
      "\n  # Function to create Python derivations
  makePyDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
    let
      pickleFile = \"${name}\";
    in
      defaultPkgs.stdenv.mkDerivation {
        inherit name src;
        dontUnpack = true;
        buildInputs = buildInputs;
        inherit configurePhase buildPhase;
        installPhase = ''
          cp ${pickleFile} $out
        '';
      };"
    )
  }
  if (need_jl) {
    function_defs <- paste0(
      function_defs,
      "\n  # Function to create Julia derivations
  makeJlDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
    defaultPkgs.stdenv.mkDerivation {
      inherit name src;
      dontUnpack = true;
      buildInputs = buildInputs;
      inherit configurePhase buildPhase;
      installPhase = ''
        cp ${name} $out/
      '';
    };"
    )
  }

  # Generate Nix code
  pipeline_nix <- sprintf(
    'let
  %s%s

  # Define all derivations
%s

  # Generic default target that builds all derivations
  allDerivations = defaultPkgs.symlinkJoin {
    name = "all-derivations";
    paths = with builtins; attrValues { inherit %s; };
  };

in
{
  inherit %s;
  default = allDerivations;
}
',
    nix_envs,
    function_defs,
    paste0("  ", derivations_code),
    names_line,
    names_line
  )

  strsplit(pipeline_nix, split = "\n")[[1]]
}

# Escape regex special characters in a literal string
#' @noRd
escape_regex <- function(x) {
  gsub("([][{}()+*^$|\\\\.?])", "\\\\\\1", x)
}

#' Finalise a Flat Pipeline
#' @param dag_file A json file giving the names and relationships between derivations.
#' @param flat_pipeline A flat pipeline, output of `gen_flat_elements()`.
#' @noRd
gen_pipeline <- function(dag_file, flat_pipeline) {
  dag <- jsonlite::read_json(dag_file)
  pipeline_str <- paste(flat_pipeline, collapse = "\n")

  for (d in dag$derivations) {
    if (
      length(d$depends) == 0 ||
        d$type %in% c("rxp_qmd", "rxp_rmd", "rxp_py2r", "rxp_r2py")
    ) {
      next
    }
    deriv_name <- as.character(d$deriv_name[1])
    deps <- d$depends
    type <- d$type[1]
    decoder <- d$decoder

    # Helper function to get the unserialize function for a specific dependency
    get_unserialize_func_for_dep <- function(
      dep_name,
      decoder,
      type
    ) {
      if (is.null(decoder) || length(decoder) == 0) {
        # Use default based on type
        return(switch(
          type,
          "rxp_r" = "readRDS",
          "rxp_py" = "pickle.load",
          "rxp_jl" = "Serialization.deserialize",
          "readRDS"
        ))
      }

      # Check if decoder is a list (from JSON)
      if (is.list(decoder)) {
        # Check if it has names (named list/vector case)
        func_names <- names(decoder)
        if (!is.null(func_names) && length(func_names) > 0) {
          # It's a named list - look up the specific dependency
          if (dep_name %in% func_names) {
            return(as.character(decoder[[dep_name]]))
          } else {
            # Dependency not in the named list, use default
            return(switch(
              type,
              "rxp_r" = "readRDS",
              "rxp_py" = "pickle.load",
              "rxp_jl" = "Serialization.deserialize",
              "readRDS"
            ))
          }
        } else {
          # It's a single value in a list
          return(as.character(decoder[[1]]))
        }
      } else {
        # It's a single string value
        return(as.character(decoder[1]))
      }
    }

    # Build load lines per type
    if (type == "rxp_r") {
      base_placeholder <- "# RIXPRESS_LOAD_DEPENDENCIES_HERE"
      load_lines <- vapply(
        deps,
        function(dep) {
          func <- get_unserialize_func_for_dep(dep, decoder, type)
          sprintf("%s <- %s('${%s}/%s')", dep, func, dep, dep)
        },
        character(1)
      )
    } else if (type == "rxp_py") {
      base_placeholder <- "# RIXPRESS_PY_LOAD_DEPENDENCIES_HERE"
      load_lines <- vapply(
        deps,
        function(dep) {
          func <- get_unserialize_func_for_dep(dep, decoder, type)
          sprintf(
            "with open('${%s}/%s', 'rb') as f: %s = %s(f)",
            dep,
            dep,
            dep,
            func
          )
        },
        character(1)
      )
    } else if (type == "rxp_jl") {
      base_placeholder <- "# RIXPRESS_JL_LOAD_DEPENDENCIES_HERE"
      load_lines <- vapply(
        deps,
        function(dep) {
          func <- get_unserialize_func_for_dep(dep, decoder, type)
          sprintf(
            "%s = open(\\\\\\\"%s\\\\\\\", \\\\\\\"r\\\\\\\") do io; %s(io); end",
            dep,
            paste0("${", dep, "}/", dep),
            func
          )
        },
        character(1)
      )
    } else {
      next
    }

    # Name-scoped placeholder pattern; preserve indentation
    specific_placeholder <- paste0(base_placeholder, ":", deriv_name)
    pattern <- paste0(
      "(?m)^([ \\t]*)",
      escape_regex(specific_placeholder),
      "\\s*$"
    )

    # Prefix every injected line with the captured indentation
    replacement_block <- paste(load_lines, collapse = "\n")
    replacement_block <- gsub("\n", "\n\\1", replacement_block, fixed = TRUE)
    replacement <- paste0("\\1", replacement_block)

    pipeline_str <- sub(pattern, replacement, pipeline_str, perl = TRUE)
  }

  strsplit(pipeline_str, "\n")[[1]]
}


#' Generate an R or Py Script with Library Calls from a default.nix File
#'
#' @param nix_env Nix environment where the derivation runs
#' @param additional_files Character vector, additional files to include. These
#'   are the files that contain custom functions required for this derivation.
#' @param project_path	Path to root of project, typically "."
#' @return A script to load the libraries inside of derivations.
#' @noRd
generate_libraries_from_nix <- function(
  nix_env,
  additional_files = "",
  project_path
) {
  generate_r_libraries_from_nix(
    nix_env,
    additional_files,
    project_path
  )
  generate_py_libraries_from_nix(
    nix_env,
    additional_files,
    project_path
  )
  generate_jl_libraries_from_nix(
    nix_env,
    additional_files,
    project_path
  )
}

#' @noRd
get_need_r <- function(types) {
  any(
    types %in%
      c("rxp_r", "rxp_r_file", "rxp_rmd", "rxp_qmd", "rxp_py2r", "rxp_r2py")
  )
}

#' @noRd
get_need_py <- function(types) {
  any(types %in% c("rxp_py", "rxp_py_file"))
}

#' @noRd
get_need_jl <- function(types) {
  any(types %in% c("rxp_jl"))
}

Try the rixpress package in your browser

Any scripts or data that you put into this service are public.

rixpress documentation built on Feb. 19, 2026, 9:06 a.m.