github_user_name   <- "petermeissner"
codecov_user_name  <- github_user_name
travis_user_name   <- github_user_name
appveyor_user_name <- github_user_name
knitr::opts_chunk$set(
  fig.path = "man/figures/README-"
)
options("width" = 110)
options(scipen = 20)

# get package info
tmp <- packageDescription( "kafkaesque" )

package_name <- tmp$Package
cat("#", tmp$Title)
md_list <- 
  function(x, item_string = c("-", "*", "+"), item_ident = c("", "  ", "    ")){

    for ( i in seq_along(x)){

      cat(item_ident[1], item_string[1], " **", names(x)[[i]], "**\n", sep = "")

      if ( is.list(x[[i]]) & !is.null(names(x[[i]])) ){

        lapply(x[[i]], md_list, item_string = item_string[-1], item_ident = item_ident[-1])

      } else {

        for (k in seq_along(x[[i]])) {
          cat(item_ident[-1][1], item_string[-1][1], " ", x[[i]][[k]], "\n", sep = "")
        }

      }
    }  
  }

Status

codecovCheck, Test, Lint

This package is - so far - feature complete and should be functional. This is a structured extract and rewrite from client work. Though core parts of the package are used in production and are 'battle tested' the package presented here is not - so far.

Package has not been published to CRAN. Attempts have been made. CRAN has a policy on package size (<= 5MB) and publishing to CRAN would mean only publishing the R code without any Java dependencies. One can add an install function that downloads the necessary JAR files after installing the package from CRAN. So far no decision has been made if I should pursue this way or not.

filelist.R     <- list.files("R", recursive = TRUE, pattern="\\.R$", ignore.case = TRUE, full.names = TRUE)
filelist.Java  <- list.files("java/kafkaesque/src/main/java/kafkaesque", recursive = TRUE, pattern="\\.java$", ignore.case = TRUE, full.names = TRUE)
filelist.tests   <- list.files("tests", recursive = TRUE, pattern="\\.R$", ignore.case = TRUE, full.names = TRUE)
filelist.cpp <- list.files("src", recursive = TRUE, pattern="\\.cpp$", ignore.case = TRUE, full.names = TRUE)
lines.R      <- unlist(lapply(filelist.R, readLines))
lines.Java   <- unlist(lapply(filelist.Java, readLines, warn=FALSE))
lines.tests  <- unlist(lapply(filelist.tests, readLines))
lines.cpp    <- unlist(lapply(filelist.cpp, readLines))
length.R     <- length(grep("(^\\s*$)|(^\\s*#)|(^\\s*//)", lines.R,  value = TRUE, invert = TRUE))
length.Java  <- length(grep("(^\\s*$)|(^\\s*/*\\*)|(^\\s*#)|(^\\s*//)", lines.Java,  value = TRUE, invert = TRUE))
length.tests <- length(grep("(^\\s*$)|(^\\s*#)|(^\\s*//)", lines.tests,  value = TRUE, invert = TRUE))
length.cpp   <- length(grep("(^\\s*$)|(^\\s*#)|(^\\s*//)", lines.cpp,  value = TRUE, invert = TRUE))

lines of R code: r length.R, lines of Java code: r length.Java, lines of test code: r length.tests

Version

source_files <-
  grep(
    "/R/|/src/|/tests/",
    list.files(recursive = TRUE, full.names = TRUE),
    value = TRUE
  )
last_change <-
  as.character(
    format(max(file.info(source_files)$mtime), tz="UTC")
  )
cat(tmp$Version, "(",last_change,"UTC )")

Description

cat(tmp$Description)

License

cat(tmp$License, "<br>")
cat(tmp$Author)

Citation

cat("```r\n")
cat("citation(\"",package_name,"\")", sep = "")
cat("\n```\n")
cat("```r\n")
print_text <- capture.output(print(citation(package_name), style = "text"))
cat(gsub("_", "", print_text))
cat("\n```\n")

BibTex for citing

cat("```r\n")
cat("toBibtex(citation(\"",package_name,"\"))", sep = "")
cat("\n```\n")
cat("```\n")
cat(as.character(toBibtex(citation(package_name))), sep = "\n")
cat("\n```\n")

Installation

Latest development version from Github:

cat("```r\n")
cat("devtools::install_github(\"petermeissner/",package_name,"\")", sep = "")
cat("\n```\n")

Prerequisites

For the package to work (more precisely do any significant work) it needs a running Kafka that can be reached over network.

A simple way to get a test version and the same version used throughout the README is to run the following docker command (Link to Dockerfile).

docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 petermeissner/kafkatest

Content

library(kafkaesque)
ls("package:kafkaesque")

Alternatives

There are no viable alternatives at the moment, that I know off.

Usage

Consumer

Start Consumer (... Stop, Status)

library("kafkaesque")

# new consumer
consumer <- kafka_consumer()

# starting/connecting - + status
consumer$start()

consumer$running()
consumer$end()$running()
consumer$start()$running()

Properties aka Config

See here for list of consumer properties: https://kafka.apache.org/documentation/#consumerconfigs.

consumer$props()
consumer$props(max.poll.records = 200)

Topics and Subscriptions

# list topics available to consumer
consumer$topics_list()

# subscribe to topic
consumer$topics_subscribe("test500000")
consumer$topics_subscription()

Retrieving a Message

# retrieve next message
consumer$consume_next()

Looping over Messages and Executing Code

# loop over messages and execute code
res <- 
  consumer$consume_loop(
    f     = function(loop_env){ print(loop_env$messages)},
    check = function(loop_env){loop_env$meta$loop_counter < 4},
    batch = TRUE
  )

# having a look at the statistics
res

Looping over Batches of Messages and Executing Code

# loop over batches of messages and execute code
res <- 
  consumer$consume_loop(
    f     = function(loop_env){ print(loop_env$messages); cat("\n")},
    check = function(loop_env){loop_env$meta$message_counter < 1000},
    batch = TRUE
  )

res

Offsets and Seeking

# get current offsets from Kafka
consumer$topics_offsets()

# seek to end of topics 
consumer$topics_seek_to_end()
consumer$topics_offsets()

# seek to beginning of topics
consumer$topics_seek_to_beginning()
consumer$topics_offsets()

Producer

Start Producer (... Stop, Status)

library("kafkaesque")

# new producer
producer <- kafka_producer()

# starting/connecting - + status
producer$start()

producer$running()
producer$end()$running()
producer$start()$running()
producer$restart()$running()

Properties aka Config

See here for list of consumer properties: https://kafka.apache.org/documentation/#producerconfigs.

producer$props()
producer$props(whatever.you.may.want.to.set = "true")

Sending Messages

producer$send(topic = "test", msg = "Die Kuh macht muh.")
producer$send(topic = "test", msg = "Das Schaf macht mäh.")
producer$send(topic = "test", msg = "Das Pferd macht wihiiiiiiiii-pffffff.")

Setting and Getting Java-LogLEvels

kafka_get_log_level()

# one of off, fatal, error, warn, info, debug, trace, all
kafka_set_log_level("info")
producer$start()


# set back to normal
kafka_set_log_level("error")

Admin

Properties aka Config

admin <- kafka_admin()
admin$start()

admin$props()
admin$props(whatever.you.may.want.to.set = "true")

Get List of Topics

admin$topics_list()

Create Topics

admin$topics_list()

topics <- c("chuckle", "chit_chat")
admin$topics_create(
  topic              = topics, 
  partition         = c(1L, 1L), 
  replication_factor = c(1L, 1L)
)

Delete Topics

admin$topics_delete(topics)

Developement Notes

For R development Rstudio was used. For Java development Visual Studio Code lend a helping hand with Maven as build tooling.

For development one packages is needed:

Java sources are in ./java/kafkaesque/ folder - so your Java project should take this as root folder. Building Java sources can be done via Maven: mvn install will compile everything and copy the kafkaesque.jar into the package's ./inst/java/ folder with all its java dependencies.

After Java compilation, the R packages has to be (re-)build and (re-)installed (with most likely re-starting the R session frist (Ctrl-Shift-F10 in Rstudio)).

If developing Java in VScode - as I did here - pressing Ctr-Shift-B should allow to select the two most important tasks: resolving dependencies and compiling the Java code and distributing it to the right places as described above.



petermeissner/kafkaesque documentation built on Sept. 28, 2022, 4:30 a.m.