tests/testthat/test-client.R

echo_server <- function(port = httpuv::randomPort()) {
  httpuv::startServer("127.0.0.1", port,
    list(
      onWSOpen = function(ws) {
        ws$onMessage(function(binary, message) {
          ws$send(message)
        })
      }
    )
  )
}
shut_down_server <- function(s) {
  # Run the event loop a few more times to make sure httpuv handles the closed
  # websocket properly.
  for (i in 1:5) later::run_now(0.02)
  s$stop()
}
server_url <- function(server) {
  paste0("ws://", server$getHost(), ":", server$getPort(), "/")
}


test_that("Connection can't be defined with invalid maxMessageSize", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  expect_error(WebSocket$new(url, maxMessageSize=-1), "maxMessageSize must be a non-negative integer")
  expect_error(WebSocket$new(url, maxMessageSize=1:2), "maxMessageSize must be a non-negative integer")
})


check_later <- function(
  # debugging name
  name,
  # checks for validity
  is_valid_fn,
  # ran on valid situation
  on_success_fn,
  # ~ 30 seconds total
  counter = round(30 / sleepTime),
  # check every 0.01 seconds
  sleepTime = 0.01
) {

  found <- FALSE
  check_fn <- function() {
    counter <<- counter - 1
    Sys.sleep(sleepTime)
    if (is_valid_fn()) {
      found <<- TRUE
    }
  }

  while(counter > 0 && (!found)) {
    later::later(check_fn)
    later::run_now(5)
  }

  if (found) {
    on_success_fn()
    return(1)
  } else {
    fail(paste0("Ran out of attempts at checking: ", name))
    return(0)
  }
}

test_that("small maxMessageSizes break simple connections", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  state <- NULL
  didFail <- FALSE

  ws <- WebSocket$new(url, maxMessageSize=2)
  ws$onMessage(function(event) {

  })
  ws$onOpen(function(event) {
    state <<- "open"
  })
  ws$onClose(function(event) {
    state <<- "closed"
  })
  ws$onError(function(event) {
    state <<- "failed"
    didFail <<- TRUE
  })

  check_later("open",
              function() !is.null(state),
              function() identical(state, "open")
  )

  # Make sure the internal state gets set, and the onOpen function gets called.
  expect_equivalent(ws$readyState(), 1L)
  expect_identical(state, "open")

  ws$send("hello")
  check_later("open",
              function() !identical(state, "open"),
              function() didFail
  )
  ws$close()
})

check_ws <- function(wsUrl) {
  state <- NULL
  last <- NULL
  found <- 0

  ws <- WebSocket$new(wsUrl)
  ws$onMessage(function(event) {
    expect_identical(ws, event[["target"]])
    expect_false(is.null(event[["data"]]))
    last  <<- event$data
  })
  ws$onOpen(function(event) {
    expect_identical(ws, event[["target"]])
    state <<- "open"
  })
  ws$onClose(function(event) {
    expect_identical(ws, event[["target"]])
    expect_false(is.null(event[["code"]]))
    expect_false(is.null(event[["reason"]]))
    state <<- "closed"
  })
  ws$onError(function(event) {
    expect_identical(ws, event[["target"]])
    expect_true(is.character(event[["message"]]))
    state <<- "failed"
  })

  check_later("open",
    function() !is.null(state),
    function() identical(state, "open")
  )

  # Make sure the internal state gets set, and the onOpen function gets called.
  expect_equivalent(ws$readyState(), 1L)
  expect_identical(state, "open")

  last <- NULL
  ws$send("hello")
  found <- found + check_later("hello",
    function() !is.null(last),
    function() expect_identical(last, "hello")
  )


  last <- NULL
  ws$send(charToRaw("hello"))
  found <- found + check_later("hello_raw",
    function() !is.null(last),
    function() expect_identical(last, charToRaw("hello"))
  )

  state <- NULL
  ws$close()
  found <- found + check_later("closing",
    function() !is.null(state),
    function() {
      expect_equivalent(ws$readyState(), 3L)
      expect_identical(state, "closed")
    }
  )

  expect_identical(found, 3)
}


test_that("Basic websocket communication", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  check_ws(url)
})

test_that("WebSocket object can be garbage collected", {
  skip_if(
    R.version$major == "3" && grepl("^4\\.", R.version$minor),
    "GC behavior for this test is strange on R 3.4 (but works on older and newer versions of R)"
  )

  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  # Test closed WebSocket is GC'd
  collected <- FALSE
  local({
    ws <- WebSocket$new(url)
    ws$onOpen(function(event) {
      ws$close()
    })
    reg.finalizer(ws, function(obj) {
      collected <<- TRUE
    })
    # Pump events until connection is closed, or up to 10 seconds.
    end_time <- as.numeric(Sys.time()) + 10
    while (ws$readyState() != 3L && as.numeric(Sys.time()) < end_time) {
      later::run_now(0.1)
    }
  })
  gc()
  expect_true(collected)

  # Test WebSocket with failed connection is GC'd
  collected <- FALSE
  local({
    ws <- WebSocket$new("ws://example.com")
    reg.finalizer(ws, function(obj) {
      collected <<- TRUE
    })
    # Pump events until connection is closed, or up to 10 seconds.
    end_time <- as.numeric(Sys.time()) + 10
    while (ws$readyState() != 3L && as.numeric(Sys.time()) < end_time) {
      later::run_now(0.1)
    }
  })
  gc()
  expect_true(collected)
})

test_that("Open is async", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  onOpenCalled <- FALSE
  ws <- WebSocket$new(url)
  ws$onOpen(function(event) {
    onOpenCalled <<- TRUE
    ws$close()
  })
  Sys.sleep(1)
  # Even though the I/O happens on a separate thread, the callback which invokes
  # onOpen and sets readyState runs on the main thread using later(), so without
  # a run_now(), there would be no opportunity for these values to be changed.
  expect_equivalent(ws$readyState(), 0L)
  expect_false(onOpenCalled)

  # Run events until closed state, or timeout.
  end_time <- as.numeric(Sys.time()) + 10
  while (ws$readyState() != 3L && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1)
  }
  expect_equivalent(ws$readyState(), 3L)
  expect_true(onOpenCalled)
})

test_that("Connection errors are reported", {
  error_reported <- FALSE
  ws <- WebSocket$new("ws://example.com")
  ws$onError(function(event) {
    expect_identical(ws, event[["target"]])
    expect_true(is.character(event[["message"]]))
    expect_true(nzchar(event[["message"]]))
    error_reported <<- TRUE
  })

  end_time <- as.numeric(Sys.time()) + 10
  while (ws$readyState() != 3L && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1)
  }
  expect_true(error_reported)
})

test_that("Connect can be delayed", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  # With autoConnect = TRUE (the default), you can miss the onOpen event
  connected <- FALSE
  ws <- WebSocket$new(url)
  end_time <- as.numeric(Sys.time()) + 10
  while (ws$readyState() == 0L && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1)
  }
  ws$onOpen(function(event) {
    connected <<- TRUE
  })
  for (i in 1:20)
    later::run_now(0.1)
  expect_false(connected)
  ws$close()

  # With autoConnect = FALSE, the open event is guaranteed not to fire
  # until after connect() is called
  connected <- FALSE
  ws <- WebSocket$new(url, autoConnect = FALSE)
  for (i in 1:10) {
    later::run_now(0.1)
  }
  ws$connect()
  # It's OK even if onOpen is registered immediately after connect() (in the
  # same tick though), the same guarantee (that connect is asynchronous) as
  # autoConnect = TRUE applies. Note the connection is made on a separate
  # thread, so the websocket could have been open before getting to the next
  # line; however, the callback for onOpen is scheduled with later() on the main
  # thread, so it can't run until a run_now().
  ws$onOpen(function(event) {
    connected <<- TRUE
  })
  expect_false(connected)
  end_time <- as.numeric(Sys.time()) + 10
  while (!connected && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1)
  }
  expect_true(connected)
  ws$close()
})

test_that("WebSocket can be closed before fully open", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  onCloseCalled <- FALSE
  ws <- WebSocket$new(url)
  ws$onClose(function(event) {
    onCloseCalled <<- TRUE
  })
  ws$close()
  for (i in 1:20) {
    if (onCloseCalled)
      break
    later::run_now(0.1)
  }
  expect_equivalent(ws$readyState(), 3L)
  expect_true(onCloseCalled)

  # If no connection attempt is made, then we'll stay in the pre-connectiong
  # state, and the onClose callback won't be invoked.
  onCloseCalled <- FALSE
  ws <- WebSocket$new(url, autoConnect = FALSE)
  ws$onClose(function(event) {
    onCloseCalled <<- TRUE
  })
  ws$close()
  expect_equivalent(ws$readyState(), -1L)
  expect_false(onCloseCalled)
})

test_that("WebSocket event handlers can be registered more than once", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  a_called <- FALSE
  b_called <- FALSE
  c_called <- FALSE
  ws <- WebSocket$new(url, autoConnect = FALSE)
  ws$onOpen(function(event) {
    a_called <<- TRUE
  })
  ws$onOpen(function(event) {
    b_called <<- TRUE
  })
  handle_c <- ws$onOpen(function(event) {
    c_called <<- TRUE
  })
  handle_c() # Unregister
  ws$onOpen(function(event) {
    ws$close()
  })
  ws$connect()
  while (ws$readyState() != 3L)
    later::run_now(1)
  expect_true(a_called)
  expect_true(b_called)
  expect_false(c_called)
})

test_that("WebSocket event handlers can run in private loop", {
  s <- echo_server()
  on.exit(shut_down_server(s))
  url <- server_url(s)

  onOpenCalled <- FALSE
  loop <- later::create_loop(parent = NULL)
  ws <- WebSocket$new(url, loop = loop)
  ws$onOpen(function(event) {
    onOpenCalled <<- TRUE
  })

  # Running main loop shouldn't cause onOpen callback to run.
  for (i in 1:20) {
    later::run_now()
  }
  expect_false(onOpenCalled)

  # Runing the private loop (for the websocket) should cause the onOpen callback
  # to run. We also need to interleave running the main loop so that the httpuv
  # server can handle the connection.
  end_time <- as.numeric(Sys.time()) + 10
  while (!onOpenCalled && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1, loop = loop)
    later::run_now(0.1)
  }
  expect_true(onOpenCalled)

  ws$close()
})

test_that("WebSocket persists after reference is gone, and can be GC'd after connection is closed", {
  # Start a websocket server app where we can send commands to ws_server.
  ws_server <- NULL
  s <- httpuv::startServer("127.0.0.1", httpuv::randomPort(),
    list(
      onWSOpen = function(ws) {
        ws_server <<- ws
      }
    )
  )
  on.exit(shut_down_server(s))
  url <- server_url(s)

  finalized <- FALSE
  ws <- WebSocket$new(url)
  reg.finalizer(ws, function(e) finalized <<- TRUE)

  end_time <- as.numeric(Sys.time()) + 10
  while (ws$readyState() == 0L && as.numeric(Sys.time()) < end_time) {
    later::run_now(0.1)
  }
  rm(ws)
  gc()
  for (i in 1:5) later::run_now(0.02)

  # Connection is still open, so WebSocket shouldn't be GC'd yet.
  expect_false(finalized)

  # If we close the connection from the other side, the WebSocket should get
  # GC'd.
  ws_server$close()
  for (i in 1:5) later::run_now(0.02)
  gc()
  expect_true(finalized)
})



# # Test is failing now that websocket.org is dead
# test_that("Basic ssl websocket communication", {
#   # Don't want network connectivity issues on CRAN to cause package to be
#   # rejected.
#   skip_on_cran()
#   check_ws("wss://echo.websocket.org/")
# })

Try the websocket package in your browser

Any scripts or data that you put into this service are public.

websocket documentation built on Aug. 19, 2021, 1:08 a.m.