R/connection.R

Defines functions check_listen check_populate_data

##' @importFrom R6 R6Class
toxiproxy_connection <-
  R6::R6Class(
    "toxiproxy_connection",
    public =
      list(
        host = NULL,
        port = NULL,
        url_prefix = NULL,
        version = NULL,

        initialize = function(host = NULL, port = NULL, timeout = NULL) {
          self$host <- host %||% Sys.getenv("TOXIPROXY_HOST", "127.0.0.1")
          self$port <- port %||% as.integer(Sys.getenv("TOXIPROXY_PORT", 8474))
          self$url_prefix <- sprintf("http://%s:%s", self$host, self$port)
          tryCatch({
            response <- httr::GET(self$url("/version"),
                                  if (!is.null(timeout)) httr::timeout(timeout))
            handle_response(response, "While fetching toxiproxy version")
          }, error = function(e) {
            stop(sprintf(
              "toxiproxy does not appear to be running at %s (%s)",
              self$url_prefix, e$message),
              call. = FALSE)
          })
          ## NOTE: this is the only non-JSON endpoint
          self$version <- response_as_text(response)
          if (numeric_version(self$version) < numeric_version("2.0.0")) {
            stop(sprintf("Version %s of toxiproxy server is not supported",
                         self$version))
          }
          ## We should be good from here:
          lockBinding(quote(host), self)
          lockBinding(quote(port), self)
          lockBinding(quote(url_prefix), self)
          lockBinding(quote(version), self)
        },

        url = function(...) {
          file.path(self$url_prefix, ..., fsep = "/")
        },

        reset = function() {
          response <- httr::POST(self$url("reset"))
          handle_response(response, "While resetting")
          invisible(self)
        },

        clear = function() {
          self$delete(NULL)
        },

        create = function(name, upstream, listen = 0, enabled = TRUE) {
          if (is.numeric(upstream)) {
            upstream <- sprintf("%s:%d", self$host, upstream)
          }
          assert_scalar(listen)
          listen <- check_listen(listen)
          hash <- list(name = name,
                       listen = listen,
                       upstream = upstream,
                       enabled = enabled)
          response <- httr::POST(self$url("proxies"), body = to_json(hash))
          prefix <- sprintf("Error creating proxy at %s; %s",
                            listen, name)
          handle_response(response, prefix)
          toxic_proxy$new(response_as_json(response), self)
        },

        ## Helper functions:
        list = function() {
          response <- httr::GET(self$url("proxies"))
          handle_response(response, "While listing proxies")
          dat <- unname(response_as_json(response))
          data.frame(name = vcapply(dat, "[[", "name"),
                     listen = vcapply(dat, "[[", "listen"),
                     upstream = vcapply(dat, "[[", "upstream"),
                     enabled = vlapply(dat, "[[", "enabled"),
                     toxics = lengths(lapply(dat, "[[", "toxics")),
                     stringsAsFactors = FALSE)
        },

        get = function(name) {
          response <- httr::GET(self$url("proxies", name))
          if (httr::status_code(response) == 404) {
            stop(sprintf("'%s' not known to toxiproxy", name))
          }
          handle_response(response, sprintf("Cannot get proxy '%s'", name))
          dat <- response_as_json(response)
          toxic_proxy$new(dat, self)
        },

        exists = function(name) {
          res <- tryCatch(self$get(name), error = function(e) NULL)
          !is.null(res)
        },

        delete = function(name) {
          if (is.null(name)) {
            name <- self$list()$name
          } else {
            assert_character(name)
          }
          for (nm in name) {
            response <- httr::DELETE(self$url("proxies", nm))
            handle_response(response, sprintf("Cannot delete proxy '%s'", nm))
            message("Removed proxy ", nm)
          }
          invisible(self)
        },

        populate = function(data) {
          ## TODO: once we support v2.1.0 we could do this via the
          ## native populate POST endpoint.  The data is set up to
          ## support that natively at the moment.
          data <- check_populate_data(data)
          n <- nrow(data)
          for (i in seq_len(n)) {
            if (!self$exists(data$name[[i]])) {
              self$create(data$name[[i]], data$upstream[[i]],
                          data$listen[[i]])
            }
          }
          invisible(self)
        }
      ))

check_populate_data <- function(data) {
  if (!is.data.frame(data)) {
    stop("Expected a data.frame")
  }
  ## 1. name
  if (!("name" %in% names(data))) {
    stop("Expected column 'name'")
  }
  if (!is.character(data$name)) {
    stop("Column 'name' must be a character vector")
  }

  ## 2. listen
  if (is.null(data$listen)) {
    data$listen <- "127.0.0.1:0"
  } else {
    data$listen <- check_listen(data$listen)
  }

  ## 3. upstream (similar)
  if (is.null(data$upstream)) {
    stop("Expected column 'upstream'")
  } else if (is.numeric(data$upstream)) {
    data$upstream <- check_listen(data$upstream, "upstream")
  }

  extra <- setdiff(names(data), c("name", "listen", "upstream"))
  if (length(extra) > 0L) {
    stop(sprintf("Unknown columns in populate data: %s",
                 paste(extra, collspe = ", ")))
  }
  data
}

check_listen <- function(x, name = "listen") {
  if (is.numeric(x)) {
    assert_nonmissing(x)
    assert_integer_like(x)
    x <- sprintf("127.0.0.1:%d", x)
  } else if (is.character(x)) {
    i <- grepl("^[0-9]+$", x)
    x[i] <- sprintf("127.0.0.1:%s", x)
    err <- !grepl(".+:[0-9]+$", x)
    if (any(err)) {
      stop(sprintf("Invalid string for %s: %s",
                   name, paste(x[err], collapse = ", ")))
    }
  }
  x
}
richfitz/toxiproxyr documentation built on May 25, 2017, 3:10 a.m.