
#' R6 Class for Kafka Consumer
#' @import data.table
#' @import jsonlite
#' @import rJava
#' @export
kafka_consumer_class <-

    #### options ###############################################################
    classname = "kafka_class_consumer",

    portable = TRUE,

    parent_env = asNamespace("kafkaesque"),

    #### public ################################################################

    public =

        ## data ################################################################

        #' @field java_consumer reference to the underlying Java object
        java_consumer = list(),

        ## methods #############################################################

        #' Initialize
        #' @description
        #' Create a new consumer object.
        #' Instead of \code{kafka_class_consumer$new()} one can use \code{kafka_consumer()}
        #' @return \code{self} for method chaining
        initialize =
          function() {
            self$java_consumer <- rJava::.jnew("kafkaesque/Kafka_consumer")
            self$records       <- kafka_records(parent = self)

        #' @description
        #' Code run when object is removed from session
        finalize =
          function() {

        #' @description
        #' Spin up consumer and connect it to Kafka cluster
        #' @return \code{self} for method chaining
        start =
          function() {

            # return for method chaining

        #' @description
        #' Disconnect consumer from Kafka cluster
        #' @return \code{self} for method chaining
        end =
          function() {

            # return for method chaining

        #' @description
        #' Whether or not consumer is active (has been started or not)
        #' @return TRUE/FALSE
        running =
          function() {

        #' @param timeout_ms number of miliseconds to wait for polling to return
        #'  messages, defaults to Inf
        #' @description
        #' Polling for messages
        #' @return the number of records retrieved by last poll
        poll =
          function(timeout_ms = Inf) {
            stopifnot( self$running() == TRUE )

            if (  is.infinite(timeout_ms) ){
            } else {
                obj       = self$java_consumer,
                returnSig = "I",
                method    = "poll",

            # return for method chaining

        #' @param sync synchronous or asynchronous commit
        #' @description
        #' Commit offsets returned on the last poll() for all the subscribed
        #' list of topics and partitions.
        #' @return \code{self} for method chaining
        commit =
          function(sync = TRUE) {

            # execute commit
            if ( sync == TRUE ){
            } else {

            # return for method chaining

        #' @field records
        #' If poll() did fetch any messages, they are stored here until the
        #' next call to poll().
        #' Note: Consider using consume methods instead of managing records on your own.
        records = list(),

        #' @param timeout_ms defaults to `Inf`.
        #'   Time for which poll will wait for data
        #'   Passed through to kafka_consumer$poll()
        #' @description
        #' Consume one message either from the records already fetched from last poll or via initiating a new poll.
        consume_next =
          function ( timeout_ms = Inf ) {
            stopifnot( self$running() == TRUE )
            self$records$next_record( timeout_ms = Inf )

        #' @param check function that will exept one argument namely loop_env
        #'   and will evaluate to TRUE or FALSE to either
        #'   continue or stop processing
        #' @param batch defaults to FALSE, Kafka's the default is to poll for as much
        #'   data as one can get given the consumers limits on the number and
        #'   size of messages as well as the chosen timeout. No matter how many
        #'   data is returned from a poll the method process - if batch is set to
        #'   FALSE - return only a single message at a time. If batch is set to
        #'   TRUE however the msgs data.frame/data.table will contain all messages
        #'   that were retrieved by the last poll unless consumed already.
        #' @param timeout_ms defaults to `Inf`.
        #'   Time for which poll will wait for data
        #'   Passed through to kafka_consumer$poll()
        #' @param f loop execution function exepting one argument namely loop_env
        #' @param loop_env Environment to store meta info in and pass to loop
        #'   execution function and check function. Stored information:
        #'   `loop_env$meta$start_time` -
        #'   the result of a call to Sys.time()
        #'   when consume loop execution started;
        #'   `loop_env$meta$loop_counter` -
        #'   counter that counts the current loop iteration;
        #'   `loop_env$meta$message_counter` -
        #'   counter that counts the number of messages already processed
        #' @description
        #' Method that is basically an infinite loop (until the check expression
        #' evaluates to FALSE) that will evaluate the supplied expression for
        #' each loop.
        #' There are several objects available to the expression supplied:
        #' - messages: a data.frame/data.table with one or more rows - see batch parameter
        #' - loop_counter: single number equal the current loop count.
        #' - message_counter: single number equal to the number of messages already processed.
        #' - start_time: the result of a call to Sys.time() when first the method started
        consume_loop =
          function (
            f          = function(loop_env){print(loop_env$messages)},
            check      = function(loop_env){loop_env$meta$loop_counter < 1},
            loop_env   = new.env(),
            batch      = FALSE,
            timeout_ms = Inf
          ) {

            # set up environments to pass around/share data in check() and f()
            loop_env$meta                 <- new.env()
            loop_env$meta$start_time      <- Sys.time()
            loop_env$meta$loop_counter    <- 0
            loop_env$meta$message_counter <- 0

            # loop while check evaluates to TRUE
            while ( check(loop_env = loop_env) ){

              loop_env$meta$loop_counter <-
                loop_env$meta$loop_counter + 1

              if ( batch == TRUE ){

                loop_env$messages <-
                  self$records$next_record_batch(timeout_ms = timeout_ms)

              } else {

                loop_env$messages <-
                  self$records$next_record(timeout_ms = timeout_ms)


              f(loop_env = loop_env)

              loop_env$meta$message_counter <-
                loop_env$meta$message_counter + nrow(loop_env$messages)

            # last steps
            loop_env$meta$end_time <- Sys.time()
            loop_env$meta <- as.list(loop_env$meta)
            loop_env      <- as.list(loop_env)

            # return

        #' @param ... a series of properties provided as \code{key = "values"}
        #' @param .properties a list of properties provided as  \code{.properties = list(key = "values", ...)}
        #' @description
        #' Retrieving current current set of properties.
        #' If properties are supplied via props parameter thos properties will
        #' be set.
        #' @return returns a list of properties
        props =
          function(..., .properties = NULL) {

            # ? set properties
            if ( !is.null(.properties) ){
                .jcastToArray(format(.properties, scientific = FALSE))
            } else if ( length(list(...)) > 0 ){
              .properties <- list(...)
                .jcastToArray(format(.properties, scientific = FALSE))

            # return properties
                x  = self$java_consumer$props$to_json(),
                to = "UTF-8"

        #' @description
        #' Subscribe to one or more topics
        #' @param topics character vector defining topics or topic regex to subscribe to
        #' @return \code{self} for method chaining
        topics_subscribe =
          function(topics) {

            stopifnot( self$running() == TRUE )

            topics_string_array <- .jarray(topics)

            # return for method chaining

        #' @description
        #' List current subscription
        topics_subscription =
          function() {

            stopifnot( self$running() == TRUE )


        #' @param full defaults to FALSE, whether or not to return all data
        #'   returned fro Java object (TRUE) or only a simple character vector
        #'   listing the names of the data topics available for consumption
        #'   (FALSE)
        #' @description
        #' List topics available to consumer
        topics_list =
          function(full=FALSE) {

            stopifnot( self$running() == TRUE )

            tmp <- jsonlite::fromJSON(self$java_consumer$topics_list())
            if ( full == FALSE ) {
              topics <- unique(unlist(lapply(tmp, `[[`, "topic")))
              return(topics[!grepl("^__", topics)])
            } else {
              return ( tmp)

        #' @description
        #' Seek to beginning of all topics subscribed and all partitions
        topics_seek_to_beginning =
            # execute seeking

        #' @description
        #' Seek to end of all topics subscribed and all partitions
        topics_seek_to_end =
            # execute seeking

        #' @description
        topics_offsets =
          function() {
            obj <- self$java_consumer$topics_offsets()

            # return
              topic     = obj$topics,
              partition = obj$partitions,
              offset    = obj$offsets


    #### private ###############################################################

    private = NULL
