Nothing
#' Generate Nix Pipeline Code
#'
#' @family pipeline functions
#' @param derivs A list of derivation objects, where each object is a list of
#' five elements:
#' - *name*, name of the derivation,
#' - *snippet*, the nix code snippet to build this derivation,
#' - *type*, can be R, Python or Quarto,
#' - *additional_files*, character vector of paths to files to make available to build sandbox,
#' - *nix_env*, path to Nix environment to build this derivation.
#' A single deriv is the output of `rxp_r()`, `rxp_qmd()` or `rxp_py()`
#' function.
#'
#' @param project_path Path to root of project, defaults to ".".
#'
#' @param build Logical, defaults to FALSE. Should the pipeline get built right
#' after being generated? When FALSE, use `rxp_make()` to build the pipeline
#' at a later stage.
#'
#' @param py_imports Named character vector of Python import rewrites. Names are
#' the base modules that rixpress auto-imports as "import name", and values
#' are the desired import lines. For example: c(numpy = "import numpy as np",
#' xgboost = "from xgboost import XGBClassifier"). Each entry is applied by
#' replacing "import name" with the provided string across generated
#' _rixpress Python library files.
#'
#' @param ... Further arguments passed down to methods. Use `max-jobs` and
#' `cores` to set parallelism during build. See the documentation of
#' `rxp_make()` for more details.
#'
#' @return Nothing, writes a file called `pipeline.nix` with the Nix code to
#' build the pipeline, as well as folder called _rixpress with required
#' internal files.
#'
#' @details
#' This function generates a `pipeline.nix` file based on a list of derivation
#' objects. Each derivation defines a build step, and `rxp_populate()` chains these
#' steps and handles the serialization and conversion of Python objects into R
#' objects (or vice-versa). Derivations are created with `rxp_r()`, `rxp_py()`
#' and so on. By default, the pipeline is also immediately built after being
#' generated, but the build process can be postponed by setting `build` to
#' FALSE. In this case, the pipeline can then be built using `rxp_make()` at
#' a later stage.
#' The generated `pipeline.nix` expression includes:
#' - the required imports of environments, typically `default.nix` files generated by
#' the `rix` package;
#' - correct handling of interdependencies of the different derivations;
#' - serialization and deserialization of both R and Python objects, and conversion
#' between them when objects are passed from one language to another;
#' - correct loading of R and Python packages, or extra functions needed to build
#' specific targets
#'
#' The `_rixpress` folder contains:
#' - R, Python or Julia scripts to load the required packages that need to be
#' available to the pipeline.
#' - a JSON file with the DAG of the pipeline, used for visualisation, and to
#' allow `rxp_populate()` to generate the right dependencies between derivations.
#' - `.rds` files with build logs, required for `rxp_inspect()` and `rxp_gc()`.
#' See `vignette("debugging")` for more details.
#'
#' Inline Python import adjustments
#' In some cases, due to the automatic handling of Python packages, users might
#' want to change import statements. By default if, say, `pandas` is needed to
#' build a derivation, it will be imported with `import pandas`. However, Python
#' programmers typically use `import pandas as pd`. You can either:
#' - use `py_imports` to rewrite these automatically during population, or
#' - use `adjust_import()` and `add_import()` for advanced/manual control.
#' See `vignette("polyglot")` for more details.
#'
#' @examples
#' \dontrun{
#' # Create derivation objects
#' d1 <- rxp_r(mtcars_am, filter(mtcars, am == 1))
#' d2 <- rxp_r(mtcars_head, head(mtcars_am))
#' list_derivs <- list(d1, d2)
#'
#' # Generate and build in one go
#' rxp_populate(derivs = list_derivs, project_path = ".", build = TRUE)
#'
#' # Or only populate, with inline Python import adjustments
#' rxp_populate(
#' derivs = list_derivs,
#' project_path = ".",
#' build = FALSE,
#' py_imports = c(pandas = "import pandas as pd")
#' )
#' # Then later:
#' rxp_make()
#' }
#' @export
rxp_populate <- function(
derivs,
project_path = ".",
build = FALSE,
py_imports = NULL,
...
) {
# Flatten any rxp_pipeline objects to get a flat list of derivations
# This supports hierarchical organization while preserving metadata
derivs <- flatten_derivations(derivs)
rxp_write_dag(
derivs,
output_file = file.path(project_path, "_rixpress", "dag.json")
)
# Read back the DAG to get the final no-op flags
dag <- jsonlite::read_json(file.path(project_path, "_rixpress", "dag.json"))
# Update derivation snippets based on final no-op flags
for (i in seq_along(derivs)) {
dag_entry <- Find(
function(x) x$deriv_name[1] == derivs[[i]]$name,
dag$derivations
)
if (!is.null(dag_entry) && isTRUE(dag_entry$noop_build[[1]])) {
# Only update if it wasn't already a no-op
if (!isTRUE(derivs[[i]]$noop_build)) {
# Just update the fields directly
derivs[[i]]$noop_build <- TRUE
derivs[[i]]$snippet <- sprintf(
" %s = defaultPkgs.runCommand \"%s\" {} \"\n mkdir -p $out\n echo 'Build skipped for %s (cascading no-op)' > $out/NOOPBUILD\n \";",
derivs[[i]]$name,
derivs[[i]]$name,
derivs[[i]]$name
)
}
}
}
# Need to combine nix envs and additional files into a
# list of two elements, "nix_env" and "additional_files"
# which list all the unique combinations
nix_expressions_and_additional_files <- lapply(
derivs,
function(d) {
list(
"nix_env" = d$nix_env,
"additional_files" = d$additional_files,
"type" = d$type
)
}
)
# Drop quarto objects, as these are handled separately
nix_expressions_and_additional_files <- lapply(derivs, function(d) {
if (d$type == "rxp_qmd" || d$type == "rxp_rmd") {
d$additional_files <- ""
}
list(
nix_env = d$nix_env,
additional_files = d$additional_files,
type = d$type
)
})
flat_list <- list(
nix_env = sapply(
X = nix_expressions_and_additional_files,
FUN = `[[`,
"nix_env",
USE.NAMES = FALSE
),
additional_files = sapply(
X = nix_expressions_and_additional_files,
FUN = `[[`,
"additional_files",
USE.NAMES = FALSE
)
)
nix_env_all <- flat_list$nix_env
add_files_all <- flat_list$additional_files
unique_env <- unique(nix_env_all)
additional_files_combined <- lapply(
unique_env,
function(env) {
idx <- which(nix_env_all == env)
files <- unlist(add_files_all[idx])
files <- files[!is.na(files) & files != ""]
if (length(files) == 0) {
return("")
}
unique(files)
}
)
result <- list(
nix_env = unique_env,
additional_files = additional_files_combined
)
suppressWarnings(
for (i in seq_along(result$nix_env)) {
generate_libraries_from_nix(
result$nix_env[i],
result$additional_files[[i]],
project_path = project_path
)
}
)
# Apply inline Python import adjustments, if provided.
if (!is.null(py_imports)) {
if (!is.character(py_imports) || is.null(names(py_imports))) {
stop(
"py_imports must be a named character vector, e.g. c(numpy = 'import numpy as np')."
)
}
for (mod in names(py_imports)) {
desired <- unname(py_imports[[mod]])
old <- paste0("import ", mod)
adjust_import(
old_import = old,
new_import = desired,
project_path = project_path
)
}
}
# Finalize pipeline
flat_pipeline <- gen_flat_pipeline(derivs)
pipeline <- gen_pipeline(
dag_file = file.path(paste0(project_path, "/_rixpress/dag.json")),
flat_pipeline = flat_pipeline
)
writeLines(pipeline, file.path(project_path, "pipeline.nix"))
if (build) {
rxp_make(...)
}
}
#' Parse Nix Environment Element of a Derivation Object
#' @param derivs A list of derivation objects, where each object is a list of
#' five elements:
#' - `$name`, character, name of the derivation
#' - `$snippet`, character, the nix code snippet to build this derivation
#' - `$type`, character, can be R, Python or Quarto
#' - `$additional_files`, character vector of paths to files to make
#' available to build sandbox
#' - `$nix_env`, character, path to Nix environment to build this derivation
#' Typically, these objects are created by a function like `rxp_r`.
#' @noRd
parse_nix_envs <- function(derivs) {
# Add required elements
# base name of libraries file
derivs <- lapply(
derivs,
function(d) {
d$base_name <- sub(
"_nix$",
"",
gsub("[^a-zA-Z0-9]", "_", basename(d$nix_env))
)
d
}
)
# path to libraries file
derivs <- lapply(
derivs,
function(d) {
d$library <- list.files("_rixpress", pattern = d$base_name)
d$library_in_sandbox <- gsub(paste0(d$base_name, "_"), "", d$library)
list(
"nix_env" = d$nix_env,
"base_name" = d$base_name,
"library" = d$library,
"library_in_sandbox" = d$library_in_sandbox
)
}
)
derivs <- unique(derivs)
generate_configurePhase <- function(d) {
# Compute the configure_phases_str
configure_phases_str <- paste0(
d$base_name,
"ConfigurePhase = ''\n ",
paste0(
"cp ${./_rixpress/",
unlist(d$library),
"} ",
unlist(d$library_in_sandbox),
collapse = "\n "
),
"\n mkdir -p $out ",
"\n mkdir -p .julia_depot ",
"\n export JULIA_DEPOT_PATH=$PWD/.julia_depot ",
"\n export HOME_PATH=$PWD\n ",
"'';\n "
)
# Create the individual lines
lines <- c(
paste0(d$base_name, " = import ./", d$nix_env, ";"),
paste0(d$base_name, "Pkgs = ", d$base_name, ".pkgs;"),
paste0(d$base_name, "Shell = ", d$base_name, ".shell;"),
paste0(d$base_name, "BuildInputs = ", d$base_name, "Shell.buildInputs;"),
configure_phases_str
)
# Combine all lines into a single string with newline separators
paste(lines, collapse = "\n ")
}
nix_lines <- character(0)
for (d in seq_along(derivs)) {
current_lines <- generate_configurePhase(derivs[[d]])
nix_lines <- c(nix_lines, current_lines)
}
paste(nix_lines, collapse = "\n\n ")
}
#' Generate Flat Pipeline Boilerplate for pipeline.nix
#' @param derivs A list of derivation objects, where each object is a list of
#' five elements:
#' - `$name`, character, name of the derivation
#' - `$snippet`, character, the nix code snippet to build this derivation
#' - `$type`, character, can be R, Python or Quarto
#' - `$additional_files`, character vector of paths to files to make
#' available to build sandbox
#' - `$nix_env`, character, path to the Nix environment to build this derivation
#' A single deriv is the output of `rxp_r()`, `rxp_qmd()` or `rxp_py()`
#' function.
#' @noRd
gen_flat_pipeline <- function(derivs) {
derivation_texts <- vapply(
derivs,
function(d) d$snippet,
FUN.VALUE = character(1)
)
derivations_code <- paste(derivation_texts, collapse = "\n\n")
deriv_names <- vapply(derivs, function(d) d$name, character(1))
names_line <- paste(deriv_names, collapse = " ")
nix_envs <- parse_nix_envs(derivs)
# Determine required functions
types <- vapply(derivs, function(d) d$type, character(1))
need_r <- get_need_r(types)
need_py <- get_need_py(types)
need_jl <- get_need_jl(types)
# Build function definitions
function_defs <- ""
if (need_r) {
function_defs <- paste0(
function_defs,
"\n # Function to create R derivations
makeRDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
defaultPkgs.stdenv.mkDerivation {
inherit name src;
dontUnpack = true;
inherit buildInputs configurePhase buildPhase;
installPhase = ''
cp ${name} $out/
'';
};"
)
}
if (need_py) {
function_defs <- paste0(
function_defs,
"\n # Function to create Python derivations
makePyDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
let
pickleFile = \"${name}\";
in
defaultPkgs.stdenv.mkDerivation {
inherit name src;
dontUnpack = true;
buildInputs = buildInputs;
inherit configurePhase buildPhase;
installPhase = ''
cp ${pickleFile} $out
'';
};"
)
}
if (need_jl) {
function_defs <- paste0(
function_defs,
"\n # Function to create Julia derivations
makeJlDerivation = { name, buildInputs, configurePhase, buildPhase, src ? null }:
defaultPkgs.stdenv.mkDerivation {
inherit name src;
dontUnpack = true;
buildInputs = buildInputs;
inherit configurePhase buildPhase;
installPhase = ''
cp ${name} $out/
'';
};"
)
}
# Generate Nix code
pipeline_nix <- sprintf(
'let
%s%s
# Define all derivations
%s
# Generic default target that builds all derivations
allDerivations = defaultPkgs.symlinkJoin {
name = "all-derivations";
paths = with builtins; attrValues { inherit %s; };
};
in
{
inherit %s;
default = allDerivations;
}
',
nix_envs,
function_defs,
paste0(" ", derivations_code),
names_line,
names_line
)
strsplit(pipeline_nix, split = "\n")[[1]]
}
# Escape regex special characters in a literal string
#' @noRd
escape_regex <- function(x) {
gsub("([][{}()+*^$|\\\\.?])", "\\\\\\1", x)
}
#' Finalise a Flat Pipeline
#' @param dag_file A json file giving the names and relationships between derivations.
#' @param flat_pipeline A flat pipeline, output of `gen_flat_elements()`.
#' @noRd
gen_pipeline <- function(dag_file, flat_pipeline) {
dag <- jsonlite::read_json(dag_file)
pipeline_str <- paste(flat_pipeline, collapse = "\n")
for (d in dag$derivations) {
if (
length(d$depends) == 0 ||
d$type %in% c("rxp_qmd", "rxp_rmd", "rxp_py2r", "rxp_r2py")
) {
next
}
deriv_name <- as.character(d$deriv_name[1])
deps <- d$depends
type <- d$type[1]
decoder <- d$decoder
# Helper function to get the unserialize function for a specific dependency
get_unserialize_func_for_dep <- function(
dep_name,
decoder,
type
) {
if (is.null(decoder) || length(decoder) == 0) {
# Use default based on type
return(switch(
type,
"rxp_r" = "readRDS",
"rxp_py" = "pickle.load",
"rxp_jl" = "Serialization.deserialize",
"readRDS"
))
}
# Check if decoder is a list (from JSON)
if (is.list(decoder)) {
# Check if it has names (named list/vector case)
func_names <- names(decoder)
if (!is.null(func_names) && length(func_names) > 0) {
# It's a named list - look up the specific dependency
if (dep_name %in% func_names) {
return(as.character(decoder[[dep_name]]))
} else {
# Dependency not in the named list, use default
return(switch(
type,
"rxp_r" = "readRDS",
"rxp_py" = "pickle.load",
"rxp_jl" = "Serialization.deserialize",
"readRDS"
))
}
} else {
# It's a single value in a list
return(as.character(decoder[[1]]))
}
} else {
# It's a single string value
return(as.character(decoder[1]))
}
}
# Build load lines per type
if (type == "rxp_r") {
base_placeholder <- "# RIXPRESS_LOAD_DEPENDENCIES_HERE"
load_lines <- vapply(
deps,
function(dep) {
func <- get_unserialize_func_for_dep(dep, decoder, type)
sprintf("%s <- %s('${%s}/%s')", dep, func, dep, dep)
},
character(1)
)
} else if (type == "rxp_py") {
base_placeholder <- "# RIXPRESS_PY_LOAD_DEPENDENCIES_HERE"
load_lines <- vapply(
deps,
function(dep) {
func <- get_unserialize_func_for_dep(dep, decoder, type)
sprintf(
"with open('${%s}/%s', 'rb') as f: %s = %s(f)",
dep,
dep,
dep,
func
)
},
character(1)
)
} else if (type == "rxp_jl") {
base_placeholder <- "# RIXPRESS_JL_LOAD_DEPENDENCIES_HERE"
load_lines <- vapply(
deps,
function(dep) {
func <- get_unserialize_func_for_dep(dep, decoder, type)
sprintf(
"%s = open(\\\\\\\"%s\\\\\\\", \\\\\\\"r\\\\\\\") do io; %s(io); end",
dep,
paste0("${", dep, "}/", dep),
func
)
},
character(1)
)
} else {
next
}
# Name-scoped placeholder pattern; preserve indentation
specific_placeholder <- paste0(base_placeholder, ":", deriv_name)
pattern <- paste0(
"(?m)^([ \\t]*)",
escape_regex(specific_placeholder),
"\\s*$"
)
# Prefix every injected line with the captured indentation
replacement_block <- paste(load_lines, collapse = "\n")
replacement_block <- gsub("\n", "\n\\1", replacement_block, fixed = TRUE)
replacement <- paste0("\\1", replacement_block)
pipeline_str <- sub(pattern, replacement, pipeline_str, perl = TRUE)
}
strsplit(pipeline_str, "\n")[[1]]
}
#' Generate an R or Py Script with Library Calls from a default.nix File
#'
#' @param nix_env Nix environment where the derivation runs
#' @param additional_files Character vector, additional files to include. These
#' are the files that contain custom functions required for this derivation.
#' @param project_path Path to root of project, typically "."
#' @return A script to load the libraries inside of derivations.
#' @noRd
generate_libraries_from_nix <- function(
nix_env,
additional_files = "",
project_path
) {
generate_r_libraries_from_nix(
nix_env,
additional_files,
project_path
)
generate_py_libraries_from_nix(
nix_env,
additional_files,
project_path
)
generate_jl_libraries_from_nix(
nix_env,
additional_files,
project_path
)
}
#' @noRd
get_need_r <- function(types) {
any(
types %in%
c("rxp_r", "rxp_r_file", "rxp_rmd", "rxp_qmd", "rxp_py2r", "rxp_r2py")
)
}
#' @noRd
get_need_py <- function(types) {
any(types %in% c("rxp_py", "rxp_py_file"))
}
#' @noRd
get_need_jl <- function(types) {
any(types %in% c("rxp_jl"))
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.