knitr::opts_chunk$set(echo = TRUE)
Data streams are often processed in a distributed manner using multiple machines or multiple processes. For example, a data stream may be produced by a sensor attached to a remote machine or multiple clustering algorithms run in parallel using several R processes. Another application is to connect to other software components in a stream mining pipeline.
First, we show how socket connections together with the package stream
can be used to
connect multiple processes or machines.
Then we give examples of how
package streamConnect
makes connecting stream mining components more
convenient by providing an interface
to connect stream processing using sockets or web services.
While sockets are only used to connect data steam generating processes,
web services are more versatile and can also be used
to create data stream clustering processes as a service.
The final section of this paper shows how to deploy the server/web service.
The functions write_stream()
and the
class DSD_ReadStream
provided in package stream
can be used for communicate
via connections (files, sockets, URLs, etc.).
In the first example, we manually set up the connection. The example
is useful to understand how sockets work especially for users interested in
implementing their own components using other programming languages or
connecting with other data stream software.
A more convenient way
to do this using package streamConnect
is described later in this paper.
For we find an available port.
port <- httpuv::randomPort() port
The server serves data from a data stream.
We use library callr
to create a separate R process that serves a data
stream creating 10 points every second
using a socket connection, but
you can also put the code in function r_bg()
in a file
called server.R
and run (potentially on a different machine)
it with R CMD BATCH server.R
from the command line.
library(stream) library(callr) rp1 <- r_bg(function(port) { library(stream) stream <- DSD_Gaussians(k = 3, d = 3) blocksize <- 10 con <- socketConnection(port = port, server = TRUE) while (TRUE) { write_stream(stream, con, n = blocksize, close = FALSE) Sys.sleep(1) } close(con) }, args = list(port = port)) rp1
The client consumes the data stream.
We open the connection which starts the data generating process.
Note that streamConnect
is not used here. For convenience, we only use the helper
retry()
defined in streamConnect to make sure the server connections are established.
con <- streamConnect::retry(socketConnection(port = port, open = 'r')) con dsd <- streamConnect::retry(DSD_ReadStream(con))
We poll
all available data (n = -1
) several times. The first request should yield
10 points, the second none and the third request should
yield 20 points (2 seconds).
get_points(dsd, n= -1) get_points(dsd, n= -1) Sys.sleep(2) get_points(dsd, n= -1) close(con)
Here we stop the callr
process. Note that the socket connection is still active and will serve the data in the connection buffer as long as the reading
process keeps the connection open.
rp1$kill()
streamConnect
provides a more convenient way to set up a connection using sockets.
publish_DSD_via_Socket()
creates a socket broadcasting the data stream and
DSD_ReadSocket
creates a DSD
object reading from that socket.
We will use an available port.
port <- httpuv::randomPort() port
We create a DSD process sending data to the port.
library(streamConnect) rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1
Next, we create a DSD that connects to the socket. DSD_ReadSocket()
already performs internally
retries
library(streamConnect) dsd <- DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class")) dsd get_points(dsd, n = 10) plot(dsd) close_stream(dsd)
Closing the stream on the client also closes the connection which may already kill the serving process.
if (rp1$is_alive()) rp1$kill()
Web services are more versatile, they can be used to deploy data stream sources
using publish_DSD_via_WebService()
/DSD_ReadWebservice
or data stream tasks using publish_DSC_via_WebService()
/DSC_WebService
.
Here we only show how to deploy a clusterer, but a DSD can be published
in a similar manner. Larger workflows can be created using DST_Runner
from stream
.
streamConnect
uses the package plumber
to manage web services.
The data is transmitted in serialized form. The default serialization format
it csv
(comma separated values). Other formats are json
and rds
(see plumber::serializer_csv
).
We will use an available port.
port <- httpuv::randomPort() port
Creating a clustering web service process listening for data on the port.
library(streamConnect) rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1
Connect to the web service with a local DSC interface.
library(streamConnect) dsc <- DSC_WebService(paste0("http://localhost", ":", port), verbose = TRUE, config = httr::verbose(info = TRUE)) dsc
Note that the verbose output can help with debugging connection issues.
Cluster some data.
dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) update(dsc, dsd, 500) dsc get_centers(dsc) get_weights(dsc) plot(dsc)
Kill the web service process.
rp1$kill()
Web services and the socket-based server can be easily deployed to any server
or cloud system including containers.
Make sure R and the package streamConnect
and all dependencies are
installed. Create a short R script to start the server/service and deploy it.
library(streamConnect) port = 8001 publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port, background = FALSE)
Web services can also be deployed using a plumber task file. The following call does not create a server, but returns the name of the task file.
publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", serve = FALSE)
Open the file in R studio to deploy it or read the plumber Hosting vignette.
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.