bridgeUploadDateColumnName<-"bridgeUploadDate"
mPowerBatchStartColumnName<-"mPowerBatchStart"
hostNameColumnName<-"hostName"
batchStatusColumnName<-"batchStatus"
reportsSentCountColumnName<-"reportsSentCount"
inProgressStatusValue<-"inProgress"
# check for new data to process:
# The latest export is the last row in the 'parkinson-status' table.
# The 'mPowerBatchStatus' table tracks the processing of each batch:
# the first column is the bridgeUploadDate date, treated as an ID for the batch
# the second column is the date-time when processing began
# the third column is the ID of the host/server processing the batch
# the fourth column is the state of the processing (inProgress, completed, or failed)
#
# A host may start processing a batch if (1) there is no row in the mPowerBatchStatus
# table for the batch; or (2) there is a row, the processing is 'inProgress' but the
# date-time has timed out, and 'expireLease' is TRUE
#
# returns: NULL (if no new batch) or a query result with one row, locked for processing
#
checkForAndLockBridgeExportBatch<-function(bridgeStatusId, mPowerBatchStatusId, hostname, now, leaseTimeOut) {
bridgeStatusSql<-paste0("select uploadDate from ", bridgeStatusId, " ORDER BY uploadDate DESC LIMIT 1 OFFSET 0")
bridgeStatusValues<-synTableQuery(bridgeStatusSql)@values
if (nrow(bridgeStatusValues)==0) return(NULL) # an unusual case
if (nrow(bridgeStatusValues)!=1) stop(paste0("Expected 0-1 rows but got ", nrow(bridgeStatusValues)))
latestBridgeUploadDate<-bridgeStatusValues[1,1] # there's only one column in the result
if (is(latestBridgeUploadDate, "character")) latestBridgeUploadDate<-as.Date(latestBridgeUploadDate)
mPowerBatchSql<-paste0("select * from ", mPowerBatchStatusId, " where ",
bridgeUploadDateColumnName, "='", latestBridgeUploadDate, "'")
mPowerBatchStatusQueryResult<-synTableQuery(mPowerBatchSql)
mPowerBatchStatusValues<-mPowerBatchStatusQueryResult@values
if (nrow(mPowerBatchStatusValues)==0) {
# if no row then we can 'lock it'.
mPowerBatchStatusValues<-data.frame(
latestBridgeUploadDate,
now,
hostname,
inProgressStatusValue,
stringsAsFactors=F
)
names(mPowerBatchStatusValues)<-
c(bridgeUploadDateColumnName,
mPowerBatchStartColumnName,
hostNameColumnName,
batchStatusColumnName)
statusTable<-Table(mPowerBatchStatusQueryResult@schema, mPowerBatchStatusValues)
synStore(statusTable)
mPowerBatchStatusQueryResult<-synTableQuery(sprintf("select * from %s where %s='%s'",
mPowerBatchStatusId, bridgeUploadDateColumnName, latestBridgeUploadDate))
} else if (nrow(mPowerBatchStatusValues)==1) {
# if there IS a row, we can only process it if leaseTimeOut is specified AND
# processing=InProgress AND the start time is too old
if (!missing(leaseTimeOut) && mPowerBatchStatusValues[1,batchStatusColumnName]==inProgressStatusValue &&
now-mPowerBatchStatusValues[1,mPowerBatchStartColumnName]>leaseTimeOut) {
mPowerBatchStatusQueryResult@values[1,mPowerBatchStartColumnName]<-now
mPowerBatchStatusQueryResult@values[1,hostNameColumnName]<-hostname
synStore(mPowerBatchStatusQueryResult)
mPowerBatchStatusQueryResult<-synTableQuery(sprintf("select * from %s where %s='%s'",
mPowerBatchStatusId, bridgeUploadDateColumnName, latestBridgeUploadDate))
} else {
mPowerBatchStatusQueryResult<-NULL
}
} else {
stop(paste0("Expected 0-1 rows but got ", nrow(mPowerBatchStatusValues)))
}
if (!is.null(mPowerBatchStatusQueryResult) && nrow(mPowerBatchStatusQueryResult@values)!=1)
stop("Problem 'locking' the mPower batch for processing. Expected one row for the batch in the status table but found ", nrow(mPowerBatchStatusQueryResult@values), ".")
mPowerBatchStatusQueryResult
}
# This allows a unique host/worker name to be specified. If just one
# copy of the job is running, then this isn't important and we
# can let it default to "UNKNOWN".
getHostname<-function() {
result<-Sys.getenv("HOSTNAME")
if (nchar(result)==0) result<-"UNKNOWN"
result
}
markProcesingComplete<-function(batchStatusQueryResult, status, reportsSentCount) {
if (nrow(batchStatusQueryResult@values)!=1) stop(paste0("Expected one row but found ", nrow(batchStatusQueryResult@values)))
batchStatusQueryResult@values[1, batchStatusColumnName]<-status
batchStatusQueryResult@values[1, reportsSentCountColumnName]<-as.integer(reportsSentCount)
synStore(batchStatusQueryResult, retrieveData=TRUE)
}
getLastProcessedVersion<-function(df) {
lastProcessedVersion<-df[["LAST_VERSION"]]
names(lastProcessedVersion)<-df[["TABLE_ID"]]
lastProcessedVersion
}
lastProcessVersionToDF<-function(lastProcessedVersion) {
result<-data.frame(TABLE_ID=names(lastProcessedVersion), LAST_VERSION=lastProcessedVersion, stringsAsFactors=F)
row.names(result)<-NULL
result
}
mergeLastProcessVersionIntoToDF<-function(lastProcessedVersion, df) {
mergeDataFrames(df, lastProcessVersionToDF(lastProcessedVersion), "TABLE_ID")
}
# the the last row version of the given cleaned data table for which the given feature was computed
lastProcessedFeatureVersion<-function(lastProcessedFeatureVersionTableId, cleanedTableId, featureName, featureTableId) {
queryResult<-synTableQuery(paste0("SELECT * FROM ",
lastProcessedFeatureVersionTableId, " WHERE TABLE_ID='",
cleanedTableId, "' AND FEATURE='", featureName, "' AND OUTPUT_TABLE_ID='", featureTableId, "'"))
if (nrow(queryResult@values)==0) {
queryResult@values[1,"TABLE_ID"]<-cleanedTableId
queryResult@values[1,"FEATURE"]<-featureName
queryResult@values[1,"OUTPUT_TABLE_ID"]<-featureTableId
queryResult@values[1,"LAST_VERSION"]<-NA
}
queryResult
}
# given a data frame having row values named according to the Synapse table convention
# find the maximum value
getMaxRowVersion<-function(df) {
# TODO make this private method public
max(synapseClient:::parseRowAndVersion(row.names(df))[2,])
}
process_mpower_data<-function(eId, uId, pId, mId, tId, tlrId, vId1, vId2, wId, outputProjectId,
tappingFeatureTableId, tappingLeftFeatureTableId, tappingRightFeatureTableId, voiceFeatureTableId, balanceFeatureTableId, gaitFeatureTableId,
bridgeStatusId, mPowerBatchStatusId, lastProcessedVersionTableId, lastProcessedFeatureVersionTableId) {
# check if Bridge is done. If not, exit
hostname<-getHostname()
leaseTimeout<-as.difftime("06:00:00") # not used at this time
bridgeExportQueryResult<-checkForAndLockBridgeExportBatch(bridgeStatusId, mPowerBatchStatusId, hostname, Sys.time()) # no lease timeout given
if (is.null(bridgeExportQueryResult) || nrow(bridgeExportQueryResult@values)==0) return(NULL)
tryCatch({
reportsSentCount<-
process_mpower_data_bare(eId, uId, pId, mId, tId, tlrId, vId1, vId2, wId, outputProjectId,
tappingFeatureTableId, tappingLeftFeatureTableId, tappingRightFeatureTableId, voiceFeatureTableId, balanceFeatureTableId, gaitFeatureTableId,
bridgeStatusId, mPowerBatchStatusId, lastProcessedVersionTableId, lastProcessedFeatureVersionTableId)
markProcesingComplete(bridgeExportQueryResult, "complete", reportsSentCount)
},
error=function(e) {
message(e)
markProcesingComplete(bridgeExportQueryResult, "failed", 0)
})
}
# this entry point, which lacks the 'try-catch', is exposed for testing purposes
process_mpower_data_bare<-function(eId, uId, pId, mId, tId, tlrId, vId1, vId2, wId, outputProjectId,
tappingFeatureTableId, tappingLeftFeatureTableId, tappingRightFeatureTableId, voiceFeatureTableId, balanceFeatureTableId, gaitFeatureTableId,
bridgeStatusId, mPowerBatchStatusId, lastProcessedVersionTableId, lastProcessedFeatureVersionTableId) {
lastProcessedQueryResult<-synTableQuery(paste0("SELECT * FROM ", lastProcessedVersionTableId))
lastProcessedVersion<-getLastProcessedVersion(lastProcessedQueryResult@values)
######
# Data Cleaning
######
cat("Processing survey v1...\n")
eDatResult<-process_survey_v1(eId, lastProcessedVersion[eId])
eDat<-eDatResult$eDat
lastProcessedVersion[eId]<-eDatResult$maxRowVersion
cat("... done. # rows: ", nrow(eDat), ", max row version: ", eDatResult$maxRowVersion, "\n")
cat("Processing survey v2...\n")
uDatResult<-process_survey_v2(uId, lastProcessedVersion[uId])
uDat<-uDatResult$uDat
lastProcessedVersion[uId]<-uDatResult$maxRowVersion
cat("... done. # rows: ", nrow(uDat), ", max row version: ", uDatResult$maxRowVersion, "\n")
cat("Processing survey v3...\n")
pDatResult<-process_survey_v3(pId, lastProcessedVersion[pId])
pDat<-pDatResult$pDat
lastProcessedVersion[pId]<-pDatResult$maxRowVersion
cat("... done. # rows: ", nrow(pDat), ", max row version: ", pDatResult$maxRowVersion, "\n")
cat("Processing memory activity...\n")
mResults<-process_memory_activity(mId, lastProcessedVersion[mId])
mDat<-mResults$mDat
mFilehandleCols<-mResults$mFilehandleCols
lastProcessedVersion[mId]<-mResults$maxRowVersion
cat("... done. # rows: ", nrow(mDat), ", max row version: ", mResults$maxRowVersion, "\n")
cat("Processing tapping activity...\n")
tResults<-process_tapping_activity(tId, lastProcessedVersion[tId])
tDat<-tResults$tDat
tFilehandleCols<-tResults$tFilehandleCols
cat("... done. # rows: ", nrow(tDat))
for (id in names(tResults$maxRowProcessed)) {
lastProcessedVersion[id]<-tResults$maxRowProcessed[[id]]
cat(", max row version for ", id, ": ", tResults$maxRowProcessed[[id]])
}
cat("\n")
cat("Processing tapping activity left/right ...\n")
tlrResults<-process_tapping_leftright_activity(tlrId, lastProcessedVersion[tlrId])
tlrDat<-tlrResults$tlrDat
tlrFilehandleCols<-tlrResults$tlrFilehandleCols
cat("... done. # rows: ", nrow(tlrDat))
for (id in names(tlrResults$maxRowProcessed)) {
lastProcessedVersion[id]<-tlrResults$maxRowProcessed[[id]]
cat(", max row version for ", id, ": ", tlrResults$maxRowProcessed[[id]])
}
cat("\n")
cat("Processing voice activity...\n")
vResults<-process_voice_activity(vId1, vId2, lastProcessedVersion[vId1], lastProcessedVersion[vId2])
vDat<-vResults$vDat
vFilehandleCols<-vResults$vFilehandleCols
cat("... done. # rows: ", nrow(vDat))
for (id in names(vResults$maxRowProcessed)) {
lastProcessedVersion[id]<-vResults$maxRowProcessed[[id]]
cat(", max row version for ", id, ": ", vResults$maxRowProcessed[[id]])
}
cat("\n")
cat("Processing walking activity...\n")
wResults<-process_walking_activity(wId, lastProcessedVersion[wId])
wDat<-wResults$wDat
cat("... done. # rows: ", nrow(wDat))
for (id in names(wResults$maxRowProcessed)) {
lastProcessedVersion[id]<-wResults$maxRowProcessed[[id]]
cat(", max row version for ", id, ": ", wResults$maxRowProcessed[[id]])
}
cat("\n")
cat("Cleaning up missing med data...\n")
clean_up_result<-cleanup_missing_med_data(mDat, tDat, vDat, wDat)
mDat<-clean_up_result$mDat
tDat<-clean_up_result$tDat
vDat<-clean_up_result$vDat
wDat<-clean_up_result$wDat
cat("... done.\n")
cat("Storing cleaned data...\n")
nameToTableIdMap<-store_cleaned_data(outputProjectId, eDat, uDat, pDat, mDat, tDat, tlrDat, vDat, wDat, mFilehandleCols, tFilehandleCols, tlrFilehandleCols, vFilehandleCols)
cat("... done.\n")
# update the last processed version for the cleaned data tables
cat("Updating 'last processed' table for cleaned data tables...\n")
lastProcessedQueryResult@values<-mergeLastProcessVersionIntoToDF(
lastProcessedVersion, lastProcessedQueryResult@values)
synStore(lastProcessedQueryResult)
cat("... done.\n")
# **** compute features ****
tappingCleanedDataId<-nameToTableIdMap[["Tapping Activity"]]
if (is.null(tappingCleanedDataId)) stop("No cleaned Tapping Activity data")
lp<-lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, tappingCleanedDataId, "tap_count", tappingFeatureTableId)
newLastProcessedVersion<-computeTappingFeatures(tappingCleanedDataId, lp@values[1, "LAST_VERSION"], tappingFeatureTableId)
if (!is.na(newLastProcessedVersion)) {
lp@values[1, "LAST_VERSION"]<-newLastProcessedVersion
synStore(lp)
}
## NOW DO FOR TAPPING - PER HAND
tappingLeftrightCleanedDataId<-nameToTableIdMap[["Tapping Activity - Left and Right"]]
if (is.null(tappingLeftrightCleanedDataId)) stop("No cleaned Tapping Left and Right Activity data")
## LEFT
lp<-lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, tappingLeftrightCleanedDataId, "tap_count", tappingLeftFeatureTableId)
newLastProcessedVersion<-computeTappingFeatures(tappingLeftrightCleanedDataId, lp@values[1, "LAST_VERSION"], tappingLeftFeatureTableId, "left")
if (!is.na(newLastProcessedVersion)) {
lp@values[1, "LAST_VERSION"]<-newLastProcessedVersion
synStore(lp)
}
## RIGHT
lp<-lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, tappingLeftrightCleanedDataId, "tap_count", tappingRightFeatureTableId)
newLastProcessedVersion<-computeTappingFeatures(tappingLeftrightCleanedDataId, lp@values[1, "LAST_VERSION"], tappingRightFeatureTableId, "right")
if (!is.na(newLastProcessedVersion)) {
lp@values[1, "LAST_VERSION"]<-newLastProcessedVersion
synStore(lp)
}
walkingCleanedDataId<-nameToTableIdMap[["Walking Activity"]]
if (is.null(walkingCleanedDataId)) stop("No cleaned Walking Activity data")
# compute gait and balance features
lp<-lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, walkingCleanedDataId, "F0XY", gaitFeatureTableId)
lastProcessedGaitVersion<-computeGaitFeatures(walkingCleanedDataId,
lp@values[1, "LAST_VERSION"], gaitFeatureTableId)
if (!is.na(lastProcessedGaitVersion)) {
lp@values[1, "LAST_VERSION"]<-lastProcessedGaitVersion
synStore(lp)
}
lp<-lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, walkingCleanedDataId, "zcrAA", balanceFeatureTableId)
lastProcessedBalanceVersion<-computeBalanceFeatures(walkingCleanedDataId,
lp@values[1, "LAST_VERSION"], balanceFeatureTableId)
if (!is.na(lastProcessedBalanceVersion)) {
lp@values[1, "LAST_VERSION"]<-lastProcessedBalanceVersion
synStore(lp)
}
## ------------------------------------------------------------
## Compute voice features
## ------------------------------------------------------------
voiceCleanedDataId <- nameToTableIdMap[["Voice Activity"]]
if (is.null(voiceCleanedDataId)) stop("No cleaned Voice Activity data")
lp <- lastProcessedFeatureVersion(lastProcessedFeatureVersionTableId, voiceCleanedDataId, "medianF0", voiceFeatureTableId)
lastProcessedVoiceVersion <- computeVoiceFeatures(
voiceCleanedDataId,
lp@values[1, "LAST_VERSION"],
voiceFeatureTableId)
if (!is.na(lastProcessedVoiceVersion)) {
lp@values[1, "LAST_VERSION"] <- lastProcessedVoiceVersion
synStore(lp)
}
## ------------------------------------------------------------
# Now compute the normalized features
demographicsCleanedDataId<-nameToTableIdMap[["Demographics Survey"]]
if (is.null(demographicsCleanedDataId)) stop("No cleaned Demographics Survey data")
tables<-list(demographics=demographicsCleanedDataId, tapping=tappingCleanedDataId, tappingLeftright=tappingLeftrightCleanedDataId,
voice=voiceCleanedDataId, walking=walkingCleanedDataId)
features<-list(tapping=tappingFeatureTableId, gait=gaitFeatureTableId, tappingLeft=tappingLeftFeatureTableId, tappingRight=tappingRightFeatureTableId,
balance=balanceFeatureTableId, voice=voiceFeatureTableId)
thirtyDayWindow <- list(start=Sys.Date()-as.difftime(30, units="days"), end=Sys.Date())
featureNames <- list(balance='zcrAA', gait='F0XY',
tap='tap_count', tapLeft='tap_count', tapRight='tap_count',
voice="medianF0")
normalizedFeatures<-runNormalization(tables, features, featureNames, thirtyDayWindow)
nRecords<-length(normalizedFeatures)
reportsSentCount<-0
if (nRecords>0) {
# Now call the Visualization Data API
#https://sagebionetworks.jira.com/wiki/display/BRIDGE/mPower+Visualization#mPowerVisualization-WritemPowerVisualizationData
cat("Invoking visualization API...\n")
url <- bridger:::uriToUrl("/parkinson/visualization", bridger:::.getBridgeCache("bridgeEndpoint"))
testHealthCodes<-c(
'fb0c8ffc-52d7-492f-b5c1-9fdb7cce247f',# Alx's admin account
'afba0a91-dddd-4927-85d7-0cf7c5ab3cfe',#brian.bot+test-report
'76e03824-93de-4bd2-81b8-6ba0cf8c244d',#mike.kellen+test-report
#'e550d6ea-8058-4288-af65-27a494141c2b',#thea.norman -- can't use since she said "no" to the "diagnosed with Parkinson" question
'd66f8651-40fc-4b77-80ba-07d6da274493',#dwayne.jeng+test01
'f8266cc7-eb8e-4fc3-9255-72b7c2649980'#amy.truong
)
for (i in 1:min(nRecords,length(testHealthCodes))) { # for now, just send the first few reports, to the test health codes
healthCode<-names(normalizedFeatures)[i]
normdata <- normalizedFeatures[[healthCode]]
if (inherits(normdata, "try-error")) {
message(sprintf('skipping %s due to error in processing', healthCode))
} else {
jsonStrings <- visDataToJSON(testHealthCodes[i], normalizedFeatures[[healthCode]])
for (jsonString in jsonStrings) {
print(jsonString)
# call Bridge Visualization API
curl<-getCurlHandle()
response<-getURL(url, postfields=jsonString, customrequest="POST",
.opts=bridger:::.getBridgeCache("opts"), httpheader=bridger:::.getBridgeCache("httpheader"), curl=curl)
# response is "Visualization created."
httpStatus<-getCurlInfo(curl)$response.code
if (httpStatus>=400) stop(response)
}
if (length(jsonStrings)>0) reportsSentCount<-reportsSentCount+1
}
}
cat("... done.\n")
}
cat("... ALL DONE!!!\n")
reportsSentCount
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.