R/utils.R

Defines functions .check_header_rate_limit add_context_annotations add_query_prefix .process_qparam .trigger_sleep .check_reset .vwarn .vcat .gen_random_dir create_storage_dir df_to_json create_data_dir check_data_path check_bearer get_tweets make_query

make_query <- function(url, params, bearer_token, max_error = 4, verbose = TRUE) {
  bearer_token <- check_bearer(bearer_token)
  count <- 0
  while (TRUE) {
    if (count >= max_error) {
      stop("Too many errors.")
    }
    pre_time <- Sys.time()
    r <- httr::GET(url, httr::add_headers(Authorization = bearer_token), query = params)
    time_diff <- as.numeric(Sys.time() - pre_time)
    if (time_diff < 1) { ## To prevent #231
      Sys.sleep(1)
    }
    status_code <- httr::status_code(r)
    if (!status_code %in% c(200, 429, 503)) {
      stop(paste("something went wrong. Status code:", httr::status_code(r)))
    }
    if (.check_header_rate_limit(r, verbose = verbose)) {
      count <- count + 1
    }
    if (status_code == 200) {
      break()
    }
    if (status_code == 503) {
      count <- count + 1
      Sys.sleep(count * 5)
    }
    if (status_code == 429) {
      .trigger_sleep(r, verbose = verbose)
      count <- count + 1
    }
  }
  jsonlite::fromJSON(httr::content(r, "text"))
}

get_tweets <- function(params, endpoint_url, page_token_name = "next_token", n, file, bearer_token, data_path, export_query, bind_tweets, verbose, errors = FALSE) {
  # the errors parameter allows to optionally retrieve tweets that throw an error, with the according error description. Currently, only the id-level errors are added to the bound data.frame. All errors are contained in the error_ json files when path is used, however
  
  # Check file storage conditions
  create_storage_dir(data_path = data_path, export_query = export_query, built_query = params[["query"]], start_tweets = params[["start_time"]], end_tweets = params[["end_time"]], verbose = verbose)
  
  # Start Data Collection Loop
  next_token <- ""
  df.all <- data.frame()
  toknum <- 0
  ntweets <- 0
  while (!is.null(next_token)) {
    df <- make_query(url = endpoint_url, params = params, bearer_token = bearer_token, verbose = verbose)
    if (!is.null(data_path)) {
      df_to_json(df, data_path, errors) # piping error options to df_to_json      
    }
    if (!is.null(file) | bind_tweets | is.null(data_path)) {
      ## if data path is null, generate data.frame object within loop
      df.all <- dplyr::bind_rows(df.all, df$data)
      if (errors) {  # error catcher
        if ("errors" %in% names(df)) {
          df.errors <- df$errors %>% dplyr::filter(.data$parameter == "ids") %>% dplyr::select(.data$resource_id, .data$title) %>% dplyr::rename(id = .data$resource_id, error = .data$title)
        } else {
          df.errors <- data.frame(id = character(), error = character()) ## empty
        }
        df.all <- dplyr::bind_rows(df.all, df.errors)
      }
    }
    next_token <- df$meta$next_token #this is NULL if there are no pages left
    if (!is.null(next_token)) {
      params[[page_token_name]] <- next_token
    }
    toknum <- toknum + 1
    if (is.null(df$data)) {
      n_newtweets <- 0
    } else {
      n_newtweets <- nrow(df$data)
    }
    ntweets <- ntweets + n_newtweets
    .vcat(verbose, "Total pages queried: ", toknum, " (tweets captured this page: ", n_newtweets, ").\n",
          sep = ""
    )
    if (ntweets >= n){ # Check n
      df.all <- df.all[seq_len(n),] # remove extra
      .vcat(verbose, "Total tweets captured now reach", n, ": finishing collection.\n")
      break
    }
    if (is.null(next_token)) {
      .vcat(verbose, "This is the last page for", params[["query"]], ": finishing collection.\n")
      break
    }
  }
  
  if (is.null(data_path) & is.null(file)) {
    return(df.all) # return to data.frame
  }
  if (!is.null(file)) {
    saveRDS(df.all, file = file) # save as RDS
    return(df.all) # return data.frame
  }
  
  if (!is.null(data_path) & bind_tweets) {
    return(df.all) # return data.frame
  }
  
  if (!is.null(data_path) &
      is.null(file) & !bind_tweets) {
    .vcat(verbose, "Data stored as JSONs: use bind_tweets function to bundle into data.frame")
  }
}

check_bearer <- function(bearer_token){
  if(missing(bearer_token)){
    stop("bearer token must be specified.")
  }
  if(substr(bearer_token,1,7)=="Bearer "){
    bearer <- bearer_token
  } else{
    bearer <- paste0("Bearer ",bearer_token)
  }
  return(bearer)
}

check_data_path <- function(data_path, file, bind_tweets, verbose = TRUE){
  #warning re data storage recommendations if no data path set
  if (is.null(data_path)) {
    .vwarn(verbose, "Recommended to specify a data path in order to mitigate data loss when ingesting large amounts of data.")
  }
  #warning re data.frame object and necessity of assignment
  if (is.null(data_path) & is.null(file)) {
    .vwarn(verbose, "Tweets will not be stored as JSONs or as a .rds file and will only be available in local memory if assigned to an object.")
  }
  #stop clause for if user sets bind_tweets to FALSE but sets no data path
  if (is.null(data_path) & !bind_tweets) {
    stop("Argument (bind_tweets = FALSE) only valid when a data_path is specified.")
  }
  #warning re binding of tweets when a data path and file path have been set but bind_tweets is set to FALSE
  if (!is.null(data_path) & !is.null(file) & !bind_tweets) {
    .vwarn(verbose, "Tweets will still be bound in local memory to generate .rds file. Argument (bind_tweets = FALSE) only valid when just a data path has been specified.")
  }
  #warning re data storage and memory limits when setting bind_tweets to TRUE 
  if (!is.null(data_path) & is.null(file) & bind_tweets) {
    .vwarn(verbose, "Tweets will be bound in local memory as well as stored as JSONs.")
  }
}

create_data_dir <- function(data_path, verbose = TRUE){
  #create folders for storage
  if (dir.exists(file.path(data_path))) {
    .vwarn(verbose, "Directory already exists. Existing JSON files may be parsed and returned, choose a new path if this is not intended.")
    invisible(data_path)
  }
  dir.create(file.path(data_path), showWarnings = FALSE, recursive = TRUE)
  invisible(data_path)  
}

df_to_json <- function(df, data_path, errors = FALSE){
  # check input
  # if data path is supplied and file name given, generate data.frame object within loop and JSONs
  jsonlite::write_json(df$data,
                       file.path(data_path, paste0("data_", df$data$id[nrow(df$data)], ".json")))
  jsonlite::write_json(df$includes,
                       file.path(data_path, paste0("users_", df$data$id[nrow(df$data)], ".json")))
  if (errors) { # error catcher
    jsonlite::write_json(df$errors,
                         file.path(data_path, paste0("errors_", df$data$id[nrow(df$data)], ".json")))
  }
}

create_storage_dir <- function(data_path, export_query, built_query, start_tweets, end_tweets, verbose){
  if (!is.null(data_path)){
    create_data_dir(data_path, verbose)
    if (isTRUE(export_query) & !is.null(built_query)){ # Note export_query is called only if data path is supplied
      # Writing query to file (for resuming)
      filecon <- file(file.path(data_path, "query"))
      writeLines(c(built_query,start_tweets,end_tweets), filecon)
      close(filecon)
    }
  }
}


.gen_random_dir <- function() {
  file.path(tempdir(), paste0(sample(letters, 20), collapse = ""))
}

.vcat <- function(bool, ...) {
  if (bool) {
    cat(...)
  }
}

.vwarn <- function(bool, ...) {
  if (bool) {
    warning(..., call. = FALSE)
  }
}

.check_reset <- function(r, tzone = "") {
  lubridate::with_tz(lubridate::as_datetime(as.numeric(httr::headers(r)$`x-rate-limit-reset`), tz = tzone), tzone)
}

.trigger_sleep <- function(r, verbose = TRUE, really_sleep = TRUE, ref_time = Sys.time(), tzone = "") {
  reset_time <- .check_reset(r, tzone = tzone)
  ## add 1s as buffer
  sleep_period <- ceiling(as.numeric(reset_time - ref_time, units = "secs")) + 1
  if (sleep_period < 0) {
    ## issue #213
    .vcat(verbose, "Rate limit reached. Cannot estimate adaptive sleep time. Sleeping for 900 seconds. \n")
    sleep_period <- 900
  } else {
    .vcat(verbose, "Rate limit reached. Rate limit will reset at", as.character(reset_time) ,"\nSleeping for", sleep_period ,"seconds. \n")
  }
  if (verbose) {
    pb <- utils::txtProgressBar(min = 0, max = sleep_period, initial = 0)
    for (i in seq_len(sleep_period)) {
      utils::setTxtProgressBar(pb, i)
      if (really_sleep) {
        Sys.sleep(1)
      }
    }
  } else {
    if (really_sleep) {
      Sys.sleep(sleep_period)
    }
  }
  invisible(r)
}

.process_qparam <- function(param, param_str,query) {
  if(!is.null(param)){
    if(isTRUE(param)) {
      query <- paste(query, param_str)
    } else if(param == FALSE) {
      query <- paste(query, paste0("-", param_str))
    }
  }
  return(query)
}

add_query_prefix <- function(x, prefix){
  q <- paste0(prefix, x)
  q <- paste(q, collapse = " OR ")
  q <- paste0("(",q,")")
  return(q)
}

add_context_annotations <- function(params, verbose){
  if(params[["max_results"]] > 100){
    params[["max_results"]] <- 100
    .vcat(verbose, "page_n is limited to 100 due to the restriction imposed by Twitter API\n")
  }
  params[["tweet.fields"]] <- "attachments,author_id,context_annotations,conversation_id,created_at,entities,geo,id,in_reply_to_user_id,lang,public_metrics,possibly_sensitive,referenced_tweets,source,text,withheld"
  params
}


## prophylatic solution to #192
.check_header_rate_limit <- function(r, verbose) {
  if (is.null(httr::headers(r)$`x-rate-limit-remaining`)) {
    return(FALSE)
  }
  if (httr::headers(r)$`x-rate-limit-remaining` == "1") {
    .vwarn(verbose, paste("x-rate-limit-remaining=1. Resets at", .check_reset(r)))
    return(TRUE)
  } else {
    return(FALSE)
  }
}

Try the academictwitteR package in your browser

Any scripts or data that you put into this service are public.

academictwitteR documentation built on March 18, 2022, 6:41 p.m.