kafka_consumer_class | R Documentation |
R6 Class for Kafka Consumer
R6 Class for Kafka Consumer
java_consumer
reference to the underlying Java object Initialize
records
If poll() did fetch any messages, they are stored here until the next call to poll().
Note: Consider using consume methods instead of managing records on your own.
new()
Create a new consumer object.
Instead of kafka_class_consumer$new()
one can use kafka_consumer()
kafka_consumer_class$new()
self
for method chaining
finalize()
Code run when object is removed from session
kafka_consumer_class$finalize()
start()
Spin up consumer and connect it to Kafka cluster
kafka_consumer_class$start()
self
for method chaining
end()
Disconnect consumer from Kafka cluster
kafka_consumer_class$end()
self
for method chaining
running()
Whether or not consumer is active (has been started or not)
kafka_consumer_class$running()
TRUE/FALSE
poll()
Polling for messages
kafka_consumer_class$poll(timeout_ms = Inf)
timeout_ms
number of miliseconds to wait for polling to return messages, defaults to Inf
the number of records retrieved by last poll
commit()
Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.
kafka_consumer_class$commit(sync = TRUE)
sync
synchronous or asynchronous commit
self
for method chaining
consume_next()
Consume one message either from the records already fetched from last poll or via initiating a new poll.
kafka_consumer_class$consume_next(timeout_ms = Inf)
timeout_ms
defaults to 'Inf'. Time for which poll will wait for data Passed through to kafka_consumer$poll()
consume_loop()
Method that is basically an infinite loop (until the check expression evaluates to FALSE) that will evaluate the supplied expression for each loop.
There are several objects available to the expression supplied:
- messages: a data.frame/data.table with one or more rows - see batch parameter - loop_counter: single number equal the current loop count. - message_counter: single number equal to the number of messages already processed. - start_time: the result of a call to Sys.time() when first the method started
kafka_consumer_class$consume_loop( f = function(loop_env) { print(loop_env$messages) }, check = function(loop_env) { loop_env$meta$loop_counter < 1 }, loop_env = new.env(), batch = FALSE, timeout_ms = Inf )
f
loop execution function exepting one argument namely loop_env
check
function that will exept one argument namely loop_env and will evaluate to TRUE or FALSE to either continue or stop processing
loop_env
Environment to store meta info in and pass to loop execution function and check function. Stored information:
'loop_env$meta$start_time' - the result of a call to Sys.time() when consume loop execution started;
'loop_env$meta$loop_counter' - counter that counts the current loop iteration;
'loop_env$meta$message_counter' - counter that counts the number of messages already processed
batch
defaults to FALSE, Kafka's the default is to poll for as much data as one can get given the consumers limits on the number and size of messages as well as the chosen timeout. No matter how many data is returned from a poll the method process - if batch is set to FALSE - return only a single message at a time. If batch is set to TRUE however the msgs data.frame/data.table will contain all messages that were retrieved by the last poll unless consumed already.
timeout_ms
defaults to 'Inf'. Time for which poll will wait for data Passed through to kafka_consumer$poll()
props()
Retrieving current current set of properties. If properties are supplied via props parameter thos properties will be set.
kafka_consumer_class$props(..., .properties = NULL)
...
a series of properties provided as key = "values"
.properties
a list of properties provided as .properties = list(key = "values", ...)
returns a list of properties
topics_subscribe()
Subscribe to one or more topics
kafka_consumer_class$topics_subscribe(topics)
topics
character vector defining topics or topic regex to subscribe to
self
for method chaining
topics_subscription()
List current subscription
kafka_consumer_class$topics_subscription()
topics_list()
List topics available to consumer
kafka_consumer_class$topics_list(full = FALSE)
full
defaults to FALSE, whether or not to return all data returned fro Java object (TRUE) or only a simple character vector listing the names of the data topics available for consumption (FALSE)
topics_seek_to_beginning()
Seek to beginning of all topics subscribed and all partitions
kafka_consumer_class$topics_seek_to_beginning()
topics_seek_to_end()
Seek to end of all topics subscribed and all partitions
kafka_consumer_class$topics_seek_to_end()
topics_offsets()
kafka_consumer_class$topics_offsets()
clone()
The objects of this class are cloneable with this method.
kafka_consumer_class$clone(deep = FALSE)
deep
Whether to make a deep clone.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.