hsTableReader: Chunks input data into data frames

Description Usage Arguments Details Value Author(s) Examples

Description

This function repeatedly reads chunks of data from an input connection, packages the data as a data.frame, optionally ensures that all the rows for certain keys are contained in the data.frame, and passes the data.frame to a handler for processing. This continues until the end of file.

Usage

1
2
3
4
hsTableReader(file="",cols='character',chunkSize=-1,FUN=print,
ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t',
keyCol='key', PFUN=NULL,carryMemLimit=512e6,carryMaxRows=Inf,
stringsAsFactors=FALSE,debug=FALSE)

Arguments

file

Any file specification accepted by scan

cols

A list of column names, as accepted by the 'what' arg to scan

chunkSize

Number of lines to read at a time

FUN

A function accepting a dataframe with columns given by cols

ignoreKey

If TRUE, always passes chunkSize rows to FUN, regardless of whether the chunk has only some of the rows for a given key. If TRUE, the singleKey arg is ignored.

singleKey

If TRUE, then each data frame passed to FUN will contain all rows corresponding to a single key. If FALSE, then will contain several complete keys.

skip

Number of lines to skip at the beginning of the file.

sep

Any separator character accepted by scan

keyCol

The column name of the column with the keys.

PFUN

Same as FUN, except handles incomplete keys. See below.

carryMemLimit

Max memory used for values of a single key

carryMaxRows

Max number of values allowed for a key.

stringsAsFactors

Whether strings converted to factors.

debug

Whether to print debug messages.

Details

With ignoreKey=TRUE, hsTableReader reads from file, chunkSize lines at a time, packages the lines into a data.frame, and passes the data.frame to FUN. This mode would most commonly be used for the MAPPER job in Hadoop.

Everything below pertains to ignoreKey=FALSE.

With ignoreKey=FALSE, hsTableReader breaks up the data read from file into chunks comprising all rows of the same key. This ASSUMES that all rows with the same key are stored consecutively in the input file. (This is always the case if the input file is taken to be the STDIN pipe to a Hadoop reducer.)

We first disucss the case of PFUN=NULL. This is the recommended setting when all the values for a given key can comfortably fit in memory.

When singleKey=TRUE, FUN is called with all the rows for a single key at a time. When singleKey=FALSE, FUN made be called with rows corresponding to multiple keys, but we guarantee that the data.frame contains all the rows corresponding to any key that appears in the data.frame.

When PFUN != NULL, hsTableReader does not wait to collect all rows for a given key before passing the data to FUN. When hsTableReader reads the first chunk of rows from file, it first passes all the rows of complete keys to FUN. Then, it passes the rows corresponding to the last key, call it PREVKEY, to PFUN. These rows may or may not consist of all rows corresponding to PREVKEY. Then, hsTableReader continues to read chunks from file and pass them to PFUN until it reaches a a new key (i.e. different from PREVKEY). At this point, PFUN is called with an empty data.frame to indicate that it is done handling (key,value) pairs for PREVKEY. Then, as with the first chunk, any complete keys left in the chunk are passed to FUN and the incomplete key is passed to PFUN. The process continues until the end of file.

By using a PFUN function, we can process more values for a given key than can fit into memory. See below for examples of using PFUN.

Value

No return value.

Author(s)

David S. Rosenberg <drosen@sensenetworks.com>

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
  ## This function is useful as a reader for Hadoop reduce scripts
  str <- "key1\t3.9\nkey1\t8.9\nkey1\t1.2\nkey1\t3.9\nkey1\t8.9\nkey1\t1.2\nkey2\t9.9\nkey2\t10.1\nkey3\t1.0\nkey3\t3.4\nkey4\t9.4\n"
  cat(str)
  cols = list(key='',val=0)
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=TRUE)
  close(con)
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=FALSE,singleKey=TRUE)
  close(con)
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=print,ignoreKey=FALSE,singleKey=FALSE)
  close(con)

  ## The next two examples compute the mean, by key, in 2 different ways
  reducerKeyAtOnce <-function(d) {
    key = d[1,'key']
    ave = mean(d[,'val'])
    cat(key,ave,'\n',sep='\t')
  }
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=reducerKeyAtOnce,ignoreKey=FALSE,singleKey=TRUE)
  close(con)

  
  reducerManyCompleteKeys <-function(d) {
    a=aggregate(d$val,by=list(d$key),FUN=mean)
    write.table(a,quote=FALSE,sep='\t',row.names=FALSE,col.names=FALSE)
  }

  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=reducerManyCompleteKeys,ignoreKey=FALSE,singleKey=FALSE)
  close(con)

### ADVANCED: When we have more values for each key  than can fit in memory
  ## Test example to see how the input is broken up
  reducerFullKeys <- function(d) {
    print("Processing complete keys.")
    print(d)
  }
  reducerPartialKey <- function(d) {
    if (nrow(d)==0) {
      print("Done with partial key")
    } else {
      print("Processing partial key...")
      print(d)
    }
  }
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=5,FUN=reducerFullKeys,ignoreKey=FALSE,singleKey=FALSE,PFUN=reducerPartialKey)
  close(con)

  ## Repeats the mean example, with partial key processing
  partialSum = 0
  partialCnt = 0
  partialKey = NA
  reducerPartialKey <- function(d) {
    if (nrow(d)==0) {
      ## empty data.frame indicates that we have seen all rows for the previous key
      ave = partialSum / partialCnt
      cat(partialKey,ave,'\n',sep='\t')
      partialSum <<- 0
      partialCnt <<- 0
      partialKey <<- NA
    } else {
      if (is.na(partialKey)) partialKey <<- d[1,'key']
      partialSum <<- partialSum + sum(d[,'val'])
      partialCnt <<- partialCnt + nrow(d)
    }
  }
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=reducerKeyAtOnce,ignoreKey=FALSE,singleKey=TRUE,PFUN=reducerPartialKey)
  close(con)
  con <- textConnection(str, open = "r")
  hsTableReader(con,cols,chunkSize=6,FUN=reducerManyCompleteKeys,ignoreKey=FALSE,singleKey=FALSE,PFUN=reducerPartialKey)
  close(con)

Example output

Loading required package: getopt
key1	3.9
key1	8.9
key1	1.2
key1	3.9
key1	8.9
key1	1.2
key2	9.9
key2	10.1
key3	1.0
key3	3.4
key4	9.4
   key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
6 key1 1.2
   key  val
1 key2  9.9
2 key2 10.1
3 key3  1.0
4 key3  3.4
5 key4  9.4
   key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
6 key1 1.2
   key  val
7 key2  9.9
8 key2 10.1
    key val
9  key3 1.0
10 key3 3.4
    key val
11 key4 9.4
   key val
1 key1 3.9
    key  val
1  key1  3.9
2  key1  8.9
3  key1  1.2
4  key1  3.9
5  key1  8.9
6  key1  1.2
7  key2  9.9
8  key2 10.1
9  key3  1.0
10 key3  3.4
    key val
11 key4 9.4
key1	4.666667	
key2	10	
key3	2.2	
key4	9.4	
key1	3.9
key1	4.66666666666667
key2	10
key3	2.2
key4	9.4
[1] "Processing partial key..."
   key val
1 key1 3.9
2 key1 8.9
3 key1 1.2
4 key1 3.9
5 key1 8.9
[1] "Processing partial key..."
   key val
1 key1 1.2
[1] "Done with partial key"
[1] "Processing complete keys."
   key  val
2 key2  9.9
3 key2 10.1
[1] "Processing partial key..."
   key val
4 key3 1.0
5 key3 3.4
[1] "Done with partial key"
[1] "Processing partial key..."
   key val
1 key4 9.4
[1] "Done with partial key"
key1	4.666667	
key2	10	
key3	2.2	
key4	9.4	
key1	4.666667	
key2	10
key3	2.2
key4	9.4	

HadoopStreaming documentation built on May 1, 2019, 9:12 p.m.