kafka_consumer_class: R6 Class for Kafka Consumer

kafka_consumer_classR Documentation

R6 Class for Kafka Consumer

Description

R6 Class for Kafka Consumer

R6 Class for Kafka Consumer

Public fields

java_consumer

reference to the underlying Java object Initialize

records

If poll() did fetch any messages, they are stored here until the next call to poll().

Note: Consider using consume methods instead of managing records on your own.

Methods

Public methods


Method new()

Create a new consumer object. Instead of kafka_class_consumer$new() one can use kafka_consumer()

Usage
kafka_consumer_class$new()
Returns

self for method chaining


Method finalize()

Code run when object is removed from session

Usage
kafka_consumer_class$finalize()

Method start()

Spin up consumer and connect it to Kafka cluster

Usage
kafka_consumer_class$start()
Returns

self for method chaining


Method end()

Disconnect consumer from Kafka cluster

Usage
kafka_consumer_class$end()
Returns

self for method chaining


Method running()

Whether or not consumer is active (has been started or not)

Usage
kafka_consumer_class$running()
Returns

TRUE/FALSE


Method poll()

Polling for messages

Usage
kafka_consumer_class$poll(timeout_ms = Inf)
Arguments
timeout_ms

number of miliseconds to wait for polling to return messages, defaults to Inf

Returns

the number of records retrieved by last poll


Method commit()

Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.

Usage
kafka_consumer_class$commit(sync = TRUE)
Arguments
sync

synchronous or asynchronous commit

Returns

self for method chaining


Method consume_next()

Consume one message either from the records already fetched from last poll or via initiating a new poll.

Usage
kafka_consumer_class$consume_next(timeout_ms = Inf)
Arguments
timeout_ms

defaults to 'Inf'. Time for which poll will wait for data Passed through to kafka_consumer$poll()


Method consume_loop()

Method that is basically an infinite loop (until the check expression evaluates to FALSE) that will evaluate the supplied expression for each loop.

There are several objects available to the expression supplied:

- messages: a data.frame/data.table with one or more rows - see batch parameter - loop_counter: single number equal the current loop count. - message_counter: single number equal to the number of messages already processed. - start_time: the result of a call to Sys.time() when first the method started

Usage
kafka_consumer_class$consume_loop(
  f = function(loop_env) {     print(loop_env$messages) },
  check = function(loop_env) {     loop_env$meta$loop_counter < 1 },
  loop_env = new.env(),
  batch = FALSE,
  timeout_ms = Inf
)
Arguments
f

loop execution function exepting one argument namely loop_env

check

function that will exept one argument namely loop_env and will evaluate to TRUE or FALSE to either continue or stop processing

loop_env

Environment to store meta info in and pass to loop execution function and check function. Stored information:

'loop_env$meta$start_time' - the result of a call to Sys.time() when consume loop execution started;

'loop_env$meta$loop_counter' - counter that counts the current loop iteration;

'loop_env$meta$message_counter' - counter that counts the number of messages already processed

batch

defaults to FALSE, Kafka's the default is to poll for as much data as one can get given the consumers limits on the number and size of messages as well as the chosen timeout. No matter how many data is returned from a poll the method process - if batch is set to FALSE - return only a single message at a time. If batch is set to TRUE however the msgs data.frame/data.table will contain all messages that were retrieved by the last poll unless consumed already.

timeout_ms

defaults to 'Inf'. Time for which poll will wait for data Passed through to kafka_consumer$poll()


Method props()

Retrieving current current set of properties. If properties are supplied via props parameter thos properties will be set.

Usage
kafka_consumer_class$props(..., .properties = NULL)
Arguments
...

a series of properties provided as key = "values"

.properties

a list of properties provided as .properties = list(key = "values", ...)

Returns

returns a list of properties


Method topics_subscribe()

Subscribe to one or more topics

Usage
kafka_consumer_class$topics_subscribe(topics)
Arguments
topics

character vector defining topics or topic regex to subscribe to

Returns

self for method chaining


Method topics_subscription()

List current subscription

Usage
kafka_consumer_class$topics_subscription()

Method topics_list()

List topics available to consumer

Usage
kafka_consumer_class$topics_list(full = FALSE)
Arguments
full

defaults to FALSE, whether or not to return all data returned fro Java object (TRUE) or only a simple character vector listing the names of the data topics available for consumption (FALSE)


Method topics_seek_to_beginning()

Seek to beginning of all topics subscribed and all partitions

Usage
kafka_consumer_class$topics_seek_to_beginning()

Method topics_seek_to_end()

Seek to end of all topics subscribed and all partitions

Usage
kafka_consumer_class$topics_seek_to_end()

Method topics_offsets()

Usage
kafka_consumer_class$topics_offsets()

Method clone()

The objects of this class are cloneable with this method.

Usage
kafka_consumer_class$clone(deep = FALSE)
Arguments
deep

Whether to make a deep clone.


petermeissner/kafkaesque documentation built on Sept. 28, 2022, 4:30 a.m.