R/build-cohorts.R

Defines functions build_cohorts

#' @export
build_cohorts <- function(){


##### DEFINE CONCEPT ID'S ----

# Condition / event codes
codes.copd_diag <- c(255573) # SNOMED 13645005 (include all descendants)
codes.lrti <- c(4175297) # SNOMED 50417007 (including all descendants)
codes.copd_ex <- c(257004) # SNOMED 195951007 (include all descendants)

# drug codes
codes.resp_drugs <- c(21603248) # ATC R03 (include all descendants)
codes.drug_ocs <- c(21602723) # ATC H02A (include all descendants)
codes.drug_abiotic <- c(21602796) # ATC J01 (include all descendants)

# visit codes
codes.inpatient <- c(9201)
codes.outpatient <- c(9202)
codes.emergency <- c(9203)
codes.office <- c(8940)

#### COPD PATIENTS ----

# cohort 1: Patients with a diagnosis of COPD
# cohort 2: Patients with a diagnosis of COPD and evidence of receiving at least two respiratory drugs

# print section title
print("COPD COHORTS")

# IDENTIFY FIRST DIAGNOSTIC EVIDENCE OF COPD
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.copd_diag;
SELECT person_id AS subject_id,
  MIN(condition_start_date) AS cohort_start_date
INTO @cdm_db_schema.copd_diag
FROM @cdm_db_schema.condition_occurrence
INNER JOIN @cdm_db_schema.concept_ancestor
  ON descendant_concept_id = condition_concept_id
WHERE ancestor_concept_id IN (@copd_i)
GROUP BY person_id"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, copd_i = codes.copd_diag))

# add cohort_end date and instantiate
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.cohorts_sk;
SELECT subject_id, CAST(2 AS INT) AS cohort_definition_id, cohort_start_date,
  observation_period_end_date AS cohort_end_date,
  'COPD: copd diag code only' AS cohort_id_desc
INTO @cdm_db_schema.cohorts_sk
FROM @cdm_db_schema.copd_diag
LEFT JOIN @cdm_db_schema.observation_period
ON subject_id = person_id
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# person count
sql <- "SELECT COUNT(*) FROM @cdm_db_schema.cohorts_sk WHERE cohort_definition_id = 2"
print(
  paste("COPD #1: Number with a COPD diagnosis",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# IDENTIFY RESPIRATORY DRUGS POST COPD DIAGNOSIS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.resp_drugs;
SELECT diag.subject_id, diag.cohort_start_date, drug.drug_concept_id
INTO @cdm_db_schema.resp_drugs
FROM @cdm_db_schema.copd_diag diag
INNER JOIN @cdm_db_schema.drug_exposure AS drug
ON subject_id = person_id
INNER JOIN @cdm_db_schema.concept_ancestor AS d1
ON descendant_concept_id = drug_concept_id
WHERE ancestor_concept_id IN (@drugs_i) AND drug_exposure_start_date >= cohort_start_date"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, drugs_i = codes.resp_drugs))

# person count
sql <- "
SELECT COUNT(*)
FROM (
  SELECT subject_id
  FROM @cdm_db_schema.test_table
  GROUP BY subject_id
) AS d0"
print(
  paste("COPD #2: #1 + receiving >= 1 respiratory drug at or after COPD diagnosis",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# CREATE DRUG HIERARCHY FOR INGREDIENTS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.drug_ingred;
SELECT d1.ancestor_concept_id, d1.descendant_concept_id
INTO @cdm_db_schema.drug_ingred
FROM @cdm_db_schema.concept AS d0
INNER JOIN @cdm_db_schema.concept_ancestor AS d1
ON concept_id = ancestor_concept_id
WHERE standard_concept = 'S' AND domain_id = 'Drug' AND concept_class_id = 'Ingredient'
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# MAP ALL RESP DRUG EXPOSURES TO INGREDIENT LEVEL
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.resp_drug_ingred;
SELECT d0.subject_id, d0.cohort_start_date, d1.ancestor_concept_id
INTO @cdm_db_schema.resp_drug_ingred
FROM @cdm_db_schema.resp_drugs AS d0
INNER JOIN @cdm_db_schema.drug_ingred AS d1
ON drug_concept_id = descendant_concept_id
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# RESTRICT TO THOSE WITH EXPOSURE TO >1 DISTINCT INGREDIENT
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.resp_drug_criteria;
SELECT subject_id, MIN(cohort_start_date) AS cohort_start_date
INTO @cdm_db_schema.resp_drug_criteria
FROM(
  SELECT subject_id, MIN(cohort_start_date) AS cohort_start_date, ancestor_concept_id
  FROM @cdm_db_schema.resp_drug_ingred
  GROUP BY subject_id, ancestor_concept_id
) AS d1
GROUP BY subject_id
HAVING COUNT(*) > 1
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# person count
sql <- "SELECT COUNT(*) FROM @cdm_db_schema.resp_drug_criteria"
print(
  paste("COPD #3: #2 + > 1 prescription of distinct resp drug (by ingredient) at or post diagnosis ",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# RESTRICT TO THOSE WITH AT LEAST 6 MONTHS FOLLOW-UP POST DIAG DATE
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.six_month_fu;
SELECT *
  INTO @cdm_db_schema.six_month_fu
FROM @cdm_db_schema.resp_drug_criteria
INNER JOIN @cdm_db_schema.observation_period
ON subject_id = person_id
AND observation_period_start_date <= cohort_start_date
AND observation_period_end_date >= cohort_start_date
WHERE cohort_start_date + INTERVAL '183 days' < observation_period_end_date
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# person count
sql <- "SELECT COUNT(*) FROM @cdm_db_schema.six_month_fu"
print(
  paste("COPD #4: #3 + at least six months of follow-up post diagnosis",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# FOLLOW-UP STARTS AT LATEST OF FIRST RECORDED DIAG CODE AND 1 JAN 2008 (WITH SOME FU)
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.fu_start;
SELECT subject_id, GREATEST(to_date('01-01-2008', 'DD-MM-YYYY'), cohort_start_date) AS cohort_start_date, observation_period_end_date AS cohort_end_date
INTO @cdm_db_schema.fu_start
FROM @cdm_db_schema.six_month_fu
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# REMOVE THOSE WITH START OF FU AFTER 31 DEC 2018
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.start_b4_2019;
SELECT *
  INTO @cdm_db_schema.start_b4_2019
FROM @cdm_db_schema.fu_start
WHERE cohort_start_date < to_date('31-01-2018', 'DD-MM-YYYY')
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# person count
sql <- "SELECT COUNT(*) FROM @cdm_db_schema.start_b4_2019"
print(
  paste("COPD #5: #4 + COPD diagnosis before 31 Dec 2018",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# RESTRICT TO ADULTS (AGE > 18 YEARS AT COHORT START DATE)
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.adults;
SELECT d0.subject_id, d0.cohort_start_date, d0.cohort_end_date
INTO @cdm_db_schema.adults
FROM @cdm_db_schema.start_b4_2019 AS d0
LEFT JOIN @cdm_db_schema.person AS d1
ON subject_id = person_id
WHERE FLOOR((d0.cohort_start_date - make_date(d1.year_of_birth, d1.month_of_birth, d1.day_of_birth)) / 365) > 18
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# person count
sql <- "SELECT COUNT(*) FROM @cdm_db_schema.adults"
print(
  paste("COPD #6: #5 + aged over 18 years at time of diagnosis",
        renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema),
        sep = " = ")
)

# COMPLETE COHORTS AND INSTANTIATE

# version 1
sql <- "
DELETE FROM ohdsi.cohorts_sk WHERE cohort_definition_id = 1;
INSERT INTO @cdm_db_schema.cohorts_sk
SELECT subject_id, CAST(1 AS INT) AS cohort_definition_id, cohort_start_date, cohort_end_date,
  'COPD: copd diag code + >1 respiratory drug' AS cohort_id_desc
FROM @cdm_db_schema.adults
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# REMOVE REDUNDANT TABLES
sql <- "
DROP TABLE @cdm_db_schema.copd_diag;
DROP TABLE @cdm_db_schema.resp_drugs;
DROP TABLE @cdm_db_schema.drug_ingred;
DROP TABLE @cdm_db_schema.resp_drug_ingred;
DROP TABLE @cdm_db_schema.resp_drug_criteria;
DROP TABLE @cdm_db_schema.six_month_fu;
DROP TABLE @cdm_db_schema.fu_start;
DROP TABLE @cdm_db_schema.start_b4_2019;
DROP TABLE @cdm_db_schema.adults"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# SUMMARISE COPD COHORTS
sql <- "
SELECT cohort_definition_id, count(*), cohort_id_desc
FROM @cdm_db_schema.cohorts_sk
WHERE cohort_definition_id IN (1, 2)
GROUP BY cohort_definition_id, cohort_id_desc
ORDER BY cohort_definition_id, cohort_id_desc
"
print("-------------------------")
print("COPD COHORTS")
print(renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema))


#### COPD EXACERBATIONS ----

# cohort 3: Mod exacerbations: COPD or LRTI code with presc of OCS (<14 days) or antibiotic
# cohort 4: Mod exacerbations: As above, or COPD exacerbation diagnosis
# cohort 5: Severe exacerbations: hospitalised with COPD exacerbation code

# IDENTIFY PEOPLE WITH A COPD OR LRTI CODE
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_1;
SELECT person_id, condition_start_date AS event_date
INTO @cdm_db_schema.ex_temp_1
FROM @cdm_db_schema.condition_occurrence
INNER JOIN @cdm_db_schema.concept_ancestor
ON descendant_concept_id = condition_concept_id
WHERE ancestor_concept_id IN (@copd_i, @lrti_i)
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, copd_i = codes.copd_diag, lrti_i = codes.lrti))

# IDENITFY PEOPLE WITH OCS (<=14 days) OR ANTIOBIOTIC PRESCRIPTION
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_2;
SELECT person_id, drug_exposure_start_date AS event_date
INTO @cdm_db_schema.ex_temp_2
FROM @cdm_db_schema.drug_exposure
INNER JOIN @cdm_db_schema.concept_ancestor
ON descendant_concept_id = drug_concept_id
WHERE (ancestor_concept_id IN (@ocs_i) AND days_supply <= 14) OR ancestor_concept_id IN (@abio_i)
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, ocs_i = codes.drug_ocs, abio_i = codes.drug_abiotic))

# COMBINE CODES AND DRUGS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_3;
SELECT d1.person_id, d1.event_date
INTO @cdm_db_schema.ex_temp_3
FROM @cdm_db_schema.ex_temp_1 d1
INNER JOIN @cdm_db_schema.ex_temp_2 d2
ON d1.person_id = d2.person_id AND d1.event_date = d2.event_date
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# REMOVE DUPLICATE EVENTS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_4;
SELECT DISTINCT *
  INTO @cdm_db_schema.ex_temp_4
FROM @cdm_db_schema.ex_temp_3
ORDER BY person_id, event_date
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# FIND TIME GAPS BETWEEN RECORDS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_5;
SELECT person_id, event_date,
event_date - LAG(event_date) OVER (PARTITION BY person_id ORDER BY person_id, event_date) next_date
INTO @cdm_db_schema.ex_temp_5
FROM @cdm_db_schema.ex_temp_4
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))


# REMOVE EVENTS WITHIN 14 DAYS OF THE PREVIOUS ONE
# AND SAVE AS COHORT 3
sql <- "
DELETE FROM @cdm_db_schema.cohorts_sk WHERE cohort_definition_id = 3;
INSERT INTO @cdm_db_schema.cohorts_sk
SELECT person_id AS subject_id, CAST(3 AS INT) AS cohort_definition_id, event_date AS cohort_start_date,
  NULL AS cohort_end_date,
  'COPD EX: ocs or antiobiotics and copd or ltri code' AS cohort_id_desc
FROM @cdm_db_schema.ex_temp_5
WHERE next_date >= 14 or next_date IS NULL
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# COPD EXACERBATION CODES
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_6;
SELECT person_id, condition_start_date AS event_date
INTO @cdm_db_schema.ex_temp_6
FROM @cdm_db_schema.condition_occurrence
INNER JOIN @cdm_db_schema.concept_ancestor
ON descendant_concept_id = condition_concept_id
WHERE ancestor_concept_id IN (@copdEx_i)
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, copdEx_i = codes.copd_ex))

# COMBINE COPD EX CODES AND DRUG USE DATA
sql <- "
INSERT INTO @cdm_db_schema.ex_temp_6
SELECT *
  FROM @cdm_db_schema.ex_temp_4
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# FIND TIME GAPS BETWEEN RECORDS
sql <- "
DROP TABLE IF EXISTS @cdm_db_schema.ex_temp_7;
SELECT person_id, event_date,
event_date - LAG(event_date) OVER (PARTITION BY person_id ORDER BY person_id, event_date) next_date
INTO @cdm_db_schema.ex_temp_7
FROM @cdm_db_schema.ex_temp_6
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# REMOVE EVENTS WITHIN 14 DAYS OF THE PREVIOUS ONE
# AND SAVE AS COHORT 4
sql <- "
DELETE FROM @cdm_db_schema.cohorts_sk WHERE cohort_definition_id = 4;
INSERT INTO @cdm_db_schema.cohorts_sk
SELECT person_id AS subject_id, CAST(4 AS INT) AS cohort_definition_id, event_date AS cohort_start_date,
  NULL AS cohort_end_date,
  'COPD EX: drug codes or COPD exacerbation code' AS cohort_id_desc
FROM @cdm_db_schema.ex_temp_7
WHERE next_date >= 14 or next_date IS NULL
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# SEVERE / HOSPITALISED EXACERBATIONS
sql <- "
DELETE FROM @cdm_db_schema.cohorts_sk WHERE cohort_definition_id = 5;
INSERT INTO @cdm_db_schema.cohorts_sk
SELECT d1.person_id AS subject_id, CAST(5 AS INT) AS cohort_definition_id,
  d1.event_date AS cohort_start_date,
  NULL AS cohort_end_date,
  'COPD EX: Severe or hospitalised exacerbation' AS cohort_id_desc
FROM @cdm_db_schema.ex_temp_6 d1
INNER JOIN (
  SELECT person_id, visit_start_date
  FROM @cdm_db_schema.visit_occurrence
  WHERE visit_concept_id IN (@inp_i, @ae_i)
) d2
ON d1.person_id = d2.person_id AND d1.event_date = d2.visit_start_date
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema, inp_i = codes.inpatient, ae_i = codes.emergency))

# REMOVE REDUNDANT TABLES
sql <- "
DROP TABLE @cdm_db_schema.ex_temp_1;
DROP TABLE @cdm_db_schema.ex_temp_2;
DROP TABLE @cdm_db_schema.ex_temp_3;
DROP TABLE @cdm_db_schema.ex_temp_4;
DROP TABLE @cdm_db_schema.ex_temp_5;
DROP TABLE @cdm_db_schema.ex_temp_6;
DROP TABLE @cdm_db_schema.ex_temp_7
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema))

# SUMMARISE COPD EXACERBATION COHORTS
sql <- "
SELECT cohort_definition_id, count(*), cohort_id_desc
FROM @cdm_db_schema.cohorts_sk
WHERE cohort_definition_id IN (3,4,5)
GROUP BY cohort_definition_id, cohort_id_desc
ORDER BY cohort_definition_id, cohort_id_desc
"
print("-------------------------")
print("COPD EXACERBATION COHORTS")
print(renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema))



#### HEALTHCARE VISITS ---


# CREATE HEALTHCARE UTILISATION COHORTS
sql <- "
DELETE FROM @cdm_db_schema.cohorts_sk WHERE cohort_definition_id IN (9201, 9202, 9203, 8940);
INSERT INTO @cdm_db_schema.cohorts_sk
SELECT person_id AS subject_id, visit_concept_id AS cohort_definition_id,
visit_start_date AS cohort_start_date, visit_end_date AS cohort_end_date,
CASE WHEN visit_concept_id = @inp_i THEN 'Inpatient'
  WHEN visit_concept_id = @out_i THEN 'Outpatient'
  WHEN visit_concept_id = @ae_i THEN 'Emergency room'
  WHEN visit_concept_id = @office_i THEN 'Office visit' END AS cohort_id_desc
FROM @cdm_db_schema.visit_occurrence
WHERE visit_concept_id IN (@inp_i, @out_i, @ae_i, @office_i)
"
capture.output(renderTranslateExecuteSql(conn, sql, cdm_db_schema = cdmDbSchema,
                                         inp_i = codes.inpatient, out_i = codes.outpatient,
                                         ae_i = codes.emergency, office_i = codes.office))

# SUMMARISE COPD EXACERBATION COHORTS
sql <- "
SELECT cohort_definition_id, count(*), cohort_id_desc
FROM @cdm_db_schema.cohorts_sk
WHERE cohort_definition_id IN (@inp_i, @out_i, @ae_i, @office_i)
GROUP BY cohort_definition_id, cohort_id_desc
ORDER BY cohort_definition_id, cohort_id_desc
"
print("-------------------------")
print("HEALTHCARE VISIT COHORTS")
print(renderTranslateQuerySql(conn, sql, cdm_db_schema = cdmDbSchema,
                              inp_i = codes.inpatient, out_i = codes.outpatient,
                              ae_i = codes.emergency, office_i = codes.office))

}
seamuskent/ehdenHtaCopd documentation built on Feb. 22, 2020, 8:14 a.m.