R/scraply.R

#' Scrape urls with llply or elastic map reduce, handling errors
#'
#' This function works like \code{\link{ldply}}, but specifically for page scraping.
#' Like \code{\link{ldply}}, it applies a function over a list (in this case a list of urls)
#' and returns a data.frame. The difference is that scraply includes error handling and logging automagically
#' This saves you a ton of time when you want to quickly write and deploy a page scraper.
#' Happy scraplying!
#'
#' @param urls A character vector of ids/urls to feed to a scraping function
#' @param fx The scraping function to apply across the ids/urls
#' @param format the format of the web page (html or xml)
#' @param chunk_size the number of urls to download at once.
#' @param sleep seconds to sleep between iterations.
#' @param emr whether or not to use Elastic  Map Reduce via segue
#' @param clusterObject a cluster of EC2's generated by segue
#'
#' @export
#'
#' @return
#' A data.frame created by the scraping function (fx), with an added "error" column.
#' Urls that dont return data will have scraped fields filled with NAs.
#'
#' @examples
#' # see example in the README
scraply <- function(urls, fx,
                    chunk_size=1,
                    sleep=0.01, emr=FALSE,
                    clusterObject=NULL) {

    require("plyr")
    require("XML")
    require("RCurl")

    #___________________________________________________#
    # function wrapper
    run_fx <- function(x, the_fx) {
        the_function <- match.fun(the_fx)
        the_function(x)
    }

    # function for generating input data
    generate_input_list <- function(input_urls) {

        # generate ids to sort on
        n <- length(input_urls)
        sort <- 1:n

        if (chunk_size>n){
            chunk_size <- ceiling(n/2)
        }

        # generate list of chunks of urls to scrape
        chunks <- seq(1,n,chunk_size)
        n_chunks <- length(chunks)

        list <- vector("list", n_chunks)
        for (i in 1:n_chunks) {
            if(i==n_chunks) {
                the_chunk <- chunks[i]:n
            } else {
                the_chunk <- chunks[i]:(chunks[i+1]-1)
            }
            list[[i]] <- data.frame(sort = sort[the_chunk], url = input_urls[the_chunk], stringsAsFactors=F)
        }
        return(list)
    }

    # function for parsing html, applying function, and handling errors
    parse_and_handle_errors <- function(d) {
        # CONVERT HTML TO PARSEABLE TREE
        tree <- htmlTreeParse(d$html, useInternalNodes=TRUE)

        # RUN THE FUNCTION
        output <- try(run_fx(x=tree, the_fx=fx), TRUE)

        # HANDLE ALL THE ERRORS
        if (class(output)=='try-error'| is.null(output) | nrow(output)==0 | length(output)==0) {
            cat("!\n")
            warning("had a problem scraping ", d$url, "\n")
            df <- data.frame(error = 1, stringsAsFactors=F)
            df$sort <- d$sort
         } else {
            df <- data.frame(output, stringsAsFactors=F)
            df$sort <- d$sort
            df$error <- 0
         }
        return(df)
    }

    # function for running
    runner <-  function(df) {

        # download a chunk of urls
        df$html <- getURL(df$url)

        # apply the parsing function to the resulting html pages
        ddply(df, .(url), parse_and_handle_errors)
    }

    reducer <- function(df_list) {
        # order dfs from smallest number of columns to greatest number of columns
        # this will ensure that additional columns are filled in by rbind.fill
        col_list <- unlist(lapply(df_list, ncol))
        df_list <- df_list[order(col_list)]

        # reduce data
        output <- rbind.fill(df_list)

        # reorder by sort ids, discard
        output <- output[order(output$sort),]
        output$sort <- NULL
        return(output)
    }


    #___________________________________________________#

    # initialize data
    list <- generate_input_list(urls)

    # announce scraping
    cat("now scraping", length(urls), "pages,", chunk_size, "at a time...\n")
    # start scraping and logging errors
    if(emr) {
        require("segue")
        if(is.null(clusterObject)) {
            stop("must provide clusterObject initialized from segue to run on EMR")
        }
        cat("using emr...\n")
        output <- emrlapply(clusterObject, list, runner)
    } else {
        output <- llply(list, runner, .progress="text")

    }
    cat("reducing output...\n")

    reducer(output)
}
abelsonlive/scraply documentation built on May 10, 2019, 4:09 a.m.