#' Merge a batch of Lotek .DTA files with an existing database.
#'
#' @param files either a character vector of full paths to files, or the full
#' path to a directory, which will be searched recursively for .DTA files.
#'
#' @param j job, whose ID will be recorded for any new / changed DTAfile record
#'
#' @param dbdir path to folder with existing receiver databases; defaults
#' to \code{MOTUS_PATH$RECV}
#'
#' @return a data_frame reporting the fate of each file, with these columns:
#' \enumerate{
#' \item fullname - full path to filename
#' \item nameNew - TRUE iff file of this name was not yet in database
#' \item dataNew - TRUE iff file with these contents was not yet in database
#' \item use - TRUE iff file copied to database
#' \item err - if not NA, any error message generated by trying to parse the file
#' \item serno - serial number (e.g. "Lotek-1234") of receiver this file is from
#' \item ts - first timestamp from a detection in this file
#' \item tsLast - last timestamp from a detection in this file
#' \item inRepo - file came from file_repo (rather than being a new upload; happens when fully reprocessing)
#' \item fid - fileID, if file in or added to DB
#' }
#'
#' @note If err is not NA for a file, then other fields for that file
#' might not be set appropriately in the return value.
#'
#' @details Any files which are not identical to an existing file are saved
#' in the file repository at \code{MOTUS_PATH$FILE_REPO}. The names will be
#' modified if necessary to avoid collisions with existing files.
#'
#' Remaining files are deleted.
#'
#' @export
#'
#' @author John Brzustowski \email{jbrzusto@@REMOVE_THIS_PART_fastmail.fm}
ltMergeFiles = function(files, j, dbdir=MOTUS_PATH$RECV) {
if (! isTRUE(is.character(files) && all(file.exists(files)))) {
warning("invalid or non-existent input files specified")
return()
}
if (file.info(files[1])$isdir)
rv = dir(files, recursive=TRUE, full.names=TRUE, pattern=".*\\.dta$", ignore.case=TRUE)
else
rv = sort(files)
rv = data_frame(fullname = rv, nameNew = TRUE, dataNew = TRUE, use = FALSE, err = NA, serno=as.character(NA), ts = as.numeric(NA), tsLast = as.numeric(NA), fid=as.numeric(NA))
## read serial numbers from DTA files, returning NA for any files which fail to parse or for which we can't
## get a serial number
rv$serno = NA
for (i in seq(along = rv$serno)) {
f = rv$fullname[i]
tryCatch({
hdr = readDTA(f, numLines=20)
if (hdr$recv == "Lotek-NA") {
## For some .DTA files, the serial number appears as the "Site Code"
## We'll assume this is the case, but verify against the deployment metadata whether
## the filename begins with the same 3 letters as any deployment name for that receiver.
serno = paste0("Lotek-", hdr$siteCode)
depInfo = MetaDB("select serno, name from recvDeps where serno='%s'", serno)
fnprefix = substring(basename(f), 1, 3)
if (isTRUE(any(grepl(paste0("^", fnprefix), depInfo$name, ignore.case=TRUE, perl=TRUE)))) {
rv$serno[i] = serno
jobLog(j, paste("Warning: file", basename(f), "contains no serial number!\nI'm guessing from the Site Code",
hdr$siteCode, "and metadata DB that this file is for", serno, ".\nIf I'm wrong, detections will appear under the wrong receiver!\n"))
} else {
jobLog(j, paste("Warning: file", basename(f), "contains no serial number.\nIts Site Code is ", hdr$siteCode," which might be its serial number\nbut I can't find a deployment name for that receiver\nwhich matches `", substr(basename(f), 1, 3), "`,\nso I will not process this file"))
}
} else {
rv$serno[i] = hdr$recv
}
},
error = function(e) {
jobLog(j, paste0("File ", basename(f), " failed to parse: ", e))
})
}
## flag which files are already in the file repo
rv$fromRepo = beginsWith(rv$fullname, MOTUS_PATH$FILE_REPO)
for (serno in unique(rv$serno)) {
if (is.na(serno))
next
## lock the receiver DB
lockSymbol(serno)
## make sure we unlock the receiver DB when this function exits, even on error
## NB: the runMotusProcessServer script also drops any locks held by a given
## processServer after the latter exits.
on.exit(lockSymbol(serno, lock=FALSE))
src = getRecvSrc(serno, dbdir=dbdir)
sql = safeSQL(src)
files = tbl(src, "DTAfiles")
iRecv = which(rv$serno==serno)
for (i in iRecv) {
f = rv$fullname[i]
size = file.info(f)$size
blob = readBin(f, raw(), n=size)
ll = (blob %>% rawToChar)
## fix encoding: wherever there are non-ascii characters, assume 'Latin-1'
## see: https://github.com/jbrzusto/motusServer/issues/429
Encoding(ll) = "latin1"
ll = strsplit(ll, "\r\n", fixed=TRUE) [[1]]
skip = FALSE
x = NULL
tryCatch({
x <- readDTA(lines=ll)
if(nrow(x$tags) > 0) {
tsr = range(x$tags$ts)
rv$ts[i] = tsr[1]
rv$tsLast[i] = tsr[2]
}
}, error = function(e) {
rv$err[i] = as.character(e)
skip <- TRUE
})
if (skip)
next
## compute file hash, then check whether it already is in database
fhash = digest(blob, algo="sha512", serialize=FALSE)
bname = basename(f)
if (files %>% filter(name==bname) %>% count %>% as.data.frame > 0) {
rv$nameNew[i] = FALSE
}
this = files %>% filter(hash==fhash)
if (this %>% count %>% as.data.frame > 0) {
rv$dataNew[i] = FALSE
} else {
## sqlite connection
con = src$con
## write file record
dbGetQuery(
con,
"insert into DTAfiles (name, size, tsBegin, tsEnd, tsDB, hash, motusJobID) values (:name, :size, :tsBegin, :tsEnd, :tsDB, :hash, :motusJobID)",
params = data_frame(
name = bname,
size = length(blob),
tsBegin = min(x$tags$ts),
tsEnd = max(x$tags$ts),
tsDB = as.numeric(Sys.time()),
hash = fhash,
motusJobID = as.integer(j)
) %>% as.data.frame
)
rv$use[i] = TRUE
rv$fid[i] = dbGetQuery(con, "select max(fileID) from DTAfiles")[[1,1]]
## write tags records
## FIXME: it would be nice to factor out antenna frequency,
## codeset, gain, lat/lon as we do for SG records. This would
## make for much smaller data files. But we can't do this by
## timestamp, as these can jump around significantly in the
## .DTA files, and so must use file (lexical) order.
if (isTRUE(nrow(x$tags) > 0)) {
dbGetQuery(
con,
"insert or ignore into DTAtags (fileID, dtaline, ts, id, ant, sig, lat, lon, antFreq, gain, codeSet) values (:fileID, :dtaline, :ts, :id, :ant, :sig, :lat, :lon, :antFreq, :gain, :codeSet)",
params = data_frame(
fileID = rv$fid[i],
dtaline = x$tags$dtaline,
ts = x$tags$ts,
id = x$tags$id,
ant = x$tags$ant,
sig = x$tags$sig,
lat = x$tags$lat,
lon = x$tags$lon,
antFreq = x$tags$antfreq,
gain = x$tags$gain,
codeSet = x$tags$codeset
) %>% as.data.frame
)
}
## write boottime records
if (isTRUE(length(x$boottimes) > 0)) {
dbGetQuery(
con,
"insert or ignore into DTAboot (ts, fileID) values (:ts, :fileID)",
params = data_frame(
ts = mdy_hms(x$boottimes),
fileID = rv$fid[i]
) %>% as.data.frame
)
}
}
}
## save any files used in the receiver's folder in the file repo, if not already there.
iMove = iRecv[rv$use[iRecv] & !rv$fromRepo[iRecv]]
if (length(iMove) > 0) {
dir.create(file.path(MOTUS_PATH$FILE_REPO, serno), showWarnings=FALSE, recursive=TRUE)
newNames = moveFilesUniquely(rv$fullname[iMove], file.path(MOTUS_PATH$FILE_REPO, serno), copyLinkTargets=TRUE)
## some names had to change, so update them in the DB
for (jj in seq(along = iMove)) {
if (! is.na(newNames[jj]))
sql("update DTAfiles set name=:name where fileID=:fileID",
name = newNames[jj],
fileID = rv$fid[iMove[jj]])
}
}
## unlock the receiver and drop the source
lockSymbol(serno, lock=FALSE)
rm(sql)
rm(src)
gc()
}
## trash files which did not come from repo and which are not being used
toTrash(subset(rv, ! fromRepo & ! use)$fullname, j)
return (rv)
}
#' does a string begin with another string
#' @param x string in which to look for prefix
#' @param y prefix to look for
beginsWith = function(x, y) {
y == substr(x, 1, nchar(y))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.