Simulate a Streaming Process in [R]

Share:

Description

While streaming processing provides opportunities to deal with extremely large and ever growing data sets in (near) real time, the development of streaming algorithms for complex models is often cumbersome: the software packages that facilitate streaming processing in production environments do not provide statisticians with the simulation, estimation, and plotting tools they are used to. Developers of streaming algorithms would thus benefit from the flexibility of [R] to create, plot and compute data while developing streaming algorithms. RStorm implements a streaming architecture modeled on Storm for easy development and testing of streaming algorithms in [R]. Package RStorm is not intended as a production package, but rather a development tool for streaming algorithms. See the below examples for some of the usages of RStorm for the development and comparison of streaming algorithms.

Details of the package, examples of streaming algorithms, and examples of the use of RStorm can be found at http://software.mauritskaptein.com/RStorm

Details

Package: RStorm
Type: Package
Version: 0.9
Date: 2013-07-26
License: GPL-2

Author(s)

Maurits Kaptein

Maintainer: Maurits Kaptein <maurits@mauritskaptein.com>

See Also

ddply RStorm Topology

Examples

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
##############################
# a simple stream to compute a sum:
##############################

# create some data:
x <- seq(1, 1000)

# start a topology
topology <- Topology(data.frame(x=x))

# define a bolt and add it to the topology
computeSum <- function(x, ...){
	sum <- GetHash("sum")
	if(is.data.frame(sum)){
		x <- sum + (x[1])
	}
	SetHash("sum", x)
}
topology <- AddBolt(topology, Bolt(computeSum))

# Run the stream:
result <- RStorm(topology)

# Inspect the result
print(GetHash("sum", result))

#plot(topology)


##############################
# Example of a stream to compare two 
# methods of streaming variance computation:
##############################

# Generate some data 
set.seed(10)
t <- 100
x <- rnorm(t,0,1)
# Look at the variance as computed by var():
var(x)

# Start a topology
topology <- Topology(data.frame(x=x))

# Bolt for "Sum of Squares Method" with tracking over time
var.SS <- function(x, ...){
	params <- GetHash("params1")
	if(!is.data.frame(params)){
		params <- list()
		params$n <- params$sum <- params$sum2 <- 0
	}
	n <- params$n + 1
	sum <- params$sum + as.numeric(x[1])
	sum2 <- params$sum2 + as.numeric(x[1]^2)
	if(n>1){
		var <- 1/(n*(n-1)) * (n*sum2 - sum^2)
	} else {
		var <- 0
	}
	SetHash("params1", data.frame(n=n, sum=sum, sum2=sum2, var=var))
	TrackRow("var.SS", data.frame(var=var))
}


## Bolt for "Welford's" Method:

var.Welford <- function(x, ...){
	x <- as.numeric(x[1])
	params <- GetHash("params2")
	if(!is.data.frame(params)){
		params <- list()
		params$M <- x
		params$S <- params$n <- 0
	}
	n <- params$n + 1
	M <- params$M + ( x - params$M) / n
	S <- params$S + (x - params$M)*(x-M)

	if(n>1){
		var <- S / (n-1)
	} else {
		var <- 0
	}
	SetHash("params2", data.frame(n=n, M=M, S=S, var=var))
	TrackRow("var.Welford", data.frame(var=var))
}

# Add both topologies to a Stream:
topology <- AddBolt(topology, Bolt(var.SS))
topology <- AddBolt(topology, Bolt(var.Welford))
result <- RStorm(topology)

# Plot the results over the stream
plot(c(1:t), GetTrack("var.Welford", result)$var, type="l")
lines(c(1:t), GetTrack("var.SS", result)$var, col="red")


### Similar, but with a dataset
###  in which the mean is very large compared to the variance:
x2 <- rnorm(t,10^8,1)
topology2 <- Topology(data.frame(x=x2))
topology2 <- AddBolt(topology2, Bolt(var.SS))
topology2 <- AddBolt(topology2, Bolt(var.Welford))
result2 <- RStorm(topology2)

# This time the standard SS methods screws up (mind the different y scale):
# (And mind the fact that the SS method gives NEGATIVE variance)
plot(c(1:t), GetTrack("var.Welford", result2)$var, type="l", ylim=c(-10, 11))
lines(c(1:t), GetTrack("var.SS", result2)$var, col="red")