R/finmatchr.R

Defines functions cancel_task match_encounter match_encounters_to_encounters create_encounter_matching_task create_matching_task get_presigned_json_url annotate_images get_annotations create_image_annotation_task get_task_result_from_presigned salvage_task get_task_result wait_for_task task_status upload_images get_image_urls get_image_meta exists_in_database uuid_from_file s3_to_tmp update_edge update_node cypher finmatchr

Documented in annotate_images cancel_task create_encounter_matching_task create_image_annotation_task create_matching_task cypher exists_in_database finmatchr get_annotations get_image_meta get_image_urls get_presigned_json_url get_task_result get_task_result_from_presigned match_encounter match_encounters_to_encounters s3_to_tmp salvage_task task_status update_edge update_node upload_images uuid_from_file wait_for_task

#' Load the finmatchr dashboard
#'
#' This function loads the finmatchr dashboard in a browser window.
#' @param params A list specifying access credentials and urls for neo4j, s3, and the matching api.
#' @import shiny
#' @import flexdashboard
#' @import rmarkdown
#' @import magick
#' @import svgPanZoom
#' @import visNetwork
#' @import jsTreeR
#' @import neo4r
#' @importFrom magrittr "%>%"
#' @importFrom purrr map map_df map_chr map_lgl map2_lgl
#' @importFrom dplyr mutate select select_if pull rename filter group_by ungroup recode case_when left_join anti_join distinct rowwise bind_rows slice arrange
#' @importFrom tidyr nest unnest unnest_wider
#' @importFrom stringr str_replace_all
#' @importFrom tibble tibble add_column as_tibble
#' @importFrom igraph graph_from_data_frame groups components
#' @importFrom openssl md5 sha1
#' @importFrom lubridate year month day
#' @importFrom purrr pluck
#' @importFrom rlang parse_expr !!
#' @importFrom htmlwidgets onRender
#' @importFrom jsonlite read_json write_json
#' @importFrom shinycssloaders withSpinner
#' @export
finmatchr <- function( params = list( neo4j_url = "http://localhost:7474",
                                      neo4j_user = "neo4j",
                                      neo4j_pass = "neo4j",
                                      s3_bucket = "bucket",
                                      s3_access_key = "minioadmin",
                                      s3_secret_key = "minioadmin",
                                      s3_region = "",
                                      s3_endpoint = "localhost:9000",
                                      ssl = F,
                                      api_url = NULL,
                                      host = getOption("shiny.host", "127.0.0.1"),
                                      port = getOption("shiny.port") ) ) {
  run( system.file( 'rmd', 'finmatchr.Rmd', package='finmatchr', mustWork = T ), render_args = list( params = params ), shiny_args = list(launch.browser = TRUE, host = params$host, port = params$port ) )
}

#' Update a node in the database
#'
#' This is a wrapper for \code{\link[neo4r]{call_neo4j}}
#' @param .query A cypher query
#' @param con A neo4r connection.
#' @importFrom neo4r call_neo4j
#' @importFrom magrittr "%>%"
cypher <- function( .query, con, type = "graph", quiet = F ) {
  if( !quiet ) {
    cat( "QUERY: ", .query, "\n" )
  }
  tryCatch(error = function(cnd) {
    showNotification(conditionMessage(cnd), type = "error")
    return(list())
  },
  suppressMessages({r <- .query %>% call_neo4j( con, type = type )
    if(length(r$error_code)) {
      stop(r$error_message)
    } else {
      return(r)
    }
  }))
}

#' Update a node in the database
#'
#' This updates properties of a node in the neo4j database
#' @param uuids A list of node uuids to update
#' @param region A character string
#' @param date A character string
#' @param unusable A boolean specifying whether the node is unusable.
#' @param reviewed A boolean specifying whether the node has been reviewed.
#' @param con A neo4r connection.
#' @importFrom magrittr "%>%"
update_node <- function( uuids, region, date, unusable, reviewed, con, quiet = F ) {
  query <- paste0( "MATCH (n) ",
                   "WHERE n.uuid IN ['", paste0( uuids, collapse = "', '" ) , "'] ",
                   "SET n.region = '", region, "', n.date = date(", ifelse(date == "null", date, sQuote(date,"\'")), "), n.unusable = ", unusable, ", n.reviewed = ", reviewed )
  query %>% cypher( con, "row", quiet )

  # #if unusable = T, detach any propagated IS_SAME relationships
  # if( ifelse( unusable == 'null', F, unusable ) ) {
  #   query <- paste0( "MATCH (n) ",
  #                    "WHERE n.uuid IN ['", paste0( uuids, collapse = "', '" ) , "'] ",
  #                    "AND COALESCE(n.unusable, false) = true ",
  #                    "OPTIONAL MATCH (n)<-[r:IS_SAME {basis: 'propagated'}]->(:Image) ",
  #                    "DELETE r" )
  #   query %>% cypher( con, "row", quiet )
  # }
  #if unusable = T, detach any UNSURE/NOT_SAME relationships
  #TODO: detach delete annotation?
  if( ifelse( unusable == 'null', F, unusable ) ) {
    query <- paste0( "MATCH (n) ",
                     "WHERE n.uuid IN ['", paste0( uuids, collapse = "', '" ) , "'] ",
                     "AND COALESCE(n.unusable, false) = true ",
                     "OPTIONAL MATCH (n)<-[r:NOT_SAME|UNSURE]->(:Image) ",
                     "DELETE r" )
    query %>% cypher( con, "row", quiet )
  }
}

#' Update an edge in the database
#'
#' This updates properties of a edge in the neo4j database and complete transitive closures.
#' @param from_uuids A list of node uuids
#' @param to_uuids A list of node uuids
#' @param label A character string
#' @param basis A character string
#' @param reviewed A boolean specifying whether the node has been reviewed.
#' @param score A numeric.
#' @param con A neo4r connection.
#' @param payload a
#' @param quiet A boolean specifying whether the function should (not) output progress.
#' @importFrom magrittr "%>%"
#' @importFrom dplyr filter select
#' @importFrom tidyr unnest
# TODO: write smarter function, update only whats changed, clear basis on NOT_SAME, consider making IS_SAME service unreviewed not propagate?
# TODO: propagate IS_SAME or NOT_SAME over UNSURE?
update_edge <- function( from_uuids, to_uuids, label, basis, reviewed, con, payload, quiet = F ) {
  query <- paste0( "MATCH (from:Image), (to:Image) ",
                   "WHERE from.uuid IN ['", paste0( from_uuids, collapse = "', '" ), "'] ",
                   "AND to.uuid IN ['", paste0( to_uuids, collapse = "', '" ), "'] ",
                   "WITH from, to, from AS f, to AS t ",
                   "OPTIONAL MATCH (f)<-[r]->(t) ",
                   "DELETE r ",
                   "WITH from, to ",
                   "MERGE (from)<-[new:", label, "]->(to) ",
                   "SET new.basis = '", basis, "', new.reviewed = ", reviewed )
  query %>% cypher( con, "row", quiet )

  # #transitive closures
  # nodes <- payload$graph$nodes %>%
  #   filter( label == "Image" ) %>%
  #   select( -id, -label ) %>%
  #   unnest( cols = c(data) )
  # #propagate IS_SAME through IS_SAME (unreviewed)
  # query <- paste0( "MATCH (from:Image)-[:IS_SAME*2..2]->(to:Image) ",
  #                  "WHERE id(from) <> id(to) ",
  #                  "AND NOT (from)<-[:IS_SAME]->(to) ",
  #                  "AND from.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND to.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND COALESCE(from.unusable, false) <> true ",
  #                  "AND COALESCE(to.unusable, false) <> true ",
  #                  "MERGE (from)<-[:IS_SAME {basis: 'propagated'}]->(to)" )
  # query %>% cypher( con, "row", quiet )
  #
  # #propagate IS_SAME (reviewed) through IS_SAME (reviewed)
  # query <- paste0( "MATCH (from:Image)-[:IS_SAME*2..2 {reviewed: true}]->(to:Image) ",
  #                  "WHERE id(from) <> id(to) ",
  #                  "AND NOT (from)<-[:IS_SAME {reviewed: true}]->(to) ",
  #                  "AND NOT (from)<-[:IS_SAME {basis: 'duplicate'}]->(to) ",
  #                  "AND from.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND to.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND COALESCE(from.unusable, false) <> true ",
  #                  "AND COALESCE(to.unusable, false) <> true ",
  #                  "MERGE (from)<-[r:IS_SAME]->(to) ",
  #                  "ON CREATE SET r.basis = 'propagated', r.reviewed = true ",
  #                  "ON MATCH SET r.reviewed = true")
  # query %>% cypher( con, "row", quiet )
  #
  # #propagate NOT_SAME through NOT_SAME-IS_SAME (unreviewed)
  # query <- paste0( "MATCH (from:Image)<-[:NOT_SAME]->(int:Image)<-[:IS_SAME*2..2]->(to:Image) ",
  #                  "WHERE id(from) <> id(to) ",
  #                  "AND id(from) <> id(int) ",
  #                  "AND id(to) <> id(int) ",
  #                  "AND NOT (from)<-[:NOT_SAME]->(to) ",
  #                  "AND NOT (from)<-[:IS_SAME]->(to) ",
  #                  "AND from.uuid IN ['", paste0( from_uuids, "', '", to_uuids, collapse = "', '" ), "'] ",
  #                  "AND int.uuid IN ['", paste0( from_uuids, "', '", to_uuids, collapse = "', '" ), "'] ",
  #                  "AND to.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND COALESCE(from.unusable, false) <> true ",
  #                  "AND COALESCE(int.unusable, false) <> true ",
  #                  "AND COALESCE(to.unusable, false) <> true ",
  #                  "MERGE (from)<-[r:NOT_SAME]->(to) ",
  #                  "ON CREATE SET r.basis = 'propagated'" )
  # query %>% cypher( con, "row", quiet )
  #
  # #propagate NOT_SAME (reviewed) through NOT_SAME-IS_SAME (reviewed)
  # query <- paste0( "MATCH (from:Image)<-[:NOT_SAME {reviewed: true}]->(int:Image)<-[:IS_SAME*2..2 {reviewed: true}]->(to:Image) ",
  #                  "WHERE id(from) <> id(to) ",
  #                  "AND id(from) <> id(int) ",
  #                  "AND id(to) <> id(int) ",
  #                  "AND NOT (from)<-[:NOT_SAME {reviewed: true}]->(to) ",
  #                  "AND NOT (from)<-[:IS_SAME]->(to) ",
  #                  "AND from.uuid IN ['", paste0( from_uuids, "', '", to_uuids, collapse = "', '" ), "'] ",
  #                  "AND int.uuid IN ['", paste0( from_uuids, "', '", to_uuids, collapse = "', '" ), "'] ",
  #                  "AND to.uuid IN ['", paste0( nodes$uuid, collapse = "', '" ), "'] ",
  #                  "AND COALESCE(from.unusable, false) <> true ",
  #                  "AND COALESCE(int.unusable, false) <> true ",
  #                  "AND COALESCE(to.unusable, false) <> true ",
  #                  "MERGE (from)<-[r:NOT_SAME]->(to) ",
  #                  "ON CREATE SET r.basis = 'propagated', r.reviewed = true ",
  #                  "ON MATCH SET r.reviewed = true")
  # query %>% cypher( con, "row", quiet )
}

#' Copy/Convert an image from S3 storage to tmpdir
#'
#' This function copies an image from S3 storage and optionally converts RAW images to jpg
#' @param path An S3 object key.
#' @param uuid uuid
#' @param pars A list with AWS S3 settings
#' @param quiet quiet A boolean specifying whether the function should (not) output progress.
#' @importFrom tools file_ext
#' @importFrom exifr read_exif
#' @importFrom aws.s3 save_object
#' @importFrom dplyr pull
s3_to_tmp <- function( path, uuid, pars, quiet = F ) {
  src <- paste0( tempdir(), "/", basename(path) )
  dest <- paste0( tempdir(), "/", uuid, '.jpg' )
  if( !file.exists( dest ) ) {
    save_object( path, bucket = pars$s3_bucket, file = src, base_url = pars$s3_endpoint, region = pars$s3_region, key = pars$s3_access_key, secret = pars$s3_secret_key, use_https = pars$ssl )

    #standardize extension if jpg
    type <- try( read_exif( src, tags = "FileType" ) %>% pull(FileType), silent = T )
    if( file_ext( src ) %in% c( 'jpg', 'JPG', 'jpeg', 'JPEG' ) | 'JPEG' == ifelse( class(type) != 'try-error', type, F ) ) {
      file.rename( src, dest )
    } else { #otherwise convert to jpg with Image Magick
      if( !quiet ) {
        cat( "Converting...\n" )
      }
      # [0] to take first layer for pyramid tiff files, see what this does for others?
      system2( "convert", args = paste( "'", src, "[0]' '", dest, "'", sep = "" ) )
      unlink(src)
    }
  }
}

#' Generate a deterministic UUID from a file
#'
#' This function generates a deterministic UUID from a file hash
#' @param file A vector of file paths.
#' @param quiet A boolean specifying whether the function should (not) output progress.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @importFrom openssl sha1
#' @export
uuid_from_file <- function( file, quiet = T ) {
  uuid <- character( length(file) )
  for( i in 1:length(file) ) {
    if( !quiet ) {
      cat( sprintf( '\rgenerating uuid %06i of %i', i, length(file) ) )
    }
    uuid[i] <- paste0( substring( as.character( sha1( file( file[i] ) ) ), c(1,9,13,17,21), c(8,12,16,20,32) ), collapse = "-" )
    close( file( file[i] ) )
  }
  return(uuid)
}

#' Check if a UUID exists in the matching database
#'
#' This function queries a vector of UUIDs against the matching database, returning
#' \code{TRUE} or \code{FALSE} if they exist.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the UUIDs to query.
#' @returns \code{TRUE} or \code{FALSE}
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
exists_in_database <- function( base_url, uuids ) {
  chunks <- lapply( split( 1:length(uuids),
                           sort( rep_len( 1:ceiling( length( uuids ) / 10000 ),
                                          length(uuids) ) ) ), range )
  output <- lapply( chunks, function(i) {
    r <- POST( paste0( base_url, '/api/image/exists/' ),
               body = list( 'image_uuid_list' = I(uuids[i[1]:i[2]]) ),
               encode = 'json' ) %>%
      content( as = 'text' ) %>%
      fromJSON()
    if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
      stop( r$status$message )
    } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
      stop( r$message )
    }
    r$response
  } )
  unlist( output, use.names = F )
}

#' Get image metadata from the matching database
#'
#' This function queries a vector of UUIDs against the matching database, returning image metadata.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the UUIDs to query.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
get_image_meta <- function( base_url, uuids ) {
  chunks <- lapply( split( 1:length(uuids),
                           sort( rep_len( 1:ceiling( length( uuids ) / 5000 ),
                                          length(uuids) ) ) ), range )
  output <- lapply( chunks, function(i) {
    r <- POST( paste0( base_url, '/api/image/' ),
               body = list( 'image_uuid_list' = I(uuids[i[1]:i[2]]) ),
               encode = 'json' ) %>%
      content( as = 'text' ) %>%
      fromJSON()
    if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
      stop( r$status$message )
    } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
      stop( r$message )
    }
    r$response
  } )
  bind_rows(output)
}

#' Get urls for images from the matching database
#'
#' This function queries a vector of UUIDs against the matching database, returning a list of
#' presigned urls for each image.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the UUIDs to query.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
get_image_urls <- function( base_url, uuids ) {
  r <- POST( paste0( base_url, '/api/image/presigned/get' ),
             body = list( 'image_uuid_list' = I(uuids) ),
             encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Upload images to the matching service
#'
#' This function uploads images to the matching service s3 storage.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param image_file_list (vector) of filepaths.
#' @param quiet A boolean specifying whether the function should (not) output progress.
#' @importFrom httr POST upload_file content
#' @importFrom jsonlite fromJSON
#' @export
upload_images <- function( base_url, image_file_list, quiet = T ) {
  filenames <- list( 'filenames' = basename( image_file_list ) )
  r <- POST( paste0( base_url, '/api/file/presigned/post/' ),
             body = filenames,
             encode = "json" ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  s3_image_list <- data.frame( "bucket" = character(0), "key" = character(0) )
  for( i in 1:nrow( r$response$presigned_urls ) ) {
    if( !quiet ) {
      cat( sprintf( 'uploading image %s to:\n bucket: %s,\n key: %s\n\n',
                    basename( image_file_list[i] ),
                    r$response$presigned_urls$s3_store$bucket[i],
                    r$response$presigned_urls$s3_store$key[i] ) )
    }
    fields <- r$response$presigned_urls$presigned_url$fields[i,]
    url <- r$response$presigned_urls$presigned_url$url[i]
    s3_store <- r$response$presigned_urls$s3_store[i,]
    POST( url, body = c( as.list( fields), file = list(upload_file( image_file_list[i] ) ) ), encode = "multipart" )
    s3_image_list[i,] <- s3_store
  }
  return( s3_image_list )
}

#' Get matching service task status
#'
#' This function return the status of a matching service task.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
task_status <- function( base_url, tid ) {
  r <- POST( paste0( base_url, '/api/task/status/' ),
             body = list( 'taskid' = tid,
                          'extended' = 'True'),
             encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  } else if( ! r$response$status$success ) {
    warning( r$response$status$message )
  }
  r$response$status$status
}

#' Wait for matching service task to complete
#'
#' This function checks the status of a matching service task, returning when complete.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @param sleep An integer specifying the number of seconds to pause between status checks.
#' @param quiet A boolean specifying whether the function should (not) output progress.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
wait_for_task <- function( base_url, tid, sleep = 1, max = 600, quiet = T ) {
  complete = F
  t0 = Sys.time()
  while( !complete ) {
    r <- task_status(base_url, tid)
    if( !quiet ) {
      cat( sprintf( "\rtask: %s, status: %9s, %.2fs", tid, r, round( difftime(Sys.time(), t0, units = "secs" ), 2 ) ) )
    }
    if( r %in% c( 'complete', 'exception', 'cancelled' ) ) {
      complete = T
      break()
    } else if( difftime(Sys.time(), t0, units = "secs" ) > max ) {
      complete = T
      r <- "timeout"
      break()
    }
    Sys.sleep(sleep)
  }
  if( !quiet ) {
    cat( "\n" )
  }
  r
}

#' Get result of completed task
#'
#' This function fetches the result of a completed task.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
get_task_result <- function( base_url, tid ) {
  r <- POST( paste0( base_url, '/api/task/result/' ),
                       body = list( 'taskid' = tid ),
                       encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Attempt to re-associate runaway task
#'
#' This function binds a new task id to a runaway task.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the UUID of the runaway task.
#' @param new_tid A character string specifying a new task UUID to associate with the runaway task.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
salvage_task <- function( base_url, tid, new_tid ) {
  r <- POST( paste0( base_url, '/api/task/salvage/' ),
             body = list( 'taskid_to_salvage' = tid,
                          'taskid' = new_tid ),
             encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Get result of completed task from presigned URL
#'
#' This function fetches the result of a completed task from a presigned URL.
#' @param presigned_url A character string specifying the presigned URL of the task result.
#' #' @param tid A character string specifying the task UUID.
#' @importFrom httr GET write_disk status_code
#' @importFrom jsonlite fromJSON
#' @export
get_task_result_from_presigned <- function( presigned_url, filename ) {
  tmp <- paste0( tempdir(), "/", filename )
  if( !file.exists( tmp ) ) {
    r <- GET( presigned_url, write_disk(tmp) ) %>%
      status_code()
    if( r != 200 ) {
      stop( 'httr GET Error: ', r )
    }
  }
  unzip(tmp, exdir=tempdir()) %>%
    fromJSON()
}

#' Create image annotation task
#'
#' This function registers an image annotation task with the matching service.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @param uuids A character vector specifying the image UUIDs to annotate.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
create_image_annotation_task <- function( base_url, tid, uuids ) {
  r <- POST( paste0( base_url, '/api/task/image/annotate/' ),
             body = list( 'taskid' = tid,
                          'image_uuid_list' = I(uuids) ),
             encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Get image annotations
#'
#' This function fetches image annotations by annotation uuid.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the annotation UUIDs to fetch.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
get_annotations <- function( base_url, uuids ) {
  chunks <- lapply( split( 1:length(uuids),
                           sort( rep_len( 1:ceiling( length( uuids ) / 100 ),
                                          length(uuids) ) ) ), range )
  output <- lapply( chunks, function(i) {
    r <- POST( paste0( base_url, '/api/annotation/get/' ),
               body = list( 'annotation_uuid_list' = I(uuids[i[1]:i[2]]) ),
               encode = 'json' ) %>%
      content( as = 'text' ) %>%
      fromJSON()
    if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
      stop( r$status$message )
    } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
      stop( r$message )
    }
    r$response
  } )
  bind_rows(output)
}

#' Get image annotations
#'
#' This function generates an image annotation task and returns the results.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the image UUIDs to annotate, limited to 10k.
#' @param ... additional parameters passed on to \code{\link{wait_for_task}}.
#' @importFrom uuid UUIDgenerate
#' @export
annotate_images <- function( base_url, uuids, ... ) {
  tid <- UUIDgenerate()
  create_image_annotation_task( base_url, tid, uuids )
  if( wait_for_task( base_url, tid, ... ) == 'complete' ) {
    result <- get_task_result( base_url, tid )
    if( length( result$result$success ) ) {
      annots <- get_annotations( base_url, result$result$success$annotation_uuids )
      #loop over original uuids, if in task_success$annotation_uuids, get annotation from ann_json
      #min(ann_json$response$features[[1]][which(ann_json$response$features[[1]]$type=="refined_fin_boundary"),"x"][[1]])
      #return( ann_json$response[0] )
    } else {
      warning( 'Annotation failed for all images.' )
    }
  } else {
    stop( 'Task cancelled or exception thrown.' )
  }
}

#' Get presigned s3 URL for json
#'
#' This function gets a presigned s3 URL for uploading a JSON task description.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @importFrom httr GET content
#' @importFrom jsonlite fromJSON
#' @export
get_presigned_json_url <- function( base_url ) {
  r <- GET( paste0( base_url, '/api/json/presigned/put/' ) ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Create matching task
#'
#' This function registers an image matching task with the matching service.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @param query A named list specifying "image_uuid_list", and optionally "encounter_uuid_list" and "individual_uuid_list".
#' @param ref A named list specifying "image_uuid_list", and optionally "encounter_uuid_list" and "individual_uuid_list".
#' @param exclude_same a
#' @param integrate_query_by a
#' @param integrate_ref_by a
#' @param integration_type a
#' @importFrom httr PUT POST status_code content
#' @importFrom jsonlite fromJSON
#' @export
create_matching_task <- function( base_url, tid, query, ref, exclude_same = 'image',
                                  integrate_query_by = 'image', integrate_ref_by = 'image',
                                  integration_type = 'model' ) {
  presigned <- get_presigned_json_url( base_url )
  r1 <- PUT( presigned$presigned_url,
             body = list( 'taskid' = tid,
                          'queries' = query,
                          'reference' = ref,
                          'exclude_same' = exclude_same,
                          'integrate_query_by' = integrate_query_by,
                          'integrate_reference_by' = integrate_ref_by,
                          'integration_type' = integration_type ),
             encode = 'json' ) %>%
    status_code()
  if( r1 != 200 ) {
    stop( 'httr PUT Error: ', r1 )
  }
  Sys.sleep(1)
  r2 <- POST( paste0( base_url, '/api/task/identify/indexidentifysync/' ),
              body = presigned,
              encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r2 ), !r2$status$success, F ) ) {
    stop( r2$status$message )
  } else if( !exists( 'status', r2 ) & exists( 'message' , r2 ) ) {
    stop( r2$message )
  }
  return( r2$response )
}

#' Create encounter matching task
#'
#' This function registers an image matching task with the matching service.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param tid A character string specifying the task UUID.
#' @param query A character vector specifying the query image UUIDs.
#' @importFrom httr POST content
#' @importFrom jsonlite fromJSON
#' @export
create_encounter_matching_task <- function( base_url, tid, query ) {
  r <- POST( paste0( base_url, '/api/task/identify/encountermake/' ),
             body = list( 'taskid' = tid,
                          'image_uuid_list' = I(query) ),
             encode = 'json' ) %>%
    content( as = 'text' ) %>%
    fromJSON()
  if( ifelse( exists( 'status', r ), !r$status$success, F ) ) {
    stop( r$status$message )
  } else if( !exists( 'status', r ) & exists( 'message' , r ) ) {
    stop( r$message )
  }
  return( r$response )
}

#' Match encounters
#'
#' This function generates an encounter to encounter matching task and returns the results.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param query_uuids A character vector specifying the query image UUIDs.
#' @param query_encounters A character vector the same length as query_uuids specifying the encounter membership.
#' @param ref_uuids A character vector specifying the reference image UUIDs.
#' @param ref_encounters A character vector the same length as ref_uuids specifying the encounter membership.
#' @param ... additional parameters passed on to \code{\link{wait_for_task}}.
#' @importFrom uuid UUIDgenerate
#' @export
match_encounters_to_encounters <- function( base_url, query_uuids, query_encounters, ref_uuids, ref_encounters, ... ) {
  tid <- UUIDgenerate()
  create_matching_task( base_url, tid,
                        list( 'image_uuid_list' = I(query_uuids), 'encounter_uuid_list' = I(query_encounters) ),
                        list( 'image_uuid_list' = I(ref_uuids), 'encounter_uuid_list' = I(ref_encounters) ),
                        exclude_same = 'encounter',
                        integrate_query_by = 'encounter', integrate_ref_by = 'encounter',
                        integration_type = 'sum' )
  if( wait_for_task( base_url, tid, ... ) == 'complete' ) {
    result <- get_task_result( base_url, tid )
    if( length( result$result$success ) ) {
      r <- get_task_result_from_presigned( result$result$success$presigned_url, basename( result$result$success$key ) )
      out <- tibble( query = sapply( r$results$best_image_matches_query, function(l) l[1] ), ref = sapply( r$results$best_image_matches_ref, function(l) l[1] ), score = sapply( r$results$score, function(l) l[1] ) )

    } else {
      warning( 'Matching failed for all encounters.' )
      out <- NULL
    }
    return(out)
  } else {
    stop( 'Task cancelled or exception thrown.' )
  }
}

#' Match encounter
#'
#' This function generates an image to image matching task and returns the results.
#' @param base_url A character string specifying the base URL of the matching service api.
#' @param uuids A character vector specifying the image UUIDs to match.
#' @param ... additional parameters passed on to \code{\link{wait_for_task}}.
#' @importFrom uuid UUIDgenerate
#' @export
match_encounter <- function( base_url, uuids, ... ) {
  tid <- UUIDgenerate()
  create_encounter_matching_task( base_url, tid, uuids )
  if( wait_for_task( base_url, tid, ... ) == 'complete' ) {
    r <- get_task_result( base_url, tid )
    if( length( r$result$success ) ) {
      out <- tibble(from = unlist(r$result$success$matched_images_p1), to = unlist(r$result$success$matched_images_p2), score = unlist(r$result$success$scores))
    } else {
      warning( 'Matching failed for all images.' )
      out <- NULL
    }
    return(out)
  } else {
    stop( 'Task cancelled or exception thrown.' )
  }
}

#' Cancel task
#'
#' This function cancels an existing task.
#' @param tid A character string specifying the task UUID.
#' @export
cancel_task <- function( base_url, tid ) {
  r <- POST( paste0( base_url, '/api/task/cancel/' ),
             body = list( 'taskid' = tid ),
             encode = 'json' ) %>%
    content( as = 'text' )
}


#image registration response
#res['response']['result']['success'][0]['image_uuid']
dylanirion/finmatchr documentation built on Nov. 23, 2021, 4:53 a.m.