R/import.R

Defines functions stageImport formatImport queryLookupTable findKeyFieldIds matchRecordIdsByKey prepareMonth prepareDate prepareSerial prepareUserReference prepareReference prepareMultiEnumImport prepareSingleEnumImport prepareEnumImport prepareImport matchColumn parentIdFromData recordIdFromData importTable importRecords

Documented in importRecords importTable stageImport

#' Batch imports a data.frame into an ActivityInfo form
#' 
#' Please be aware of the 600 column and 200,000 row limit for ActivityInfo forms.
#' See more details here: https://www.activityinfo.org/support/docs/m/84877/l/1144008.html
#' 
#' Currently, not all types of fields are supported and you will get a warning
#' if you attempt to import records with unsupported fields.
#' 
#' @param formId The form ID
#' @param data The data.frame to import
#' @param recordIdColumn The record ID column
#' @param parentRecordIdColumn The parent record ID column required when importing a subform
#' @param stageDirect Whether the import should be directly staged to Google Cloud Storage. This may not be possible if connecting from Syria or other countries that are blocked from accessing Google services directly. This option is ignored when connecting to a self-managed instance of ActivityInfo.
#' @param progress Show import progress while waiting for import job to complete
#' @param parentIdColumn Use parentRecordIdColumn instead. parentIdColumn is deprecated.
#'
#' @importFrom utils head
#' @export
importRecords <- function(formId, data, recordIdColumn, parentRecordIdColumn, stageDirect = TRUE, progress = getOption("activityinfo.import.progress", default = TRUE), parentIdColumn = parentRecordIdColumn) { 
  parentId <- NULL

  schema <- activityinfo::getFormSchema(formId)
  schemaTable <- as.data.frame(schema)
  subform <- !is.null(schema$parentFormId)

  providedCols <- names(data)

  if(!missing(recordIdColumn)) {
    recordId <- recordIdFromData(data, recordIdColumn)
    providedCols <- providedCols[providedCols != recordIdColumn]
  } else {
    recordId <- rep.int(NA_character_, times = nrow(data))
  }
  if(subform) {
    if (missing(parentRecordIdColumn)||is.null(parentRecordIdColumn)) {
      if(!missing(parentIdColumn)&&!is.null(parentIdColumn)) {
        parentRecordIdColumn = parentIdColumn
        warning("parentIdColumn is deprecated. Please use parentRecordIdColumn instead.")
      } else {
        stop("A subform requires a parentRecordIdColumn for parent record ids.")
      }
    }
    parentIdColumn = parentRecordIdColumn

    parentId <- parentIdFromData(data, parentIdColumn, schema)
    providedCols <- providedCols[providedCols != parentIdColumn]
  } else {
    parentId <- NULL
  }
  
  if(length(providedCols) == 0) {
    stop("The data.frame to import does not have any fields to import.")
  }
  
  factorColumns <- unlist(lapply(data, is.factor))
  data[factorColumns] <- as.data.frame(lapply(data[factorColumns], as.character))
  #data <- dplyr::mutate(data, dplyr::across(dplyr::where(is.factor), as.character))
  
  fieldIds <- sapply(providedCols, USE.NAMES = FALSE, matchColumn, schemaTable)
  fieldValues <- list()
  for(i in 1:length(fieldIds)) {
    fieldIndex <- which(fieldIds[i] == schemaTable$fieldId)
    columnName <- providedCols[i]
    fieldValues[[i]] <- prepareImport(schema$elements[[fieldIndex]], columnName, data[[columnName]])
  }

  if(nrow(data) == 0) {
    warning("data.frame to import is empty")
    return()
  }

  if(missing(recordIdColumn)) {
    recordId <- matchRecordIdsByKey(schema, data, fieldIds, fieldValues)
  }
  lines <- formatImport(data, recordId, parentId, fieldIds, fieldValues)
  importId <- stageImport(paste(lines, collapse = "\n"), direct = stageDirect)
  
  executeJob("importRecords", progress = progress, descriptor =
                              list(formId = formId,
                                   importId = importId))
  
}

#' Deprecated function to batch import a data.frame into an ActivityInfo form; use \link{importRecords}.
#'
#' @param ... parameters of importRecords()
#'
#' @export
importTable <- function(...) {
  warning("importTable() is deprecated. Use importRecords() instead.")
  importRecords(...)
}

recordIdFromData <- function(data, recordIdColumn) {
  recordId <- data[[recordIdColumn]]
  if(!is.character(recordIdColumn)) {
    stop(sprintf("Expected a character vector for the recordIdColumn, found %s", deparse(head(recordId))))
  }
  if(anyDuplicated(recordId[!is.na(recordId)])) {
    stop("The recordIdColumn contains duplicates.")
  }
  return(recordId)
}

parentIdFromData <- function(data, parentIdColumn, schema) {
  if(missing(parentIdColumn)||is.null(parentIdColumn)) {
    stop("When importing to a subform, you must provide a parentIdColumn")
  }
  parentId <- as.character(data[[parentIdColumn]])
  if(!is.character(parentIdColumn)) {
    stop(sprintf("Expected a character vector for the parentIdColumn, found %s", deparse(head(parentId))))
  }
  if(anyNA(parentId)) {
    stop("The parentIdColumn contains missing values.")
  }
  existingParentIds <- queryTable(schema$parentFormId, id = "_id")
  validParentIds <- parentId %in% existingParentIds$id
  if(!all(validParentIds)) {
    stop(sprintf("The parent id column `%s` has %d invalid parent ids, including: %s",
                 parentIdColumn,
                 sum(!validParentIds),
                 paste(head(parentId[!validParentIds]), collapse = ", ")))
  }
  return(parentId)
}

matchColumn <- function(colName, schema) {
  if(colName %in% schema$fieldId) {
    return(colName)
  }
  matchingCodes <- schema$fieldId[!is.na(schema$fieldCode) & tolower(schema$fieldCode) == tolower(colName)]
  if(length(matchingCodes) == 1) {
    return(matchingCodes)
  }
  matchingLabels <- schema$fieldId[tolower(schema$fieldLabel) == tolower(colName)]
  if(length(matchingLabels) == 1) {
    return(matchingLabels)
  }
  if(length(matchingLabels) > 1) {
    stop(sprintf("Ambiguous imported column name '%s', matches several fields", colName))
  } else {
    stop(sprintf("No matching field for imported column '%s'", colName))
  }
}


prepareImport <- function(field, columnName, column) {
  switch (field$type,
          FREE_TEXT = as.character(column),
          NARRATIVE = as.character(column),
          quantity = as.double(column),
          enumerated = prepareEnumImport(field, columnName, column),
          reference = prepareReference(field, column),
          date = prepareDate(field, column),
          month = prepareMonth(field, columnName, column),
          serial = prepareSerial(field, columnName, column),
          stop(sprintf("Field '%s' has unsupported type '%s'", field$label, field$type))
  )
}

prepareEnumImport <- function(field, columnName, column) {
  items <- sapply(field$typeParameters$values, function(item) item$id)
  names(items) <- sapply(field$typeParameters$values, function(item) tolower(item$label))

  # Replace empty strings with NAs
  column[!nzchar(column)] <- NA_character_
  
  # If this vector has labels, then add those to the lookup table
  if(inherits(column, "haven_labelled")) {
    columnLabels <- attr(column, "labels")
    names(columnLabels) <- tolower(names(columnLabels))
    codedItems <- items
    names(codedItems) <- tolower(columnLabels[ names(codedItems) ])
    items <- c(items, codedItems)
  }
  
  if(field$typeParameters$cardinality == "single") {
    prepareSingleEnumImport(field, items, columnName, column)
  } else {
    prepareMultiEnumImport(field, items, columnName, column)
  }
}

prepareSingleEnumImport <- function(field, items, columnName, column) {
  force(field)
  force(items)
  force(column)
  column <- as.character(column)
  itemIds <- as.character(items[tolower(column)])
  matching <- is.na(column) | !is.na(itemIds)
  if (any(!matching)) {
    badLabels <- unique(column[!matching])
    stop(
      sprintf(
        "For single-select field '%s', the imported column `%s` has values (%s) which do not match the options defined for this field (%s)",
        field$label,
        columnName,
        paste(collapse = ", ", sprintf("'%s'", badLabels)),
        paste(collapse = ", ", sprintf("'%s'", names(items)))
        
      )
    )
  }
  itemIds
}

prepareMultiEnumImport <- function(field, items, columnName, column) {
  column <- as.character(column)
  rows <- strsplit(column, split = "\\s*,\\s*")
  
  lapply <- lapply(rows, function(row) {
    if(length(row) == 1 && is.na(row)) {
      return(NA_character_)
    }
    itemIds <- as.character(items[tolower(row)])
    matching <- !is.na(itemIds)
    if (any(!matching)) {
      badLabels <- unique(row[!matching])
      stop(
        sprintf(
          "For multi-select field '%s', the imported column `%s` has values (%s) which do not match the options defined for this field (%s)",
          field$label,
          columnName,
          paste(collapse = ", ", sprintf("'%s'", badLabels)),
          paste(collapse = ", ", sprintf("'%s'", names(items)))
          
        )
      )
    }
    itemIds
  })
  
  
}

prepareReference <- function(field, column) {
  if(grepl(field$typeParameters$range, pattern = "@users$")) {
    return(prepareUserReference(field, column))
  }
  column <- as.character(column)
  valid <- grepl(column, pattern = "^[a-z][a-z0-9]{0,30}$")  
  invalid <- !is.na(column) & !valid
  if(any(invalid)) {
    badLabels <- head(unique(column[invalid]), n = 5)
    
    stop(sprintf("Field '%s' contains invalid record ids: %s",
                 field$label,
                 paste(collapse = ", ", sprintf("'%s'", badLabels))))
  }
  column
}

prepareUserReference <- function(field, column) {
  column <- as.character(column)
  valid <- grepl(column, pattern = "^[0-9]{0,30}$")  
  invalid <- !is.na(column) & !valid
  if(any(invalid)) {
    badLabels <- head(unique(column[invalid]), n = 5)
    
    stop(sprintf("Field '%s' contains invalid user ids: %s",
                 field$label,
                 paste(collapse = ", ", sprintf("'%s'", badLabels))))
  }
  column
}


prepareSerial <- function(field, columnName, column) {
  stop(sprintf("Column '%s': importing serial numbers not (yet) supported", columnName))
}

prepareDate <- function(field, column) {
  
  dates <- as.Date(column)
  invalid <- !is.na(column) & is.na(dates)
  if(any(invalid)) {
    badDates <- head(unique(column[invalid]), n = 5)
    
    stop(sprintf("Field '%s' contains %d invalid date values, including: %s",
                 field$label,
                 sum(invalid),
                 paste(collapse = ", ", sprintf("'%s'", badDates))))
  }
  
  strftime(dates, "%Y-%m-%d")
}


prepareMonth <- function(field, columnName, column) {

  months <- as.character(column)
  valid <- grepl(pattern = "\\d{4}-\\d{2}", x = column)
  if(any(!valid)) {
    badMonths <- head(unique(column[!valid]), n = 5)

    stop(sprintf("Column '%s' contains %d invalid month values, including: %s",
                 columnName,
                 sum(!valid),
                 paste(collapse = ", ", sprintf("'%s'", badMonths))))
  }
  months
}

matchRecordIdsByKey <- function(schema, data, fieldIds, fieldValues) {
  keyFieldIds <- findKeyFieldIds(schema)
  keys <- which(fieldIds %in% keyFieldIds)

  provided <- as.data.frame(fieldValues[keys])
  if(ncol(provided) != length(keyFieldIds)) {
    stop("One or more key fields are missing")
  }
  names(provided) <- sprintf("k%d", seq_along(keyFieldIds))

  # Check that our input does not include duplicates according to the key
  # fields
  dups <- duplicated(provided)
  if(any(dups)) {
    stop(sprintf("There are %d duplicate rows (by key fields %s) in the input, including row(s) %s",
                 sum(dups),
                 paste(sprintf("`%s`", names(data)[keys]), collapse =", "),
                 paste(head(which(dups)), collapse = ", ")))
  }

  # Match against existing rows
  lookup <- queryLookupTable(schema$id, keyFieldIds)
  matched <- merge(provided, lookup, by = names(provided), all.x = TRUE)

  return(matched$id)
}

findKeyFieldIds <- function(schema) {
  fieldIds <- sapply(schema$elements, function(e) e$id)
  keys <- sapply(schema$elements, function(e) identical(e$key, TRUE) && e$type != "serial")

  return(fieldIds[keys])
}

queryLookupTable <- function(formId, keyFieldIds) {
  columns <- as.list(keyFieldIds)
  names(columns) <- sprintf("k%d", seq_along(keyFieldIds))
  columns[["id"]] <- "_id"

  queryTable(formId, columns, truncateStrings = FALSE)
}

#'
#' @importFrom jsonlite toJSON
formatImport <- function(data, recordId, parentId, fieldIds, fieldValues) {


  if(!is.null(parentId)) {
    fieldValues <- c(
      list(parentId),
      fieldValues
    )
  }

  fieldValues <- c(
    list(recordId),
    fieldValues
  )

  recordLines <- character(nrow(data))
  fieldSeq <- seq_along(fieldValues)

  for(recordIndex in 1:nrow(data)) {
    record <- lapply(fieldSeq, function(fieldIndex) {
      v <- fieldValues[[fieldIndex]][[recordIndex]]
      if(length(v) == 1 && is.na(v)) NULL else v
    })
    recordLines[recordIndex] <- toActivityInfoJson(record)
  }

  c("LINE DELIMITED JSON RECORDS",
    as.character(nrow(data)),
    toActivityInfoJson(as.list(fieldIds)),
    recordLines)
}



#' Stages data to import to ActivityInfo
#' 
#' @param text The text of the file to import.
#' @param direct Whether the import should be directly staged to Google Cloud Storage. This may not be possible if connecting from Syria or other countries that are blocked from accessing Google services directly. This option is ignored when connecting to a self-managed instance of ActivityInfo.
stageImport <- function(text, direct = TRUE) {
  
  if(direct && !grepl(activityInfoRootUrl(), pattern = "www\\.activityinfo\\.org|appspot\\.com")) {
    direct <- FALSE
  }
  
  if(direct) {
    url <- paste(activityInfoRootUrl(), "resources", "imports", "stage", "direct", sep = "/") 
  } else {
    url <- paste(activityInfoRootUrl(), "resources", "imports", "stage", sep = "/")
  }
  
  result <- POST(url, activityInfoAuthentication(), accept_json())
  
  if (result$status_code != 200) {
    stop(sprintf("Request for %s failed with status code %d %s: %s",
                 url, result$status_code, http_status(result$status_code)$message,
                 content(result, as = "text", encoding = "UTF-8")))
  }
  
  response <- fromActivityInfoJson(result)
  
  uploadUrl <- response$uploadUrl
  if(!grepl(uploadUrl, pattern = "^https://")) {
    uploadUrl <- paste0(activityInfoRootUrl(), uploadUrl)
  }
  
  putResult <- PUT(uploadUrl, body = text, encode = "raw", activityInfoAuthentication())
  if(putResult$status_code != 200) {
    stop("Failed to stage import file at ", uploadUrl, ": status = ", putResult$status_code)
  }

  response$importId
}
bedatadriven/activityinfo-R documentation built on Dec. 21, 2024, 8:23 a.m.