R/CA.R

Defines functions ca.netcdf.wrapper mk.output.ncdf find.all.analogues find.analogues analogue.search.space construct.analogue.weights apply.analogues.netcdf.c apply.analogue bias.correct.dqm na.masked na.unmasked create.aggregates aggregate.obs.to.gcm.grid aggregate.obs

##******************************************************************************
# Constructed Analogue (CA) downscaling algorithm
# Based loosely off of code by Alex Cannon <acannon@uvic.ca>
# Rewritten by James Hiebert <hiebert@uvic.ca>

# Input is a factor of which maps fine-scale obs cells to large-scale gcm cells
# The factor should be of length x * y
# and a 3d array of obs (x, y, time)
aggregate.obs <- function(cell.factor, obs) {
  trim <- getOption('trimmed.mean')
  apply(obs, 3, function(x) tapply(x, cell.factor, mean, trim=trim, na.rm=TRUE))
}

# Input cell indicies mapping obs grid to GCM grid
# and a 3d array of obs (x, y, time)
# Output is 3d array, gcmx x gcm y x time
aggregate.obs.to.gcm.grid <- function(xi, yi, xn, yn, obs) {
  cell.number <- xi * max(yi) + yi
  cell.factor <- factor(cell.number, unique(as.vector(cell.number)))
  # apply preserving time (dim 3)
  # so for each time step, aggregate (take the mean) according to the cell map
  rv <- aggregate.obs(cell.factor, obs)
  ti <- dim(obs)[3]
  dim(rv) <- c(xn, yn, ti)
  return(rv)
}

##******************************************************************************
# Read fine-scale grid and spatially aggregate to GCM grid
##******************************************************************************
create.aggregates <- function(obs.file, gcm.file, varid) {

  # Read fine-scale and GCM grid dimensions
  nc.obs <- nc_open(obs.file)
  nc.gcm <- nc_open(gcm.file)
  obs.lons <- nc_getx(nc.obs)
  obs.lats <- nc_gety(nc.obs)
  gcm.lons <- nc_getx(nc.gcm)
  gcm.lats <- nc_gety(nc.gcm)
  obs.time <- netcdf.calendar(nc.obs, 'time')

  # Figure out which GCM grid boxes are associated with each fine-scale grid point
  grid.mapping <- regrid.coarse.to.fine(gcm.lats, gcm.lons, obs.lats, obs.lons)
  xi <- grid.mapping$xi
  yi <- grid.mapping$yi

  xn <- length(unique(as.vector(xi)))
  yn <- length(unique(as.vector(yi)))
  aggregates <- array(dim=c(length(gcm.lons), length(gcm.lats), length(obs.time)))

  chunk.size <- optimal.chunk.size(length(obs.lons) * length(obs.lats))

  chunks <- chunk.indices(length(obs.time), chunk.size)
  # Loop over chunks fo time
  for (i in chunks) {
    print(paste("Aggregating timesteps", i['start'], "-", i['stop'], "/", length(obs.time)))
    obs <- CD_ncvar_get(nc.obs, varid=varid, start=c(1, 1, i['start']), # get obs for one chunk
                     count=c(-1, -1, i['length']))
    agg <- aggregate.obs.to.gcm.grid(xi, yi, xn, yn, obs)
    aggregates[min(xi):max(xi), min(yi):max(yi), i['start']:i['stop']] <- agg
    rm(obs)
    gc()
  }
  aggregates
}

na.unmasked <- function(grid) {
    which(!is.na(grid), arr.ind=TRUE)
}

na.masked <- function(grid) {
    which(is.na(grid), arr.ind=TRUE)
}

##******************************************************************************
# Bias correct daily GCM values using detrended quantile mapping algorithm
##******************************************************************************
bias.correct.dqm <- function(gcm, aggd.obs,
                             obs.time,
                             gcm.time,
                             historical.start=getOption('calibration.start'),
                             historical.end=getOption('calibration.end'),
                             detrend=FALSE,
                             ratio=FALSE) {
    t0 <- as.PCICt(historical.start, attr(gcm.time, 'cal'))
    tn <- as.PCICt(historical.end, attr(gcm.time, 'cal'))
    prehist.period <- gcm.time < t0
    future.period <- gcm.time > tn
    hist.period.gcm <- ! (prehist.period | future.period)

    t0 <- as.PCICt(historical.start, attr(obs.time, 'cal'))
    tn <- as.PCICt(historical.end, attr(obs.time, 'cal'))
    ti <- compute.time.overlap(obs.time, t0, tn)

    points <- na.unmasked(aggd.obs[,,1])

    rv <- array(dim=dim(gcm))
    rv[!is.na(aggd.obs[,,1])] <- t(
    mapply(function(x, y) {
        if (all(is.na(gcm[x,y,]))) {
            rep(NA,dim(gcm)[3])
        } else {
            dqm.tmp <- mnDQM(obs.h=aggd.obs[x,y, ti],
                             gcm.h=gcm[x,y, hist.period.gcm],
                             gcm.f=gcm[x,y, future.period],
                             months.obs.h=as.numeric(format(obs.time[ti], '%m')),
                             months.gcm.h=as.numeric(format(gcm.time[hist.period.gcm], '%m')),
                             months.gcm.f=as.numeric(format(gcm.time[future.period], '%m')),
                             gcm.p=gcm[x,y, prehist.period],
                             months.gcm.p=as.numeric(format(gcm.time[prehist.period], '%m')),
                             ratio=ratio, detrend=detrend, n.max=NULL)
            c(dqm.tmp$g.p.bc, dqm.tmp$g.h.bc, dqm.tmp$g.f.bc)
        }
    }, points[,'row'], points[,'col']))
    rv
}

# x: a vector of lat x lon x num.analogues
# weights: a vector of length num.analogues
apply.analogue <- function(x, weights) {
    n.cells <- prod(dim(x)[1:2])
    weights <- sapply(weights, rep, n.cells)
    dim(weights) <- dim(x)
    apply(x * weights, 1:2, sum)
}

# analog.indices: vector of time indices that correspond to the timesteps to compose together
# weights: vector of length num.analogues corresponding to the analog indices
# obs.nc: An open netcdf file containing gridded observations
apply.analogues.netcdf.c <- function(analog.indices, weights, obs.nc, varid='tasmax') {
    dims <- c(obs.nc$var[[varid]]$size[1:2],getOption('n.analogues'))
    apply(
        array(
            positive_pr(
                mapply(function(i, w) {
                    CD_ncvar_get(nc=obs.nc, varid=varid,
                                 start=c(1, 1, i),
                                 count=c(-1, -1, 1)) * w
                },
                analog.indices, weights
                ),
                varid
            ),
            dim=dims,
            ),
        1:2, sum
    )
}

apply.analogues.netcdf <- compiler::cmpfun(apply.analogues.netcdf.c)

# obs.at.analogues should be a matrix (n.analogues x number of cells)
# gcm.values should a 1d vector of gcm values for each cell at the given time step
construct.analogue.weights <- function(obs.at.analogues, gcm.values) {
    n.analogue <- nrow(obs.at.analogues)
    alib <- jitter(obs.at.analogues)
    Q <- alib %*% t(alib)
    ridge <- getOption('tol') * mean(diag(Q))
    ridge <- diag(n.analogue) * ridge
    (solve(Q + ridge) %*% alib %*% as.matrix(gcm.values))[,1]
}

# times: timeseries vector of PCICt types
# today: PCICt object, a particular day of the year (year is not important) for which
# to compute a window around
# delta.days: an integer describing the size of the window on either side of today
analogue.search.space <- function(times, today,
                                  delta.days=getOption('delta.days'),
                                  t0=getOption('calibration.start'),
                                  tn=getOption('calibration.end')) {
    cal <- attr(times, 'cal')
    dpy <- if (cal == '360') 360 else 365
    jdays <- as.numeric(format(times, '%j'))
    today <- as.numeric(format(today, '%j'))

    distance <- abs(jdays - today)
    in.days <- distance <= delta.days | distance >= (dpy - delta.days)
    in.years <- times >= as.PCICt(t0, cal=cal) & times <= as.PCICt(tn, cal=cal)
    which(in.days & in.years)
}

# FIXME: There's one remaining difference between pr and tas in the old CA code that
# needs to be explained and implemented
# 116,11yc108,109
# <     pr.gcm.i <- pr.gcm[i,na.mask]^expon
# <     pr.agg.alib <- pr.aggregate[alib,na.mask]^expon
# ---
# >     tasmin.gcm.i <- tasmin.gcm[i,na.mask]
# >     tasmin.agg.alib <- tasmin.aggregate[alib,na.mask]
# And then when the analogues are applied...
# 133,134c125
# <         pr.analogue.j <- round(pr.analogue.j, 3)
# <         pr.analogue <- pr.analogue + pr.weights[j]*(pr.analogue.j^expon)
# ---
# >         tasmin.analogue <- tasmin.analogue + tasmin.weights[j]*(tasmin.analogue.j)
# 136,137c127
# <     pr.analogue[pr.analogue < 0] <- 0
# <     pr.analogue <- pr.analogue^(1/expon)
# ---
# >     cat('*')
# What's the purpose of taking the square root before combining the analogues and then
# squaring after they're combined?

# gcm: a 2d vector representing a single time step of GCM valuse
# agged.obs: a 3d vector (lat x lon x time) representing the aggregated observations
# times: PCICt vector of time values for the aggregated obs
# now: PCICt value of the current time step
# returns 30 indices of the timestep for the closest analog and their corresponding weights
find.analogues <- function(gcm, agged.obs, times, now, n.analogues=getOption('n.analogues')) {
    # FIXME: alib is only be defined for each julian day in any year...
    # it is 50x redundant as defined and could be precomputed
    ti <- analogue.search.space(times, now)
    agged.obs <- agged.obs[,,ti,drop=FALSE]

    # Find the n.analogue closest observations from the library
    # (obs years * (delta days * 2 + 1)) x cells
    # substract the GCM at this time step from the aggregated obs *for every library time value*
    # square that difference
    
    find.daily.distance <- function(day) {
      distances <- (agged.obs[,,day] - gcm) ^2
      sum(distances, na.rm=T)
    }
    diffs <- sapply(seq_along(ti), find.daily.distance)
    
    # Then find the 30 lowest differences
    # returns the indices for the n closest analogues
    # of this particular GCM timestep
    analogue.indices <- quickest.select(diffs, n.analogues)

    # Constructed analogue weights
    na.mask <- !is.na(agged.obs[,,1])
    obs.at.analogues <- t(matrix(agged.obs[,,analogue.indices][na.mask], ncol=n.analogues))
    weights <- construct.analogue.weights(obs.at.analogues, gcm[na.mask])

    # Remap the search space indices to full timeseries indices
    analogue.indices <- ti[analogue.indices]

    list(analogues=analogue.indices, weights=weights)
}

## Package check tools can't detect foreach's use of non-standard evaluation
## Ensure that they skip these variable names
utils::globalVariables('i')

# gcm: a 3d vector (lat x lon x time) representing a GCM simulation
# agged.obs: a 3d vector (lat x lon x time) representing the aggregated observations
# gcm.times: PCICt vector of time values for the GCM
# obs.time: PCICt vector of time values for the aggregated obs
find.all.analogues <- function(gcm, agged.obs, gcm.times, obs.times) {
    foreach(
        i=seq_along(gcm.times),
        .export=c('gcm', 'agged.obs', 'obs.times', 'gcm.times'),
        .errorhandling='pass',
        .inorder=TRUE,
        .final=function(x) {
            split(unlist(x, recursive=F, use.names=F), c('indices', 'weights'))
        }
        ) %dopar% {
            now <- gcm.times[i]
            find.analogues(gcm[,,i], agged.obs, obs.times, now)
    }
}

mk.output.ncdf <- function(file.name, varname, template.nc, global.attrs=list()) {
    nc <- nc_create(file.name, template.nc$var[[varname]])
    mapply(function(name, value) {
        ncatt_put(nc, varid=0, attname=name, attval=value)
    }, names(global.attrs), global.attrs)
    nc
}

#' @title High-level NetCDF I/O wrapper for the Constructed Analogues (CA) pipeline
#'
#' @description CA starts by spatially aggregating high-resolution
#' gridded observations up to the scale of a GCM. Then it proceeds to
#' bias correcting the GCM based on those observations. Finally, it
#' conducts the search for temporal analogues (which is the most
#' expensive part of the operation). This involves taking each
#' timestep in the GCM and searching for the top 30 closest timesteps
#' (for some function of "close") in the gridded observations. For
#' each of the 30 closest "analogue" timesteps, CA records the
#' integer number of the timestep and a weight for each of the
#' analogues. These are all saved in output.file.
#' 
#' @param gcm.file Filename of GCM simulations
#' @param obs.file Filename of high-res gridded historical observations
#' @param varname Name of the NetCDF variable to downscale (e.g. 'tasmax')
#' @return A list object with two values: 'indices' and 'weights', each of which is a vector with 30 items
#'
#' @examples
#' \dontrun{
#' options(
#'     calibration.end=as.POSIXct('1972-12-31', tz='GMT')
#' )
#' analogues <- ClimDown::ca.netcdf.wrapper('./tiny_gcm.nc', './tiny_obs.nc')
#' }
#'
#' @references Maurer, E. P., Hidalgo, H. G., Das, T., Dettinger, M. D., & Cayan, D. R. (2010). The utility of daily large-scale climate data in the assessment of climate change impacts on daily streamflow in California. Hydrology and Earth System Sciences, 14(6), 1125-1138.
#' @export
ca.netcdf.wrapper <- function(gcm.file, obs.file, varname='tasmax') {
    is.pr <- varname == 'pr'

    # Read in GCM data
    nc <- nc_open(gcm.file)
    gcm <- CD_ncvar_get(nc, varname)

    gcm.time <- netcdf.calendar(nc, 'time')
    nc_close(nc)

    print("Aggregating observations to GCM scale")
    aggd.obs <- create.aggregates(obs.file, gcm.file, varname)

    aggd.obs <- positive_pr(aggd.obs, varname)

    print("Reading time values from the aggregated observations")
    nc <- nc_open(obs.file)
    obs.time <- netcdf.calendar(nc, 'time')
    nc_close(nc)

    print("Bias correcting the aggregated observations")
    bc.gcm <- bias.correct.dqm(
        gcm, aggd.obs, obs.time, gcm.time,
        getOption('calibration.start'), getOption('calibration.end'),
        detrend=!is.pr, ratio=is.pr
    )
    print("Finding an analogous observered timestep for each GCM time step")
    find.all.analogues(bc.gcm, aggd.obs, gcm.time, obs.time)
}
pacificclimate/ClimDown documentation built on April 10, 2021, 11:35 a.m.