# Copyright 2020 Observational Health Data Sciences and Informatics
#
# This file is part of treatmentCycleExtraction
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#' @param targetParameter
#' @param oracleTempSchema
#' @param cdmDatabaseSchema
#' @param cohortDatabaseSchema
#' @param oncologyDatabaseSchema
#' @param targetCohortId
#' @param maxCores Number of cores in clusterApply
#' @param parameters
#' @param chemotherapyRecords
#' @param cohortTable
#' @param episodeTable
#' @param episodeEventTable
#' @param connectionDetails
#' @import dplyr
#' @import tidyr
#' @export
# 1. Target chemotherapy records extraction tool
chemotherapyRecordsExtraction <- function(targetParameter,
connectionDetails,
cohortTable,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId,
maxCores
)
{
if (class(targetParameter)!="regimenLists") stop ("parameters should be regimenLists type")
# Define parameters
regimenConceptId <- targetParameter$regimenConceptId
regimenName <- targetParameter$regimenName
includeDescendant <- targetParameter$includeDescendant
outofCohortPeriod <- targetParameter$outofCohortPeriod
combinationCriteria <- targetParameter$combinationCriteria
gapDateBetweenCycle <- targetParameter$gapDateBetweenCycle
gapDateAfter <- targetParameter$gapDateAfter
gapDateBefore <- targetParameter$gapDateBefore
primaryConceptIdList <- targetParameter$primaryConceptIdList
secondaryConceptIdList <- targetParameter$secondaryConceptIdList
excludingConceptIdList <- targetParameter$excludingConceptIdList
# Primary records
primaryConceptRecords <- DrugExposureInCohort(targetConceptIds = primaryConceptIdList,
connectionDetails,
cohortTable,
includeDescendant,
outofCohortPeriod,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId)
# Secondary records
cluster <- ParallelLogger::makeCluster(numberOfThreads = maxCores)
secondaryConceptRecords <- ParallelLogger::clusterApply(cluster,
secondaryConceptIdList,
DrugExposureInCohort,
connectionDetails,
cohortTable,
includeDescendant,
outofCohortPeriod,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId)
ParallelLogger::stopCluster(cluster)
# Exclude records
if(length(excludingConceptIdList)==0){excludingConceptRecords <- NULL}else{
excludingConceptRecords <- DrugExposureInCohort(targetConceptIds = excludingConceptIdList,
connectionDetails,
cohortTable,
includeDescendant,
outofCohortPeriod,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId)
}
# Extraction
data <- lapply(unique(primaryConceptRecords$subjectId),function(x){
try(gapDateExamination(x,
primaryConceptRecords,
secondaryConceptRecords,
excludingConceptRecords,
combinationCriteria,
secondaryConceptIdList,
excludingConceptIdList,
gapDateBetweenCycle,
gapDateBefore,
gapDateAfter,
regimenConceptId)
)
}
)
data <- na.omit(data.table::rbindlist(data))
return(data)
}
# 2. Chemotherapy records transform to episode table form
#' @export
chemotherapyToEpisode<- function(chemotherapyRecords){
# Chemotherapy records to Episode
classIndex <- class(chemotherapyRecords)
chemotherapyRecords$CYCLE_START_DATE<-as.Date(chemotherapyRecords$CYCLE_START_DATE,origin="1970-01-01")
chemotherapyRecords$CYCLE_END_DATE<-as.Date(chemotherapyRecords$CYCLE_END_DATE,origin="1970-01-01")
chemotherapyRecords$episode_type_concept_id <-32545
chemotherapyRecords$episode_concept_id <-32532
chemotherapyRecords[chemotherapyRecords$CYCLE_NUM == 0]$episode_concept_id <- 32531
chemotherapyRecords$episode_parent_id <-NA
chemotherapyRecords$episode_object_concept_id <-32525
chemotherapyRecords$episode_id <- seq(nrow(chemotherapyRecords))
chemotherapyRecords$episode_source_value <-NA
class(chemotherapyRecords$episode_parent_id)<- 'integer'
chemotherapyRecords[chemotherapyRecords$CYCLE_NUM == 0]$episode_parent_id <- chemotherapyRecords[chemotherapyRecords$CYCLE_NUM == 0]$episode_id
chemotherapyRecords<-chemotherapyRecords %>% group_by(SUBJECT_ID) %>% fill(episode_parent_id,.direction = c("up"))
class(chemotherapyRecords) <- classIndex
chemotherapyRecords[chemotherapyRecords$CYCLE_NUM == 0]$episode_parent_id <- NA
chemotherapyRecords[chemotherapyRecords$CYCLE_NUM == 0]$CYCLE_NUM <- NA
# Colnames in Episode
names(chemotherapyRecords) <- c('person_id',
'episode_start_datetime',
'episode_number',
'episode_source_concept_id',
'episode_end_datetime',
'episode_item',
'episode_type_concept_id',
'episode_concept_id',
'episode_parent_id',
'episode_object_concept_id',
'episode_id',
'episode_source_value')
chemotherapyRecords <- data.frame(chemotherapyRecords)
episodeRecords <- chemotherapyRecords[,c(11,1,8,2,5,9,3,10,7,12,4,6)]
episode <- episodeRecords[,c(1:11)]
ParallelLogger::logInfo("Episode generated")
# Episode_event generation
episodeEventRecords <- data.table::data.table()
episodeRecords <- episodeRecords %>% subset(episode_concept_id == 32532)
episodeDrug <- episodeRecords[,c(1,12)]
n <- nrow(episodeDrug)
for (i in 1:n){
nameIndex <- as.character(episodeDrug[i, 1])
itemIndex <- as.character(episodeDrug[i, 2])
itemIndexSplitTemp <- data.frame(strsplit(itemIndex, split = '_'))
episodeDrugTemp <- data.frame(cbind(nameIndex, itemIndexSplitTemp))
names(episodeDrugTemp) <- c("episode_id", "drug_exposure_id")
episodeEventRecords <- rbind(episodeEventRecords, episodeDrugTemp)
}
episodeEventRecords$visit_occurrence_id <- NA
episodeEventRecords$condition_occurrence_id <- NA
episodeEventRecords$procedure_occurrence_id <- NA
episodeEventRecords$device_exposure_id <- NA
episodeEventRecords$measurement_id <- NA
episodeEventRecords$specimen_id <- NA
episodeEventRecords$observation_id <- NA
episodeEventRecords$note_id <- NA
episodeEventRecords$cost_id <- NA
episodeEvent <- episodeEventRecords[,c(1,3,4,5,2,6,7,8,9,10,11)]
ParallelLogger::logInfo("Episode_event generated")
return(list(episode,episodeEvent))
}
# 3. Using chemotherapy extraction tool and episode transformation tool
#' @export
generateEpisode <- function(parameters,
connectionDetails,
cohortTable,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId,
maxCores){
ParallelLogger::logInfo("Episode / Episode_event extraction start")
ParallelLogger::logInfo(paste0(Sys.time()))
# Extract target chemotherapy records
chemotherapyRecords <- lapply(1:length(parameters),function(i){
targetParameter <- parameters[[i]]
specificChemoRecords <- chemotherapyRecordsExtraction(targetParameter,
connectionDetails,
cohortTable,
cdmDatabaseSchema,
cohortDatabaseSchema,
targetCohortId,
maxCores)
# Logger for target cohort (1/4)
if(round(i/length(parameters)*100,2) >= 25 && round((i-1)/length(parameters)*100,2) < 25){
ParallelLogger::logInfo(cat(paste0('target cohort : ',targetCohortId),
'\n',
round(i/length(parameters)*100,2),'%',
'\n',
paste0(Sys.time())
)
)
}
# Logger for target cohort (2/4)
if(round(i/length(parameters)*100,2) >= 50 && round((i-1)/length(parameters)*100,2) < 50){
ParallelLogger::logInfo(cat(paste0('target cohort : ',targetCohortId),
'\n',
round(i/length(parameters)*100,2),'%',
'\n',
paste0(Sys.time())
)
)
}
# Logger for target cohort (3/4)
if(round(i/length(parameters)*100,2) >= 75 && round((i-1)/length(parameters)*100,2) < 75){
ParallelLogger::logInfo(cat(paste0('target cohort : ',targetCohortId),
'\n',
round(i/length(parameters)*100,2),'%',
'\n',
paste0(Sys.time())
)
)
}
return(specificChemoRecords)
}
)
chemotherapyRecords <- data.table::rbindlist(chemotherapyRecords)
# Transform chemotherapy records to Episode
if(nrow(chemotherapyRecords) == 0){episodeRecords <- list()}else{
episodeRecords <- chemotherapyToEpisode(chemotherapyRecords)}
ParallelLogger::logInfo("Episode / Episode_event extraction finished")
return(episodeRecords)
}
# 4. Create Episode / Episode_event table in database
#' @export
createEpisodeTable <- function(connection,
oracleTempSchema,
oncologyDatabaseSchema,
episodeTable,
episodeEventTable
){
# Create Episode
ParallelLogger::logInfo("Create Episode table")
sql <- SqlRender::loadRenderTranslateSql(sqlFilename= "CreateEpisodeTable.sql",
packageName = "treatmentCycleExtraction",
dbms = attr(connection,"dbms"),
oracleTempSchema = oracleTempSchema,
oncology_database_schema = oncologyDatabaseSchema,
episode_table = episodeTable)
DatabaseConnector::executeSql(connection, sql, progressBar = TRUE, reportOverallTime = TRUE)
# Create Episode_event
ParallelLogger::logInfo("Create Episode_event table")
sql <- SqlRender::loadRenderTranslateSql(sqlFilename= "CreateEpisodeEventTable.sql",
packageName = "treatmentCycleExtraction",
dbms = attr(connection,"dbms"),
oracleTempSchema = oracleTempSchema,
oncology_database_schema = oncologyDatabaseSchema,
episode_event_table = episodeEventTable)
DatabaseConnector::executeSql(connection, sql, progressBar = TRUE, reportOverallTime = TRUE)
}
# 5. Insert your episode records into database
#' @export
insertEpisode <- function(connectionDetails,
oncologyDatabaseSchema,
episodeTable,
episodeEventTable,
episodes){
connection <- DatabaseConnector::connect(connectionDetails = connectionDetails)
episode <- episodes[[1]]
episodeEvent <- episodes[[2]]
# Find last episode_Id
sql <- 'SELECT max(EPISODE_ID) FROM @oncology_database_schema.@episode_table'
sql <- SqlRender::render(sql,
oncology_database_schema = oncologyDatabaseSchema,
episode_table = episodeTable)
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms)
lastEpisodeId <- DatabaseConnector::querySql(connection, sql)
lastEpisodeId <- as.numeric(lastEpisodeId[,1])
if(is.na(lastEpisodeId)){lastEpisodeId <- 0}
# Episode_id update
episode$episode_id <- as.numeric(episode$episode_id)+lastEpisodeId
episodeEvent$episode_id <- as.numeric(episodeEvent$episode_id)+lastEpisodeId
episodeEvent <- as.data.frame(apply(episodeEvent,2,as.numeric))
# Insert Episode records
DatabaseConnector::insertTable(connection,
paste0(oncologyDatabaseSchema,'.',episodeTable),
episode,
dropTableIfExists = FALSE,
createTable = FALSE,
progressBar = TRUE)
# Insert Episode records
DatabaseConnector::insertTable(connection,
paste0(oncologyDatabaseSchema,'.',episodeEventTable),
episodeEvent,
dropTableIfExists = FALSE,
createTable = FALSE,
progressBar = TRUE )
DatabaseConnector::disconnect(connection)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.