R/collect-gdax-data.R

Defines functions collect_gdax_data

Documented in collect_gdax_data

# This file contains a function to collect GDAX market data
#' @include load-configurations.R

#' @title Collect GDAX market data
#' 
#' @param market_data_table_name The name of the table configured to store ticker market data
#' @param ticker_to_pull The ticker name to pull from the GDAX API
#' @param max_allowed_consecutive_failures The maximum number of consecutive
#' failure to allow before stopping the function
#' @param collection_resolution Frequency to pull data, in seconds
#' @param conn Connection to the database
#' 
#' @description This function will pull and store ticker market data from
#' GDAX's API. It will pull data from the API as frequently as specified by the
#' \code{collection_resolution} option, but if one market datum has the same
#' \code{trade_id} as the previous, then this function will not write a 
#' duplicate row to the \code{market_data_table_name} table.
#' 
#' @export
#' 
collect_gdax_data <- function(market_data_table_name = "market_data.raw_ticker",
                              ticker_to_pull = "btc_usd",
                              max_allowed_consecutive_failures = 100,
                              collection_resolution = 2,
                              conn = default_conn_to_mysql_server()) {
  
  if (substitute(conn) == "default_conn_to_mysql_server()") {
    on.exit(close_conn_to_mysql_server(conn))
  }
  
  # Pull the lookup tables into memory
  ticker_lookup_table   <- get_table(ticker_lookup, conn = conn)
  exchange_lookup_table <- get_table(exchange_lookup, conn = conn)
  
  # Collect data
  consecutive_failures       <- 0
  last_market_datum_trade_id <- 0
  while (consecutive_failures < max_allowed_consecutive_failures) {
    
    # Attempt to pull data
    iteration_start_time <- Sys.time()
    market_datum <- try({
      
      # Pull data from API
      get_gdax_market_datum(ticker = ticker_to_pull,
                            ticker_lookup_table = ticker_lookup_table, 
                            exchange_lookup_table = exchange_lookup_table)
    }, silent = TRUE)
    
    # Attempt to write to the database
    if (class(market_datum) != "try-error" &&
        get_trade_id(market_datum) > last_market_datum_trade_id) {
      
      # Attempt to insert
      insert_result <- try({
        insert_market_datum_to_table(market_datum = market_datum,
                                     table_name = market_data_table_name,
                                     conn = conn)
      }, silent = TRUE)
      
      # Check if inserting was successful and report accordingly
      if (class(insert_result) == "try-error") {
        
        # Report that inserting failed
        iteration_run_time <- as.numeric(difftime(Sys.time(), iteration_start_time, units = "sec"))
        message("Failed to store one ", ticker_to_pull, " observation in ",
                round(iteration_run_time, 3), " seconds")
        consecutive_failures <- consecutive_failures + 1
        
      } else {
        
        # Report that inserting was successful
        iteration_run_time <- as.numeric(difftime(Sys.time(), iteration_start_time, units = "sec"))
        message("Pulled and stored one ", ticker_to_pull, " observation in ",
                round(iteration_run_time, 3), " seconds")
        consecutive_failures <- 0
        
        # Remember the trade ID on the newly inserted market datum
        last_market_datum_trade_id <- get_trade_id(market_datum)
        
      }
      
    } else if (class(market_datum) != "try-error" &&
               get_trade_id(market_datum) <= last_market_datum_trade_id) {
      
      # Report that there hasn't been a change to GDAX's market data
      # since the last pull
      iteration_run_time <- as.numeric(difftime(Sys.time(), iteration_start_time, units = "sec"))
      message("Successfully retreived ", ticker_to_pull, " observation in ",
              round(iteration_run_time, 3), " seconds, but did not insert into ",
              market_data_table_name, " because it was the same as last observation")
      consecutive_failures <- 0
      
    } else {
      
      # Report that hitting the API failed
      iteration_run_time <- as.numeric(difftime(Sys.time(), iteration_start_time, units = "sec"))
      message("Failed to retreive ", ticker_to_pull, " observation from API in ",
              round(iteration_run_time, 3), " seconds")
      consecutive_failures <- consecutive_failures + 1
      
    }
    
    # Wait until the next iteration
    wait_time <- collection_resolution - iteration_run_time
    if (wait_time > 0) {
      message("* waiting ", round(wait_time, 3), " seconds for next iteration")
      Sys.sleep(wait_time)
    }
    
  }
  
  # Email that data collection is no longer running due to too many consecutive errors
  #
  # function to email will go here
  #
  
}
kyleengel/btclearn documentation built on June 7, 2018, 12:26 a.m.