#' @include execution.r help.r comm_manager.r logging.r utils.r
Kernel <- setRefClass(
fields = list(
connection_info = 'list',
zmqctx = 'externalptr',
sockets = 'list',
executor = 'Executor',
comm_manager = 'CommManager'),
methods = list(
hb_reply = function() {
data <- zmq.msg.recv(sockets$hb, unserialize = FALSE)
zmq.msg.send(data, sockets$hb, serialize = FALSE)
sign_msg = function(msg_lst) {
"Sign messages"
concat <- unlist(msg_lst)
hmac(connection_info$key, concat, 'sha256')
wire_to_msg = function(parts) {
"Deserialize a message"
i <- 1
while (!identical(parts[[i]], charToRaw('<IDS|MSG>'))) {
i <- i + 1
if (!identical(connection_info$key, '')) {
signature <- rawToChar(parts[[i + 1]])
expected_signature <- sign_msg(parts[(i + 2):(i + 5)])
stopifnot(identical(signature, expected_signature))
# Convert the four key parts of the message to strings and parse the JSON
header <- fromRawJSON(parts[[i + 2]])
parent_header <- fromRawJSON(parts[[i + 3]])
metadata <- fromRawJSON(parts[[i + 4]])
content <- fromRawJSON(parts[[i + 5]])
# ZMQ routing bits
if (i > 1) {
identities <- parts[1:(i - 1)]
} else {
identities <- NULL
header = header,
parent_header = parent_header,
metadata = metadata,
content = content,
identities = identities)
msg_to_wire = function(msg) {
"Serialize a message"
bodyparts <- list(
charToRaw(toJSON(msg$header, auto_unbox = TRUE)),
charToRaw(toJSON(msg$parent_header, auto_unbox = TRUE)),
charToRaw(toJSON(msg$metadata, auto_unbox = TRUE)),
charToRaw(toJSON(msg$content, auto_unbox = TRUE)))
signature <- sign_msg(bodyparts)
new_reply = function(msg_type, parent_msg) {
"Prepare a reply"
header <- list(
msg_id = UUIDgenerate(),
session = parent_msg$header$session,
username = parent_msg$header$username,
# ISO 8601, 6 is the maximum number decimal digits supported by R
date = strftime(as.POSIXlt(Sys.time(), 'UTC'), '%Y-%m-%dT%H:%M:%OS6Z'),
msg_type = msg_type,
version = PROTOCOL_VER)
header = header,
parent_header = parent_msg$header,
identities = parent_msg$identities,
# Ensure this is {} in JSON, not []
metadata = namedlist())
send_response = function(msg_type, parent_msg, socket_name, content) {
"Send a response"
msg <- new_reply(msg_type, parent_msg)
if (grepl('_reply$', msg_type) && is.null(content$status)) {
content$status <- 'ok'
msg$content <- content
socket <- sockets[[socket_name]]
zmq.send.multipart(socket, msg_to_wire(msg), serialize = FALSE)
log_debug('Sending msg %s.%s', socket_name, msg$header$msg_type)
handle_shell = function() {
"React to a shell message coming in"
parts <- zmq.recv.multipart(sockets$shell, unserialize = FALSE)
msg <- wire_to_msg(parts)
# protocol 5.0: send busy/idle around all of these
send_response('status', msg, 'iopub', list(
execution_state = 'busy'))
comm_info_request = comm_manager$on_comm_info_request(msg),
comm_open = comm_manager$on_comm_open(msg),
comm_msg = comm_manager$on_comm_msg(msg),
comm_close = comm_manager$on_comm_close(msg),
execute_request = executor$execute(msg),
kernel_info_request = kernel_info(msg),
history_request = history(msg),
complete_request = complete(msg),
is_complete_request = is_complete(msg),
inspect_request = inspect(msg),
shutdown_request = shutdown(msg),
log_debug(c('Got unhandled msg_type:', msg$header$msg_type)))
send_response('status', msg, 'iopub', list(
execution_state = 'idle'))
abort_shell_msg = function() {
"Send an abort message for an incoming shell request"
# See
parts <- zmq.recv.multipart(sockets$shell, unserialize = FALSE)
msg <- wire_to_msg(parts)
log_debug('Aborting msg of type %s', msg$header$msg_type)
reply_type <- paste0(unlist(strsplit(msg$header$msg_type, '_'))[1], '_reply')
reply_content <- list(status = 'aborted')
send_response(reply_type, msg, 'shell', reply_content)
log_debug('Aborted msg')
abort_queued_messages = function() {
"Abort all already queued shell messages after an error"
log_debug('abort loop: aborted all outstanding msg')
while (TRUE) {
log_debug('abort loop: before poll')
ret <- zmq.poll(
c(sockets$shell), # only shell channel
c(ZMQ.PO()$POLLIN), # type
0) # zero timeout, only what's already there
log_debug('abort loop: after poll')
if (bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)) {
log_debug('abort loop: found msg')
} else {
# no more messages...
log_debug('abort loop: breaking')
log_debug('abort loop: end')
handle_stdin = function() {
"React to a stdin message coming in"
# wait for 'input_reply' response message
while (TRUE) {
log_debug('stdin loop: beginning')
zmq.poll(c(sockets$stdin), # only stdin channel
c(ZMQ.PO()$POLLIN)) # type
if (bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)) {
log_debug('stdin loop: found msg')
parts <- zmq.recv.multipart(sockets$stdin, unserialize = FALSE)
msg <- wire_to_msg(parts)
} else {
# else shouldn't be possible
log_error('stdin loop: zmq.poll returned but no message found?')
is_complete = function(request) {
"Checks whether the code in the rest is complete"
code <- request$content$code
message <- tryCatch({
# the code compiles, so we are complete (either no code at all / only
# comments or syntactically correct code)
}, error = function(e) e$message)
# One of 'complete', 'incomplete', 'invalid', 'unknown'
status <- if (message == 'complete') {
# syntactical complete code
} else if (grepl(gettext('unexpected end of input', domain = 'R'), message, fixed = TRUE)) {
# missing closing parenthesis
} else if (grepl(gettextf('unexpected %s', 'INCOMPLETE_STRING', domain = 'R'), message, fixed = TRUE)) {
# missing closing quotes
} else {
# all else
content <- list(status = status)
if (status == 'incomplete') {
# we don't try to guess the indention level and just return zero indention
# That's fine because R has braces... :-)
# TODO: do some guessing?
content <- c(content, indent = '')
send_response('is_complete_reply', request, 'shell', content)
complete = function(request) {
# 5.0 protocol:
code <- request$content$code
cursor_pos <- request$content$cursor_pos
comps <- completions(code, cursor_pos)
send_response('complete_reply', request, 'shell', list(
matches = as.list(comps$comps), # make single strings not explode into letters
metadata = namedlist(),
cursor_start = comps$start,
cursor_end = comps$end))
inspect = function(request) {
# 5.0 protocol:
code <- request$content$code
cursor_pos <- request$content$cursor_pos
title_templates <- list(
'text/plain' = '# %s:\n',
'text/html' = '<h1>%s:</h1>\n')
# Function to add a section to content.
add_new_section <- function(data, section_name, new_data) {
for (mime in names(title_templates)) {
new_content <- new_data[[mime]]
if (is.null(new_content)) next
title <- sprintf(title_templates[[mime]], section_name)
# use paste0 since sprintf cannot deal with format strings > 8192 bytes
data[[mime]] <- paste0(data[[mime]], title, new_content, '\n', sep = '\n')
# Get token under the `cursor_pos`.
# Since `.guessTokenFromLine()` does not check the characters after `cursor_pos`
# check them by a loop. Use get since R CMD check does not like :::
token <- ''
for (i in seq(cursor_pos, nchar(code))) {
token_candidate <- get('.guessTokenFromLine', asNamespace('utils'))(code, i)
if (nchar(token_candidate) == 0) break
token <- token_candidate
data <- namedlist()
if (nchar(token) != 0) {
# In many cases `get(token)` works, but it does not
# in the cases such as `token` is a numeric constant or a reserved word.
# Therefore `eval()` is used here.
obj <- tryCatch(eval(parse(text = token), envir = .GlobalEnv), error = function(e) NULL)
class_data <- if (!is.null(obj)) IRdisplay::prepare_mimebundle(class(obj))$data
print_data <- if (!is.null(obj)) IRdisplay::prepare_mimebundle(obj)$data
# `help(token)` is not used here because it does not works
# in the cases `token` is in `pkg::topic`or `pkg:::topic` form.
help_data <- tryCatch({
help_obj <- eval(parse(text = paste0('?', token)))
}, error = function(e) NULL)
# only show help if we have a function
if ('function' %in% class(obj) && !is.null(help_data)) {
data <- help_data
} else {# any of those that are NULL are automatically skipped
data <- add_new_section(data, 'Class attribute', class_data)
data <- add_new_section(data, 'Printed form', print_data)
data <- add_new_section(data, 'Help document', help_data)
found <- length(data) != 0
send_response('inspect_reply', request, 'shell', list(
found = found,
data = data,
metadata = namedlist()))
history = function(request) {
send_response('history_reply', request, 'shell', list(history = list()))
kernel_info = function(request) {
rversion <- paste0(version$major, '.', version$minor)
send_response('kernel_info_reply', request, 'shell', list(
protocol_version = PROTOCOL_VER,
implementation = 'IRkernel',
implementation_version = as.character(packageVersion('IRkernel')),
language_info = list(
name = 'R',
codemirror_mode = 'r',
pygments_lexer = 'r',
mimetype = 'text/x-r-source',
file_extension = '.r',
version = rversion),
banner = version$version.string))
handle_control = function() {
log_debug('Control: beginning')
parts <- zmq.recv.multipart(sockets$control, unserialize = FALSE)
msg <- wire_to_msg(parts)
log_debug('Control: recv msg of type %s', msg$header$msg_type)
if (msg$header$msg_type == 'shutdown_request') {
log_debug('Control: shutdown...')
} else {
log_debug(paste('Unhandled control message, msg_type:', msg$header$msg_type))
shutdown = function(request) {
send_response('shutdown_reply', request, 'control', list(
restart = request$content$restart))
# Always call the base quit() during shutdown since execution shadows it.
backup_env$base_quit('no') # bound during startup in .onLoad
initialize = function(connection_file) {
if (is.character(connection_file)) connection_file <- file(connection_file)
connection_info <<- fromJSON(connection_file)
stopifnot(connection_info$transport %in% c('tcp', 'ipc'))
url <- paste0(connection_info$transport, '://', connection_info$ip)
url_with_port <- function(port_name) {
sep <- switch(connection_info$transport, tcp = ':', ipc = '-')
paste0(url, sep, connection_info[[port_name]])
# ZMQ Socket setup
zmqctx <<-
sockets <<- list(
hb = zmq.socket(zmqctx, ZMQ.ST()$REP),
iopub = zmq.socket(zmqctx, ZMQ.ST()$PUB),
control = zmq.socket(zmqctx, ZMQ.ST()$ROUTER),
stdin = zmq.socket(zmqctx, ZMQ.ST()$ROUTER),
shell = zmq.socket(zmqctx, ZMQ.ST()$ROUTER))
# Enable handover:
for (router in sockets[c('control', 'stdin', 'shell')]) {
zmq.setsockopt(router, ZMQ.SO()$ROUTER_HANDOVER, 1L)
zmq.bind(sockets$hb, url_with_port('hb_port'))
zmq.bind(sockets$iopub, url_with_port('iopub_port'))
zmq.bind(sockets$control, url_with_port('control_port'))
zmq.bind(sockets$stdin, url_with_port('stdin_port'))
zmq.bind(sockets$shell, url_with_port('shell_port'))
executor <<- Executor$new(
send_response = .self$send_response,
handle_stdin = .self$handle_stdin,
abort_queued_messages = .self$abort_queued_messages)
comm_manager <<- CommManager$new(send_response = .self$send_response)
runtime_env$comm_manager <- comm_manager
run = function() {
options(jupyter.in_kernel = TRUE)
while (TRUE) {
log_debug('main loop: beginning')
r <- tryCatch(
c(sockets$hb, sockets$shell, sockets$control),
rep(ZMQ.PO()$POLLIN, 3),
MC = ZMQ.MC(check.eintr = TRUE)),
interrupt = function(e) list(0L, 'SIGINT'))
log_debug('main loop: after poll. ZMQ code: %s; Errno: %s', r[[1L]], r[[2L]])
if (identical(r[[2L]], 'SIGINT')) {
log_info('main loop: keyboard interrupt caught')
# It's important that these messages are handled one by one in each
# look. The problem is that during the handler, a new zmq.poll could be
# done (and is done in case of errors in a execution request) and this
# invalidates the zmq.poll.get.revents call leading to "funny" results
# with found control message even if there are no control messages. So
# the easiest seems to be to handle this in a big if .. else if .. else
# clause...
if (bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)) {
log_debug('main loop: hb')
} else if (bitwAnd(zmq.poll.get.revents(2), ZMQ.PO()$POLLIN)) {
log_debug('main loop: shell')
} else if (bitwAnd(zmq.poll.get.revents(3), ZMQ.PO()$POLLIN)) {
log_debug('main loop: control')
} else {
# else shouldn't be possible
log_debug('main loop: zmq.poll returned but no message found?')
log_debug('main loop: end')
