#' @import rscala
#' @importFrom R6 R6Class
#' @export
#' @name KafkaConsumer
#' @title Creates KafkaConsumer object (R6 class)
#' @description Creates KafkaConsumer object (R6 class)
#' @section Usage:
#' For usage details see \bold{Methods, Arguments and Examples} sections.
#' \preformatted{
#' consumer$poll(interval = 100)
#' }
#' @section Methods:
#' \describe{
#' \item{\code{consumer = KafkaConsumer$new(topic_name, bootstrap_servers, group_id, config = list())}}{}
#' \item{\code{consumer$poll(interval = 100)}}{Fetch data for the topics or partitions specified
#' using one of the subscribe/assign APIs. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially.
#' The last consumed offset can be manually set through #seek(TopicPartition, long) or automatically
#' set as the last committed offset for the subscribed list of partitions}
#' }
#' @section Arguments:
#' \describe{
#' \item{consumer}{A \code{KafkaConsumer} object}
#' \item{topic}{name of a Kafka topic to read messages from}
#' \item{broker_list}{bootstrap_servers for a Kafka cluster}
#' \item{config}{ConsumerConfig parameters (in \code{key = value} format) for kafka client -
#' see them here \url{https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html}}
#' \item{group_id}{group_id for a Kafka client - will be used to understand offsets}
#' \item{interval}{\bold{integer >= 0} scalar value. The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
#' If 0, returns immediately with any records that are available currently in the buffer, else returns empty.}
#' }
KafkaConsumer = R6::R6Class(
classname = "KafkaConsumer",
public = list(
initialize = function(topic,
bootstrap_servers,
group_id,
config = list()) {
# ensure these parameters are strings
bootstrap_servers = as.character(bootstrap_servers)
topic = as.character(topic)
group_id = as.character(group_id)
config_keys = character(0)
config_values = character(0)
if(length(config) > 0) {
config_keys = names(config)
stopifnot(!is.null(config_keys))
config_values = unlist(config, use.names = FALSE, recursive = TRUE)
config_values = as.character(config_values)
stopifnot(length(config_keys) == length(config_values))
}
private$kafka_consumer =
s$do('org.dselivanov.rkafka.RscalaKafkaConsumer')$new(topic, bootstrap_servers,
group_id, config_keys, config_values,
length.one.as.vector = TRUE)
private$kafka_consumer$subscribe()
},
poll = function(interval = 0) {
interval = as.integer(interval)
private$kafka_consumer$poll(interval)
},
unsubsribe = function() {
private$kafka_consumer$unsubscribe()
},
# finalizer - implicitly unsubsribe
finalize = function() {
private$kafka_consumer$unsubscribe()
}
),
private = list(
kafka_consumer = NULL
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.