R/data.model.R

Defines functions process.data.by.day processing.sequence model.fwy.data predict.hpms.data assign.fraction necessary.grids no.overlap rearrange_data group.loop data.predict.generator data.model

##' Model the data for coordinates
##'
##'
##' @title data.model
##' @param df.mrg merged data frame
##' @param formula the formula to fit over time and space
##' @return the fit object
##' @author James E. Marca
data.model <- function(df.mrg,formula=n.aadt.frac~1){

  site.coords<-unique(cbind(df.mrg$Longitude,df.mrg$Latitude))
    post.gp.fit <- spTimer::spT.Gibbs(formula=formula,
                                      data=df.mrg,
                                      model="GP",
                                      coords=site.coords,
                                      tol.dist=0.005,
                                      distance.method="geodetic:km",
                                      report=10,
                                      scale.transform="SQRT")
    post.gp.fit

}


##' Set up the prediction for a model
##'
##' Call this function to call from plyr, one ply per model
##' @title data.predict.generator
##' @param df.pred.grid the prediction grid
##' @param ts.un time stamps
##' @return a function that will run the prediction given the model
##'     from within plyr loop
##' @author James E. Marca
data.predict.generator <- function(df.pred.grid,ts.un){
    n.sites <- length(df.pred.grid[,1])
    df.pred.grid$s.idx <- 1:n.sites
    ts.psx <- as.POSIXct(ts.un)
    n.times <- length(ts.un)
    dat.mrg <- matrix(NA,n.sites*n.times,8)
    dat.mrg[,1] <- sort(rep(df.pred.grid$s.idx,each=n.times)) ## site number
    dat.mrg[,2] <- rep(ts.un$year,n.sites)+1900
    dat.mrg[,3] <- rep(ts.un$mon,n.sites)
    dat.mrg[,4] <- rep(ts.un$mday,n.sites)
    dat.mrg[,5] <- rep(ts.un$hour,n.sites)
    dat.mrg[,6] <- rep(ts.psx,n.sites)
    dat.mrg[,7] <- sort(rep(df.pred.grid$lon,each=n.times)) ## lon
    dat.mrg[,8] <- sort(rep(df.pred.grid$lat,each=n.times))  ## lat
    dimnames(dat.mrg)[[2]] <- c('s.idx','year','month','day','hour','tsct','Longitude','Latitude')
    df.mrg <- as.data.frame(dat.mrg)
    df.mrg$n.aadt.frac <- NA
    df.mrg$hh.aadt.frac <- NA
    df.mrg$nhh.aadt.frac <- NA
    df.mrg <-  merge(df.mrg,df.pred.grid,all=TRUE,by=c("s.idx"))
                                        # grid.coords<-unique(cbind(df.mrg$Longitude,df.mrg$Latitude))
    grid.coords<-as.matrix(unique(cbind(df.mrg$Longitude,df.mrg$Latitude)))
    return (function(model){
        ## gc()
        ## print(grid.coords)
        spTimer::predict.spT(model,newcoords=grid.coords,newdata=df.mrg,tol.dist=0.005,distance.method="geodetic:km")
    })
}

##' This is the big outer modeling loop
##'
##' This function is called from the predict.hpms routine.  It needs
##' the prediction grid (where should predictions be made), and a list
##' of models and it will run the predictions and save to couchdb.
##' Nothing is returned from this function.
##' @title group.loop
##' @param prediction.grid which cells in grid to do predictions on
##' @param var.models the models to run over each grid cell
##' @param ts.ts some sort of time series data
##' @param ts.un a different time series.  Old stuff, I just don't
##'     remember right now.
##' @param curlH the curl handle
##' @return nothing at all
##' @author James E. Marca
group.loop <- function(prediction.grid,var.models,ts.ts,ts.un,curlH){
    config <- rcouchutils::get.config()

    ## set up for saving to couchdb
    df.all.predictions = list()
    for(sim.site in 1:(length(prediction.grid[,1]))){
        df.all.predictions[[sim.site]] <- data.frame('ts'= ts.ts)
        df.all.predictions[[sim.site]]$i_cell  <- prediction.grid[sim.site,'i_cell']
        df.all.predictions[[sim.site]]$j_cell  <- prediction.grid[sim.site,'j_cell']
        df.all.predictions[[sim.site]]$geom_id <- prediction.grid[sim.site,'geo_id']
        df.all.predictions[[sim.site]]['_id']  <- paste(df.all.predictions[[sim.site]]$geom_id,
                                                        df.all.predictions[[sim.site]]$ts,
                                                        sep='_')
    }

    predf <- data.predict.generator(prediction.grid,ts.un)
    model.names <- names(var.models)
    for (m in 1:length(var.models)){
        grid.pred <- predf(var.models[[m]])
        for(sim.site in 1:(length(df.all.predictions))){
            df.all.predictions[[sim.site]][,model.names[m]] <- grid.pred$Median[,sim.site]
        }
        rm(grid.pred)
    }

    rm(predf)
    ## for(model.name in names(var.models)){
    ##     grid.pred <- predictions[[model.name]]
    ##     for(sim.site in 1:(length(df.all.predictions))){
    ##         df.all.predictions[[sim.site]][,model.name] <- grid.pred$Median[,sim.site]
    ##     }
    ## }
    # gc()
    rearranger <- NULL
    doccount <- 0
    for(sim.site in 1:(length(df.all.predictions))){
        rnm <- names( df.all.predictions[[sim.site]] )

        rnm  <- gsub('.aadt.frac','',x=rnm)
        if(is.null(rearranger)){
            rearranger <- rearrange_data(rnm)
        }
        names( df.all.predictions[[sim.site]] ) <- rnm
        storedf <- list()
        for(i in 1:length(df.all.predictions[[sim.site]][['ts']])){
            storedf[[i]] <- rearranger(df.all.predictions[[sim.site]][i,])
            if(config$recheck){
                storedf[[i]][['modelversion']] <- config$recheck
            }
        }
        print(paste(sim.site,
                    storedf[[1]]['_id']))

        res <- rcouchutils::couch.bulk.docs.save(config$couchdb$grid_hpms,storedf,h=curlH)

        rm(storedf)

        doccount <- doccount + res
    }
    rm(df.all.predictions)
    gc()
    ## print(paste('saved',doccount,'docs'))
    return ()

}


##' rearrange data
##'
##' In order to store data in CouchDB, need to change the records
##' around a bit, so that each has a list for the value of aadt
##'
##' @title rearrange_data
##' @param col.names what you want to rearrange
##' @return a data frame
##' @author James E. Marca
rearrange_data <- function(col.names){

    numeric.cols <-  grep( pattern="^(_id|ts|geom_id|freeway|detectors)$",x=col.names,perl=TRUE,invert=TRUE)
    aadt.cols <- grep(pattern="^(n|hh|nhh)",x=col.names[numeric.cols],perl=TRUE)
    ## so strip these out of the original, and replace with a list

    anon <- function(row){
        ## for each row, split out the aadt fraction variables
        outpt <- as.list(row)
        ##print(outpt)
        aadtlist <- list()
        for(i in 1:length(aadt.cols)){
            outpt[[col.names[numeric.cols[aadt.cols[i]]]]] <- NULL
            aadtlist[[col.names[numeric.cols[aadt.cols[i]]]]] <- row[[numeric.cols[aadt.cols[i]]]]
        }
        outpt[['aadt_frac']] <- aadtlist
        ## print(outpt)
        return (outpt)

    }
    return(anon)

}

##' Omit grid cells in HPMS data that overlap existing fwy grids
##'
##' @title no.overlap
##' @param df.fwy.data the cells with freeway data
##' @param df.hpms.grid.locations the cells with hpms data
##' @return less than or equal to df.hpms.grid.locations
##' @author James E. Marca
no.overlap <- function(df.fwy.data,df.hpms.grid.locations){
    geoids <- df.fwy.data$geo_id ## sort(unique(paste(df.fwy.data$i_cell, df.fwy.data$j_cell,'_')))
    overlap <- df.hpms.grid.locations$geo_id %in% geoids
    return(df.hpms.grid.locations[!overlap,])
}

##' Determine the grids that need to be processed
##'
##' This routine will run the hpms grids at the database and determine
##' which ones need to be processed still.  No point duplicating
##' effort.  Any grid with 23:00 on the current day in the DB does
##' not need doing for the day.  Any grid cell without midnight stored
##' in the DB is assumed to need processing.
##'
##' @title necessary.grids
##' @param df.fwy.data the grid cells with freeway data
##' @param df.hpms.grid.locations the grid cells with hpms data
##' @param year the year of analysis
##' @param curlH the curl handle
##' @return a possibly reduced list of df.hpms.grid.locations
##' @author James E. Marca
necessary.grids <- function(df.fwy.data,df.hpms.grid.locations,year,curlH){

    config <- rcouchutils::get.config()
    picker <- 1:length(df.hpms.grid.locations[,1])
    hpmstodo <- picker < 0 # default false

    ## handle time from df.fwy.data

    ## set up date to check if data in couchdb for hpms grid
    checkday <- min(df.fwy.data$day)
    if(checkday<10) checkday <- paste('0',checkday,sep='')
    checkmonth <- df.fwy.data$month[1] + 1
    ## month is one less than month, because javascript
    if(checkmonth < 10) checkmonth <- paste('0',checkmonth,sep='')
    ## form date part of checking URL for couchdb

    couch.test.date <- paste(paste(year,checkmonth,checkday,sep='-'),"23:00",sep=' ')
    ## note just checking a single date, the zero hour of the day.  If
    ## not there, assume whole day is missing.  If there, assume whole
    ## day is there too.
    ##
    ## couchdb library routine will escape ascii stuff if needed.

    print(paste('checking',couch.test.date))

    picker <- 1:length(df.hpms.grid.locations[,1])
    couch.test.docs <- paste(df.hpms.grid.locations$geo_id,couch.test.date,sep='_')
    incl.docs <- FALSE
    if(! is.null(config$recheck)){
        incl.docs <- TRUE
    }
    print (paste ( 'checking on',length(couch.test.docs),'docs that might be done already'))
    if(length(couch.test.docs) == 1){
        ## use get not alldocs
        doc <- rcouchutils::couch.get(db=config$couchdb$grid_hpms,
                                         docname=couch.test.docs)
        if('error' %in% names(doc)){
            ## error means doc not found, need to do this grid
            hpmstodo[1] <- TRUE
        }else{
            if(! is.null(config$recheck)){
                if(is.null(doc$modelversion)
                   || doc$modelversion < config$recheck){
                    ## done but needs redoing
                    hpmstodo[1] <- TRUE
                }
            }
        }
        ##print(doc)
        print(paste('checked',couch.test.date,'for 1 doc, missing',length(hpmstodo[hpmstodo])))
    }else{
        result <- rcouchutils::couch.allDocsPost(db=config$couchdb$grid_hpms,
                                                 keys=couch.test.docs,
                                                 include.docs=incl.docs,h=curlH)

        if('error' %in% names(result)){
            ## error means doc not found, need to do this grid

            hpmstodo <- rep(TRUE,length(hpmstodo))

            ## maybe need to create the database?
            if(result$reason == "no_db_file"){
                reslt <- rcouchutils::couch.makedb(config$couchdb$grid_hpms)
                print(reslt)
                if('error' %in% names(reslt)){
                    ## error is bad
                    stop("error making database")
                }
            }
        }else{
            rows <- result$rows
            print(length(rows))
            for(i in 1:length(rows)){
                row <- rows[[i]]
                ## print(row$key)
                if('error' %in% names(row)){
                    ## error means doc not found, need to do this grid
                    hpmstodo[i] <- TRUE
                    ## print(paste('todo',row$key,couch.test.docs[i]))
                    ##true means need to do this document
                } else {
                    ## no error means there is a doc.  If config says to
                    ## recheck, check if there is a date
                    if(! is.null(config$recheck)){
                        if(is.null(row$doc$modelversion)
                           || row$doc$modelversion < config$recheck){
                            ## done but needs redoing
                            hpmstodo[i] <- TRUE
                        }
                    }
                }
            }
        }
        print(paste('checked',couch.test.date,'for',length(hpmstodo),'rows, missing',length(hpmstodo[hpmstodo])))
    }
    return (df.hpms.grid.locations[hpmstodo,])

}


##' Assign a fraction because just one freeway cell
##'
##' At the moment this does nothing as it hasnt been tested and I want
##' to run the other cells first.
##'
##' @title assign.fraction
##' @param df.fwy.data the freeway grid cells (but really just one)
##' @param df.hpms.grid.locations the hpms grid cells
##' @param year the year, for writing to couchdb
##' @param curlH the curl handle for couchdb
##' @return nothing
##' @author James E. Marca
assign.fraction <- function(df.fwy.data,df.hpms.grid.locations,year,curlH){
    ## just assign frac to hpms cells
    picked <- 1:length(df.hpms.grid.locations[,1])
    ts2 <- strptime(df.fwy.data$ts,"%Y-%m-%d %H:%M",tz='UTC')
    ts.un <- sort(unique(ts2))
    ts.ct <- sort(unique(df.fwy.data$tsct))
    ts.ts = sort(unique(df.fwy.data$ts))
    config <- rcouchutils::get.config()
    rearranger <-  NULL


    for(sim.site in picked){
        df.all.predictions <- data.frame('ts'= ts.ts)
        df.all.predictions$i_cell <- df.hpms.grid.locations[sim.site,'i_cell']
        df.all.predictions$j_cell <- df.hpms.grid.locations[sim.site,'j_cell']
        df.all.predictions$geom_id <- df.hpms.grid.locations[sim.site,'geo_id']
        df.all.predictions['_id']  <- paste(df.all.predictions$geom_id,
                                            df.all.predictions$ts,
                                            sep='_')

        for(variable in c('n.aadt.frac','hh.aadt.frac','nhh.aadt.frac')){
            df.all.predictions[,variable] <- df.fwy.data[,variable]
        }

        if(dim(df.all.predictions)[2]>4){
            ## now dump that back into couchdb
            rnm <-  names(df.all.predictions)
            rnm  <- gsub('.aadt.frac','',x=rnm)
            names(df.all.predictions) <- rnm
            if(is.null(rearranger)){
                rearranger <- rearrange_data(rnm)
            }

            save.these <-  ! is.na(df.all.predictions$n)
            df.all.predictions <- df.all.predictions[save.these,]
            storedf <- list()

            for(i in 1:nrow(df.all.predictions)){
                storedf[[i]] <- rearranger(df.all.predictions[i,])
                ## print(paste(storedf[[i]]))
                if(config$recheck){
                    storedf[[i]][['modelversion']] <- config$recheck
                }

            }
            print(paste(sim.site,
                        storedf[[1]]['_id']))
            res <- rcouchutils::couch.bulk.docs.save(config$couchdb$grid_hpms,storedf,h=curlH)
        }
    }
    return ()
}

##' Predict fractions at HPMS grid cells based on models
##'
##' @title predict.hpms.data
##' @param df.fwy.data the freeway grid cells
##' @param df.hpms.grid.locations the hpms grid cells
##' @param var.models the models to use for predictions
##' @param year the year, for writing to the correct db entry
##' @param curlH the curl handle for couchdb
##' @return 0 or 1+
##' @author James E. Marca
predict.hpms.data <- function(df.fwy.data,df.hpms.grid.locations,var.models,year,curlH){

    ts2 <- strptime(df.fwy.data$ts,"%Y-%m-%d %H:%M",tz='UTC')
    ts.un <- sort(unique(ts2))
    ts.ct <- sort(unique(df.fwy.data$tsct))
    ts.ts = sort(unique(df.fwy.data$ts))
    n.times = length(ts.un)

    picked <- 1:length(df.hpms.grid.locations[,1])
    if(length(picked)>1)    picked = sample(picked)

    returnval <- 0
    dolimit <-  150 # the higher this number, the more likely to run out of ram
    maxaddl <-  floor(dolimit/3)
    remainder <- length(picked) %% dolimit

    if(remainder > maxaddl && length(picked) > dolimit ){
        remainder <- maxaddl
    }

    dolimit <- dolimit + remainder

    # dolimit <- 200 # temporary hacking for all_california run
    if(length(picked)>dolimit){
        returnval <- length(picked) - dolimit
        ## just do 100 for now
        ## by chopping down hpms.idx
        keep.idx <- picked < 0 ## all of them are FALSE, of course
        ## only keep 100 values
        keep.idx[1:dolimit] <- TRUE
        picked <- picked[keep.idx]

    }

    print(paste('processing',length(picked),'cells'))

    ## this numcells hack is dumb, so set to 500 so that it is skipped
    num.cells = 500 # 100 ## 90 # min( 90, ceiling(80 * 11000 / length(batch.idx)))
    num.runs = ceiling(length(picked)/num.cells) ## manage RAM
    print(paste('num.runs is',num.runs,'which means number cells per run is about',floor(length(picked)/num.runs)))

    runs.index <- rep_len(1:num.runs,length.out = length(picked))

    ## random permutation of the grid cells I need to predict
    df.pred.grid <- df.hpms.grid.locations[picked,]

    ## remove try block here.  Didn't help the memory leak, but I
    ## don't see what it was doing anyway...fails happen in RCurl to
    ## couchdb, which is already wrapped in a try
    for (i in 1:max(runs.index)){
        ## print(paste('run',i,'memory',pryr::mem_used()))
        idx <- runs.index == i
        ## this used to use plyr, but made it a loop for now to help debugging
        group.loop(df.pred.grid[idx,],var.models,ts.ts,ts.un,curlH)

    }

    return (returnval)
}

##' Model fraction changes in space based on hourly freeway observations
##'
##' @title model.fwy.data
##' @param df.fwy.data the freeway grid cells
##' @return the models list
##' @author James E. Marca
model.fwy.data <- function(df.fwy.data){

    var.models <- plyr::llply(list('n.aadt.frac'='n.aadt.frac',
                                   'hh.aadt.frac' ='hh.aadt.frac',
                                   'nhh.aadt.frac'='nhh.aadt.frac'),
                              function(variable){
                                  data.model(df.fwy.data,formula=formula(paste(variable,1,sep='~')))
                              }
                              ##.parallel=TRUE
                              )
    return (var.models)

}

##' Step through the jobs for modeling and predicting
##'
##' @title processing.sequence
##' @param df.fwy.grid the freeway grid cells
##' @param df.hpms.grid.locations the hpms grid cells
##' @param year the year of this analysis
##' @param month the month
##' @param day the day
##' @param max.iter max iterations.  watch RAM
##' @return number left to do
##' @author James E. Marca
processing.sequence <- function(df.fwy.grid,
                                df.hpms.grid.locations,
                                year,month,day,basin,
                                maxiter=2){
    iter <- 0
    curlH <- RCurl::getCurlHandle()

    var.models <- list() ## empty list
    ## note that at one time I tried to do:
    ## fetch.model(year,month,day,basin)
    ## but that fails.  spTimer has a memory leak
    ## that translates also to saving it to disk.  Never save it to disk.

    ## load stored freeway grid data
    df.fwy.data <- fetch.fwy.data(year,month,day,basin)

    ## load stored hpms grid data
    hpms <- fetch.hpms(year,month,day,basin)

    ## loaded fwy data grids and hpms grids from cached values it is
    ## possible that they are empty, say if this is the first run of
    ## the model.

    ## in this first check, if hpms has non-empty dimension, then it
    ## has been saved and has been restored from the filesystem, so
    ## the task is just to check if the length is long enough to do
    ## something
    if(length(dim(hpms)) == 2 && length(hpms[,1])<1){
        print(dim(hpms))

        print(paste('not going to start, all done'))
        ## rm(df.fwy.data)

        ## in order to signal the caller that we're already done and
        ## did no work at all, set the return state to -1, not zero
        return (-1)
    }
    ## if still here, then hpms is either empty, or it is populated
    ## with real data and with non zero length


    ## the next check.  if fwy data is zero length, then go get it as
    ## the file system cache is empty
    if(length(df.fwy.data) == 0){
        print('first pass, getting freeway grid data')
        df.fwy.data <- get.raft.of.grids(df.fwy.grid
                                        ,day=day
                                        ,month=month
                                        ,year=year)

        ## stash that data for the next run of the model.
        stash.fwy.data(year,month,day,basin,df.fwy.data)
    }

    ## At this point, fwy data should be non-empty, so assert that
    stopifnot(length(df.fwy.data) > 0)


    ## okay, if we're here still then going well, and we are assured
    ## that df.fwy.data is not empty.

    ## now check hpms.  if length of dim() is zero, then no data yet
    if(length(dim(hpms)) == 0){
        print('first pass, getting hpms grid data')

        ## so go get it.

        ## first, drop overlapping grids
        hpms <- no.overlap(df.fwy.data,df.hpms.grid.locations)

        ## then check couchdb for necessary grids to populate
        hpms <- necessary.grids(df.fwy.data,hpms,year,curlH)

        ## then stash them away for the next pass.
        stash.hpms(year,month,day,basin,hpms)

    }

    if(length(hpms[,1])<1){
        print(paste('all done'))
        ## rm(df.fwy.data)

        ## save empties to reduce space needs
        stash.fwy.data(year,month,day,basin,list())
        ## in order to signal the caller that we're already done and
        ## did no work at all, set the return state to -1, not zero
        return (-1)
    }else{
        print(length(hpms[,1]))
    }

    if(length(unique(df.fwy.data$s.idx))<2){
        ## one cell is not enough freeway grid cells to build a
        ## spatial model.
        ##
        ## just assign fraction
        while(length(hpms[,1])>0){
            assign.fraction(df.fwy.data,hpms,year,curlH)

            hpms <- necessary.grids(df.fwy.data,hpms,year,curlH)

            stash.hpms(year,month,day,basin,hpms)
        }
    }else{
        ## still here, build model, then stash
        if(length(var.models) == 0){
            ## model, then predict
            print('building models')
            var.models <- model.fwy.data(df.fwy.data)
        }else{
            print('using previously computed models')
        }

        ## predict

        predict.hpms.data(df.fwy.data,hpms,var.models,year,curlH)
        hpms <- necessary.grids(df.fwy.data,hpms,year,curlH)

        stash.hpms(year,month,day,basin,hpms)
    }
    rm(curlH)
    return(length(hpms[,1]))
}

##' Process a month of data day by day
##'
##' First step is to get the hour by hour data for the freeway grid
##' cells.  The second step is to model and predict, using plyr, one
##' day at a time, by running the processing.sequence function
##'
##' This can get out of hand.  RAM bug starts here
##'
##' @title process.data.by.day
##' @param df.grid the freeway grid cells, but not the data
##' @param df.hpms.grids the hpms grid cells
##' @param year the year of analysis
##' @param month the month to run this.
##' @return nothing at all
##' @author James E. Marca
process.data.by.day <- function(df.grid,df.hpms.grids,year,month,day,basin,maxiter=2){
    print (paste(year,month,day,pryr::mem_used()))
    ## don't care about true number of days per month
    returnval <- processing.sequence(df.grid,df.hpms.grids
                                    ,year=year
                                    ,month=month
                                    ,day=day
                                    ,basin=basin
                                    ,maxiter)

    return (returnval)
}
jmarca/grid_data documentation built on Sept. 27, 2020, 11:33 a.m.