R/appenders.R

Defines functions appender_async appender_kinesis appender_syslognet appender_syslog appender_telegram appender_pushbullet appender_slack appender_tee appender_file

Documented in appender_async appender_file appender_kinesis appender_pushbullet appender_slack appender_syslog appender_syslognet appender_tee appender_telegram

#' Append log record to stderr
#' @param lines character vector
#' @export
#' @aliases appender_stderr
#' @usage
#' appender_console(lines)
#'
#' appender_stderr(lines)
#' @seealso This is a \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_stdout}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_slack}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
appender_console <- structure(function(lines) {
    cat(lines, file = stderr(), sep = '\n')
}, generator = quote(appender_console()))


#' @export
appender_stderr <- appender_console


#' Append log record to stdout
#' @param lines character vector
#' @export
#' @seealso This is a \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_slack}}, \code{\link{appender_pushbullet}}
appender_stdout <- structure(function(lines) {
    cat(lines, sep = '\n')
}, generator = quote(appender_stdout()))


#' Append log messages to a file
#'
#' Log messages are written to a file with basic log rotation: when max number of lines or bytes is defined to be other than \code{Inf}, then the log file is renamed with a \code{.1} suffix and a new log file is created. The renaming happens recursively (eg \code{logfile.1} renamed to \code{logfile.2}) until the specified \code{max_files}, then the oldest file (\code{logfile.{max_files-1}}) is deleted.
#' @param file path
#' @param append boolean passed to \code{cat} defining if the file should be overwritten with the most recent log message instead of appending
#' @param max_lines numeric specifying the maximum number of lines allowed in a file before rotating
#' @param max_bytes numeric specifying the maximum number of bytes allowed in a file before rotating
#' @param max_files integer specifying the maximum number of files to be used in rotation
#' @export
#' @return function taking \code{lines} argument
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_tee}}, \code{\link{appender_slack}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
#' @examples \dontrun{
#' ## ##########################################################################
#' ## simple example logging to a file
#' t <- tempfile()
#' log_appender(appender_file(t))
#' for (i in 1:25) log_info(i)
#' readLines(t)
#'
#' ## ##########################################################################
#' ## more complex example of logging to file
#' ## rotated after every 3rd line up to max 5 files
#'
#' ## create a folder storing the log files
#' t <- tempfile(); dir.create(t)
#' f <- file.path(t, 'log')
#'
#' ## define the file logger with log rotation enabled
#' log_appender(appender_file(f, max_lines = 3, max_files = 5L))
#'
#' ## log 25 messages
#' for (i in 1:25) log_info(i)
#'
#' ## see what was logged
#' lapply(list.files(t, full.names = TRUE), function(t) {
#'   cat('\n##', t, '\n')
#'   cat(readLines(t), sep = '\n')
#' })
#'
#' ## enable internal logging to see what's actually happening in the logrotate steps
#' log_threshold(TRACE, namespace = '.logger')
#' ## run the above commands again
#' }
appender_file <- function(file, append = TRUE, max_lines = Inf, max_bytes = Inf, max_files = 1L) {

    force(file)
    force(append)
    force(max_lines)
    force(max_bytes)
    force(max_files)

    if (!is.integer(max_files) || max_files < 1) {
        stop('max_files must be a positive integer')
    }

    structure(
        function(lines) {
            if (is.finite(max_lines) | is.finite(max_bytes)) {

                fail_on_missing_package('R.utils')

                n_lines <- tryCatch(
                    suppressWarnings(R.utils::countLines(file)),
                    error = function(e) 0)
                n_bytes <- ifelse(file.exists(file), file.info(file)$size, 0)

                if (n_lines >= max_lines || n_bytes >= max_bytes) {
                    log_trace(
                        'lines: %s, max_lines: %s, bytes: %s, max_bytes: %s',
                        n_lines, max_lines, n_bytes, max_bytes,
                        namespace = '.logger')
                    log_trace(
                        'lines >= max_lines || bytes >= max_bytes: %s',
                        n_lines >= max_lines || n_bytes >= max_bytes,
                        namespace = '.logger')
                    for (i in max_files:1) {

                        ## just kill the old file
                        if (i == 1) {
                            log_trace('killing the main file: %s', file, namespace = '.logger')
                            unlink(file)
                        } else {

                            ## rotate the old file
                            new <- paste(file, i - 1, sep = '.')
                            if (i == 2) {
                                old <- file
                            } else {
                                old <- paste(file, i - 2, sep = '.')
                            }

                            if (file.exists(old)) {
                                log_trace('renaming %s to %s', old, new, namespace = '.logger')
                                file.rename(old, new)
                            }

                            ## kill the rotated, but not needed file
                            if (i > max_files) {
                                log_trace('killing the file with too many rotations: %s', new, namespace = '.logger')
                                unlink(new)
                            }

                        }
                    }
                }
            }
            log_trace('logging %s to %s', shQuote(lines), file, namespace = '.logger')
            cat(lines, sep = '\n', file = file, append = append)
        }, generator = deparse(match.call()))
}


#' Append log messages to a file and stdout as well
#'
#' This appends log messages to both console and a file. The same rotation options are available as in \code{\link{appender_file}}.
#' @inheritParams appender_file
#' @export
#' @return function taking \code{lines} argument
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_slack}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
appender_tee <- function(file, append = TRUE, max_lines = Inf, max_bytes = Inf, max_files = 1L) {
    force(append)
    force(max_lines)
    force(max_bytes)
    force(max_files)
    structure(
        function(lines) {
            appender_console(lines)
            appender_file(file, append, max_lines, max_bytes, max_files)(lines)
        }, generator = deparse(match.call()))
}


#' Send log messages to a Slack channel
#' @param channel Slack channel name with a hashtag prefix for public channel and no prefix for private channels
#' @param username Slack (bot) username
#' @param icon_emoji optional override for the bot icon
#' @param api_token Slack API token
#' @param preformatted use code tags around the message?
#' @return function taking \code{lines} argument
#' @export
#' @note This functionality depends on the \pkg{slackr} package.
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
appender_slack <- function(channel      = Sys.getenv('SLACK_CHANNEL'),
                           username     = Sys.getenv('SLACK_USERNAME'),
                           icon_emoji   = Sys.getenv('SLACK_ICON_EMOJI'),
                           api_token    = Sys.getenv('SLACK_API_TOKEN'),
                           preformatted = TRUE) {

    fail_on_missing_package('slackr', '1.4.1')
    force(channel)
    force(username)
    force(icon_emoji)
    force(api_token)
    force(preformatted)

    structure(
        function(lines) {
            slackr::slackr_msg(
                text = lines, channel = channel, username = username,
                icon_emoji = icon_emoji, token = api_token, preformatted = preformatted)
        }, generator = deparse(match.call()))

}


#' Send log messages to Pushbullet
#' @param ... parameters passed to \code{pbPost}, such as \code{recipients} or \code{apikey}, although it's probably much better to set all these in the \code{~/.rpushbullet.json} as per package docs at \url{http://dirk.eddelbuettel.com/code/rpushbullet.html}
#' @export
#' @note This functionality depends on the \pkg{RPushbullet} package.
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_slack}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
#' @export
appender_pushbullet <- function(...) {

    fail_on_missing_package('RPushbullet')

    structure(
        function(lines) {
            RPushbullet::pbPost(type = 'note', body = paste(lines, sep = '\n'), ...)
        }, generator = deparse(match.call()))

}


#' Send log messages to a Telegram chat
#' @param chat_id Unique identifier for the target chat or username of the target channel (in the format @channelusername)
#' @param bot_token Telegram Authorization token
#' @param parse_mode Message parse mode. Allowed values: Markdown or HTML
#' @return function taking \code{lines} argument
#' @export
#' @note This functionality depends on the \pkg{telegram} package.
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_pushbullet}}, \code{\link{appender_syslog}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
appender_telegram <- function(chat_id      = Sys.getenv('TELEGRAM_CHAT_ID'),
                              bot_token    = Sys.getenv('TELEGRAM_BOT_TOKEN'),
                              parse_mode   = NULL) {

    fail_on_missing_package('telegram')
    force(chat_id)
    force(bot_token)
    force(parse_mode)

    tb <- telegram::TGBot$new(token = bot_token)
    structure(
        function(lines) {
            tb$sendMessage(text = lines, parse_mode = parse_mode, chat_id = chat_id)
        }, generator = deparse(match.call()))

}


#' Send log messages to the POSIX system log
#' @param identifier A string identifying the process.
#' @param ... Further arguments passed on to \code{\link[rsyslog]{open_syslog}}.
#' @return function taking \code{lines} argument
#' @export
#' @note This functionality depends on the \pkg{rsyslog} package.
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_kinesis}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
#' @examples \dontrun{
#' if (requireNamespace("rsyslog", quietly = TRUE)) {
#'   log_appender(appender_syslog("test"))
#'   log_info("Test message.")
#' }
#' }
appender_syslog <- function(identifier, ...) {
    fail_on_missing_package('rsyslog')
    rsyslog::open_syslog(identifier = identifier, ...)
    structure(
        function(lines) {
            for (line in lines) {
                rsyslog::syslog(line)
            }
        },
        generator = deparse(match.call())
    )
}


#nocov start
#' Send log messages to a network syslog server
#' @param identifier program/function identification (string).
#' @param server machine where syslog daemon runs (string).
#' @param port port where syslog daemon listens (integer).
#'
#' @return A function taking a \code{lines} argument.
#' @export
#' @note This functionality depends on the \pkg{syslognet} package.
#' @examples \dontrun{
#' if (requireNamespace("syslognet", quietly = TRUE)) {
#'   log_appender(appender_syslognet("test_app", 'remoteserver'))
#'   log_info("Test message.")
#' }
#' }
appender_syslognet <- function(identifier, server, port = 601L) {
  fail_on_missing_package('syslognet')
  force(identifier)
  force(server)
  force(port)
  structure(
    function(lines) {
      sev <- attr(lines, 'severity', exact = TRUE)
      for (line in lines) {
        syslognet::syslog(line, sev, app_name = identifier, server = server, port = port)
      }
    },
    generator = deparse(match.call())
  )
}
#nocov end


#' Send log messages to a Amazon Kinesis stream
#' @param stream name of the Kinesis stream
#' @return function taking \code{lines} and optional \code{partition_key} argument
#' @export
#' @note This functionality depends on the \pkg{botor} package.
#' @seealso This is generator function for \code{\link{log_appender}}, for alternatives, see eg \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}} and \code{\link{appender_async}} for evaluate any \code{\link{log_appender}} function in a background process.
appender_kinesis <- function(stream) {
    fail_on_missing_package('botor')
    force(stream)
    structure(
        function(lines, partition_key = NA_character_) {
            for (line in lines) {
                botor::kinesis()$put_record(StreamName = stream, Data = line, PartitionKey = partition_key)
            }
        },
        generator = deparse(match.call())
    )
}


#' Delays executing the actual appender function to the future in a background process to avoid blocking the main R session
#' @param appender a  \code{\link{log_appender}} function with a \code{generator} attribute (TODO note not required, all fn will be passed if not)
#' @param batch number of records to process from the queue at once
#' @param namespace \code{logger} namespace to use for logging messages on starting up the background process
#' @param init optional function to run in the background process that is useful to set up the environment required for logging, eg if the \code{appender} function requires some extra packages to be loaded or some environment variables to be set etc
#' @return function taking \code{lines} argument
#' @export
#' @note This functionality depends on the \pkg{txtq} and \pkg{callr} packages. The R session's temp folder is used for staging files (message queue and other forms of communication between the parent and child processes).
#' @seealso This function is to be used with an actual \code{\link{log_appender}}, for example \code{\link{appender_console}}, \code{\link{appender_file}}, \code{\link{appender_tee}}, \code{\link{appender_pushbullet}}, \code{\link{appender_telegram}}, \code{\link{appender_syslog}} or \code{\link{appender_kinesis}}.
#' @examples \dontrun{
#' appender_file_slow <- function(file) {
#'   force(file)
#'   function(lines) {
#'     Sys.sleep(1)
#'     cat(lines, sep = '\n', file = file, append = TRUE)
#'   }
#' }
#'
#' ## log what's happening in the background
#' log_threshold(TRACE, namespace = 'async_logger')
#' log_appender(appender_console, namespace = 'async_logger')
#'
#' ## start async appender
#' t <- tempfile()
#' log_info('Logging in the background to {t}')
#' my_appender <- appender_async(appender_file_slow(file = t))
#'
#' ## use async appender
#' log_appender(my_appender)
#' log_info('Was this slow?')
#' system.time(for (i in 1:25) log_info(i))
#'
#' readLines(t)
#' Sys.sleep(10)
#' readLines(t)
#'
#' ## check on the async appender (debugging, you will probably never need this)
#' attr(my_appender, 'async_writer_queue')$count()
#' attr(my_appender, 'async_writer_queue')$log()
#'
#' attr(my_appender, 'async_writer_process')$get_pid()
#' attr(my_appender, 'async_writer_process')$get_state()
#' attr(my_appender, 'async_writer_process')$poll_process(1)
#' attr(my_appender, 'async_writer_process')$read()
#'
#' attr(my_appender, 'async_writer_process')$is_alive()
#' attr(my_appender, 'async_writer_process')$read_error()
#' }
appender_async <- function(appender, batch = 1, namespace = 'async_logger',
                           init = function() log_info('Background process started')) {

    fail_on_missing_package('txtq')
    fail_on_missing_package('callr')

    force(appender)
    force(batch)

    ## create a storage for the message queue
    async_writer_storage <- tempfile()
    log_trace(paste('Async writer storage:', async_writer_storage), namespace = 'async_logger')

    ## initialize the message queue
    async_writer_queue <- txtq::txtq(async_writer_storage)

    ## start a background process for the async execution of the message queue
    ## TODO make it easy to create multiple/parallel background processes?
    async_writer_process <- callr::r_session$new()
    log_trace(paste('Async writer PID:', async_writer_process$get_pid()), namespace = 'async_logger')

    ## load minimum required packages
    async_writer_process$run(function() source(system.file(
        'load-packages-in-background-process.R',
        package = 'logger')))
    async_writer_process$run(init)

    ## connect to the message queue
    async_writer_process$run(assign, args = list(x = 'async_writer_storage', value = async_writer_storage))
    async_writer_process$run(function() async_writer_queue <<- txtq::txtq(async_writer_storage))

    ## pass arguments
    async_writer_process$run(assign, args = list(x = 'batch', value = batch))

    ## pass appender
    async_writer_tempfile <- tempfile()
    saveRDS(appender, async_writer_tempfile)
    log_trace(paste('Async appender cached at:', async_writer_tempfile), namespace = 'async_logger')
    async_writer_process$run(assign, args = list(x = 'async_writer_tempfile', value = async_writer_tempfile))
    async_writer_process$run(assign, args = list(x = 'appender', value = readRDS(async_writer_tempfile)))

    ## start infinite loop processing log records
    async_writer_process$call(function() {
        while (TRUE) {

            items <- async_writer_queue$pop(batch)

            if (nrow(items) == 0) {

                ## avoid burning CPU
                Sys.sleep(.1)

            } else {

                ## execute the actual appender for each log item
                for (i in seq_len(nrow(items))) {
                    appender(items$message[i])
                }

                ## remove processed log records
                async_writer_queue$clean()

            }
        }
    })

    structure(

        function(lines) {

            ## check if background process still works
            if (!isTRUE(async_writer_process$is_alive())) {
                stop('FATAL: Async writer process not found')
            }
            remote_error <- async_writer_process$read_error()
            if (remote_error != '') {
                stop(paste('FATAL: Async writer failed with', shQuote(remote_error)))
            }
            remote_event <- async_writer_process$read()
            if (!is.null(remote_event) && !is.null(remote_event$error)) {
                stop(paste(
                    'FATAL: Async writer error of',
                    shQuote(remote_event$error$message),
                    'in',
                    shQuote(paste(deparse(remote_event$error$call), collapse = ' '))))
            }

            ## write to message queue
            for (line in lines) {
                async_writer_queue$push(title = as.character(as.numeric(Sys.time())), message = line)
            }

        },

        generator = deparse(match.call()),
        ## share remote process and queue with parent for debugging purposes
        async_writer_storage = async_writer_storage,
        async_writer_queue = async_writer_queue,
        async_writer_process = async_writer_process)

    ## NOTE no need to clean up, all will go away with the current R session's temp folder

}

## TODO other appenders: graylog, datadog, cloudwatch, email via sendmailR, ES etc

Try the logger package in your browser

Any scripts or data that you put into this service are public.

logger documentation built on Oct. 19, 2021, 9:07 a.m.