#' @export
register_dataframe_to_entity = function(pkg_schema, con, df, entitynm){
arraynm = revealcore::full_arrayname(pkg_schema=pkg_schema, entitynm = entitynm, con = con)
namespace = revealcore::get_namespace(arraynm)
if(namespace != "public" && is_scidb_ee(con)){
namespace_perms = revealcore::show_user_namespace_permissions(pkg_schema = pkg_schema, con = con, namespace = namespace)
if(!namespace_perms$u || !namespace_perms$r || !namespace_perms$d){ #delete is needed for remove_versions
stop("User does not have required permissions to update ", entitynm)
}
}
df_columns_metadata_values = colnames(df)[sapply(df, class) %in% c("character", "factor")]
df = revealcore:::type_convert_dataframe_by_entity(pkg_schema = pkg_schema, con = con, df = df, entitynm = entitynm)
df_columns_metadata_values = intersect(colnames(df)[sapply(df, class) %in% c("character", "factor")],df_columns_metadata_values)
# Register main fields, possibly generating new unique ids
unique_fields = pkg_schema$array[[entitynm]]$unique_fields
unique_dim = pkg_schema$array[[entitynm]]$unique_fields_dim
entity_dims = names(pkg_schema$array[[entitynm]]$dims)
entity_attrs = names(pkg_schema$array[[entitynm]]$attributes)
info_array_fields = unique(c(unlist(pkg_schema$array[[entitynm]]$info_array_attributes),
setdiff(colnames(df), c(entity_dims, entity_attrs))))
info_array_fields_character = intersect(info_array_fields, df_columns_metadata_values)
info_array_fields_noncharacter = setdiff(info_array_fields, df_columns_metadata_values)
entitynm_metadata_value = paste0(entitynm, "_METADATA_VALUE")
entitynm_metadata_attrkey = paste0(entitynm, "_METADATA_ATTRKEY")
entitynm_info = paste0(entitynm, "_INFO")
if(length(info_array_fields)>0 && !is_entity(pkg_schema, entitynm_info)){
stop("Asked to register info fields, but info array not configured")
}
# Register non-info fields
df_main = df[, intersect(colnames(df), c(entity_dims, entity_attrs)), drop=F]
df_main_uploaded_name = NULL
df_new_uploaded_name = NULL
if(getOption("revealcore.debug", FALSE)) message("Uploading ", nrow(df_main), " rows to insert into ", arraynm)
tryCatch({
stopifnot(all(setdiff(c(entity_dims, entity_attrs), unique_dim) %in% colnames(df_main)))
df_main_uploaded = con$aop_connection$upload_df(df_main, template = arraynm, .gc = F)$persist(.gc=F)
df_main_uploaded_name = df_main_uploaded$array_meta_data()$name
do_remove_versions = TRUE # set to FALSE if we don't end up inserting anything
if(!is.null(unique_dim) && !(unique_dim %in% colnames(df_main))){ #if unique_dim is not specified, determine new or old unique_dim to use based on unique_fields
#what unique_fields don't already have a key assigned?
df_new_uploaded_0 = df_main_uploaded$left_join(con$aop_connection$array(arraynm), by=unique_fields)
df_new_uploaded_1 = df_new_uploaded_0$drop_dims()$mutate(.dots=sapply(setdiff(df_new_uploaded_0$attrs_n_dims,unique(c(unique_fields, unique_dim))), function(x) NULL)) #change unique_dim -> entity_dims
qq_uploaded_filtered = paste0("project(grouped_aggregate(filter(",df_new_uploaded_1$to_afl(),",",unique_dim," is null),",paste0(unique_fields, collapse = ",")," ,count(*)),",paste0(unique_fields, collapse = ","),")") #changed from unique_fields
# Get the subset of df_main_uploaded with unique_fields that haven't been registered
qq_uploaded_joined = paste0("redimension(",
"equi_join(", qq_uploaded_filtered, " as x,", df_main_uploaded_name, " as y,",
"left_names:(", paste0(paste0("x.",unique_fields), collapse=","), "),", #changed from unique_fields
"right_names:(",paste0(paste0("y.",unique_fields), collapse=","),")),", #changed from unique_fields
df_main_uploaded_name,")")
# Save that subset to a second array
df_new_uploaded = con$aop_connection$afl_expr(qq_uploaded_joined)$persist(.gc=F)
df_new_uploaded_name = df_new_uploaded$array_meta_data()$name
# Assign a new unique id to everything in df_new_uploaded_name and insert into entitynm in single transaction
if(nrow(df_new_uploaded$limit(n = 1)$to_df())==1){
if(getOption("revealcore.debug", FALSE)) message("Generating ", unique_dim, " for ", iquery(con$db, paste0("aggregate(", df_new_uploaded_name, ", count(*))"), T)$count, " new entries and inserting.")
udim_max = paste0(unique_dim, "_max")
schema_allows_many_to_many = !all(setdiff(entity_dims,unique_dim) %in% unique_fields)
qq1 = paste0("aggregate(apply(", arraynm, ", ", udim_max,",",unique_dim,"), max(",udim_max,"))")
if(schema_allows_many_to_many) {#Not all array dimensions are in unique_fields, so multiple rows may need the same id assigned
qqa = paste0("unpack(grouped_aggregate(", df_new_uploaded_name, ", ", paste0(unique_fields, collapse=","), ", count(*)),_revealcore_unpack_dim)")
} else {
qqa = paste0("unpack(",df_new_uploaded_name,",_revealcore_unpack_dim)")
}
udim_max_max = paste0(udim_max,"_max")
udim_new = paste0(unique_dim,"_new")
qq1a = paste0("apply(", qq1, ",", udim_new, ",iif(",udim_max_max," IS NULL, 0, ",udim_max_max,"+1))")
qq2 = paste0("cross_join(",qq1a,",",qqa,")")
qq3 = paste0("apply(",qq2,",",unique_dim,",_revealcore_unpack_dim+",udim_new,")")
if(schema_allows_many_to_many) {
qqb = paste0("project(",qq3,",",paste0(c(unique_dim, unique_fields), collapse=","),")")
qqc = paste0("equi_join(", qqb, " as x,", df_new_uploaded_name, " as y,left_names:(", paste0(paste0("x.",unique_fields), collapse=","), "),right_names:(",paste0(paste0("y.",unique_fields), collapse=","),"))")
qq4 = paste0("redimension(",con$aop_connection$afl_expr(qqc)$drop_dims()$to_afl(),",",arraynm,")")
} else {
qq4 = paste0("redimension(",con$aop_connection$afl_expr(qq3)$drop_dims()$to_afl(),",",arraynm,")")
}
qq5 = paste0("insert(",qq4,",",arraynm,")")
iquery(con$db, qq5)
}
else{
if(getOption("revealcore.debug", FALSE)) message("No new entries to insert.")
do_remove_versions = FALSE
}
# Get the dimensions for everything in the input data frame
if(getOption("revealcore.debug", FALSE)) message("Retreiving array dimensions for all rows in data frame.")
qr1 = paste0("equi_join(",df_main_uploaded_name," as x,",
con$aop_connection$afl_expr(arraynm)$drop_dims()$to_afl(), " as y,",
"left_names:(",paste0(paste0("x.",unique(c(unique_fields, setdiff(entity_dims, unique_dim)))), collapse=","),"),", #not df_main_uploaded$attrs #not unique_fields
"right_names:(",paste0(paste0("y.",unique(c(unique_fields, setdiff(entity_dims, unique_dim)))), collapse=","),"),", #not unique_fields
"keep_dimensions:true, left_outer:true)")
qr2 = paste0("project(sort(", qr1,",",df_main_uploaded$dims,"),", paste0(entity_dims, collapse=","),")")
dims = con$aop_connection$afl_expr(qr2)$to_df()
df[[unique_dim]] = as.integer(dims[[unique_dim]])
}
else { #if not generating unique_dim, all dims and attributes should've been provided and we can insert directly
if(getOption("revealcore.debug", FALSE)) message("Inserting ", nrow(df_main), " entries in ", full_arrayname(pkg_schema, entitynm, con = con))
stopifnot(setequal(colnames(df_main), c(entity_dims, entity_attrs)))
qq = paste0("insert(redimension(", df_main_uploaded_name, ",", arraynm, "),", arraynm, ")")
iquery(con$db, qq)
dims = df_main[, entity_dims, drop=F]
}
keep_versions = pkg_schema$array[[entitynm]]$keep_versions
if(do_remove_versions && !is.null(keep_versions)){
if(getOption("revealcore.debug", FALSE)) message("Removing all but latest ", keep_versions, " for ", arraynm)
if(keep_versions>1 && !.SCIDB_HAS_REMOVE_VERSIONS_KEEP_FIX(con=con)){
# remove_versions with keep: more version than exist causes issues
num_versions = nrow(iquery(con$db, paste0("versions(", arraynm, ")"), T))
if(keep_versions<num_versions){
iquery(con$db, paste0("remove_versions(", arraynm, ", keep: ",keep_versions,")"))
}
} else {
iquery(con$db, paste0("remove_versions(", arraynm, ", keep: ",keep_versions,")"))
}
}
if(length(info_array_fields)>0){
if(getOption("revealcore.debug", FALSE)) message("Updating INFO fields for ", full_arrayname(pkg_schema, entitynm, con = con))
# Find out which things from the original dataframe had new unique_ids registered and only register info fields for those
if(!is.null(df_new_uploaded_name)) {
if(nrow(df_new_uploaded$limit(n = 1)$to_df())!=1){
df=subset(df,FALSE)
} else {
dims_updated = iquery(con$db, paste0("grouped_aggregate(", df_new_uploaded_name, ", ", paste0(unique_fields, collapse=","), ", count(*))"),T)[,unique_fields,drop=F]
df = dplyr::left_join(dims_updated, df, by=unique_fields)
}
}
if(nrow(df)>0){
if(getOption("revealcore.debug", FALSE)) message("Updating INFO fields for ", nrow(df), " entries in ", full_arrayname(pkg_schema, entitynm, con = con))
metadata_attrkey_df = data.frame("metadata_attrkey"=info_array_fields)
metadata_attrkey_df$metadata_attrkey_id = register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = metadata_attrkey_df, entitynm = entitynm_metadata_attrkey)$metadata_attrkey_id
if(length(info_array_fields_character)>0){
df_info_1 = df[, c(names(dims), info_array_fields_character), drop=F]
df_info_1 = as.data.frame(tidyr::pivot_longer(df_info_1, tidyr::all_of(info_array_fields_character), names_to = "metadata_attrkey", values_to="metadata_value"))
df_info_1 = subset(df_info_1, !is.na(metadata_value))
metadata_value_df = data.frame("metadata_value"=na.omit(unique(df_info_1$metadata_value)))
metadata_value_df$metadata_value_id = register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = metadata_value_df, entitynm = entitynm_metadata_value)$metadata_value_id
df_info_1 = dplyr::left_join(df_info_1, metadata_value_df, by="metadata_value")
df_info_1 = dplyr::left_join(df_info_1, metadata_attrkey_df, by="metadata_attrkey")
register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = df_info_1, entitynm = entitynm_info)
}
if(length(info_array_fields_noncharacter)>0){
df_info_2 = df[, c(names(dims), info_array_fields_noncharacter), drop=F]
df_info_2 = as.data.frame(tidyr::pivot_longer(df_info_2, tidyr::all_of(info_array_fields_noncharacter), names_to = "metadata_attrkey", values_to="metadata_value"))
df_info_2 = subset(df_info_2, !is.na(metadata_value))
df_info_2$metadata_value_id = as.integer(-1)
df_info_2 = dplyr::left_join(df_info_2, metadata_attrkey_df, by="metadata_attrkey")
register_dataframe_to_entity(pkg_schema = pkg_schema, con = con, df = df_info_2, entitynm = entitynm_info)
}
}
else{
if(getOption("revealcore.debug", FALSE)) message("No INFO fields to insert.")
}
}
},
finally = { #Cleanup any uploaded dataframes we persisted
tryCatch({iquery(con$db, paste0("remove(", df_main_uploaded_name,")"))}, error = function(e){invisible(NULL)})
tryCatch({iquery(con$db, paste0("remove(", df_new_uploaded_name,")"))}, error = function(e){invisible(NULL)})
})
return(dims)
}
type_convert_dataframe_by_entity = function(pkg_schema, con, df, entitynm){
if(entitynm %in% get_entity_names(pkg_schema = pkg_schema)){
types_in_df = sapply(df, class)
types_for_df = sapply(colnames(df), function(x) "character")
attrs = pkg_schema$array[[entitynm]]$attributes
attrs = attrs[intersect(names(attrs), names(types_for_df))]
dims = names(pkg_schema$array[[entitynm]]$dims)
dims = intersect(dims, names(types_for_df))
if(length(dims)>0) types_for_df[dims] = "integer"
if(length(attrs)>0) {
attrs = setNames(data_type_mapping_scidb_to_r[unlist(attrs)],
names(attrs))
types_for_df[names(attrs)] = attrs
}
if(any(is.na(types_for_df))){
stop("Scidb -> R datatype equivalence not set for some attribute")
}
ind <- types_for_df != types_in_df
if(!all(!ind)){
if(getOption("revealcore.debug", FALSE)) message("Types in input dataframe do not match ",entitynm," array schema. Trying conversions ",paste0(sapply(names(types_in_df)[ind], function(x) paste0(x,":(",types_in_df[[x]][[1]],"->",types_for_df[[x]],")")), collapse = ", "))
df[colnames(df)] <- lapply(colnames(df), function(x) {
y = df[[x]]
if(types_in_df[[x]][[1]] != types_for_df[[x]]) {
tryCatch({
if(types_in_df[[x]][[1]] == "factor"){as.character(y)} #convert factors to character first to avoid weird behaviors
match.fun(paste0("as.", types_for_df[[x]]))(y)},
warning = function(w){stop(w$message)})}
else {y}
})
}
}
df
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.