R/io_register.R

Defines functions type_convert_dataframe_by_entity register_dataframe_to_entity

#' @export
register_dataframe_to_entity = function(pkg_schema, con, df, entitynm){
  arraynm = revealcore::full_arrayname(pkg_schema=pkg_schema, entitynm = entitynm, con = con)
  namespace = revealcore::get_namespace(arraynm)
  if(namespace != "public" && is_scidb_ee(con)){
    namespace_perms = revealcore::show_user_namespace_permissions(pkg_schema = pkg_schema, con = con, namespace = namespace)
    if(!namespace_perms$u || !namespace_perms$r || !namespace_perms$d){ #delete is needed for remove_versions
      stop("User does not have required permissions to update ", entitynm)
    }
  }
  df_columns_metadata_values = colnames(df)[sapply(df, class) %in% c("character", "factor")]
  df = revealcore:::type_convert_dataframe_by_entity(pkg_schema = pkg_schema, con = con, df = df, entitynm = entitynm)
  df_columns_metadata_values = intersect(colnames(df)[sapply(df, class) %in% c("character", "factor")],df_columns_metadata_values)
  
  # Register main fields, possibly generating new unique ids
  unique_fields = pkg_schema$array[[entitynm]]$unique_fields
  unique_dim = pkg_schema$array[[entitynm]]$unique_fields_dim
  entity_dims = names(pkg_schema$array[[entitynm]]$dims)
  entity_attrs = names(pkg_schema$array[[entitynm]]$attributes)
  info_array_fields = unique(c(unlist(pkg_schema$array[[entitynm]]$info_array_attributes),
                               setdiff(colnames(df), c(entity_dims, entity_attrs))))
  info_array_fields_character = intersect(info_array_fields, df_columns_metadata_values)
  info_array_fields_noncharacter = setdiff(info_array_fields, df_columns_metadata_values)
  entitynm_metadata_value = paste0(entitynm, "_METADATA_VALUE")
  entitynm_metadata_attrkey = paste0(entitynm, "_METADATA_ATTRKEY")
  entitynm_info = paste0(entitynm, "_INFO")
  
  if(length(info_array_fields)>0 && !is_entity(pkg_schema, entitynm_info)){
    stop("Asked to register info fields, but info array not configured")
  }
  
  # Register non-info fields
  df_main = df[, intersect(colnames(df), c(entity_dims, entity_attrs)), drop=F]
  
  df_main_uploaded_name = NULL
  df_new_uploaded_name = NULL
  
  if(getOption("revealcore.debug", FALSE)) message("Uploading ", nrow(df_main), " rows to insert into ", arraynm)
  
  tryCatch({
    stopifnot(all(setdiff(c(entity_dims, entity_attrs), unique_dim) %in% colnames(df_main)))
    
    df_main_uploaded = con$aop_connection$upload_df(df_main, template = arraynm, .gc = F)$persist(.gc=F)
    df_main_uploaded_name = df_main_uploaded$array_meta_data()$name
    
    do_remove_versions = TRUE # set to FALSE if we don't end up inserting anything
    
    if(!is.null(unique_dim) && !(unique_dim %in% colnames(df_main))){ #if unique_dim is not specified, determine new or old unique_dim to use based on unique_fields
      #what unique_fields don't already have a key assigned?
      df_new_uploaded_0 = df_main_uploaded$left_join(con$aop_connection$array(arraynm), by=unique_fields)
      df_new_uploaded_1 = df_new_uploaded_0$drop_dims()$mutate(.dots=sapply(setdiff(df_new_uploaded_0$attrs_n_dims,unique(c(unique_fields, unique_dim))), function(x) NULL)) #change unique_dim -> entity_dims
      qq_uploaded_filtered = paste0("project(grouped_aggregate(filter(",df_new_uploaded_1$to_afl(),",",unique_dim," is null),",paste0(unique_fields, collapse = ",")," ,count(*)),",paste0(unique_fields, collapse = ","),")") #changed from unique_fields
      
      # Get the subset of df_main_uploaded with unique_fields that haven't been registered
      qq_uploaded_joined = paste0("redimension(",
                                  "equi_join(", qq_uploaded_filtered, " as x,", df_main_uploaded_name, " as y,",
                                  "left_names:(", paste0(paste0("x.",unique_fields), collapse=","), "),", #changed from unique_fields
                                  "right_names:(",paste0(paste0("y.",unique_fields), collapse=","),")),", #changed from unique_fields
                                  df_main_uploaded_name,")")
      
      # Save that subset to a second array
      df_new_uploaded = con$aop_connection$afl_expr(qq_uploaded_joined)$persist(.gc=F)
      df_new_uploaded_name = df_new_uploaded$array_meta_data()$name
      
      # Assign a new unique id to everything in df_new_uploaded_name and insert into entitynm in single transaction
      if(nrow(df_new_uploaded$limit(n = 1)$to_df())==1){
        if(getOption("revealcore.debug", FALSE)) message("Generating ", unique_dim, " for ", iquery(con$db, paste0("aggregate(", df_new_uploaded_name, ", count(*))"), T)$count, " new entries and inserting.")
        
        udim_max = paste0(unique_dim, "_max")
        
        schema_allows_many_to_many = !all(setdiff(entity_dims,unique_dim) %in% unique_fields)
        
        qq1 = paste0("aggregate(apply(", arraynm, ", ", udim_max,",",unique_dim,"), max(",udim_max,"))")
        if(schema_allows_many_to_many) {#Not all array dimensions are in unique_fields, so multiple rows may need the same id assigned
          qqa = paste0("unpack(grouped_aggregate(", df_new_uploaded_name, ", ", paste0(unique_fields, collapse=","), ", count(*)),_revealcore_unpack_dim)")
        } else {
          qqa = paste0("unpack(",df_new_uploaded_name,",_revealcore_unpack_dim)")
        }
        udim_max_max = paste0(udim_max,"_max")
        udim_new = paste0(unique_dim,"_new")
        qq1a = paste0("apply(", qq1, ",", udim_new, ",iif(",udim_max_max," IS NULL, 0, ",udim_max_max,"+1))")
        qq2 = paste0("cross_join(",qq1a,",",qqa,")")
        qq3 = paste0("apply(",qq2,",",unique_dim,",_revealcore_unpack_dim+",udim_new,")")
        if(schema_allows_many_to_many) {
          qqb = paste0("project(",qq3,",",paste0(c(unique_dim, unique_fields), collapse=","),")")
          qqc = paste0("equi_join(", qqb, " as x,", df_new_uploaded_name, " as y,left_names:(", paste0(paste0("x.",unique_fields), collapse=","), "),right_names:(",paste0(paste0("y.",unique_fields), collapse=","),"))")
          qq4 = paste0("redimension(",con$aop_connection$afl_expr(qqc)$drop_dims()$to_afl(),",",arraynm,")")
        } else {
          qq4 = paste0("redimension(",con$aop_connection$afl_expr(qq3)$drop_dims()$to_afl(),",",arraynm,")") 
        }
        qq5 = paste0("insert(",qq4,",",arraynm,")")
        iquery(con$db, qq5)
      } 
      else{
        if(getOption("revealcore.debug", FALSE)) message("No new entries to insert.")
        do_remove_versions = FALSE
      }
      
      # Get the dimensions for everything in the input data frame
      if(getOption("revealcore.debug", FALSE)) message("Retreiving array dimensions for all rows in data frame.")
      qr1 = paste0("equi_join(",df_main_uploaded_name," as x,", 
                   con$aop_connection$afl_expr(arraynm)$drop_dims()$to_afl(), " as y,",
                   "left_names:(",paste0(paste0("x.",unique(c(unique_fields, setdiff(entity_dims, unique_dim)))), collapse=","),"),", #not df_main_uploaded$attrs #not unique_fields
                   "right_names:(",paste0(paste0("y.",unique(c(unique_fields, setdiff(entity_dims, unique_dim)))), collapse=","),"),", #not unique_fields
                   "keep_dimensions:true, left_outer:true)")
      qr2 = paste0("project(sort(", qr1,",",df_main_uploaded$dims,"),", paste0(entity_dims, collapse=","),")")
      dims = con$aop_connection$afl_expr(qr2)$to_df()
      df[[unique_dim]] = as.integer(dims[[unique_dim]])
    }
    else { #if not generating unique_dim, all dims and attributes should've been provided and we can insert directly
      if(getOption("revealcore.debug", FALSE)) message("Inserting ", nrow(df_main), " entries in ", full_arrayname(pkg_schema, entitynm, con = con))
      stopifnot(setequal(colnames(df_main), c(entity_dims, entity_attrs)))
      qq = paste0("insert(redimension(", df_main_uploaded_name, ",", arraynm, "),", arraynm, ")")
      iquery(con$db, qq)
      dims = df_main[, entity_dims, drop=F]
    }
    keep_versions = pkg_schema$array[[entitynm]]$keep_versions
    if(do_remove_versions && !is.null(keep_versions)){
      if(getOption("revealcore.debug", FALSE)) message("Removing all but latest ", keep_versions, " for ", arraynm)
      if(keep_versions>1 && !.SCIDB_HAS_REMOVE_VERSIONS_KEEP_FIX(con=con)){
        # remove_versions with keep: more version than exist causes issues
        num_versions = nrow(iquery(con$db, paste0("versions(", arraynm, ")"), T))
        if(keep_versions<num_versions){
          iquery(con$db, paste0("remove_versions(", arraynm, ", keep: ",keep_versions,")"))
        }
      } else {
        iquery(con$db, paste0("remove_versions(", arraynm, ", keep: ",keep_versions,")"))
      }
    }
    if(length(info_array_fields)>0){
      if(getOption("revealcore.debug", FALSE)) message("Updating INFO fields for ", full_arrayname(pkg_schema, entitynm, con = con))
      # Find out which things from the original dataframe had new unique_ids registered and only register info fields for those
      if(!is.null(df_new_uploaded_name)) {
         if(nrow(df_new_uploaded$limit(n = 1)$to_df())!=1){
           df=subset(df,FALSE)
         } else {
           dims_updated = iquery(con$db, paste0("grouped_aggregate(", df_new_uploaded_name, ", ", paste0(unique_fields, collapse=","), ", count(*))"),T)[,unique_fields,drop=F]
           df = dplyr::left_join(dims_updated, df, by=unique_fields)
         }
      }
      if(nrow(df)>0){
      if(getOption("revealcore.debug", FALSE)) message("Updating INFO fields for ", nrow(df), " entries in ", full_arrayname(pkg_schema, entitynm, con = con))
      metadata_attrkey_df = data.frame("metadata_attrkey"=info_array_fields)
      metadata_attrkey_df$metadata_attrkey_id = register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = metadata_attrkey_df, entitynm = entitynm_metadata_attrkey)$metadata_attrkey_id
      if(length(info_array_fields_character)>0){
        df_info_1 = df[, c(names(dims), info_array_fields_character), drop=F]
        df_info_1 = as.data.frame(tidyr::pivot_longer(df_info_1, tidyr::all_of(info_array_fields_character), names_to = "metadata_attrkey", values_to="metadata_value"))
        df_info_1 = subset(df_info_1, !is.na(metadata_value))
        metadata_value_df = data.frame("metadata_value"=na.omit(unique(df_info_1$metadata_value)))
        metadata_value_df$metadata_value_id = register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = metadata_value_df, entitynm = entitynm_metadata_value)$metadata_value_id
        df_info_1 = dplyr::left_join(df_info_1, metadata_value_df, by="metadata_value")
        df_info_1 = dplyr::left_join(df_info_1, metadata_attrkey_df, by="metadata_attrkey")
        register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = df_info_1, entitynm = entitynm_info)
      }
      if(length(info_array_fields_noncharacter)>0){
        df_info_2 = df[, c(names(dims), info_array_fields_noncharacter), drop=F]
        df_info_2 = as.data.frame(tidyr::pivot_longer(df_info_2, tidyr::all_of(info_array_fields_noncharacter), names_to = "metadata_attrkey", values_to="metadata_value"))
        df_info_2 = subset(df_info_2, !is.na(metadata_value))
        df_info_2$metadata_value_id = as.integer(-1)
        df_info_2 = dplyr::left_join(df_info_2, metadata_attrkey_df, by="metadata_attrkey")
        register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = df_info_2, entitynm = entitynm_info)
      }
      }
      else{
        if(getOption("revealcore.debug", FALSE)) message("No INFO fields to insert.")
      }
    }
  },
  finally = { #Cleanup any uploaded dataframes we persisted
    tryCatch({iquery(con$db, paste0("remove(", df_main_uploaded_name,")"))}, error = function(e){invisible(NULL)})
    tryCatch({iquery(con$db, paste0("remove(", df_new_uploaded_name,")"))}, error = function(e){invisible(NULL)})
  })
  return(dims)
}

type_convert_dataframe_by_entity = function(pkg_schema, con, df, entitynm){
  if(entitynm %in% get_entity_names(pkg_schema = pkg_schema)){
    types_in_df = sapply(df, class)
    types_for_df = sapply(colnames(df), function(x) "character")
    
    attrs = pkg_schema$array[[entitynm]]$attributes
    attrs = attrs[intersect(names(attrs), names(types_for_df))]
    dims = names(pkg_schema$array[[entitynm]]$dims)
    dims = intersect(dims, names(types_for_df))
    if(length(dims)>0) types_for_df[dims] = "integer"
    if(length(attrs)>0) {
      attrs = setNames(data_type_mapping_scidb_to_r[unlist(attrs)],
                       names(attrs))
      types_for_df[names(attrs)] = attrs
    }
    if(any(is.na(types_for_df))){
      stop("Scidb -> R datatype equivalence not set for some attribute")
    }
    
    ind <- types_for_df != types_in_df
    if(!all(!ind)){
      if(getOption("revealcore.debug", FALSE)) message("Types in input dataframe do not match ",entitynm," array schema.  Trying conversions ",paste0(sapply(names(types_in_df)[ind], function(x) paste0(x,":(",types_in_df[[x]][[1]],"->",types_for_df[[x]],")")), collapse = ", "))
      df[colnames(df)] <- lapply(colnames(df), function(x) {
        y = df[[x]]
        if(types_in_df[[x]][[1]] != types_for_df[[x]]) {
          tryCatch({
            if(types_in_df[[x]][[1]] == "factor"){as.character(y)} #convert factors to character first to avoid weird behaviors
            match.fun(paste0("as.", types_for_df[[x]]))(y)},
            warning = function(w){stop(w$message)})}
        else {y}
      })
    }
  }
  df
}
Paradigm4/revealcore documentation built on May 21, 2023, 9:57 a.m.