Nothing
# Helpers for testing.
#region General test helpers.
# Helper for extracting the message associated with errors and warnings.
as_text <- function(expression) {
# Capture message.
message <- tryCatch(expression,
# Capture warning message.
warning = function(w) w$message,
# Capture error message.
error = function(e) e$message
)
return(message)
}
# Select a cluster type, with some variability.
pick_cluster_type <- function(types) {
# Decide what type of cluster to create.
if (.Platform$OS.type == "unix") {
# Randomly pick a cluster type.
cluster_type <- sample(types, 1)
} else {
# Fix the cluster type to "psock" on Windows.
cluster_type <- "psock"
}
return(cluster_type)
}
# Select backend type.
pick_backend_type <- function() {
# Randomly pick a backend type.
backend_type <- sample(c("sync", "async"), 1)
return(backend_type)
}
# Define the test task to use.
test_task <- function(x, y, z, sleep = 0) {
# Sleep a bit or not.
Sys.sleep(sleep)
# Compute something.
output <- (x + y) / z
# Return the result.
return(output)
}
# Check if a task is running on an asynchronous backend, or context.
task_is_running <- function(backend) {
# If a context is passed.
if (Helper$get_class_name(backend) == "Context") {
# Get the status via the context.
status <- backend$backend$task_state$task_is_running
} else {
# Get the status from the backend directly.
status <- backend$task_state$task_is_running
}
return(status)
}
# Check if the body of a decorated tasks contains a specific pattern.
body_contains <- function(task, pattern, position = 2) {
# Get the body of the decorated task at a specific position.
extraction <- as.list(body(task))[position]
# Check if the body at position contains the pattern.
contains <- grepl(pattern, extraction)
return(contains)
}
#endregion
#region Tests sets applicable to all backends types.
# Set of tests for unimplemented service methods.
tests_set_for_unimplemented_service_methods <- function(service) {
# Expect an error when calling the `start` method.
expect_error(service$start(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `stop` method.
expect_error(service$stop(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `clear` method.
expect_error(service$clear(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `peek` method.
expect_error(service$peek(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `export` method.
expect_error(service$export(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `evaluate` method.
expect_error(service$evaluate(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `sapply` method.
expect_error(service$sapply(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `lapply` method.
expect_error(service$lapply(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `apply` method.
expect_error(service$apply(), as_text(Exception$method_not_implemented()))
# Expect an error when calling the `get_output` method.
expect_error(service$get_output(), as_text(Exception$method_not_implemented()))
}
# Set of tests for exporting to the backend (i.e,. regardless of type).
tests_set_for_backend_exporting <- function(service) {
# Create a variable in a different environment.
env <- new.env()
env$test_variable <- rnorm(1)
# Export the variable from the specific environment to the backend.
service$export("test_variable", env)
# Expect that the variable is on the backend.
expect_true(all(service$peek() == "test_variable"))
# Expect the cluster to hold the correct value for the exported variable.
expect_true(all(service$evaluate(test_variable) == env$test_variable))
# Assign a variable to the current environment.
assign("test_variable", rnorm(1), envir = environment())
# Export the variable using the current environment (i.e., parent of `export`).
service$export("test_variable")
# Expect that the variable is on the backend.
expect_true(all(service$peek() == "test_variable"))
# Expect the cluster to hold the correct value for the exported variable.
expect_true(all(service$evaluate(test_variable) == get("test_variable", envir = environment())))
}
# Set of tests for starting and stopping backends.
tests_set_for_backend_states <- function(backend, specification) {
# Expect an error if an attempt is made to start a cluster while one is already active.
expect_error(backend$start(specification), as_text(Exception$cluster_active()))
# Stop the backend.
backend$stop()
# Expect that stopping the cluster marks it as inactive.
expect_false(backend$active)
# Expect the cluster field has been cleared.
expect_null(backend$cluster)
# Start a new cluster on the same backend instance.
backend$start(specification)
# Expect the cluster is active.
expect_true(backend$active)
# Stop the cluster.
backend$stop()
# Expect that trying to stop a cluster that is not active throws an error.
expect_error(backend$stop(), as_text(Exception$cluster_not_active()))
}
#endregion
#region Tests sets for synchronous backends.
# Set of tests for the synchronous backend task execution via a specified operation.
tests_set_for_synchronous_backend_task_execution <- function(operation, service, expected_output) {
# Run the task in parallel via the requested operation (e.g., `sapply`).
eval(operation)
# Expect the that output is correct.
expect_equal(service$get_output(), expected_output)
# Expect that subsequent calls to `get_output` return `NULL`.
expect_null(service$get_output())
# Remain silent.
invisible(NULL)
}
# Set of tests for synchronous backend operations.
tests_set_for_synchronous_backend_operations <- function(service, specification, task) {
# Start the cluster on the backend.
service$start(specification)
# Always stop on exit.
on.exit({
# Stop the backend.
service$stop()
})
# Expect that the cluster is empty upon creation.
expect_true(all(sapply(service$peek(), length) == 0))
# Tests for exporting to the backend.
tests_set_for_backend_exporting(service)
# Clear the backend.
service$clear()
# Expect that clearing the cluster leaves it empty.
expect_true(all(sapply(service$peek(), length) == 0))
# Select task arguments.
x <- sample(1:100, 100)
y <- sample(1:100, 1)
z <- sample(1:100, 1)
sleep = sample(c(0, 0.001, 0.002), 1)
# Created the expect output.
expected_output <- task(x, y, z)
# Define the `sapply` operation.
operation <- bquote(service$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `sapply` operation.
tests_set_for_synchronous_backend_task_execution(operation, service, expected_output)
# Expect that errors in the task executed via `sapply` are propagated.
expect_error(
service$sapply(1:10, function(x) stop("Task error.")),
"Task error."
)
# Created the expect output.
expected_output <- as.list(expected_output)
# Define the `lapply` operation.
operation <- bquote(service$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `lapply` operation.
tests_set_for_synchronous_backend_task_execution(operation, service, expected_output)
# Expect that errors in the task executed via `lapply` are propagated.
expect_error(
service$lapply(1:10, function(x) stop("Task error.")),
"Task error."
)
# Redefine `x` as a matrix for the `apply` operation.
x <- matrix(rnorm(100^2), nrow = 100, ncol = 100)
# Compute the expected output for the `apply` operation applied over rows.
expected_output <- base::apply(x, 1, task, y = y, z = z)
# Define the `apply` operation over rows.
operation <- bquote(service$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over rows.
tests_set_for_synchronous_backend_task_execution(operation, service, expected_output)
# Compute the expected output for the `apply` operation applied over columns.
expected_output <- base::apply(x, 2, task, y = y, z = z)
# Define the `apply` operation over columns.
operation <- bquote(service$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over columns.
tests_set_for_synchronous_backend_task_execution(operation, service, expected_output)
# Redefine a smaller `x` matrix for the `apply` operation applied element-wise.
x <- matrix(rnorm(10^2), nrow = 10, ncol = 10)
# Compute the expected output for the `apply` operation applied element-wise.
expected_output <- base::apply(x, c(1, 2), task, y = y, z = z)
# Define the `apply` operation element-wise.
operation <- bquote(service$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation element-wise.
tests_set_for_synchronous_backend_task_execution(operation, service, expected_output)
# Expect error when margins higher than the array dimensions are provided.
expect_error(
service$apply(x, 3, task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(3, dim(x))),
fixed = TRUE
)
# Expect error when duplicate margins are provided.
expect_error(
service$apply(x, c(1, 1), task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(c(1, 1), dim(x))),
fixed = TRUE
)
# Expect errors when the margins are not compatible with the array dimensions.
expect_error(
service$apply(x, c(1, 2, 3, 1), task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(c(1, 2, 3, 1), dim(x))),
fixed = TRUE
)
# Expect that errors in the task executed via `apply` are propagated.
expect_error(
service$apply(matrix(1:100, 10, 10), 1, function(x) stop("Task error.")),
"Task error."
)
# Expect that the cluster is empty after performing operations on it.
expect_true(all(sapply(service$peek(), length) == 0))
# Remain silent.
invisible(NULL)
}
#endregion
#region Tests sets for asynchronous backends.
# Set of tests for the asynchronous backend task execution via a specified operation.
tests_set_for_asynchronous_backend_task_execution <- function(operation, service, expected_output) {
# Run the task in parallel.
eval(operation)
# Expect the that output is correct.
expect_equal(service$get_output(wait = TRUE), expected_output)
# Expect that subsequent calls to `get_output` will throw an error.
expect_error(service$get_output(), as_text(Exception$async_task_not_started()))
# Run the task in parallel, with a bit of overhead.
eval(operation)
# Expect that trying to run a task while another is running fails.
expect_error(eval(operation), as_text(Exception$async_task_running()))
# Expect the that output is correct.
expect_equal(service$get_output(wait = TRUE), expected_output)
# Run the task in parallel, with a bit of overhead.
eval(operation)
# Expect that trying to get the output of a task that is still running fails.
expect_error(service$get_output(), as_text(Exception$async_task_running()))
# Block the main thread until the task is finished.
while(task_is_running(service)) {
# Sleep a bit.
Sys.sleep(0.001)
}
# Expect that trying to run a task without reading the previous output fails.
expect_error(eval(operation), as_text(Exception$async_task_completed()))
# Expect the that output is correct.
expect_equal(service$get_output(), expected_output)
}
# Set of tests for synchronous backend operations.
tests_set_for_asynchronous_backend_operations <- function(service, specification, task) {
# Start the cluster on the backend.
service$start(specification)
# Always stop on exit.
on.exit({
# Stop the backend.
service$stop()
})
# Expect that the cluster is empty upon creation.
expect_true(all(sapply(service$peek(), length) == 0))
# Tests for the `export` operation.
tests_set_for_backend_exporting(service)
# Clear the backend.
service$clear()
# Expect that clearing the cluster leaves it empty.
expect_true(all(sapply(service$peek(), length) == 0))
# Expect error waiting to fetch the output when no task is running.
expect_error(service$get_output(wait = TRUE), as_text(Exception$async_task_not_started()))
# Select task arguments.
x <- sample(1:100, 100)
y <- sample(1:100, 1)
z <- sample(1:100, 1)
sleep = sample(c(0, 0.001, 0.002), 1)
# Compute the expected output for the `sapply` operation.
expected_output <- task(x, y, z)
# Define the `sapply` operation.
operation <- bquote(service$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `sapply` operation.
tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output)
# Expect that errors in the task executed via `sapply` are propagated.
expect_error(
{
service$sapply(1:10, function(x) stop("Task error."))
service$get_output(wait = TRUE)
},
"Task error."
)
# Compute the expected output for the `lapply` operation.
expected_output <- as.list(expected_output)
# Define the `lapply` operation.
operation <- bquote(service$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `lapply` operation.
tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output)
# Expect that errors in the task executed via `lapply` are propagated.
expect_error(
{
service$lapply(1:10, function(x) stop("Task error."))
service$get_output(wait = TRUE)
},
"Task error."
)
# Redefine `x` as a matrix for the `apply` operation.
x <- matrix(rnorm(100^2), nrow = 100, ncol = 100)
# Compute the expected output for the `apply` operation applied over rows.
expected_output <- base::apply(x, 1, task, y = y, z = z)
# Define the `apply` operation over rows.
operation <- bquote(service$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over rows.
tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output)
# Compute the expected output for the `apply` operation applied over columns.
expected_output <- base::apply(x, 2, task, y = y, z = z)
# Define the `apply` operation over columns.
operation <- bquote(service$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over columns.
tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output)
# Redefine a smaller `x` matrix for the `apply` operation applied element-wise.
x <- matrix(rnorm(10^2), nrow = 10, ncol = 10)
# Compute the expected output for the `apply` operation applied element-wise.
expected_output <- base::apply(x, c(1, 2), task, y = y, z = z)
# Define the `apply` operation element-wise.
operation <- bquote(service$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation element-wise.
tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output)
# Expect error when margins higher than the array dimensions are provided.
expect_error(
service$apply(x, 3, task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(3, dim(x))),
fixed = TRUE
)
# Expect error when duplicate margins are provided.
expect_error(
service$apply(x, c(1, 1), task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(c(1, 1), dim(x))),
fixed = TRUE
)
# Expect errors when the margins are not compatible with the array dimensions.
expect_error(
service$apply(x, c(1, 2, 3, 1), task, y = y, z = z),
as_text(Exception$array_margins_not_compatible(c(1, 2, 3, 1), dim(x))),
fixed = TRUE
)
# Expect that errors in the task executed via `apply` are propagated.
expect_error(
{
service$apply(matrix(1:100, 10, 10), 1, function(x) stop("Task error."))
service$get_output(wait = TRUE)
},
"Task error."
)
# Expect that the cluster is empty after performing operations on it.
expect_true(all(sapply(service$peek(), length) == 0))
# Remain silent.
invisible(NULL)
}
#endregion
#region Tests sets for progress tracking.
# Set of tests for executing tasks in a progress tracking context with output.
tests_set_for_task_execution_with_progress_tracking <- function(operation, context, expected_output) {
# Clear the progress output on exit.
on.exit({
# Clear the output.
context$progress_bar_output <- NULL
})
# Create a bar factory.
bar_factory <- BarFactory$new()
# Get a basic bar instance.
bar <- bar_factory$get("basic")
# Register the bar with the context object.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
style = 3
)
# Run the task in parallel.
eval(operation)
# Expect that the task output is correct.
expect_equal(context$get_output(wait = TRUE), expected_output)
# Expect the progress bar was shown correctly.
expect_true(any(grepl("=\\| 100%", context$progress_bar_output)))
# Get a modern bar instance.
bar <- bar_factory$get("modern")
# Register the bar with the same context object.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
show_after = 0,
format = ":bar| :percent",
clear = FALSE,
force = TRUE
)
# Run the task in parallel.
eval(operation)
# Expect that the task output is correct.
expect_equal(context$get_output(wait = TRUE), expected_output)
# Expect the progress bar was shown correctly.
expect_true(any(grepl("=\\| 100%", context$progress_bar_output)))
}
# Set of tests for progress tracking context.
tests_set_for_progress_tracking_context <- function(context, task) {
# Check the type.
Helper$check_object_type(context, "ProgressTrackingContextTester")
# Select task arguments.
x <- sample(1:100, 100)
y <- sample(1:100, 1)
z <- sample(1:100, 1)
sleep <- sample(c(0, 0.001, 0.002), 1)
# Create the expected output for the `sapply` operation.
expected_output <- task(x, y, z)
# Create the `sapply` operation.
operation <- bquote(context$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `sapply` operation in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
# Create the expected output for the `lapply` operation.
expected_output <- as.list(expected_output)
# Create the `lapply` operation.
operation <- bquote(context$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `lapply` operation in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
# Redefine `x` as a matrix for the `apply` operation.
x <- matrix(rnorm(100^2), nrow = 100, ncol = 100)
# Compute the expected output for the `apply` operation applied over rows.
expected_output <- base::apply(x, 1, task, y = y, z = z)
# Define the `apply` operation over rows.
operation <- bquote(context$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over rows in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
# Compute the expected output for the `apply` operation applied over columns.
expected_output <- base::apply(x, 2, task, y = y, z = z)
# Define the `apply` operation over columns.
operation <- bquote(context$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over columns in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
# Redefine a smaller `x` matrix for the `apply` operation applied element-wise.
x <- matrix(rnorm(10^2), nrow = 10, ncol = 10)
# Compute the expected output for the `apply` operation applied element-wise.
expected_output <- base::apply(x, c(1, 2), task, y = y, z = z)
# Define the `apply` operation element-wise.
operation <- bquote(context$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep)))
# Tests for the `apply` operation over all elements in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
}
# Set of tests for executing tasks that throw errors in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking_and_error <- function(operation, context, expected_error) {
# Clear the progress output on exit.
on.exit({
# Clear the output.
context$progress_bar_output <- NULL
})
# Create a bar factory.
bar_factory <- BarFactory$new()
# Get a basic bar instance.
bar <- bar_factory$get("basic")
# Register the bar with the context object.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
style = 3
)
# Run the task that throws an error in parallel.
eval(operation)
# Expect that the task interrupted the progress bar with an error message.
expect_error(context$get_output(wait = TRUE), expected_error)
# Get a modern bar instance.
bar <- bar_factory$get("modern")
# Register the bar with the same context object.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
show_after = 0,
format = ":bar| :percent",
clear = FALSE,
force = TRUE
)
# Run the task in parallel.
eval(operation)
# Expect that the task interrupted the progress bar with an error message.
expect_error(context$get_output(wait = TRUE), expected_error)
}
# Set of tests for progress bar interruptions in a tracking context.
tests_set_for_progress_tracking_context_with_error <- function(context) {
# Check the type.
Helper$check_object_type(context, "ProgressTrackingContextTester")
# Clean-up.
on.exit({
# Set default values for package options.
set_default_options()
})
# Reduce waiting time between progress bar updates.
set_option("progress_wait", 0.01)
# Define the task.
task <- function(x, error_x = 1, sleep = 0) {
# Sleep a bit or not.
Sys.sleep(sleep)
# Randomly sample when to throw an error.
if(any(x == error_x)) {
# Throw an error.
stop("Intentional task error.")
}
# Compute something.
output <- x + 1
# Return the result.
return(output)
}
# Construct the expected error message.
expected_error <- as_text(task(x = 1, error_x = 1))
# Select task arguments.
x <- sample(1:100, 100)
error_x <- sample(length(x), 1)
sleep <- sample(c(0, 0.001, 0.002), 1)
# Create the `sapply` operation.
operation <- bquote(context$sapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep)))
# Tests for the `sapply` operation in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
# Create the `lapply` operation.
operation <- bquote(context$lapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep)))
# Tests for the `lapply` operation in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
# Redefine `x` as a matrix for the `apply` operation.
x <- matrix(rnorm(100^2), nrow = 100, ncol = 100)
# Sample new error `x`.
error_x <- sample(x, 1)
# Define the `apply` operation over rows.
operation <- bquote(context$apply(.(x), 1, .(task), error_x = .(error_x), sleep = .(sleep)))
# Tests for the `apply` operation over rows in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
# Define the `apply` operation over columns.
operation <- bquote(context$apply(.(x), 2, .(task), error_x = .(error_x), sleep = .(sleep)))
# Tests for the `apply` operation over columns in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
# Redefine a smaller `x` matrix for the `apply` operation applied element-wise.
x <- matrix(rnorm(10^2), nrow = 10, ncol = 10)
# Sample new error `x`.
error_x <- sample(x, 1)
# Define the `apply` operation element-wise.
operation <- bquote(context$apply(.(x), c(1, 2), .(task), error_x = .(error_x), sleep = .(sleep)))
# Tests for the `apply` operation over all elements in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
}
#endregion
#region Tests sets for the user API.
# Set of tests for creating backends via the user API.
tests_set_for_backend_creation_via_user_api <- function(cluster_type, backend_type) {
# Create a backend.
backend <- start_backend(
cores = 2,
cluster_type = cluster_type,
backend_type = backend_type
)
# Expect the backend to be active.
expect_true(backend$active)
# Expect the backend to be of the correct type.
expect_equal(
Helper$get_class_name(backend),
Helper$get_class_name(BackendFactory$new()$get(backend_type))
)
# Stop the backend
stop_backend(backend)
# Expect the backend to be inactive.
expect_false(backend$active)
}
# Set of tests for task execution via the user API.
tests_set_for_user_api_task_execution <- function(parallel, sequential, expected_output) {
# Clean-up.
on.exit({
# Set default values for package options.
set_default_options()
})
# Select a cluster type.
cluster_type <- pick_cluster_type(Specification$new()$types)
# Disable progress tracking.
set_option("progress_track", FALSE)
# Create a synchronous backend.
backend <- start_backend(
cores = 2,
cluster_type = cluster_type,
backend_type = "sync"
)
# Expect the output of the task ran in parallel to be correct.
expect_equal(eval(parallel), expected_output)
# Enable progress tracking.
set_option("progress_track", TRUE)
# Expect warning for requesting progress tracking with incompatible backend.
expect_warning(
eval(parallel),
as_text(Warning$progress_not_supported_for_backend(backend))
)
# Stop the synchronous backend.
stop_backend(backend)
# Create an asynchronous backend.
backend <- start_backend(
cores = 2,
cluster_type = cluster_type,
backend_type = "async"
)
# Expect the output to be correct.
expect_equal(eval(parallel), expected_output)
# Disable progress tracking.
set_option("progress_track", FALSE)
# Expect the output to be correct.
expect_equal(eval(parallel), expected_output)
# Stop the asynchronous backend.
stop_backend(backend)
# Expect the task to produce correct output when ran sequentially.
expect_equal(eval(sequential), expected_output)
}
# Set of tests for progress tracking via the user API.
tests_set_for_user_api_progress_tracking <- function(operation) {
# Pick a cluster type.
cluster_type <- pick_cluster_type(Specification$new()$types)
# Create an asynchronous backend.
backend <- start_backend(
cores = 2,
cluster_type = cluster_type,
backend_type = "async"
)
# Clean-up on exit.
on.exit({
# Stop the backend.
stop_backend(backend)
# Restore the default options.
set_default_options()
})
# Configure modern bar.
configure_bar(
type = "modern",
force = TRUE,
clear = FALSE
)
# Redirect output.
sink("/dev/null", type = "output")
# Run the task and capture the progress bar output.
output <- capture.output({ eval(operation) }, type = "message")
# Remove output redirection.
sink(NULL)
# Expect the progress bar to be shown correctly.
expect_true(grepl("tasks \\[100%\\]", paste0(output, collapse = ""), perl = TRUE))
# Configure the basic bar.
configure_bar(
type = "basic",
style = 3
)
# Run the task and capture the progress bar output.
output <- capture.output({ eval(operation) }, type = "output")
# Expect the progress bar to be shown correctly.
expect_true(grepl("=\\| 100%", paste0(output, collapse = ""), perl = TRUE))
# Disable progress tracking.
set_option("progress_track", FALSE)
# Run the task and capture the output without the progress bar.
output <- capture.output({ eval(operation) }, type = "output")
# Expect the progress bar to be missing from the output.
expect_false(grepl("=\\| 100%", paste0(output, collapse = ""), perl = TRUE))
}
#endregion
#region Helper `R6` classes for testing.
# Helper for testing private methods of `Specification` class.
SpecificationTester <- R6::R6Class("SpecificationTester",
inherit = Specification,
private = list(
# Overwrite the private `.get_available_cores()` method.
.get_available_cores = function() {
return(self$available_cores)
}
),
public = list(
available_cores = NULL
),
active = list(
# Expose the private `.determine_usable_cores` method.
usable_cores = function() {
# Compute and return the number of the usable cores.
return(private$.determine_usable_cores(self$available_cores))
}
)
)
# Helper for testing method implementations of `Service` interface.
ServiceImplementation <- R6::R6Class("ServiceImplementation",
inherit = Service,
public = list(
# Allow instantiation.
initialize = function() {}
)
)
# Helper for testing method implementations of `Backend` class.
BackendImplementation <- R6::R6Class("BackendImplementation",
inherit = Backend,
public = list(
# Allow instantiation.
initialize = function() {}
)
)
# Helper for testing the `ProgressTrackingContext` class.
ProgressTrackingContextTester <- R6::R6Class("ProgressTrackingContextTester",
inherit = ProgressTrackingContext,
private = list(
# Wrapper for executing task operations with progress output capturing.
.execute_and_capture_progress = function(operation) {
# Create a text connection.
connection <- textConnection("output", open = "w", local = TRUE)
# Capture the output.
sink(connection, type = "output")
sink(connection, type = "message")
# Close the connection and reset the sink on exit.
on.exit({
# Reset the sink.
sink(NULL, type = "message")
sink(NULL, type = "output")
# Close the connection.
close(connection)
})
# Execute the task.
eval(operation)
# Store the progress bar output on the instance.
self$progress_bar_output <- output
}
),
public = list(
# The progress bar output used for testing.
progress_bar_output = NULL,
# Implementation for the `sapply` method capturing the progress output.
sapply = function(x, fun, ...) {
# Define the operation.
operation <- bquote(
do.call(
super$sapply, c(list(.(x), .(fun)), .(list(...)))
)
)
# Execute the task via the operation and capture the progress output.
private$.execute_and_capture_progress(operation)
},
# Implementation for the `lapply` method capturing the progress output.
lapply = function(x, fun, ...) {
# Define the operation.
operation <- bquote(
do.call(
super$lapply, c(list(.(x), .(fun)), .(list(...)))
)
)
# Execute the task via the operation and capture the progress output.
private$.execute_and_capture_progress(operation)
},
# Implementation for the `apply` method capturing the progress output.
apply = function(x, margin, fun, ...) {
# Define the operation.
operation <- bquote(
do.call(
super$apply, c(list(.(x), .(margin), .(fun)), .(list(...)))
)
)
# Execute the task via the operation and capture the progress output.
private$.execute_and_capture_progress(operation)
},
# Wrapper to expose `.make_log` for testing.
make_log = function() {
private$.make_log()
},
# Wrapper to expose the `.decorate` for testing.
decorate = function(task, log) {
private$.decorate(task, log)
}
),
active = list(
# Expose the bar configuration.
bar_config = function() { return(private$.bar_config) }
)
)
# Helper for testing method implementations of `Bar` class.
BarImplementation <- R6::R6Class("BarImplementation",
inherit = Bar,
public = list(
# Allow instantiation.
initialize = function() {}
)
)
#endregion
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.