#' Funzione per salvare un grafo
#'
#' La funzione controlla la presenza di eventuali conflitti e necessita'
#' di risincronizzare i dati del DB con quelli presenti nel Grafo.
#'
#' \itemize{
#' \item{"1"}{
#' Identificare le serie aggregate (solo formule) - primitive (solo dati)
#' cambiate, escludendo eventuali conflitti}
#' \item{"2"}{Caricarle nel grafo}
#' \item{"3"}{Rieseguirle}
#' \item{"4"}{Risalvare il grafo}
#' }
#'
#' La funzione controlla se esistono conflitti nel seguente modo:
#' \itemize{
#' \item{"dati"}{
#' Se esistono serie primitive nel DB e nel grafo
#' in sessione che sono state aggiornate in
#' contemporanea}
#' \item{"formule"}{
#' Se esistono formule nel DB e nel grafo in
#' sessione aggiornati in contemporanea}
#' }
#'
#' Qualora uno dei due casi si verificasse il grafo va in "conflitto",
#' vengono salvate sia le proprie modifiche che le modifiche fatte da
#' altri e si attende la risoluzione del conflitto attraverso i metodi
#' `fixConflict`. La soluzione dei conflitti non e' un atto di fede:
#' occorre incontrarsi e decidere quale "formula" o quale versione dei dati
#' sia da preferire.
#'
#' @seealso saveGraph
#' @name save_graph_impl
#' @include conflicts.r copy_graph.r assert_dag.r persistence_utils.r
#' @param x GrafoDB instance
#' @param tag tag del grafo da salvare, di default e' la stessa di `x`
#' @param ... altri parametri
save_graph_impl <- function(x, tag = x@tag, ...) {
ln <- "GrafoDB.persistence.save_graph_impl"
rutils::.trace("save_graph_impl started", name = ln)
param_list <- list(...)
msg <- rutils::ifelse("msg" %in% names(param_list), param_list[["msg"]], "")
con <- if ("con" %in% names(param_list)) {
rutils::.debug("connection context provided", name = ln)
param_list[["con"]]
} else {
rutils::.debug("connection has to be created...", name = ln)
con <- build_connection()
on.exit(disconnect(con))
rutils::.debug("connection created and set to be closed on.exit", name = ln)
con
}
rutils::.debug("Message used for saving: %s", msg, name = ln)
tryCatch({
rutils::.trace("beginning transaction", name = ln)
DBI::dbBegin(con)
if (has_conflicts(x, con = con)) {
stop("Il grafo ", tag, " ha conflitti, risolverli prima di salvare")
}
if (need_resync(x)) {
rutils::.info("Resync started", name = ln)
# risincronizzo i dati del db con la copia nel grafo
x <- resync(x, con = con)
# trova serie che necessitano il resync
name_to_sync <- changed_series_names(x, con = con)
# trova serie con conflitti
name_in_conflicts <- intersect(name_to_sync,
union(hash::keys(x@functions), hash::keys(x@data)))
clean_names <- setdiff(name_to_sync, name_in_conflicts)
# clean_names contiene le serie che possono essere ricaricate dal
# db e rivalutate
# senza problemi
# aggiungo gli archi del DB al presente grafo
network <- x@network
archi <- load_edges(tag, con = con)
archi <- archi[, c("partenza", "arrivo")]
dbnetwork <- igraph::graph.data.frame(
as.data.frame(archi), directed = TRUE)
network <- igraph::graph.union(network, dbnetwork, byname = TRUE)
assert_dag(network)
x@network <- network
x <- evaluate(x, clean_names)
}
check_conflicts(x, con = con)
if (exists_tag(tag, con = con)) {
# se esiste il tag sul DB
# sto aggiornando il grafo tag
rutils::.trace("'%s' exists on DB, I'm updating it...", tag, name = ln)
if (x@tag != tag) {
rutils::.trace("x@tag ('%s') != tag (%s), execute history, \
delete tag and recreate a copy of it",
x@tag, tag, name = ln)
# faccio l'history del tag di destinazione
do_history(x, tag, con)
# lo cancello
delete_graph_impl(tag, con)
# copio il grafo in sessione col grafo attuale
copy_graph(x@tag, tag, con = con, msg = msg)
}
# aggiorno eventuali cambiamenti in sessione
rutils::.trace("update eventual changes in session", name = ln)
update_graph(x, con = con, msg = msg)
} else {
if (x@tag == tag) {
rutils::.trace("tag as param equals tag as slot: creating a new graph",
name = ln)
# se non esiste il tag sul DB
# sto creando un nuovo grafo
create_graph(x, tag, con = con, msg = msg)
} else {
# se i tag sono differenti
if (nrow(x@dbdati) == 0 && nrow(x@dbformule) == 0) {
rutils::.trace("have no data, simply create an empty graph",
name = ln)
# non ho dati, creo grafo
create_graph(x, tag, con = con, msg = msg)
} else {
# ho dati, quindi copio il grafo dalla fonte alla
# destinazione sul DB e...
rutils::.trace("have data, so copying graph... ", name = ln)
copy_graph(x@tag, tag, con = con, msg = msg)
# Aggiorno eventuali cambiamenti in sessione
rutils::.trace("... and update eventual changes in session",
name = ln)
update_graph(x, tag, con = con, msg = msg)
}
}
}
DBI::dbCommit(con)
}, error = function(err) {
tryCatch({
DBI::dbRollback(con)
}, error = function(nested_error) {
stop(nested_error, "Root: ", err)
})
stop(err)
})
x
}
#' @include update_archi.r update_data.r update_functions.r
update_graph <- function(x, tag = x@tag, con = NULL, msg = "") {
## supporto per history
do_history(x, tag = tag, con = con)
update_data(x, con = con, tag = tag, notes = msg)
update_functions(x, con = con, tag = tag, msg = msg)
update_edges(x, con = con, tag = tag)
DBI::dbExecute(con, sql_by_key(
"UPDATE_GRAFO_LAST_UPDATED",
autore = rutils::whoami(),
tag = tag,
last_updated = time_in_nano(),
.con = con))
}
#' crea ex-novo un istanza di grafo nel databae
#'
#' @name create_graph
#' @rdname create_graph-internal
#' @param x istanza di Grafo
#' @param tag identificativo della versione
#' @param con connessione al DB
#' @param msg eventual message to log
create_graph <- function(x, tag, con, msg = NULL) {
commento <- if (is.null(msg)) {
if (interactive()) {
readline(prompt = "Inserisci un commento/nota per: ")
} else {
paste0("Rilascio per ", tag)
}
} else {
msg
}
autore <- rutils::whoami()
if (length(names(x))) {
name <- NULL
dati <- foreach::`%do%`(foreach::foreach(
name = iterators::iter(names(x)), .combine = rbind), {
tt <- x[[name]]
df <- to_data_frame(tt, name)
anno <- as.numeric(df$anno)
periodo <- as.numeric(df$periodo)
freq <- as.numeric(df$freq)
dati <- as.character(df$dati)
DBI::dbExecute(con, sql_by_key(
"INSERT_DATI",
tag = tag,
name = name,
anno = anno,
periodo = periodo,
freq = freq,
dati = dati,
autore = autore,
last_updated = time_in_nano(),
.con = con))
})
} else {
stop("There's no data to save")
}
archi <- as.data.frame(igraph::get.edgelist(x@network))
if (nrow(archi)) {
row <- NULL
foreach::`%do%`(foreach::foreach(row = iterators::iter(archi, "row")), {
partenza <- row[, 1]
arrivo <- row[, 2]
DBI::dbExecute(con, sql_by_key(
"INSERT_ARCO",
tag = tag,
partenza = partenza,
arrivo = arrivo,
autore = autore,
last_updated = time_in_nano(),
.con = con))
})
}
name <- NULL
foreach::`%do%`(foreach::foreach(
name = iterators::iter(names(x)), .combine = rbind), {
formula <- expr(x, name, echo = FALSE)
if (!is.null(formula)) {
DBI::dbExecute(con, sql_by_key(
"INSERT_FORMULA",
tag = tag,
name = name,
formula = formula,
autore = autore,
last_updated = time_in_nano(),
.con = con))
}
})
DBI::dbExecute(con, sql_by_key(
"INSERT_GRAFI",
tag = tag,
commento = commento,
autore = autore,
last_updated = time_in_nano(),
.con = con))
}
#' conta le versioni rolling del grafo con tag `tag`
#'
#' @name count_rolling
#' @param x istanza di grafo
#' @param con connessione al DB
#' @return un intero ad indicare il numero di versioni rolling salvate sul DB
#' @include db.r
count_rolling <- function(x, con) {
stopifnot(is.grafodb(x))
tag <- x@tag
# controlla che grafi_`tag`_ordinal_seq esista.
# se esiste, prende il prossimo `p` dalla sequence;
# se non esiste, esegue il blocco qui sotto, crea la sequence e
# aggiorna il valore
if (getenv() == "prod") {
## se PostgreSQL:
nome_seq <- glue::glue("grafi_{tag}_ordinal_seq")
df <- DBI::dbGetQuery(con, sql_by_key("EXISTS_SEQ",
seq = nome_seq, .con = con))
if (nrow(df) > 0) {
df <- DBI::dbGetQuery(con, sql_by_key("NEXT_VAL",
seq = nome_seq, .con = con))
as.numeric(df[[1]])
} else {
val <- get_max_provvisorio(tag, con) + 1
stmts <- sql_by_key("CREATE_SEQ", seq = nome_seq, val = val,
.con = con, .multiline = TRUE)
stmts <- unlist(stringr::str_split(stmts, ";"))
for (stmt in stmts) {
DBI::dbExecute(con, stmt)
}
count_rolling(x, con)
}
} else {
## se SQLite:
get_max_provvisorio(tag, con) + 1
}
}
get_max_provvisorio <- function(tag, con) {
df <- DBI::dbGetQuery(con, sql_by_key("COUNT_ROLLING", tag = tag, .con = con))
if (nrow(df) == 0) {
0
} else {
numeri <- suppressWarnings(
as.numeric(gsub("p", "", gsub(tag, "", df[, 1]))))
max(numeri, na.rm = TRUE)
}
}
#' Costruice il progressivo per il grafo `x`
#'
#' @name next_rolling_name
#' @param x GrafoDB instance
#' @param con DB connection
next_rolling_name <- function(x, con) {
glue::glue("{x@tag}p{ count_rolling(x, con) }")
}
#' Esegue il rolling dei vintage del `GrafoDB`
#'
#' Ad ogni salvataggio con il metodo `saveGraph` se non impostiamo
#' un nuovo `tag` il `GrafoDB` salva i dati sullo stesso `tag` ma
#' contemporaneamente salva la versione precedente con un progressivo,
#' in modo da tener traccia di chi ha fatto cosa nel tempo.
#'
#' Le versioni sono contraddistinte da un nuovo tag, `tag`p`X` dove
#' `X` e' un numero progressivo
#'
#' Il grafo potra' successivamente essere caricato con il nuovo tag.
#'
#' @name do_history
#' @param x istanza di `GrafoDB`
#' @param tag tag del grafo
#' @param con connessione al database
#' @note questa e' una funzione interna del grafo invocata da `update_graph`
#' @seealso saveGraph update_graph
do_history <- function(x, tag, con) {
ril <- rilasci(tag)
autore <- ril[ril$tag == x@tag, ]$autore
if (length(autore) == 0) {
autore <- rutils::whoami()
}
dest <- next_rolling_name(x, con)
if (interactive()) message("Saving GrafoDB from ", x@tag, " to ", dest)
copy_graph(
x@tag, dest,
con = con,
autore = autore,
last_update = x@timestamp)
if (interactive()) message("Saving ", dest, " completed")
0
}
#' Salva un istanza di grafo sul file system
#'
#' @name saveBinary
#' @param x istanza del GrafoDB
#' @param path percorso del file su cui salvare il grafo
#' @export
#' @note il restore si fa con il comando `readBinary`
saveBinary <- function(x, path) { # nolint
con <- file(path, "wb")
on.exit(close(con))
ret <- serialize(x, con, ascii = TRUE)
invisible(ret)
}
#' Legge un GrafoDB dal filesystem in formato binario con `saveBinary`
#'
#' @name readBinary
#' @param path percorso del file con il GrafoDB
#' @return GrafoDB contenuto nel file `path`
#' @export
readBinary <- function(path) { # nolint
con <- file(path, "rw")
on.exit(close(con))
unserialize(con)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.