R/process.R

Defines functions ps_method process_as_ps_handle process_get_result process_supervise process_is_supervised process_get_pid process_get_start_time process_kill_tree process_kill process_interrupt process_signal process_get_exit_status process_is_alive process_wait

#' @useDynLib processx, .registration = TRUE, .fixes = "c_"
NULL

#' External process
#'
#' Managing external processes from R is not trivial, and this
#' class aims to help with this deficiency. It is essentially a small
#' wrapper around the `system` base R function, to return the process
#' id of the started process, and set its standard output and error
#' streams. The process id is then used to manage the process.
#'
#' @section Usage:
#' ```
#' p <- process$new(command = NULL, args,
#'                  stdin = NULL, stdout = NULL, stderr = NULL,
#'                  connections = list(), poll_connection = NULL,
#'                  env = NULL, cleanup = TRUE, cleanup_tree = FALSE,
#'                  wd = NULL, echo_cmd = FALSE, supervise = FALSE,
#'                  windows_verbatim_args = FALSE,
#'                  windows_hide_window = FALSE,
#'                  encoding = "", post_process = NULL)
#'
#' p$is_alive()
#' p$signal(signal)
#' p$interrupt()
#' p$kill(grace = 0.1)
#' p$kill_tree(grace = 0.1)
#' p$wait(timeout = -1)
#' p$get_pid()
#' p$get_exit_status()
#' p$get_start_time()
#'
#' p$read_output(n = -1)
#' p$read_error(n = -1)
#' p$read_output_lines(n = -1)
#' p$read_error_lines(n = -1)
#' p$has_input_connection()
#' p$has_output_connection()
#' p$has_error_connection()
#' p$has_poll_connection()
#' p$get_input_connection()
#' p$get_output_connection()
#' p$get_error_connection()
#' p$get_poll_connection()
#' p$is_incomplete_output()
#' p$is_incomplete_error()
#' p$read_all_output()
#' p$read_all_error()
#' p$read_all_output_lines()
#' p$read_all_error_lines()
#' p$write_input(str, sep = "\n")
#' p$get_input_file()
#' p$get_output_file()
#' p$get_error_file()
#'
#' p$poll_io(timeout)
#'
#' p$get_result()
#'
#' p$as_ps_handle()
#' p$get_name()
#' p$get_exe()
#' p$get_cmdline()
#' p$get_status()
#' p$get_username()
#' p$get_wd()
#' p$get_cpu_times()
#' p$get_memory_info()
#' p$suspend()
#' p$resume()
#'
#' format(p)
#' print(p)
#' ```
#'
#' @section Arguments:
#' * `p`: `process` object.
#' * `command`: Character scalar, the command to run.
#'     Note that this argument is not passed to a shell, so no
#'     tilde-expansion or variable substitution is performed on it.
#'     It should not be quoted with [base::shQuote()]. See
#'     [base::normalizePath()] for tilde-expansion.
#' * `args`: Character vector, arguments to the command. They will be
#'     used as is, without a shell. They don't need to be escaped.
#' * `stdin`: What to do with the standard input. Possible values:
#'     `NULL`: set to the _null device_, i.e. no standard input is
#'     provided; a string, supply the specified string as standard input;
#'     `"|"`: create a (writeable) connection for stdin.
#' * `stdout`: What to do with the standard output. Possible values:
#'     `NULL`: discard it; a string, redirect it to this file;
#'     `"|"`: create a connection for it.
#' * `stderr`: What to do with the standard error. Possible values:
#'     `NULL`: discard it; a string, redirect it to this file;
#'     `"|"`: create a connection for it.
#' * `connections`: A list of connections to pass to the child process.
#'     This is an experimental feature currently.
#' * `poll_connection`: Whether to create an extra connection to the process
#'     that allows polling, even if the standard input and standard output
#'     are not pipes. If this is `NULL` (the default), then this connection
#'     will be only created if standard output and standard error are not
#'     pipes, and `connections` is an empty list. If the poll connection is
#'     created, you can query it via `p$get_poll_connection()` and it is
#'     also included in the response to `p$poll_io()` and [poll()]. The
#'     numeric file descriptor of the poll connection comes right after
#'     `stderr` (2), and the connections listed in `connections`.
#' * `env`: Environment variables of the child process. If `NULL`, the
#'     parent's environment is inherited. On Windows, many programs cannot
#'     function correctly if some environment variables are not set, so we
#'     always set `HOMEDRIVE`, `HOMEPATH`, `LOGONSERVER`, `PATH`,
#'     `SYSTEMDRIVE`, `SYSTEMROOT`, `TEMP`, `USERDOMAIN`, `USERNAME`,
#'     `USERPROFILE` and `WINDIR`.
#' * `cleanup`: Whether to kill the process when the `process` object is
#'     garbage collected.
#' * `cleanup_tree`: Whether to kill the process and its child process tree
#'     when the `process` object is garbage collected.
#' * `wd`: working directory of the process. It must exist. If `NULL`, then
#'     the current working directory is used.
#' * `echo_cmd`: Whether to print the command to the screen before
#'     running it.
#' * `supervise`: Whether to register the process with a supervisor.
#'     If `TRUE`, the supervisor will ensure that the process is
#'     killed when the R process exits.
#' * `windows_verbatim_args`: Whether to omit quoting the arguments
#'     on Windows. It is ignored on other platforms.
#' * `windows_hide_window`: Whether to hide the application's window
#'     on Windows. It is ignored on other platforms.
#' * `signal`: An integer scalar, the id of the signal to send to
#'     the process. See [tools::pskill()] for the list of signals.
#' * `grace`: Currently not used.
#' * `timeout`: Timeout in milliseconds, for the wait or the I/O
#'     polling.
#' * `n`: Number of characters or lines to read.
#' * `str`: Character or raw vector to write to the standard input of the
#'     process. If a character vector with a marked encoding, it will be
#'     converted to `encoding`.
#' * `sep`: Separator to add between `str` elements if it is a character
#'     vector. It is ignored if `str` is a raw vector.
#' * `encoding`: The encoding to assume for `stdin`, `stdout` and
#'     `stderr`. By default the encoding of the current locale is
#'     used. Note that `processx` always reencodes the output of the
#'     `stdout `and `stderr`  streams in UTF-8 currently.
#'     If you want to read them without any conversion, on all platforms,
#'     specify `"UTF-8"` as encoding.
#' * `post_process`: An optional function to run when the process has
#'     finished. Currently it only runs if `$get_result()` is called.
#'     It is only run once.
#'
#' @section Details:
#' `$new()` starts a new process in the background, and then returns
#' immediately.
#'
#' `$is_alive()` checks if the process is alive. Returns a logical
#' scalar.
#'
#' `$signal()` sends a signal to the process. On Windows only the
#' `SIGINT`, `SIGTERM` and `SIGKILL` signals are interpreted,
#' and the special 0 signal, The first three all kill the process. The 0
#' signal return `TRUE` if the process is alive, and `FALSE`
#' otherwise. On Unix all signals are supported that the OS supports, and
#' the 0 signal as well.
#'
#' `$interrupt()` sends an interrupt to the process. On Unix this is a
#' `SIGINT` signal, and it is usually equivalent to pressing CTRL+C at the
#' terminal prompt. On Windows, it is a CTRL+BREAK keypress. Applications
#' may catch these events. By default they will quit.
#'
#' `$kill()` kills the process. It also kills all of its child
#' processes, except if they have created a new process group (on Unix),
#' or job object (on Windows). It returns `TRUE` if the process
#' was killed, and `FALSE` if it was no killed (because it was
#' already finished/dead when `processx` tried to kill it).
#'
#' `$kill_tree()` performs process tree cleanup, it kills the process
#' (if still alive), together with any child (or grandchild, etc.)
#' processes. It uses the _ps_ package, so that needs to be installed,
#' and _ps_ needs to support the current platform as well. Process tree
#' cleanup works by marking the process with an environment variable,
#' which is inherited in all child processes. This allows finding
#' descendents, even if they are orphaned, i.e. they are not connected
#' to the root of the tree cleanup in the process tree any more.
#' `$kill_tree()` returns a named integer vector of the process ids that
#' were killed, the names are the names of the processes (e.g. `"sleep"`,
#' `"notepad.exe"`, `"Rterm.exe"`, etc.).
#'
#' `$wait()` waits until the process finishes, or a timeout happens.
#' Note that if the process never finishes, and the timeout is infinite
#' (the default), then R will never regain control. It returns
#' the process itself, invisibly. In some rare cases, `$wait()` might take
#' a bit longer than specified to time out. This happens on Unix, when
#' another package overwrites the processx SIGCHLD signal handler, after the
#' processx process has started. One such package is parallel, if used
#' with fork clusters, e.g. through `parallel::mcparallel()`.
#'
#' `$get_pid()` returns the process id of the process.
#'
#' `$get_exit_status` returns the exit code of the process if it has
#' finished and `NULL` otherwise. On Unix, in some rare cases, the exit
#' status might be `NA`. This happens if another package (or R itself)
#' overwrites the processx SIGCHLD handler, after the processx process
#' has started. In these cases processx cannot determine the real exit
#' status of the process. One such package is parallel, if used with
#' fork clusters, e.g. through the `parallel::mcparallel()` function.
#'
#' `$get_start_time()` returns the time when the process was
#' started.
#'
#' `$is_supervised()` returns whether the process is being tracked by
#' supervisor process.
#'
#' `$supervise()` if passed `TRUE`, tells the supervisor to start
#' tracking the process. If `FALSE`, tells the supervisor to stop
#' tracking the process. Note that even if the supervisor is disabled for a
#' process, if it was started with `cleanup = TRUE`, the process will
#' still be killed when the object is garbage collected.
#'
#' `$read_output()` reads from the standard output connection of the
#' process. If the standard output connection was not requested, then
#' then it returns an error. It uses a non-blocking text connection. This
#' will work only if `stdout="|"` was used. Otherwise, it will throw an
#' error.
#'
#' `$read_error()` is similar to `$read_output`, but it reads
#' from the standard error stream.
#'
#' `$read_output_lines()` reads lines from standard output connection
#' of the process. If the standard output connection was not requested, then
#' then it returns an error. It uses a non-blocking text connection. This
#' will work only if `stdout="|"` was used. Otherwise, it will throw an
#' error.
#'
#' `$read_error_lines()` is similar to `$read_output_lines`, but
#' it reads from the standard error stream.
#'
#' `$has_input_connection()` return `TRUE` if there is a connection
#' object for standard input; in other words, if `stdout="|"`. It returns
#' `FALSE` otherwise.
#'
#' `$has_output_connection()` returns `TRUE` if there is a connection
#' object for standard output; in other words, if `stdout="|"`. It returns
#' `FALSE` otherwise.
#'
#' `$has_error_connection()` returns `TRUE` if there is a connection
#' object for standard error; in other words, if `stderr="|"`. It returns
#' `FALSE` otherwise.
#'
#' `$has_poll_connection()` return `TRUE` if there is a poll connection,
#' `FALSE` otherwise.
#'
#' `$get_input_connection()` returns a connection object, to the
#' standard input stream of the process.
#'
#' `$get_output_connection()` returns a connection object, to the
#' standard output stream of the process.
#'
#' `$get_error_conneciton()` returns a connection object, to the
#' standard error stream of the process.
#'
#' `$get_poll_connetion()` returns the poll connection, if the process has
#' one.
#'
#' `$is_incomplete_output()` return `FALSE` if the other end of
#' the standard output connection was closed (most probably because the
#' process exited). It return `TRUE` otherwise.
#'
#' `$is_incomplete_error()` return `FALSE` if the other end of
#' the standard error connection was closed (most probably because the
#' process exited). It return `TRUE` otherwise.
#'
#' `$read_all_output()` waits for all standard output from the process.
#' It does not return until the process has finished.
#' Note that this process involves waiting for the process to finish,
#' polling for I/O and potentically several `readLines()` calls.
#' It returns a character scalar. This will return content only if
#' `stdout="|"` was used. Otherwise, it will throw an error.
#'
#' `$read_all_error()` waits for all standard error from the process.
#' It does not return until the process has finished.
#' Note that this process involves waiting for the process to finish,
#' polling for I/O and potentically several `readLines()` calls.
#' It returns a character scalar. This will return content only if
#' `stderr="|"` was used. Otherwise, it will throw an error.
#'
#' `$read_all_output_lines()` waits for all standard output lines
#' from a process. It does not return until the process has finished.
#' Note that this process involves waiting for the process to finish,
#' polling for I/O and potentically several `readLines()` calls.
#' It returns a character vector. This will return content only if
#' `stdout="|"` was used. Otherwise, it will throw an error.
#'
#' `$read_all_error_lines()` waits for all standard error lines from
#' a process. It does not return until the process has finished.
#' Note that this process involves waiting for the process to finish,
#' polling for I/O and potentically several `readLines()` calls.
#' It returns a character vector. This will return content only if
#' `stderr="|"` was used. Otherwise, it will throw an error.
#'
#' `$write_input()` writes the character vector (separated by `sep`) to
#' the standard input of the process. It will be converted to the specified
#' encoding. This operation is non-blocking, and it will return, even if
#' the write fails (because the write buffer is full), or if it suceeds
#' partially (i.e. not the full string is written). It returns with a raw
#' vector, that contains the bytes that were not written. You can supply
#' this raw vector to `$write_input()` again, until it is fully written,
#' and then the return value will be `raw(0)` (invisibly).
#'
#' `$get_input_file()` if the `stdin` argument was a filename,
#' this returns the absolute path to the file. If `stdin` was `"|"` or
#' `NULL`, this simply returns that value.
#'
#' `$get_output_file()` if the `stdout` argument was a filename,
#' this returns the absolute path to the file. If `stdout` was `"|"` or
#' `NULL`, this simply returns that value.
#'
#' `$get_error_file()` if the `stderr` argument was a filename,
#' this returns the absolute path to the file. If `stderr` was `"|"` or
#' `NULL`, this simply returns that value.
#'
#' `$poll_io()` polls the process's connections for I/O. See more in
#' the _Polling_ section, and see also the [poll()] function
#' to poll on multiple processes.
#'
#' `$get_result()` returns the result of the post processesing function.
#' It can only be called once the process has finished. If the process has
#' no post-processing function, then `NULL` is returned.
#'
#' `$as_ps_handle()` returns a [ps::ps_handle] object, corresponding to
#' the process. The following methods use the ps package on a `ps_handle`
#' object, created automatically:
#' - `p$get_name()` calls [ps::ps_name()] to get the process name.
#' - `p$get_exe()` calls [ps::ps_exe()] to get the path of the executable.
#' - `p$get_cmdline()` calls [ps::ps_cmdline()] to get the command line.
#' - `p$get_status()` calls [ps::ps_status()] to get the process status.
#' - `p$get_username()` calls [ps::ps_username()] to get the username.
#' - `p$get_wd()` calls [ps::ps_cwd()] to get the current working
#'   directory.
#' - `p$get_cpu_times()` calls [ps::ps_cpu_times()] to get CPU usage data.
#' - `p$get_memory_info()` calls [ps::ps_memory_info()] to get memory usage
#'    data.
#' - `p$suspend()` calls [ps::ps_suspend()] to suspend the process.
#' - `p$resume()` calls [ps::ps_resume()] to resume a suspended process.
#'
#' `format(p)` or `p$format()` creates a string representation of the
#' process, usually for printing.
#'
#' `print(p)` or `p$print()` shows some information about the
#' process on the screen, whether it is running and it's process id, etc.
#'
#' @section Polling:
#' The `poll_io()` function polls the standard output and standard
#' error connections of a process, with a timeout. If there is output
#' in either of them, or they are closed (e.g. because the process exits)
#' `poll_io()` returns immediately.
#'
#' In addition to polling a single process, the [poll()] function
#' can poll the output of several processes, and returns as soon as any
#' of them has generated output (or exited).
#'
#' @importFrom R6 R6Class
#' @name process
#' @examples
#' # CRAN does not like long-running examples
#' \dontrun{
#' p <- process$new("sleep", "2")
#' p$is_alive()
#' p
#' p$kill()
#' p$is_alive()
#'
#' p <- process$new("sleep", "2")
#' p$is_alive()
#' Sys.sleep(3)
#' p$is_alive()
#' }
#'
NULL

#' @export

process <- R6Class(
  "process",
  cloneable = FALSE,
  public = list(

    initialize = function(command = NULL, args = character(),
      stdin = NULL, stdout = NULL, stderr = NULL, connections = list(),
      poll_connection = NULL, env = NULL, cleanup = TRUE,
      cleanup_tree = FALSE, wd = NULL, echo_cmd = FALSE, supervise = FALSE,
      windows_verbatim_args = FALSE, windows_hide_window = FALSE,
      encoding = "",  post_process = NULL)

      process_initialize(self, private, command, args, stdin,
                         stdout, stderr, connections, poll_connection,
                         env, cleanup, cleanup_tree, wd, echo_cmd,
                         supervise, windows_verbatim_args,
                         windows_hide_window, encoding, post_process),

    finalize = function() {
      if (!is.null(private$tree_id) && private$cleanup_tree &&
          ps::ps_is_supported()) self$kill_tree()
    },

    kill = function(grace = 0.1)
      process_kill(self, private, grace),

    kill_tree = function(grace = 0.1)
      process_kill_tree(self, private, grace),

    signal = function(signal)
      process_signal(self, private, signal),

    interrupt = function()
      process_interrupt(self, private),

    get_pid = function()
      process_get_pid(self, private),

    is_alive = function()
      process_is_alive(self, private),

    wait = function(timeout = -1)
      process_wait(self, private, timeout),

    get_exit_status = function()
      process_get_exit_status(self, private),

    format = function()
      process_format(self, private),

    print = function()
      process_print(self, private),

    get_start_time = function()
      process_get_start_time(self, private),

    is_supervised = function()
      process_is_supervised(self, private),

    supervise = function(status)
      process_supervise(self, private, status),

    ## Output

    read_output = function(n = -1)
      process_read_output(self, private, n),

    read_error = function(n = -1)
      process_read_error(self, private, n),

    read_output_lines = function(n = -1)
      process_read_output_lines(self, private, n),

    read_error_lines = function(n = -1)
      process_read_error_lines(self, private, n),

    is_incomplete_output = function()
      process_is_incompelete_output(self, private),

    is_incomplete_error = function()
      process_is_incompelete_error(self, private),

    has_input_connection = function()
      process_has_input_connection(self, private),

    has_output_connection = function()
      process_has_output_connection(self, private),

    has_error_connection = function()
      process_has_error_connection(self, private),

    has_poll_connection = function()
      process_has_poll_connection(self, private),

    get_input_connection =  function()
      process_get_input_connection(self, private),

    get_output_connection = function()
      process_get_output_connection(self, private),

    get_error_connection = function()
      process_get_error_connection(self, private),

    read_all_output = function()
      process_read_all_output(self, private),

    read_all_error = function()
      process_read_all_error(self, private),

    read_all_output_lines = function()
      process_read_all_output_lines(self, private),

    read_all_error_lines = function()
      process_read_all_error_lines(self, private),

    write_input = function(str, sep = "\n")
      process_write_input(self, private, str, sep),

    get_input_file = function()
      process_get_input_file(self, private),

    get_output_file = function()
      process_get_output_file(self, private),

    get_error_file = function()
      process_get_error_file(self, private),

    poll_io = function(timeout)
      process_poll_io(self, private, timeout),

    get_poll_connection = function()
      process_get_poll_connection(self, private),

    get_result = function()
      process_get_result(self, private),

    as_ps_handle = function()
      process_as_ps_handle(self, private),

    get_name = function()
      ps_method(ps::ps_name, self),

    get_exe = function()
      ps_method(ps::ps_exe, self),

    get_cmdline = function()
      ps_method(ps::ps_cmdline, self),

    get_status = function()
      ps_method(ps::ps_status, self),

    get_username = function()
      ps_method(ps::ps_username, self),

    get_wd = function()
      ps_method(ps::ps_cwd, self),

    get_cpu_times = function()
      ps_method(ps::ps_cpu_times, self),

    get_memory_info = function()
      ps_method(ps::ps_memory_info, self),

    suspend = function()
      ps_method(ps::ps_suspend, self),

    resume = function()
      ps_method(ps::ps_resume, self)
  ),

  private = list(

    command = NULL,       # Save 'command' argument here
    args = NULL,          # Save 'args' argument here
    cleanup = NULL,       # cleanup argument
    cleanup_tree = NULL,  # cleanup_tree argument
    stdin = NULL,         # stdin argument or stream
    stdout = NULL,        # stdout argument or stream
    stderr = NULL,        # stderr argument or stream
    pstdin = NULL,        # the original stdin argument
    pstdout = NULL,       # the original stdout argument
    pstderr = NULL,       # the original stderr argument
    cleanfiles = NULL,    # which temp stdout/stderr file(s) to clean up
    wd = NULL,            # working directory (or NULL for current)
    starttime = NULL,     # timestamp of start
    echo_cmd = NULL,      # whether to echo the command
    windows_verbatim_args = NULL,
    windows_hide_window = NULL,

    status = NULL,        # C file handle

    supervised = FALSE,   # Whether process is tracked by supervisor

    stdin_pipe = NULL,
    stdout_pipe = NULL,
    stderr_pipe = NULL,
    poll_pipe = NULL,

    encoding = "",

    env = NULL,

    connections = list(),

    post_process = NULL,
    post_process_result = NULL,
    post_process_done = FALSE,

    tree_id = NULL,

    get_short_name = function()
      process_get_short_name(self, private)
  )
)

## See the C source code for a discussion about the implementation
## of these methods

process_wait <- function(self, private, timeout) {
  "!DEBUG process_wait `private$get_short_name()`"
  .Call(c_processx_wait, private$status, as.integer(timeout))
  invisible(self)
}

process_is_alive <- function(self, private) {
  "!DEBUG process_is_alive `private$get_short_name()`"
  .Call(c_processx_is_alive, private$status)
}

process_get_exit_status <- function(self, private) {
  "!DEBUG process_get_exit_status `private$get_short_name()`"
  .Call(c_processx_get_exit_status, private$status)
}

process_signal <- function(self, private, signal) {
  "!DEBUG process_signal `private$get_short_name()` `signal`"
  .Call(c_processx_signal, private$status, as.integer(signal))
}

process_interrupt <- function(self, private) {
  "!DEBUG process_interrupt `private$get_short_name()`"
  if (os_type() == "windows") {
    pid <- as.character(self$get_pid())
    st <- run(get_tool("interrupt"), c(pid, "c"), error_on_status = FALSE)
    if (st$status == 0) TRUE else FALSE
  } else {
    .Call(c_processx_interrupt, private$status)
  }
}

process_kill <- function(self, private, grace) {
  "!DEBUG process_kill '`private$get_short_name()`', pid `self$get_pid()`"
  ret <- .Call(c_processx_kill, private$status, as.numeric(grace))
  if (!is.null(p <- private$poll_pipe)) .Call(c_processx_connection_close, p)
  ret
}

process_kill_tree <- function(self, private, grace) {
  "!DEBUG process_kill_tree '`private$get_short_name()`', pid `self$get_pid()`"
  if (!ps::ps_is_supported()) {
    stop(structure(
      list(message = "kill_tree is not supported on this platform"),
      class = c("not_implemented", "error", "condition")))
  }

  ret <- get("ps_kill_tree", asNamespace("ps"))(private$tree_id)
  if (!is.null(p <- private$poll_pipe)) .Call(c_processx_connection_close, p)
  ret
}

process_get_start_time <- function(self, private) {
  format_unix_time(private$starttime)
}

process_get_pid <- function(self, private) {
  .Call(c_processx_get_pid, private$status)
}

process_is_supervised <- function(self, private) {
  private$supervised
}

process_supervise <- function(self, private, status) {
  if (status && !self$is_supervised()) {
    supervisor_watch_pid(self$get_pid())
    private$supervised <- TRUE

  } else if (!status && self$is_supervised()) {
    supervisor_unwatch_pid(self$get_pid())
    private$supervised <- FALSE
  }
}

process_get_result <- function(self, private) {
  if (self$is_alive()) stop("Process is still alive")
  if (!private$post_process_done && is.function(private$post_process)) {
    private$post_process_result <- private$post_process()
    private$post_process_done <- TRUE
  }
  private$post_process_result
}

process_as_ps_handle <- function(self, private) {
  ps::ps_handle(self$get_pid(), self$get_start_time())
}

ps_method <- function(fun, self) {
  fun(ps::ps_handle(self$get_pid(), self$get_start_time()))
}
r-pkgs/processx documentation built on Oct. 7, 2018, 9:09 a.m.