#' R6 Class for Kafka Admin
#'
#' @import data.table
#' @import jsonlite
#' @import rJava R6
#'
#' @export
#'
kafka_admin_class <-
R6::R6Class(
#### options ###############################################################
classname = "kafka_admin_class",
portable = TRUE,
parent_env = asNamespace("kafkaesque"),
#### public ################################################################
public =
list(
## data ################################################################
#' @field java_admin reference to the underlying Java object
java_admin = list(),
## methods #############################################################
#' Initialize
#'
#' @description
#' Create a new admin object.
#' Instead of \code{kafka_admin_class$new()} one can use \code{kafka_admin()}
#'
#' @return \code{self} for method chaining
#'
initialize =
function() {
self$java_admin <- rJava::.jnew("kafkaesque/Kafka_admin")
},
#'
#' @description
#' Code run when object is removed from session
#'
finalize =
function() {
self$java_admin$end()
},
#'
#' @description
#' Spin up admin and connect it to Kafka cluster
#'
#' @return \code{self} for method chaining
#'
start =
function() {
self$java_admin$start()
# return for method chaining
invisible(self)
},
#'
#' @description
#' Disconnect admin from Kafka cluster
#'
#' @return \code{self} for method chaining
#'
end =
function() {
self$java_admin$end()
# return for method chaining
invisible(self)
},
#'
#' @description
#' Disconnect and reconnect admin from Kafka cluster
#'
#' @return \code{self} for method chaining
#'
restart =
function() {
self$java_admin$restart()
# return for method chaining
invisible(self)
},
#'
#' @description
#' Whether or not admin is active (has been started or not)
#'
#' @return TRUE/FALSE
#'
running =
function() {
self$java_admin$running
},
#'
#' @param ... a series of properties provided as \code{key = "values"}
#' @param .properties a list of properties provided as \code{.properties = list(key = "values", ...)}
#'
#' @description
#' Retrieving current current set of properties.
#' If properties are supplied via props parameter thos properties will
#' be set.
#'
#' @return returns a list of properties
#'
#'
props =
function(..., .properties = NULL) {
# ? set properties
if ( !is.null(.properties) ){
self$java_admin$props_set(
.jcastToArray(names(.properties)),
.jcastToArray(format(.properties, scientific = FALSE))
)
} else if ( length(list(...)) > 0 ){
.properties <- list(...)
self$java_admin$props_set(
.jcastToArray(names(.properties)),
.jcastToArray(format(.properties, scientific = FALSE))
)
}
# return properties
jsonlite::fromJSON(
iconv(
x = self$java_admin$props$to_json(),
to = "UTF-8"
)
)
},
#'
#' @description
#' Query and return list of topics.
#'
#'
#' @return returns a character vector of topics
#'
#'
topics_list =
function() {
self$java_admin$topics_list()
},
#'
#' @param topic the topic names to create
#' @param partition the topics number of partitions
#' @param replication_factor the topics replication factor
#'
#' @description
#' Create new topics
#'
#'
#' @return returns a character vector of topics
#'
#'
topics_create =
function(topic, partition = 1, replication_factor = 1) {
self$java_admin$topics_create(
topic = topic,
partitions = as.integer(partition),
replication_factor = as.integer(replication_factor)
)
},
#'
#' @param topic
#'
#' @description
#' Delete topics
#'
#'
#' @return returns a character vector of topics
#'
#'
topics_delete =
function(topic) {
self$java_admin$topics_delete(topic)
}
),
#### private ###############################################################
private = NULL
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.