R/join_utils.R

# align by-variables for joining
# - check that varnames match (required by rxMerge)
# - check vartypes match
# - check factor levels match
# - by default, change x to match y; exception is when factors are involved (may need to change y's levels)
# return transformFuncs suitable for passing to mutate_
# - inline transforms don't work (will leave old variable behind)
# - changing variable type in-place also doesn't work (need to create new variable, drop old variable)
alignVars <- function(x, y, by, yOrig)
{
    makeTransformFunc <- function(exprlst, returnFunction=TRUE)
    {
        nulls <- sapply(exprlst, is.null)
        if(all(nulls))
            return(NULL)
        exprlst <- c(exprlst, lapply(names(exprlst)[!nulls], function(e) {
            parse(text=sprintf("%s <- NULL", e))
        }))
        exprlst <- do.call(c, exprlst)                  # convert list of expressions into single expression object
        exprblock <- as.call(c(as.name("{"), exprlst))  # from ?call
        if(returnFunction)
        {
            xFunc <- function(varlst) {}
            body(xFunc) <- call("within", quote(data.frame(varlst, stringsAsFactors=FALSE)), exprblock)
            xFunc
        }
        else call("within", quote(data.frame(y)), exprblock)
    }

    getTransformVars <- function(exprlst)
    {
        nulls <- sapply(exprlst, is.null)
        names(exprlst)[!nulls]
    }

    unFactor <- function(var, type)
    {
        changeExpr <- parse(text=sprintf("%s__new__ <- as.character(%s)", var, var))
        if(type %in% c("logical", "integer", "numeric", "complex"))
            changeExpr <- c(changeExpr, parse(text=sprintf("%s__new__ <- as.%s(%s__new__)", var, type, var)))
        else if(type != "character")
            stop("don't know how to convert type: ", type)
        changeExpr
    }

    reFactor <- function(var)
    {
        # ensure level sets of x and y are the same: rxFactors very picky
        xlevs <- varLevels(x)[[var]]
        ylevs <- varLevels(y)[[var]]
        if(!identical(xlevs, ylevs))
        {
            levs <- sort(union(xlevs, ylevs))
            levs <- paste(deparse(levs), collapse="")
            parse(text=sprintf("%s__new__ <- factor(%s, levels=%s)", var, var, levs))
        }
        else NULL
    }

    # only read metadata once per file: slow on Spark
    xVars <- names(by$xTypes)
    yVars <- names(by$yTypes)
    xTypes <- by$xTypes[by$y]
    yTypes <- by$yTypes[by$y]

    by <- by$y
    xChanges <- sapply(by, function(i) {
        xt <- xTypes[i]
        yt <- yTypes[i]
        changeExpr <- NULL

        if(xt != "factor" && yt != "factor")  # (relatively) clean bit: no factors involved
        {
            if(xt != yt)
            {
                if(yt %in% c("logical", "integer", "numeric", "complex", "character"))
                    changeExpr <- parse(text=sprintf("%s__new__ <- as.%s(%s)", i, yt, i))
                else stop("don't know how to convert type: ", yt)
            }
        }
        else if(xt == "factor" && yt != "factor")  # un-factor x (path of least resistance)
            changeExpr <- unFactor(i, yt)
        else if(xt == "factor" && yt == "factor")  # combine x and y levels
            changeExpr <- reFactor(i)

        changeExpr
    }, simplify=FALSE)

    stopifnot(inherits(x, "RxXdfData"))  # x should always inherit from RxXdfData
    xFunc <- makeTransformFunc(xChanges, inherits(x, "RxXdfData"))
    xVars <- getTransformVars(xChanges)

    yChanges <- sapply(by, function(i) {
        xt <- xTypes[i]
        yt <- yTypes[i]
        changeExpr <- NULL

        # only change y if need to un-factor or relevel a factor
        if(xt != "factor" && yt == "factor")  # un-factor y (path of least resistance)
            changeExpr <- unFactor(i, xt)
        else if(xt == "factor" && yt == "factor")  # combine x and y levels
            changeExpr <- reFactor(i)

        changeExpr
    }, simplify=FALSE)

    yFunc <- makeTransformFunc(yChanges, inherits(y, "RxXdfData"))
    yVars <- getTransformVars(yChanges)

    list(xFunc=xFunc, xVars=xVars, yFunc=yFunc, yVars=yVars)
}


alignInputs <- function(x, y, by, yOrig)
{
    asXdfOrDf <- function(data)
    {
        if(inherits(data, c("data.frame", "RxXdfData")))
            data
        else if(inherits(data, "RxDataSource"))
            as(data, "tbl_xdf")
        else stop("not a valid data source format", call.=FALSE)
    }

    # data must be in xdf or data frame format, import if not
    x <- asXdfOrDf(x)
    y <- asXdfOrDf(y)

    # rxMerge requires identical by-variable names; rename xvars if necessary
    if(!identical(by$x, by$y))
    {
        xby <- setNames(by$x, by$y)

        # if renaming clashes with other variables, rename them as well (this is not recursive)
        # need separate rename call because sequential renaming (a -> b -> c) bugged
        existing <- base::intersect(by$y, names(x))
        if(length(existing) > 0)
        {
            names(existing) <- paste0(names(existing), ".x")
            x <- rename(x, !!!existing)
        }
        x <- rename(x, !!!xby)

        # modify saved metadata to match any renamed vars
        whichXBy <- which(names(by$xTypes) %in% by$x)
        names(by$xTypes)[whichXBy] <- names(xby)
    }

    # align by-variable types and factor levels
    align <- alignVars(x, y, by)
    if(!is.null(align$xFunc))
    {
        xRename <- paste0(align$xVars, "__new__")
        names(xRename) <- align$xVars
        x <- mutate(x, .rxArgs=list(transformFunc=align$xFunc, transformVars=align$xVars)) %>%
            rename(!!!xRename)
    }
    if(!is.null(align$yFunc))
    {
        yRename <- paste0(align$yVars, "__new__")
        names(yRename) <- align$yVars
        if(is.null(yOrig))
            y <- eval(align$yFunc) %>% rename(!!!yRename)
        else
        {
            # make sure not to delete original y by accident after factoring
            if(!is.null(yOrig) && y@file == yOrig)
                y <- as(y, "RxXdfData")
            y <- mutate(y, .rxArgs=list(transformFunc=align$yFunc, transformVars=align$yVars)) %>%
                rename(!!!yRename)
        }
    }
    list(x=x, y=y)
}


# copied from dplyr:::common_by, dplyr:::`%||%`
commonBy <- function(by = NULL, x, y) 
{
    # save metadata: important on Spark to minimise retrieving this
    xTypes <- tbl_types(x)
    yTypes <- tbl_types(y)
    xVars <- names(xTypes)
    yVars <- names(yTypes)

    if(is.list(by))
    {
        by <- c(by, xTypes=xTypes, yTypes=yTypes)
        return(by)
    }

    if(!is.null(by))
    {
        x <- if(is.null(names(by))) by else names(by)
        y <- unname(by)
        x[x == ""] <- y[x == ""]
        return(list(x=x, y=y, xTypes=xTypes, yTypes=yTypes))
    }

    by <- base::intersect(xVars, yVars)
    if(length(by) == 0)
    {
        stop("No common variables. Please specify `by` param.", call.=FALSE)
    }
    message("Joining by: ", capture.output(dput(by)))
    list(x=by, y=by, xTypes=xTypes, yTypes=yTypes)
}


mergeFsCheck <- function(x, y, copy)
{
    copied <- FALSE
    if(in_hdfs(x) != in_hdfs(y))
    {
        if(!copy)
            stop("x and y must both be in the same filesystem; use copy=TRUE to join", call.=FALSE)

        if(in_hdfs(x))
        {
            xFs <- rxGetFileSystem(x)
            y <- as(copy_to(xFs, y, get_dplyrxdf_dir(xFs)), "tbl_xdf")
        }
        else y <- compute(y)
        copied <- TRUE
    }

    # nothing in HDFS: return early
    if(!in_hdfs(x) && !in_hdfs(y))
        return(list(y=y, copied=copied))

    xSupported <- inherits(x, c("RxSparkData", "RxXdfData"))
    ySupported <- inherits(y, c("RxSparkData", "RxXdfData"))
    if(!xSupported || !ySupported)
        stop("unsupported HDFS file type for merge", call.=FALSE)

    cc <- rxGetComputeContext()
    if(!inherits(cc, "RxSpark"))
        stop("files in HDFS can only be merged in the Spark compute context", call.=FALSE)
    list(y=y, copied=copied)
}


mergeBase <- function(x, y, by=NULL, copy=FALSE, type, .outFile=tbl_xdf(x), .rxArgs, suffix=c(".x", ".y"))
{
    yNew <- mergeFsCheck(x, y, copy)
    y <- yNew$y
    copied <- yNew$copied

    grps <- group_vars(x)
    yOrig <- if(inherits(y, "RxFileData")) y@file else NULL
    xyNew <- alignInputs(x, y, by, yOrig)
    x <- xyNew$x
    y <- xyNew$y

    # cleanup on exit is asymmetric wrt x, y
    on.exit(
    {
        deleteIfTbl(x)
        # make sure not to delete original y by accident after factoring
        if(copied || (!is.null(yOrig) && y@file != yOrig))
            deleteIfTbl(y)
    })

    # sigh
    if(all(substr(suffix, 1, 1) == ".") && !in_hdfs(x))
        suffix <- substr(suffix, 2, nchar(suffix))

    arglst <- list(x, y, matchVars=by$y, type=type, duplicateVarExt=suffix)
    arglst <- doExtraArgs(arglst, x, .rxArgs, .outFile)
    arglst$rowsPerRead <- NULL # not used by rxMerge

    inputHd <- in_hdfs(x)

    # rxMerge won't create a data frame directly from HDFS input; do it the hard way
    outputDf <- is.null(.outFile)
    if(inputHd && outputDf)
        arglst$outFile <- tbl_xdf(x)

    output <- callRx("rxMerge", arglst)

    if(inputHd && outputDf)
    {
        outTbl <- arglst$outFile
        on.exit(deleteIfTbl(outTbl))
        output <- as.data.frame(output)
    }

    simpleRegroup(output, grps)
}
RevolutionAnalytics/dplyrXdf documentation built on June 3, 2019, 9:08 p.m.