Topology: Function to create a topology

Description Usage Arguments Value Additional Info Note Author(s) See Also Examples

View source: R/topology.R

Description

By passing a spout (dataframe) to this function and storing its return object you can start building a topology for a RStorm stream. See codeRStorm for more detailed examples of the use of Topology. The Topology is the most important concept when defining a RStorm stream.

Usage

1
Topology(spout, name = NULL, .verbose = TRUE)

Arguments

spout

a data.frame containing multiple rows of data which are to be iterated through in the stream.

name

an optional name of this topology.

.verbose

an optional boolean to indicate whether you want verbose output or not. Default is TRUE

Value

An object of class Topology which is a list containing the following elements:

spout

the data.frame passed as a spout

bolts

a list of bolts, see Bolt

finailze

the finalize function to be used for the stream

Additional Info

The is.Topology function checks whether an object is of Type Topology and is used internally.

Note

For examples see www.mauritskaptein.com/software/RStorm

Author(s)

Maurits Kaptein

See Also

Bolt, Tuple, RStorm

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	##############################
	# Example of a stream to compare two methods of streaming variance computation:
	##############################

	# Generate some data 
	set.seed(10)
	t <- 100
	x <- rnorm(t,0,1)
	# Look at the variance as computed by var():
	var(x)

	# Start a topology
	topology <- Topology(data.frame(x=x))

	# Bolt for "Sum of Squares Method" with tracking over time
	var.SS <- function(x, ...){
		params <- GetHash("params1")
		if(!is.data.frame(params)){
			params <- list()
			params$n <- params$sum <- params$sum2 <- 0
		}
		n <- params$n + 1
		sum <- params$sum + as.numeric(x[1])
		sum2 <- params$sum2 + as.numeric(x[1]^2)
		if(n>1){
			var <- 1/(n*(n-1)) * (n*sum2 - sum^2)
		} else {
			var <- 0
		}
		SetHash("params1", data.frame(n=n, sum=sum, sum2=sum2, var=var))
		TrackRow("var.SS", data.frame(var=var))
	}


	## Bolt for "Welford's" Method:

	var.Welford <- function(x, ...){
		x <- as.numeric(x[1])
		params <- GetHash("params2")
		if(!is.data.frame(params)){
			params <- list()
			params$M <- x
			params$S <- params$n <- 0
		}
		n <- params$n + 1
		M <- params$M + ( x - params$M) / n
		S <- params$S + (x - params$M)*(x-M)

		if(n>1){
			var <- S / (n-1)
		} else {
			var <- 0
		}
		SetHash("params2", data.frame(n=n, M=M, S=S, var=var))
		TrackRow("var.Welford", data.frame(var=var))
	}

	# Add both topologies to a Stream:
	topology <- AddBolt(topology, Bolt(var.SS))
	topology <- AddBolt(topology, Bolt(var.Welford))
	result <- RStorm(topology)

	# Plot the results over the stream
	plot(c(1:t), GetTrack("var.Welford", result)$var, type="l")
	lines(c(1:t), GetTrack("var.SS", result)$var, col="red")


	# Now the same variance calculation, 
	#    but with a dataset in which the mean is 
	#    very large compared to the variance:
	x2 <- rnorm(t,10^8,1)
	topology2 <- Topology(data.frame(x=x2))
	topology2 <- AddBolt(topology2, Bolt(var.SS))
	topology2 <- AddBolt(topology2, Bolt(var.Welford))
	result2 <- RStorm(topology2)

	# This time the standard SS methods screws up (mind the different y scale):
	# (And mind the fact that the SS method gives NEGATIVE variance)
	plot(c(1:t), GetTrack("var.Welford", result2)$var, type="l", ylim=c(-10, 11))
	lines(c(1:t), GetTrack("var.SS", result2)$var, col="red")

RStorm documentation built on May 2, 2019, 9:14 a.m.

Related to Topology in RStorm...