R/stream.R

Defines functions stream_tweets2 parse_stream is_user_ids stream_params stream_prep whole_lines quiet_interrupt download_from_stream stream_tweets

Documented in parse_stream stream_tweets stream_tweets2

#' Collect a live stream of Twitter data
#' 
#' @description 
#' Streams public statuses to a file via one of the following four methods:
#' 
#' 1. Sampling a small random sample of all publicly available tweets
#' 2. Filtering via a search-like query (up to 400 keywords)
#' 3. Tracking via vector of user ids (up to 5000 user_ids)
#' 4. Location via geo coordinates (1-360 degree location boxes)
#' 
#' Learn more in `vignette("stream", package = "rtweet")`
#' 
#' @inheritParams lookup_users
#' @param q Query used to select and customize streaming collection
#'   method.  There are four possible methods:
#'   
#'   1. The default, `q = ""`, returns a small random sample of all 
#'      publicly available Twitter statuses. 
#'   2. To filter by keyword, provide a comma separated character string with 
#'      the desired phrase(s) and keyword(s). 
#'   3. Track users by providing a comma separated list of user IDs or 
#'      screen names. 
#'   4. Use four latitude/longitude bounding box points to stream by geo 
#'      location. This must be provided via a vector of length 4, e.g., 
#'      `c(-125, 26, -65, 49)`.
#' @param timeout Integer specifying number of seconds to stream tweets for.
#'   Stream indefinitely with `timeout = Inf`.
#'    
#'   The stream can be interrupted at any time, and `file_name` will still be
#'   valid file.
#' @param file_name Character with name of file. If not specified,
#'   will write to `stream_tweets.json` in the current working directory.
#' @param append If `TRUE`, will append to the end of `file_name`; if
#'   `FALSE`, will overwrite.
#' @param verbose If `TRUE`, display a progress bar.
#' @param parse Use `FALSE` to opt-out of parsing the tweets.
#' @param ... Other arguments passed in to query parameters.
#' @references <https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/api-reference/get-statuses-sample>,
#'  <https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview>
#' @examples
#' \dontrun{
#' # stream tweets mentioning "election" for 10 seconds
#' e <- stream_tweets("election", timeout = 10)
#' e
#'
#' # Download another 10s worth of data to the same file
#' e <- stream_tweets("election", timeout = 10)
#'
#' # stream tweets about continential USA for 5 minutes
#' usa <- stream_tweets(lookup_coords("usa"), file_name = "usa.json", timeout = 300)
#' 
#' }
#' @return A tibble with one row per tweet
#' @export
#' @references 
#' Stream: <https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/api-reference/get-statuses-sample>
#' Filter: <https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/api-reference/post-statuses-filter>
stream_tweets <- function(q = "",
                          timeout = 30,
                          parse = TRUE,
                          token = NULL,
                          file_name = NULL,
                          verbose = TRUE,
                          append = TRUE,
                          ...) {
  if (is.null(file_name)) {
    file_name <- "stream_tweets.json"
    inform(paste0("Writing to '", file_name, "'"))
  }
  output <- file(file_name)
  
  prep <- stream_prep(token, q, ...)
  stream <- curl::curl(prep$url, handle = prep$handle)

  quiet_interrupt(download_from_stream(stream, output, 
    timeout = timeout,
    verbose = verbose
  ))
  
  if (parse) {
    df <- jsonlite::stream_in(file(file_name), verbose = FALSE)
    tibble::as_tibble(df)
  } else {
    invisible(NULL)
  }
}

download_from_stream <- function(stream, output, append = TRUE, timeout = 10, verbose = TRUE) {
  if (!timeout) {
    timeout <- Inf
  }
  stopifnot(is.numeric(timeout), timeout > 0)
  stop_time <- Sys.time() + timeout
  
  n_seen <- 0
  if (verbose) {
    pb <- progress::progress_bar$new(
      total = NA,
      show_after = 0,
      format = "Streaming tweets: :n tweets written / :bytes / :rate / :elapsedfull"
    )
  }

  open(stream, "rb") 
  withr::defer(close(stream))
  
  open(output, if (append) "ab" else "b")
  withr::defer(close(output))

  lines <- list(lines = character(), fragment = "")
  while (isIncomplete(stream) && Sys.time() < stop_time) {
    buf <- readBin(stream, raw(), 64 * 1024)
    if (length(buf) == 0) {
      if (verbose()) {
        pb$tick()
      }
      Sys.sleep(0.25)
      next
    }
  
    text <- rawToChar(buf)
    lines <- whole_lines(text, lines$fragment)
    
    # only keep tweets
    # TODO: process more messages from 
    # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
    json <- lapply(lines$lines, jsonlite::fromJSON)
    is_tweet <- vapply(json, function(x) has_name(x, "created_at"), logical(1))

    n_seen <- n_seen + sum(is_tweet)
    if (verbose) {
      pb$tick(length(buf), tokens = list(n = n_seen))
    }
  
    writeLines(lines$lines[is_tweet], output, useBytes = TRUE)
  }
  
  if (verbose) {
    cat("\n")
  }

  invisible()
}

quiet_interrupt <- function(code) {
  tryCatch(code, interrupt = function(e) NULL)
}

whole_lines <- function(text, fragment = "") {
  lines <- strsplit(text, "\r\n")[[1]]
  lines[[1]] <- paste0(fragment, lines[[1]])
    
  n <- length(lines)
  complete <- grepl("\r\n$", text)
  if (!complete) {
    fragment <- lines[[n]]
    lines <- lines[-n]
  } else {
    fragment <- ""
  }
  
  # Drop empty keep-alive lines
  lines <- lines[lines != ""]

  list(lines = lines, fragment = fragment)
}

stream_prep <- function(token, q = "", ..., filter_level = "none") {
  token <- check_token(token)
  stopifnot(is.atomic(q) || inherits(q, "coords"))
  
  if (identical(q, "")) {
    path <- "1.1/statuses/sample.json"
    params <- NULL
  } else {
    path <- "1.1/statuses/filter.json"
    params <- stream_params(q, ..., filter_level = filter_level)
  }

  url <- httr::modify_url("https://stream.twitter.com", 
    path = path, 
    query = params
  )
    
  handle <- curl::new_handle()
  curl::handle_setheaders(handle, .list = token$sign("GET", url)$headers)

  list(url = url, handle = handle)
}

stream_params <- function(stream, ...) {
  if (inherits(stream, "coords")) {
    params <- list(locations = paste(stream$box, collapse = ","))
  } else if ((length(stream) %% 4 == 0) && is.numeric(stream)) {
    params <- list(locations = paste(stream, collapse = ","))
  } else if (is_user_ids(stream)) {
    params <- list(follow = stream, ...)
  } else {
    params <- list(track = stream, ...)
  }
  
  params
}

is_user_ids <- function(x) {
  if (length(x) == 1L && grepl(",", x)) {
    x <- strsplit(x, "\\,")[[1]]
  }
  isTRUE(all(!is.na(suppressWarnings(as.numeric(x)))))
}


# Deprecated -----------------------------------------------------------------

#' Converts Twitter stream data (JSON file) into parsed data frame.
#'
#' @description 
#' `r lifecycle::badge("deprecated")`
#' Please use `jsonlite::stream_in()` instead.
#'
#' @param path Character, name of JSON file with data collected by
#'   [stream_tweets()].
#' @param ... Other arguments passed on to internal data_from_stream
#'   function.
#' @export
#' @keywords internal
parse_stream <- function(path, ...) {
  lifecycle::deprecate_stop("1.0.0", "parse_stream()", "jsonlite::stream_in()")
}

#' A more robust version of stream_tweets
#'
#' @description 
#' `r lifecycle::badge("deprecated")`
#' Please use [stream_tweets()] instead.
#'
#' @param dir Name of directory in which json files should be written.
#'   The default, NULL, will create a timestamped "stream" folder in the
#'   current working directory. If a dir name is provided that does not
#'   already exist, one will be created.
#' @param append Logical indicating whether to append or overwrite
#'   file_name if the file already exists. Defaults to FALSE, meaning
#'   this function will overwrite the preexisting file_name (in other
#'   words, it will delete any old file with the same name as
#'   file_name) meaning the data will be added as new lines to file if
#'   pre-existing.
#' @return Returns data as expected using original search_tweets
#'   function.
#' @export
#' @keywords internal
stream_tweets2 <- function(..., dir = NULL, append = FALSE) {
  lifecycle::deprecate_stop("1.0.0", "stream_tweets2()","stream_tweets()")
}
mkearney/rtweet documentation built on Sept. 29, 2021, 12:01 p.m.