R/ltMergeFiles.R

#' Merge a batch of Lotek .DTA files with an existing database.
#'
#' @param files either a character vector of full paths to files, or the full
#' path to a directory, which will be searched recursively for .DTA files.
#'
#' @param j job, whose ID will be recorded for any new / changed DTAfile record
#'
#' @param dbdir path to folder with existing receiver databases; defaults
#' to \code{MOTUS_PATH$RECV}
#'
#' @return a data_frame reporting the fate of each file, with these columns:
#' \enumerate{
#' \item fullname - full path to filename
#' \item nameNew  - TRUE iff file of this name was not yet in database
#' \item dataNew  - TRUE iff file with these contents was not yet in database
#' \item use      - TRUE iff file copied to database
#' \item err      - if not NA, any error message generated by trying to parse the file
#' \item serno    - serial number (e.g. "Lotek-1234") of receiver this file is from
#' \item ts       - first timestamp from a detection in this file
#' \item tsLast   - last timestamp from a detection in this file
#' \item inRepo   - file came from file_repo (rather than being a new upload; happens when fully reprocessing)
#' \item fid      - fileID, if file in or added to DB
#' }
#'
#' @note If err is not NA for a file, then other fields for that file
#'     might not be set appropriately in the return value.
#'
#' @details Any files which are not identical to an existing file are saved
#' in the file repository at \code{MOTUS_PATH$FILE_REPO}.  The names will be
#' modified if necessary to avoid collisions with existing files.
#'
#' Remaining files are deleted.
#'
#' @export
#'
#' @author John Brzustowski \email{jbrzusto@@REMOVE_THIS_PART_fastmail.fm}

ltMergeFiles = function(files, j, dbdir=MOTUS_PATH$RECV) {
    if (! isTRUE(is.character(files) && all(file.exists(files)))) {
        warning("invalid or non-existent input files specified")
        return()
    }
    if (file.info(files[1])$isdir)
        rv = dir(files, recursive=TRUE, full.names=TRUE, pattern=".*\\.dta$", ignore.case=TRUE)
    else
        rv = sort(files)

    rv = data_frame(fullname = rv, nameNew = TRUE, dataNew = TRUE, use = FALSE, err = NA, serno=as.character(NA), ts = as.numeric(NA), tsLast = as.numeric(NA), fid=as.numeric(NA))

    ## read serial numbers from DTA files, returning NA for any files which fail to parse or for which we can't
    ## get a serial number
    rv$serno = NA
    for (i in seq(along = rv$serno)) {
        f = rv$fullname[i]
        tryCatch({
            hdr = readDTA(f, numLines=20)
            if (hdr$recv == "Lotek-NA") {
                ## For some .DTA files, the serial number appears as the "Site Code"
                ## We'll assume this is the case, but verify against the deployment metadata whether
                ## the filename begins with the same 3 letters as any deployment name for that receiver.
                serno = paste0("Lotek-", hdr$siteCode)
                depInfo = MetaDB("select serno, name from recvDeps where serno='%s'", serno)
                fnprefix = substring(basename(f), 1, 3)
                if (isTRUE(any(grepl(paste0("^", fnprefix), depInfo$name, ignore.case=TRUE, perl=TRUE)))) {
                    rv$serno[i] = serno
                    jobLog(j, paste("Warning:  file", basename(f), "contains no serial number!\nI'm guessing from the Site Code",
                                    hdr$siteCode, "and metadata DB that this file is for", serno, ".\nIf I'm wrong, detections will appear under the wrong receiver!\n"))
                } else {
                    jobLog(j, paste("Warning: file", basename(f), "contains no serial number.\nIts Site Code is ", hdr$siteCode," which might be its serial number\nbut I can't find a deployment name for that receiver\nwhich matches `", substr(basename(f), 1, 3), "`,\nso I will not process this file"))
                }
            } else {
                rv$serno[i] = hdr$recv
            }
        },
        error = function(e) {
            jobLog(j, paste0("File ", basename(f), " failed to parse: ", e))
        })
    }

    ## flag which files are already in the file repo
    rv$fromRepo = beginsWith(rv$fullname, MOTUS_PATH$FILE_REPO)

    for (serno in unique(rv$serno)) {
        if (is.na(serno))
            next

        ## lock the receiver DB
        lockSymbol(serno)

        ## make sure we unlock the receiver DB when this function exits, even on error
        ## NB: the runMotusProcessServer script also drops any locks held by a given
        ## processServer after the latter exits.

        on.exit(lockSymbol(serno, lock=FALSE))

        src = getRecvSrc(serno, dbdir=dbdir)
        sql = safeSQL(src)
        files = tbl(src, "DTAfiles")

        iRecv = which(rv$serno==serno)
        for (i in iRecv) {
            f = rv$fullname[i]
            size = file.info(f)$size
            blob = readBin(f, raw(), n=size)
            ll = (blob %>% rawToChar)
            ## fix encoding: wherever there are non-ascii characters, assume 'Latin-1'
            ## see: https://github.com/jbrzusto/motusServer/issues/429
            Encoding(ll) = "latin1"
            ll = strsplit(ll, "\r\n", fixed=TRUE) [[1]]
            skip = FALSE
            x = NULL
            tryCatch({
                x <- readDTA(lines=ll)
                if(nrow(x$tags) > 0) {
                    tsr = range(x$tags$ts)
                    rv$ts[i] = tsr[1]
                    rv$tsLast[i] = tsr[2]
                }
            }, error = function(e) {
                rv$err[i] = as.character(e)
                skip <- TRUE
            })
            if (skip)
                next

            ## compute file hash, then check whether it already is in database
            fhash = digest(blob, algo="sha512", serialize=FALSE)

            bname = basename(f)
            if (files %>% filter(name==bname) %>% count %>% as.data.frame > 0) {
                rv$nameNew[i] = FALSE
            }

            this = files %>% filter(hash==fhash)

            if (this %>% count %>% as.data.frame > 0) {
                rv$dataNew[i] = FALSE
            } else {

                ## sqlite connection

                con = src$con

                ## write file record
                dbGetQuery(
                    con,
                    "insert into DTAfiles (name, size, tsBegin, tsEnd, tsDB, hash, motusJobID) values (:name, :size, :tsBegin, :tsEnd, :tsDB, :hash, :motusJobID)",
                    params = data_frame(
                        name       = bname,
                        size       = length(blob),
                        tsBegin    = min(x$tags$ts),
                        tsEnd      = max(x$tags$ts),
                        tsDB       = as.numeric(Sys.time()),
                        hash       = fhash,
                        motusJobID = as.integer(j)
                    ) %>% as.data.frame
                )
                rv$use[i] = TRUE
                rv$fid[i] = dbGetQuery(con, "select max(fileID) from DTAfiles")[[1,1]]

                ## write tags records

                ## FIXME: it would be nice to factor out antenna frequency,
                ## codeset, gain, lat/lon as we do for SG records.  This would
                ## make for much smaller data files.  But we can't do this by
                ## timestamp, as these can jump around significantly in the
                ## .DTA files, and so must use file (lexical) order.

                if (isTRUE(nrow(x$tags) > 0)) {
                    dbGetQuery(
                        con,
                        "insert or ignore into DTAtags (fileID, dtaline, ts, id, ant, sig, lat, lon, antFreq, gain, codeSet) values (:fileID, :dtaline, :ts, :id, :ant, :sig, :lat, :lon, :antFreq, :gain, :codeSet)",
                        params = data_frame(
                            fileID  = rv$fid[i],
                            dtaline = x$tags$dtaline,
                            ts      = x$tags$ts,
                            id      = x$tags$id,
                            ant     = x$tags$ant,
                            sig     = x$tags$sig,
                            lat     = x$tags$lat,
                            lon     = x$tags$lon,
                            antFreq = x$tags$antfreq,
                            gain    = x$tags$gain,
                            codeSet = x$tags$codeset
                        ) %>% as.data.frame
                    )
                }

                ## write boottime records
                if (isTRUE(length(x$boottimes) > 0)) {
                    dbGetQuery(
                        con,
                        "insert or ignore into DTAboot (ts, fileID) values (:ts, :fileID)",
                        params = data_frame(
                            ts = mdy_hms(x$boottimes),
                            fileID  = rv$fid[i]
                        ) %>% as.data.frame
                    )
                }
            }
        }
        ## save any files used in the receiver's folder in the file repo, if not already there.
        iMove = iRecv[rv$use[iRecv] & !rv$fromRepo[iRecv]]
        if (length(iMove) > 0) {
            newNames = moveFilesUniquely(rv$fullname[iMove], file.path(MOTUS_PATH$FILE_REPO, serno), copyLinkTargets=TRUE)
            ## some names had to change, so update them in the DB
            for (jj in seq(along = iMove)) {
                if (! is.na(newNames[jj]))
                    sql("update DTAfiles set name=:name where fileID=:fileID",
                        name = newNames[jj],
                        fileID = rv$fid[iMove[jj]])
            }
        }
        ## unlock the receiver and drop the source
        lockSymbol(x$recv, lock=FALSE)
        rm(sql)
        rm(src)
        gc()

    }
    ## trash files which did not come from repo and which are not being used
    toTrash(subset(rv, ! fromRepo & ! use)$fullname, j)
    return (rv)
}

#' does a string begin with another string
#' @param x string in which to look for prefix
#' @param y prefix to look for

beginsWith = function(x, y) {
    y == substr(x, 1, nchar(y))
}
jbrzusto/motus-R-package documentation built on May 18, 2019, 7:03 p.m.