R/get.R

Defines functions kinesis_get_records

Documented in kinesis_get_records

#' Get record from a Kinesis Stream
#' @param stream stream name (string)
#' @param region AWS region (string)
#' @param limit number of records to fetch
#' @param shard_id  optional shard id - will pick a random active shard if left empty
#' @param iterator_type shard iterator type
#' @param start_sequence_number for \code{AT_SEQUENCE_NUMBER} and \code{AFTER_SEQUENCE_NUMBER} iterators
#' @param start_timestamp for \code{AT_TIMESTAMP} iterator
#' @note Use this no more than getting sample data from a stream - it's not intended for prod usage.
#' @references \url{https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetRecordsRequest.html}
#' @return character vector that you might want to post-process eg with \code{jsonlite::stream_in}
#' @export
kinesis_get_records <- function(stream, region = 'us-west-1', limit = 25,
                                shard_id,
                                iterator_type = c('TRIM_HORIZON', 'LATEST',
                                                  'AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER',
                                                  'AT_TIMESTAMP'),
                                start_sequence_number, start_timestamp) {

    iterator_type <- match.arg(iterator_type)

    ## prepare Kinesis client
    client <- .jnew('com.amazonaws.services.kinesis.AmazonKinesisClient')
    client$setEndpoint(sprintf('kinesis.%s.amazonaws.com', region))

    ## pick a random shard if no specified
    if (missing(shard_id)) {
        shards <- client$describeStream(stream)
        shards <- sapply(
            as.list(shards$getStreamDescription()$getShards()$toArray()),
            function(x) x$getShardId())
        shards <- sub('^shardId-', '', shards)
        shard_id <- sample(shards, 1)
    }

    ## prepare iterator
    req <- .jnew('com.amazonaws.services.kinesis.model.GetShardIteratorRequest')
    req$setStreamName(stream)
    req$setShardId(.jnew('java/lang/String', shard_id))
    req$setShardIteratorType(iterator_type)
    if (!missing(start_sequence_number)) {
        req$setStartingSequenceNumber(start_sequence_number)
    }
    if (!missing(start_timestamp)) {
        req$setTimestamp(start_timestamp)
    }
    iterator <- client$getShardIterator(req)$getShardIterator()

    ## get records
    req <- .jnew('com.amazonaws.services.kinesis.model.GetRecordsRequest')
    req$setLimit(.jnew('java/lang/Integer', as.integer(limit)))
    req$setShardIterator(iterator)
    res <- client$getRecords(req)$getRecords()

    ## transform from Java to R object
    sapply(res,
           function(x)
               rawToChar(x$getData()$array()))

}

Try the AWR.Kinesis package in your browser

Any scripts or data that you put into this service are public.

AWR.Kinesis documentation built on Aug. 19, 2023, 1:07 a.m.