R/pushToMotus.R

#' Push any new tag detections to the motus master database.
#'
#' For now, this pushes into the transfer tables in the MySQL "motus"
#' database on the local host, from where the Motus server pulls
#' data periodically.
#'
#' Any batch whose ID is not in the receiver's motusTX table is sent
#' to the transfer tables.
#'
#' @param src dplyr src_sqlite to receiver database
#'
#' @param batchStatus integer to which the `status` field of each transferred
#' batch is set once complete. Default: 1, which means the batch will
#' be immediately servable by the \code{\link{dataServer()}}.
#'
#' @return no return value
#'
#' @export
#'
#' @author John Brzustowski \email{jbrzusto@@REMOVE_THIS_PART_fastmail.fm}

pushToMotus = function(src, batchStatus=1) {
    con = src$con
    sql = function(...) dbGetQuery(con, sprintf(...))

    ## get batches without a record in motusTX
    motusTX = tbl(src, "motusTX")
    batches = tbl(src, "batches")
    newBatches = batches %>%
        anti_join (motusTX, by="batchID") %>%
        arrange(batchID) %>%
        filter(!is.na(tsStart)) %>%
        collect

    if (nrow(newBatches) == 0)
        return()

    deviceID = getMotusDeviceID(src)
    if(! isTRUE(deviceID > 0))
        stop("invalid motus device ID for receiver with DB at ", attr(src$con, "dbname"))

    ## Ensure the device ID has been set on all batches.
    ## It is a constant, and the only reason we do this
    ## is to have the same schema for receiver and master
    ## databases.
    sql("update batches set motusDeviceID=%d", deviceID)

    newBatches$motusDeviceID = deviceID

    ## lock the master DB, to prevent client-side problems with
    ## non-monotonic completion of batch transfers; see
    ## https://github.com/jbrzusto/motusServer/issues/301

    lockSymbol("masterDB")

    ## make sure we unlock the master DB when this function exits,
    ## even on error.  Note:  the runMotusProcessServer script also drops
    ## any locks held by a given processServer after the latter exits.

    on.exit(lockSymbol("masterDB", lock=FALSE))

    ## open the motus transfer table

    mtcon = openMotusDB()$con ## also ensures global MotusDB exists

    ## Writing a record to the motus batches table indicates that
    ## batch is ready for transfer, so must be the last thing we do
    ## for a batch, after writing all hits, runs, etc.

    ## So we need to reserve a block of nrow(b) IDs in motus.batches

    firstMotusBatchID = motusReserveKeys("batches", nrow(newBatches))
    offsetBatchID = firstMotusBatchID - newBatches$batchID[1]

    ## ---------- transfer tag ambiguities ----------
    ## This is a two-way process, in that we want to have a unique
    ## master ambigID across receivers for each ambiguity of a
    ## particular set of (motus) tagIDs.

    ## 1. get tag ambiguities from this receiver which don't already have a master
    ## ambigID:

    ambig = tbl(src, "tagAmbig") %>% filter (is.null(masterAmbigID)) %>% collect

    if (nrow(ambig) > 0) {

        ## 2. join to the master ambiguity table by tagIDs
        masterAmbig = as.tbl(MotusDB("select * from tagAmbig"))

        joinAmbig = ambig %>% left_join (masterAmbig, by=c("motusTagID1", "motusTagID2", "motusTagID3", "motusTagID4", "motusTagID5", "motusTagID6")) %>% collect

        ## 3. create entries in the master tagAmbig table for any not
        ## already there

        newA = which(is.na(joinAmbig$ambigID.y))
        n = length(newA)
        if (n > 0) {
            ## note use of negative n to reserve negative, descending keys
            firstMasterAmbigID = motusReserveKeys("tagAmbig", -n)

            ## fill in new masterAmbigIDs
            joinAmbig$ambigID.y[newA] = seq(from = firstMasterAmbigID, by = -1, length = n)

            ## create table with new tag ambiguities to be added to master tagAmbig table
            newAmbig = joinAmbig[newA, ] %>% transmute_(
                                                 ambigID="ambigID.y",
                                                 motusTagID1 = "motusTagID1",
                                                 motusTagID2 = "motusTagID2",
                                                 motusTagID3 = "motusTagID3",
                                                 motusTagID4 = "motusTagID4",
                                                 motusTagID5 = "motusTagID5",
                                                 motusTagID6 = "motusTagID6",
                                                 ambigProjectID = 0) %>% as.data.frame
            ## write new tag ambiguities
            ## but work around bug in RMySQL
            dbWriteTable(mtcon, "temptagAmbig", newAmbig, overwrite=TRUE, row.names=FALSE)
            MotusDB("
replace into
   tagAmbig (
      ambigID,
      motusTagID1,
      motusTagID2,
      motusTagID3,
      motusTagID4,
      motusTagID5,
      motusTagID6,
      ambigProjectID
   )
   select
      ambigID,
      motusTagID1,
      motusTagID2,
      motusTagID3,
      motusTagID4,
      motusTagID5,
      motusTagID6,
      ambigProjectID
   from
      temptagAmbig
")
            MotusDB("drop table temptagAmbig")
        }

        ## 4. record the masterAmbigID for each tag ambiguity in the receiver DB
        copy_to(src, joinAmbig, "joinAmbig")  ## add as a temporary table
        sql("replace into tagAmbig select `ambigID.x` as ambigID, `ambigID.y` as masterAmbigID, motusTagID1, motusTagID2, motusTagID3, motusTagID4, motusTagID5, motusTagID6 from joinAmbig")
    }

    ## ----------  copy batches ----------

    ## update the batchID fields for the batches, then insert them into motus.batches
    ## The default value of -10 for the status field means these records represent
    ## incomplete batches, not to be served by the dataServer until the status field
    ## is set to +1 at the end of this function.  If the status is set to -1, these
    ## data are for debugging only, and not returned by the dataServer unless explicitly
    ## requested.

    txBatches = newBatches
    txBatches$batchID = txBatches$batchID + offsetBatchID

    txBatches$status = -10

    dbWriteTable(mtcon, "batches", txBatches %>% as.data.frame, append=TRUE, row.names=FALSE)

    ## set bogus offsets, in case no record of these types; permits update to
    ## motusTX table in sqlite database to work.

    offsetRunID = 0
    offsetHitID = 0

    ## number of rows to move at a time
    CHUNK_ROWS = 50000

    ## for each batch, transfer associated records

    for (i in seq_len(nrow(newBatches))) {

        b = newBatches[i,]
        txBatchID = txBatches$batchID[i]

        ## accumulate unique tag project IDs:
        tagDepProjIDs = c()

        ## create a map from local (i.e. valid in the receiver DB) run IDs to global ones (i.e. valid in the master DB)

        ## first, grab the portion of the map for runs touched in this batch but begun earlier.
        runIDtable = sql("select cast(t1.runID as text), cast(t1.runID + t2.offsetRunID as real) from runs as t1 join motusTX as t2 on t1.batchIDbegin=t2.batchID where t1.runID in (select distinct runID from batchRuns where batchID=%d)", b$batchID)
        ## runIDMap will be a numeric vector whose names are the local runIDs (as strings) and whose values are the global runIDs
        if (nrow(runIDtable) > 0) {
            runIDMap = runIDtable[[2]]
            names(runIDMap) = runIDtable[[1]]
        } else {
            runIDMap = numeric(0)
        }

        ## ----------  copy new runs  ----------

        ## get count of new runs and 1st run ID for this batch
        runInfo = sql("select count(*), min(runID) from runs where batchIDbegin = %d", b$batchID)

        if (runInfo[1,1] > 0) {

            ## reserve the required number of runIDs
            firstMotusRunID = motusReserveKeys("runs", runInfo[1,1])
            offsetRunID = firstMotusRunID - runInfo[1,2]

            res = dbSendQuery(con, sprintf("select * from runs where batchIDBegin = %d order by runID", b$batchID))
            dbClearResult(res)

            ## grab runs from this batch, substituting any ambiguous tag IDs with their master (global) version:
            res = dbSendQuery(con, sprintf("
select
   runID,
   batchIDbegin,
   tsBegin,
   tsEnd,
   done,
   ifnull(t2.masterAmbigID, t1.motusTagID) as motusTagID,
   ant,
   len
from
   runs as t1
   left join tagAmbig as t2 on t1.motusTagID=t2.ambigID
where
   t1.batchIDbegin = %d
order by
   t1.runID
", b$batchID))

            repeat {
                runs = dbFetch(res, CHUNK_ROWS)
                if (nrow(runs) == 0)
                    break
                ## add new runs to the runIDMap
                runs$runID = runIDMap[as.character(runs$runID)] = runs$runID + offsetRunID
                runs$batchIDbegin = txBatchID
                dbWriteTable(mtcon, "runs", runs, append=TRUE, row.names=FALSE)

                ## Set the tagDepProjectID for each run; it will be
                ## the project ID of the latest deployment of that tag
                ## that which begins no later than the start of the
                ## run.  We allow for tsEnd of a tagDep to be null or
                ## 0, meaning "still active".  And we allow a slop of
                ## 20 minutes in the deployment time, to catch runs which
                ## started slightly before the nominal deployment time
                ## (see https://github.com/jbrzusto/find_tags/issues/41 )

                MotusDB("
update
   runs as t1
set
   t1.tagDepProjectID = (
      select
         t2.projectID
      from
         tagDeps as t2
      where
         t2.motusTagID=t1.motusTagID
         and t2.tsStart - 1200 <= t1.tsBegin
      order by
         t2.tsStart desc
      limit 1)
where
   t1.runID between %d and %d
",
runs$runID[1], tail(runs$runID, 1))

            ## append these runs to the batchRuns table
                MotusDB("
insert
   into batchRuns (
      select
         batchIDbegin as batchID,
         runID,
         tagDepProjectID
      from
         runs
      where
         runID between %d and %d
   )
",
runs$runID[1], tail(runs$runID, 1))

            }
            dbClearResult(res)
        }

        ## ----------  update existing runs that overlap this batch  ----------

        ## For runs which began before this batch and which were processed
        ## during this batch, update their length, tsEnd, and done fields

        res = dbSendQuery(con, sprintf("
select
   *
from
   batchRuns as t0
   join runs as t1 on t1.runID=t0.runID and t1.batchIDbegin <> t0.batchID
   join motusTX as t2 on t2.batchID=t1.batchIDbegin
where
   t0.batchID = %d
order by t1.runID
", b$batchID))
        repeat {
            runUpd = dbFetch(res, CHUNK_ROWS)
            if (nrow(runUpd) == 0)
                break

            runUpd$runID        = runUpd$runID        + runUpd$offsetRunID    ## offsetRunID depends on batchID via the join above

            ## create a temporary table from which to update using an update with join
            MotusDB("
create temporary table if not exists
   tempRunUpdates (
      runID bigint(20) primary key,
      tsEnd double,
      len int(11),
      done tinyint(4)
   )
")
            MotusDB("truncate table tempRunUpdates")

            ## add updated portions of records to the temporary table)
            dbWriteTable(mtcon, "tempRunUpdates", runUpd[, c("runID", "tsEnd", "len", "done")], append=TRUE, row.names=FALSE)

            ## update the runs table via a join with tempRunUpdates
            MotusDB("
update
   tempRunUpdates as t1
   join runs as t2 on t2.runID = t1.runID
set
   t2.tsEnd = t1.tsEnd,
   t2.len   = t1.len,
   t2.done  = t1.done
")

            ## add records to batchRuns table; unlike the case of new runs, the runIDs for this
            ## set are not necessarily consecutive, so we have to explicitly supply them.
            MotusDB("
insert
   into batchRuns (
      select
         %d as batchID,
         t1.runID,
         t2.tagDepProjectID
      from
         tempRunUpdates as t1
         join runs as t2 on t1.runID=t2.runID
   )
",
txBatchID)
        }
        dbClearResult(res)


        ## ----------  copy hits  ----------

        ## get count of hits and 1st hit ID for this batch
        hitInfo = sql("select count(*), min(hitID) from hits where batchID = %d", b$batchID)
        if (hitInfo[1,1] > 0) {
            ## reserve the required number of hitIDs
            firstMotusHitID = motusReserveKeys("hits", hitInfo[1,1])
            offsetHitID = firstMotusHitID - hitInfo[1,2]

            res = dbSendQuery(con, sprintf("select * from hits where batchID = %d order by hitID", b$batchID))

            repeat {
                hits = dbFetch(res, CHUNK_ROWS)
                if (nrow(hits) == 0)
                    break
                hits$hitID   = hits$hitID   + offsetHitID
                ## use the runIDMap to correct runIDs; valid for both new and continuing runs
                hits$runID   = runIDMap[as.character(hits$runID)]
                hits$batchID = hits$batchID + offsetBatchID
                dbWriteTable(mtcon, "hits", hits, append=TRUE, row.names=FALSE)

                ## copy the helper field tagDepProjectID from the value for the associated run
                MotusDB("
update
   hits as t1
join
   runs as t2 on t2.runID=t1.runID
set
   t1.tagDepProjectID = t2.tagDepProjectID
where
   t1.hitID between %.0f and %.0f
", hits$hitID[1], tail(hits$hitID, 1))
            }
            dbClearResult(res)
        }
        ## ----------  copy gps  ----------

        gps = sql("select * from gps where batchID = %d order by ts", b$batchID)
        gps$batchID = gps$batchID + offsetBatchID
        dbWriteTable(mtcon, "gps", gps, append=TRUE, row.names=FALSE)

        ## ----------  copy batchProgs  ----------
        bpr = sql("select * from batchProgs where batchID = %d", b$batchID)
        bpr$batchID = bpr$batchID + offsetBatchID

        if (nrow(bpr) > 0)
            dbWriteTable(mtcon, "batchProgs", bpr, append=TRUE, row.names=FALSE)


        ## ----------  copy batchParams  ----------
        bpa = sql("select * from batchParams where batchID = %d", b$batchID)
        bpa$batchID = bpa$batchID + offsetBatchID
        bpa$paramVal = as.character(bpa$paramVal)
        if (nrow(bpa) > 0)
            dbWriteTable(mtcon, "batchParams", bpa, append=TRUE, row.names=FALSE)

        ## ----------  copy pulseCounts  ----------
        pcs = sql("select * from pulseCounts where batchID = %d", b$batchID)
        pcs$batchID = pcs$batchID + offsetBatchID

        if (nrow(pcs) > 0)
            dbWriteTable(mtcon, "pulseCounts", pcs, append=TRUE, row.names=FALSE)

        ## --------- update projBatch helper table -----

        ## Note: we do this as a nested query so that the inner one
        ## can take advantage of a covering index on the batchRuns
        ## table.  i.e. if instead we used the flat query `select %d
        ## as batchID, distinct (tagDepProjectID) from batchRuns where
        ## batchID=%d` then mariadb, apparently not smart enough to
        ## grab the distinct values of tagDepProjectID from the index,
        ## would instead scan a portion of the batchRuns table, which
        ## can be very large (e.g. 6.9M runs for batch 68902)

        MotusDB("
insert
   into projBatch (
      select
         tagDepProjectID,
         %d as batchID
      from
         (
            select
               distinct tagDepProjectID
            from
               batchRuns
            where
               batchID = %d
               and tagDepProjectID is not null
         ) as t
   )
", txBatchID, txBatchID)
        ## Mark what has been transferred

        sql("insert into motusTX (batchID, tsMotus, offsetBatchID, offsetRunID, offsetHitID) \
                         values  (  %d   , %.4f   , %d           , %.0f       , %.0f       )",
            b$batchID,
            as.numeric(Sys.time()),
            offsetBatchID,
            offsetRunID,
            offsetHitID
            )
    }

    ## Set the recvDepProjectID for each batch; it will be the project
    ## ID of the latest deployment of that receiver which overlaps the
    ## batch.  We allow for tsEnd of a recvDep to be null or 0,
    ## meaning "still active".  Batches not overlapped by a receiver
    ## deployment will have the motusProjectID field used for this purpose,
    ## even if that is null.  For most data processing runs, that will be the
    ## projectID selected by the user at data upload time.

    MotusDB("update batches as t1 set t1.recvDepProjectID = coalesce((select t2.projectID from recvDeps as t2 where t2.deviceID=t1.motusDeviceID and t2.tsStart <= t1.tsEnd and (t2.tsEnd is null or t2.tsEnd = 0 or t1.tsStart <= t2.tsEnd) order by t2.tsStart desc limit 1), t1.motusProjectID) where t1.batchID between %d and %d",
            txBatches$batchID[1],
            tail(txBatches$batchID, 1))

    ## To indicate they are complete and ready for transfer, set
    ## status on these batches.

    MotusDB("update batches set status = %d where status = -10 and batchID between %d and %d",
            batchStatus,
            txBatches$batchID[1],
            tail(txBatches$batchID, 1))

    invisible(NULL)
}
jbrzusto/motus-R-package documentation built on May 18, 2019, 7:03 p.m.