#' @include json-rpc.R
#' @include logger.R
#'
#' @title Server
#'
#' @description
#' A server is responsible for receiving requests to call an executor
#' method. This is an abstract base class for concrete classes of servers that will
#' each communicate with clients over alternative transports (e.g. stdio, HTTP).
#'
#' R implementation of Executa's
#' [Server](https://github.com/stencila/executa/blob/v1.6.0/src/base/Server.ts)
#' interface.
Server <- R6::R6Class(
"Server",
private = list(
executor = NULL,
log = NULL,
error_file = NULL
),
public = list(
#' @field running A boolean indicating if this server is running
running = FALSE,
#' @description Initialize the server.
#'
#' @param executor The executor to serve
initialize = function(executor = NULL) {
private$executor <- executor
private$log <- logger("rasta:server")
self$running <- FALSE
},
#' @description Get the addresses of the server.
#'
#' A server will usually on have one address type (e.g. `stdio` or `http`)
#' but may have more than one address for each type.
#'
#' @return A list of address types, with one or more addresses for each type.
addresses = function() {
list()
},
#' @description Get the URL of the server.
#'
#' The URL is derived from the first address.
#' Intended mainly for easily adding the address of the
#' server to log messages
#'
#' @return A URL string of the server
url = function() {
addresses <- self$addresses()
if (length(addresses) > 0) {
first <- addresses[1]
paste0(names(first), "://", first)
}
},
#' @description Receive a request.
#'
#' @param request The `JsonRpcRequest` (or JSON/list representation of it) to receive.
#' @param then A function to call with the response.
receive = function(request, then) {
# See the equivalent implementation in Javascript:
# https://github.com/stencila/executa/blob/v1.6.0/src/base/Server.ts#L70
private$log$debug("Received request")
request <- tryCatch(JsonRpcRequest$create(request), error = identity)
# Local function to make the following a little more terse
respond <- function(...) {
response <- JsonRpcResponse$new(...)
private$log$debug("Sending response")
then(response)
}
if (inherits(request, "JsonRpcError"))
respond(error = request)
else if (inherits(request, "error"))
respond(error = JsonRpcError$new(
JsonRpcErrorCode$InternalError,
error$message
))
else if (is.null(private$executor))
respond(error = JsonRpcError$new(
JsonRpcErrorCode$ServerError,
"No executor configured yet for this server"
))
else {
# Handle the request by dispatching to the executor
private$executor$dispatch(
method = request$method,
params = request$params,
then = function(result) {
respond(
id = request$id,
result = result
)
},
catch = function(error) {
# Log error and transform into a JSON-RPC error
message <- if (!is.null(error$message)) error$message else as.character(error)
private$log$error(message)
respond(
id = request$id,
error = JsonRpcError$new(JsonRpcErrorCode$ServerError, message)
)
}
)
}
},
#' @description Read a message.
#'
#' @note
#' Derived classes will usually want to override this method
#' to read a message from a specific transport.
#'
#' @param blocking Should the read be a blocking operation?
#' @returns A message string. An empty string if no message was read.
read = function(blocking = TRUE) return(""),
#' @description Write a message.
#'
#' @note
#' Derived classes will usually want to override this method
#' to write a message to a specific transport.
#'
#' @param message The message to write.
write = function(message) {
},
#' @description Start the server.
#'
#' @note
#' When overriding this method, derived classes will usually want to
#' call this method, i.e `super$start(executor)`
#'
#' @param executor The executor to dispatch to
#' @param background Run the server in the background with this duration (s).
#' between checks for new messages.
start = function(executor, background = -1) {
private$log$debug(paste("Starting server:", self$url()))
private$executor <- executor
# Print error file path in case it is needed
error_file_name <- tempfile(pattern = "rasta-stream-server-", fileext = ".txt")
private$error_file <- file(error_file_name, open = "w")
private$log$debug(paste("Error file path:", error_file_name))
self$running <- TRUE
if (background >= 0) {
private$log$debug(paste("Running server in background with interval (s):", background))
check_later <- function() {
if (!self$running) return()
later::later(function() {
self$check(blocking = FALSE)
check_later()
}, background)
}
check_later()
} else {
while (TRUE) {
stop <- self$check(blocking = TRUE)
if (length(stop) && stop) {
self$stop()
break
}
}
}
invisible(NULL)
},
#' Check the incoming message stream and handle any messages
#'
#' @param blocking Should the read be a blocking operation?
check = function(blocking = TRUE) {
tryCatch({
message <- self$read(blocking)
if (message != "") {
self$receive(message, then = function(response) {
json <- response$serialize()
self$write(json)
})
} else {
# Empty message blocking read indicates stop
if (blocking) return(TRUE)
}
},
warning = function(warning) {
private$log$warn(warning$message)
},
error = function(error) {
# Without the following line, when the parent process is killed,
# the R process would hang, consuming 100% CPU on the call to `log$error`,
# and become a zombie process. This may be due to stderr being
# closed, but attempts to detect that more intelligently did not work
# and this seemingly useless writing of the error to a files
# was the only way found to avoid the hanging.
writeLines(error$message, private$error_file)
# Errors which are caught via `try` or `tryCatch`` do not generate a traceback
# so unfortunately it is not possible to use the `.traceback` function here to add a stack
private$log$error(error$message)
}
)
},
#' @description Stop the server
#'
#' Derived classes may override this method to gracefully
#' shutdown the server e.g. close client connections.
stop = function() {
private$log$debug(paste("Stopping server:", self$url()))
self$running <- FALSE
if (inherits(private$error_file, "connection")) {
try(close(private$error_file))
private$error_file <- NULL
}
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.