concurrent version without involving promises or spiders:

spider_site_async <- async(function(
                        start_pages, #starting URLs
                        regexp,    # Linked URLs must match this to be
                        limit=500) { # maximum number of pages to collect

  #all encountered pages will be collected in this hash table
  known <- new.env()
  seen <- 0

  # inner helper function:
  is_new_page <- function(url) {
    seen < limit && !exists(url, known) && grepl(regexp, url)
  }

  # define inner recursive function
  visit_page <- async(function(url) {
    known[[url]] <- NULL
    seen <<- seen + 1

    # Fetch the page
    start_time <- Sys.time()
    cat("visiting ", url, "\n")
    data <- curl_fetch_multi(url)
    end_time <- Sys.time()

    links <- extract_links(data)

    #store this page's results in the hash table
    known[[url]] <<- list(url=url, start=start_time, end=end_time, links=list(links))

    #recursively follow new links, if within the site filter
    links |> Filter(f=is_new_page) |> lapply(visit_page)  |> promise_all() |> await()
  })

  #finally kick off the spider
  start_pages |> lapply(visit_page)

  #return our hash table as a data frame; "links" is a list-column
  (known
    |> lapply(\(x) data.frame(x[c("url", "start", "end")], links=I(list(links))))
    |> do.call(what=rbind))
})
spider_site_async <- function(page, filter) {
  #all encountered pages will be collected in this hash table
  known <- new.env()
  seen <- 0

  # define inner recursive function
  request_page <- function(url) {

    is_new_page <- function(url) {
      seen < limit && !exists(url, known) && !grepl(filter, url)
    }

    process_error <- function(data) {
      end_time <- Sys.time()
      known[[page]] <- list(url=url, start=start_time, end=end_time, error=error)
    }

    process_page <- function(data) {
      end_time <- Sys.time()
      #extract links
      links <- extract_links(data)
      #store data inthe hash table
      known[[url]] <<- list(url=url, start=start_time, end=end_time, links=list(links))
          #recursively follow new links, if within the site filter
      for (link in links) {
        if (is_new_page(link, known, filter)) {
          request_page(link)
        }
      }
    }

    seen <<- seen + 1
    cat("Fetching", url, "\n")
    start_time <- Sys.time()
    data <- curl_fetch_multi(url, process_page, process_error)
  }

  #finally kick off the spider, and return the hashtable
  for (page in start_pages) visit_page(page)
  repeat {
    multi_run()
    if (length(multi_list()) == 0) break
  }

  return(as.list(known))
}

This amounts to a pretty big structure of the program we started with! I'm here to show you async package which provides a way to abstract away most of the needed changes.

Interfacing curl with async

The curl package has a non-blocking API. You can make a non-blocking request for a page by calling curl_fetch_multi(url). By "nonblocking" what is meant is that calling this function queues up the request and returns promptly, without waiting for data to arrive. You will have to check in on the status of your request later.

curl_fetch_multi has the option to return its result by invoking callback functions that you you provide. This is ideal for interfacing with async because the promise constructor gives you just the two callbacks you need. So a simple wrapper suffices to make curl return its download in a promise object:

curl_fetch_async <- function() {
  promise(function(resolve, reject) {
    curl::curl_fetch_multi(done=resolve, fail=reject)
}

For example, if you have a list of 100 URLs, you can curl_fetch_multi on all of them, then call multi_run(); libcurl will then download using a pool of concurrent TCP connections.

When used this way the function multi_run() is blocking; it doesn't return data until it's finished downloading all of files you requested.

This won't do for a web spider, because the list of pages to fetch is not known ahead of time. For a spider the links are discovered "on the fly;" each page may add new results to the result set. Moreover, if we block while download a batch at once, we will have to wait until the last page in the batch is finished downloading before discovering any further links; this will add to latency.

For this kind of usage multi_run can also be used in a "non-blocking" manner. The option poll=TRUE means to return as soon as one file is ready, rather than waiting for all of them. The option timeout gives a further limit on the amount of time curl is willing to wait for data to arrive.

When used this way, you will need to call multi_run() over and over again until all the pending data is received. So that leaves us with the problem of who invokes those calls. This is where the event loop comes in -- we hide those calls in the event loop using the later package,

You may have noticed that when you enter a long-running command at R's command line, graph windows will not redraw themselves if you resize them; HTTP help pages will stop loading, and this is because the event loop does not ordinarily run while some other command is running. R's execution model is, at the end of the day, single-threaded.



crowding/generators documentation built on June 28, 2023, 6:14 a.m.