kinesis_consumer: Run Kinesis Consumer application

View source: R/consumer.R

kinesis_consumerR Documentation

Run Kinesis Consumer application

Description

Run Kinesis Consumer application

Usage

kinesis_consumer(
  initialize,
  processRecords,
  shutdown,
  checkpointing = TRUE,
  updater,
  logfile = tempfile()
)

Arguments

initialize

optional function to be run on startup. Please note that the variables created inside of this function will not be available to eg processRecords, so make sure to store the shared variables in the parent or global namespace

processRecords

function to process records taking a data.frame object with partitionKey, sequenceNumber and data columns as the records argument. Probably you only need the data column from this object

shutdown

optional function to be run when finished processing all records in a shard

checkpointing

if set to TRUE (default), kinesis_consumer will checkpoint after each processRecords call. To disable checkpointing altogether, set this to FALSE. If you want to checkpoint periodically, set this to the frequency in minutes as integer.

updater

optional list of list(s) including frequency (in minutes) and function to be run, most likely to update some objects in the parent or global namespace populated first in the initialize call. If the frequency is smaller than how long the processRecords call runs, it will be triggered once after each processRecords call

logfile

file path of the log file. To disable logging, set log_threshold to something high with the AWR.Kinesis namespace

Note

Don't run this function directly, it is to be called by the MultiLangDaemon. See the package README for more details.

References

https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java

Examples

## Not run: 
log_threshold(FATAL, namespace = 'AWR.Kinesis')
AWS.Kinesis::kinesis_consumer(
  initialize = function() log_info('Loading some data'),
  processRecords = function(records) log_info('Received some records from Kinesis'),
  updater = list(list(1, function() log_info('Updating some data every minute')),
                 list(1/60, function() log_info('This is a high frequency updater call')))
)

## End(Not run)

AWR.Kinesis documentation built on Aug. 19, 2023, 1:07 a.m.