Description Usage Arguments Details Value Author(s) Examples
This function repeatedly reads chunks of data from an input connection, packages the data as a data.frame, optionally ensures that all the rows for certain keys are contained in the data.frame, and passes the data.frame to a handler for processing. This continues until the end of file.
1 2 3 4 |
file |
Any file specification accepted by scan |
cols |
A list of column names, as accepted by the 'what' arg to scan |
chunkSize |
Number of lines to read at a time |
FUN |
A function accepting a dataframe with columns given by cols |
ignoreKey |
If TRUE, always passes chunkSize rows to FUN, regardless of whether the chunk has only some of the rows for a given key. If TRUE, the singleKey arg is ignored. |
singleKey |
If TRUE, then each data frame passed to FUN will contain all rows corresponding to a single key. If FALSE, then will contain several complete keys. |
skip |
Number of lines to skip at the beginning of the file. |
sep |
Any separator character accepted by scan |
keyCol |
The column name of the column with the keys. |
PFUN |
Same as FUN, except handles incomplete keys. See below. |
carryMemLimit |
Max memory used for values of a single key |
carryMaxRows |
Max number of values allowed for a key. |
stringsAsFactors |
Whether strings converted to factors. |
debug |
Whether to print debug messages. |
With ignoreKey=TRUE, hsTableReader reads from file, chunkSize lines at a time, packages the lines into a data.frame, and passes the data.frame to FUN. This mode would most commonly be used for the MAPPER job in Hadoop.
Everything below pertains to ignoreKey=FALSE.
With ignoreKey=FALSE, hsTableReader breaks up the data read from file into chunks comprising all rows of the same key. This ASSUMES that all rows with the same key are stored consecutively in the input file. (This is always the case if the input file is taken to be the STDIN pipe to a Hadoop reducer.)
We first disucss the case of PFUN=NULL. This is the recommended setting when all the values for a given key can comfortably fit in memory.
When singleKey=TRUE, FUN is called with all the rows for a single key at a time. When singleKey=FALSE, FUN made be called with rows corresponding to multiple keys, but we guarantee that the data.frame contains all the rows corresponding to any key that appears in the data.frame.
When PFUN != NULL, hsTableReader does not wait to collect all rows for a given key before passing the data to FUN. When hsTableReader reads the first chunk of rows from file, it first passes all the rows of complete keys to FUN. Then, it passes the rows corresponding to the last key, call it PREVKEY, to PFUN. These rows may or may not consist of all rows corresponding to PREVKEY. Then, hsTableReader continues to read chunks from file and pass them to PFUN until it reaches a a new key (i.e. different from PREVKEY). At this point, PFUN is called with an empty data.frame to indicate that it is done handling (key,value) pairs for PREVKEY. Then, as with the first chunk, any complete keys left in the chunk are passed to FUN and the incomplete key is passed to PFUN. The process continues until the end of file.
By using a PFUN function, we can process more values for a given key than can fit into memory. See below for examples of using PFUN.
No return value.
David S. Rosenberg <drosen@sensenetworks.com>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | ## This function is useful as a reader for Hadoop reduce scripts
str <- "key1\t3.9\nkey1\t8.9\nkey1\t1.2\nkey1\t3.9\nkey1\t8.9\nkey1\t1.2\nkey2\t9.9\nkey2\t10.1\nkey3\t1.0\nkey3\t3.4\nkey4\t9.4\n"
cat(str)
cols = list(key='',val=0)
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=TRUE)
close(con)
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=FALSE,singleKey=TRUE)
close(con)
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=FALSE,singleKey=FALSE)
close(con)
## The next two examples compute the mean, by key, in 2 different ways
reducerKeyAtOnce <-function(d) {
key = d[1,'key']
ave = mean(d[,'val'])
cat(key,ave,'\n',sep='\t')
}
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=reducerKeyAtOnce,ignoreKey=FALSE,singleKey=TRUE)
close(con)
reducerManyCompleteKeys <-function(d) {
a=aggregate(d$val,by=list(d$key),FUN=mean)
write.table(a,quote=FALSE,sep='\t',row.names=FALSE,col.names=FALSE)
}
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=reducerManyCompleteKeys,ignoreKey=FALSE,singleKey=FALSE)
close(con)
### ADVANCED: When we have more values for each key than can fit in memory
## Test example to see how the input is broken up
reducerFullKeys <- function(d) {
print("Processing complete keys.")
print(d)
}
reducerPartialKey <- function(d) {
if (nrow(d)==0) {
print("Done with partial key")
} else {
print("Processing partial key...")
print(d)
}
}
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=5,FUN=reducerFullKeys,ignoreKey=FALSE,singleKey=FALSE,PFUN=reducerPartialKey)
close(con)
## Repeats the mean example, with partial key processing
partialSum = 0
partialCnt = 0
partialKey = NA
reducerPartialKey <- function(d) {
if (nrow(d)==0) {
## empty data.frame indicates that we have seen all rows for the previous key
ave = partialSum / partialCnt
cat(partialKey,ave,'\n',sep='\t')
partialSum <<- 0
partialCnt <<- 0
partialKey <<- NA
} else {
if (is.na(partialKey)) partialKey <<- d[1,'key']
partialSum <<- partialSum + sum(d[,'val'])
partialCnt <<- partialCnt + nrow(d)
}
}
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=reducerKeyAtOnce,ignoreKey=FALSE,singleKey=TRUE,PFUN=reducerPartialKey)
close(con)
con <- textConnection(str, open = "r")
hsTableReader(con,cols,chunkSize=6,FUN=reducerManyCompleteKeys,ignoreKey=FALSE,singleKey=FALSE,PFUN=reducerPartialKey)
close(con)
|
Loading required package: getopt
key1 3.9
key1 8.9
key1 1.2
key1 3.9
key1 8.9
key1 1.2
key2 9.9
key2 10.1
key3 1.0
key3 3.4
key4 9.4
key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
6 key1 1.2
key val
1 key2 9.9
2 key2 10.1
3 key3 1.0
4 key3 3.4
5 key4 9.4
key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
6 key1 1.2
key val
7 key2 9.9
8 key2 10.1
key val
9 key3 1.0
10 key3 3.4
key val
11 key4 9.4
key val
1 key1 3.9
key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
6 key1 1.2
7 key2 9.9
8 key2 10.1
9 key3 1.0
10 key3 3.4
key val
11 key4 9.4
key1 4.666667
key2 10
key3 2.2
key4 9.4
key1 3.9
key1 4.66666666666667
key2 10
key3 2.2
key4 9.4
[1] "Processing partial key..."
key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
[1] "Processing partial key..."
key val
1 key1 1.2
[1] "Done with partial key"
[1] "Processing complete keys."
key val
2 key2 9.9
3 key2 10.1
[1] "Processing partial key..."
key val
4 key3 1.0
5 key3 3.4
[1] "Done with partial key"
[1] "Processing partial key..."
key val
1 key4 9.4
[1] "Done with partial key"
key1 4.666667
key2 10
key3 2.2
key4 9.4
key1 4.666667
key2 10
key3 2.2
key4 9.4
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.