R/phase1.R

Defines functions compute_all_last gaps gaps.f_forward gaps.f_backward dt_to_df_inner dt_to_df y_coordinates tree_filtering hl_y_coordinates hl_y_paje_tree reorder_elimination_tree atree_to_df atree_coordinates isolate_read_write_m isolate_read_write starvz_phase1

Documented in starvz_phase1

#' @include starvz_data.R
NULL

#' Execute StarVZ Phase one.
#'
#' This function calls all CSV-converter inner-functions to pre-process
#' they into StarVZ files. Although this can be directly used in a folder
#' where all CSV compressed (gzip) files reside, we suggest to use the
#' shell tool \code{starvz} or \code{phase1-workflow.sh} in the \code{tools/}
#' directory.
#'
#' @param directory Directory of CSV files
#' @param app_states_fun Function to determine application
#' @param state_filter Type of filter
#' @param whichApplication Name of Application
#' @param input.parquet Use or not of parquet files
#' @param config_file StarVZ config structure, this function uses only the app_tasks
#' @return ggplot object with all starvz plots
#' @family phase1 functions
#'
#' @examplesIf requireNamespace("arrow", quietly = TRUE)
#' \donttest{
#' example_folder <- system.file("extdata", "lu_trace", package = "starvz")
#' starvz_phase1(directory = example_folder)
#' }
#' @export
starvz_phase1 <- function(directory = ".", app_states_fun = lu_colors,
                          state_filter = 0, whichApplication = "", input.parquet = "1",
                          config_file = NULL) {
  # Start of reading procedure
  if (is.null(app_states_fun)) stop("app_states_fun is obligatory for reading")

  # If no config_file is specified, try to fallback on the config.yaml inside directory
  # If it is still not available, it will use a default one
  if (!is.null(config_file)) {
    config <- starvz_read_config(config_file)
  } else {
    config <- starvz_read_config(file.path(directory, "config.yaml"), warn = FALSE)
  }

  # Read entities.csv and register the hierarchy (with Y coordinates)
  entities <- hl_y_paje_tree(where = directory)
  dfhie <- entities$workertreedf

  # Read Worker States
  Worker <- read_worker_csv(
    where = directory,
    app_states_fun = app_states_fun,
    outlier_fun = outlier_definition,
    state_filter = state_filter,
    whichApplication = whichApplication,
    config = config
  )
  Worker$Application <- Worker$Application %>%
    hl_y_coordinates(dfhie = dfhie) %>%
    select(-"Type")

  Worker$StarPU <- Worker$StarPU %>%
    hl_y_coordinates(dfhie = dfhie) %>%
    select(-"Type")

  if (Worker$Application %>% nrow() == 0) stop("After reading states, number of application rows is zero.")

  # This can be isolated
  gc()
  isolate_read_write(input.parquet, read_vars_set_new_zero, "Variable", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, pmtool_bounds_csv_parser, "Pmtool", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, data_handles_csv_parser, "Data_handles", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, papi_csv_parser, "Papi", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, read_memory_state_csv, "Memory_state", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, read_comm_state_csv, "Comm_state", directory, Worker$ZERO)
  gc()
  isolate_read_write(input.parquet, read_other_state_csv, "Other_state", directory, Worker$ZERO)
  gc()
  isolate_read_write_m(input.parquet, events_csv_parser, directory, Worker$ZERO)
  gc()
  isolate_read_write_m(input.parquet, tasks_csv_parser, directory, Worker$ZERO)
  gc()

  # Read links
  dfl <- read_links(where = directory, Worker$ZERO)

  # Read the elimination tree
  dfa <- atree_load(where = directory)

  # QRMumps case:
  # If the Atree is available and loaded, we create new columns for each task
  # to hold Y coordinates for the temporal elimination tree plot
  if (!is.null(dfa)) {
    dfap <- dfa %>%
      select(-"Parent", -"Depth") %>%
      rename(Height.ANode = "Height", Position.ANode = "Position")
    Worker$Application <- Worker$Application %>% left_join(dfap, by = "ANode")
    dfap <- NULL
    # Reorder the elimination tree
    dfa <- reorder_elimination_tree(dfa, Worker$Application)
  }

  # Read Dag
  dfdag <- read_dag(where = directory, Worker$Application %>% mutate(Application = TRUE), dfl)

  dpmts <- pmtool_states_csv_parser(where = directory, whichApplication = whichApplication, Y = dfhie, States = Worker$Application)

  # Ending
  # Enframe ZERO
  ZERO <- enframe(Worker$ZERO, name = NULL)

  # Enframe Version
  Version <- enframe("0.2.0", name = NULL)

  starvz_log("Assembling the named list with the data from this case.")

  data <- list(
    Origin = directory,
    Application = Worker$Application,
    StarPU = Worker$StarPU,
    Colors = Worker$Colors,
    Link = dfl, Dag = dfdag, Y = dfhie, Atree = dfa,
    Pmtool_states = dpmts, entities = entities$dfe,
    Zero = ZERO, Version = Version
  )

  starvz_log("Call Gaps.")
  data$Gaps <- NULL # gaps(data)

  starvz_log("Call Latest.")
  data$Last <- compute_all_last(data)

  if (input.parquet == "1") {
    starvz_log("Saving as parquet")
    starvz_write_parquet(data, directory = directory)
  } else {
    starvz_log("Saving as feather")
    starvz_write_feather(data, directory = directory)
  }
}

isolate_read_write <- function(input.parquet, fun, name, directory, ZERO) {
  data <- list()
  data[[name]] <- fun(where = directory, ZERO = ZERO)
  if (input.parquet == "1") {
    starvz_log("Saving as parquet")
    starvz_write_parquet(data, directory = directory)
  } else {
    starvz_log("Saving as feather")
    starvz_write_feather(data, directory = directory)
  }
  return(NULL)
}

isolate_read_write_m <- function(input.parquet, fun, directory, ZERO) {
  data <- fun(where = directory, ZERO = ZERO)
  if (is.null(data)) {
    return(NULL)
  }
  if (input.parquet == "1") {
    starvz_log("Saving as parquet")
    starvz_write_parquet(data, directory = directory)
  } else {
    starvz_log("Saving as feather")
    starvz_write_feather(data, directory = directory)
  }
  return(NULL)
}

# This function gets a data.tree object and calculate three properties
# H: height, P: position, D: depth
atree_coordinates <- function(atree, height = 1) {
  defineHeightPosition <- function(node, curPos, depth) {
    if (length(node$children) == 0) {
      # My coordinates are the same of the parents
      node$H <- node$parent$H
      node$P <- node$parent$P
      node$D <- node$parent$D
    } else {
      # Defined this node properties
      node$H <- height
      node$P <- curPos
      node$D <- depth
      # Before recursion, set the new Y position at curPos
      curPos <- curPos + node$H
      # Recurse
      for (child in node$children) {
        curPos <- defineHeightPosition(child, curPos, (depth + 1))
      }
    }
    return(curPos)
  }
  defineHeightPosition(atree, 0, 0)
  return(atree)
}

# This function gets a data.tree object and converts it to a tibble
# Expects three properties on each node (H, P, and D), as defined above
atree_to_df <- function(node) {
  ndf <- tibble(
    Parent = ifelse(is.null(node$parent), NA, node$parent$name),
    ANode = node$name,
    Height = node$H,
    Position = node$P,
    Depth = node$D
  )
  for (child in node$children) ndf <- ndf %>% bind_rows(atree_to_df(child))
  return(ndf)
}

reorder_elimination_tree <- function(Atree, Application) {
  # Reorganize tree Position, consider only not pruned nodes and submission order
  data_reorder <- Application %>%
    filter(grepl("qrt", .data$Value)) %>%
    select("ANode", "SubmitOrder") %>%
    unique() %>%
    group_by(.data$ANode) %>%
    mutate(SubmitOrder = as.integer(.data$SubmitOrder)) %>%
    arrange(.data$SubmitOrder) %>%
    slice(1) %>%
    ungroup() %>%
    arrange(.data$SubmitOrder) %>%
    mutate(Position = 1:n(), Height = 1) %>%
    select(-"SubmitOrder")

  Atree <- Atree %>%
    # Replace Position and Height by new ordering
    select(-"Position", -"Height") %>%
    left_join(data_reorder, by = "ANode")

  # Define Position for pruned nodes as the same of its Parent
  data_pruned_position <- Application %>%
    filter(grepl("qrt", .data$Value) | grepl("do_subtree", .data$Value)) %>%
    mutate(NodeType = case_when(.data$Value == "do_subtree" ~ "Pruned", TRUE ~ "Not Pruned")) %>%
    select(-"Position", -"Height") %>%
    left_join(Atree, by = "ANode") %>%
    select("ANode", "Parent", "NodeType", "Position", "Height") %>%
    unique() %>%
    left_join(
      Atree %>%
        select("ANode", "Position", "Height"),
      by = c("Parent" = "ANode"), suffix = c("", ".Parent")
    ) %>%
    # pruned child node have the same position as its father
    mutate(
      Position = case_when(.data$NodeType == "Pruned" ~ .data$Position.Parent, TRUE ~ .data$Position),
      Height = case_when(.data$NodeType == "Pruned" ~ .data$Height.Parent, TRUE ~ .data$Height)
    ) %>%
    select(-"Parent", -"Position.Parent", -"Height.Parent")

  Atree <- Atree %>%
    # Replace Position and Height for pruned nodes
    select(-"Position", -"Height") %>%
    left_join(data_pruned_position, by = "ANode")

  return(Atree)
}

hl_y_paje_tree <- function(where = ".") {
  entities.feather <- paste0(where, "/entities.feather")
  entities.csv <- paste0(where, "/entities.csv")

  if (file.exists(entities.feather)) {
    check_arrow()
    starvz_log(paste("Reading ", entities.feather))
    dfe <- arrow::read_feather(entities.feather)
  } else if (file.exists(entities.csv)) {
    starvz_log(paste("Reading ", entities.csv))
    dfe <- starvz_suppressWarnings(read_csv(entities.csv,
      trim_ws = TRUE,
      col_types = cols(
        Parent = col_character(),
        Name = col_character(),
        Type = col_character(),
        Nature = col_character()
      )
    ))
  } else {
    starvz_log(paste("Files", entities.feather, "or", entities.csv, "do not exist."))
    return(NULL)
  }

  # first part: read entities, calculate Y
  # If this file is read with readr's read_csv function, the data.tree does not like

  if ((dfe %>% nrow()) == 0) stop(paste("After reading the entities file, the number of rows is zero"))

  workertree <- tree_filtering(
    dfe,
    c("Link", "Event", "Variable"),
    c("GFlops", "Memory Manager", "Scheduler State", "User Thread", "Thread State")
  )

  # Customize heigth of each object
  dfheights <- data.frame(Type = c("Worker State", "Communication Thread State"), Height = c(1, 1))
  # Customize padding between containers
  dfpaddings <- data.frame(Type = c("MPI Program"), Padding = c(2))

  # Calculate the Y coordinates with what has left
  workertree <- y_coordinates(workertree, dfheights, dfpaddings)

  # print(workertree, "Type", "Nature", "H", "P", limit=200);
  # Convert back to data frame
  workertreedf <- dt_to_df(workertree) %>% select(-"Nature")

  if ((workertreedf %>% nrow()) == 0) stop("After converting the tree back to DF, number of rows is zero.")

  return(list(workertreedf = workertreedf, dfe = dfe))
}

hl_y_coordinates <- function(dfw = NULL, dfhie = NULL) {
  if (is.null(dfw)) stop("The input data frame with states is NULL")

  # first part: read entities, calculate Y
  workertreedf <- dfhie

  # second part: left join with Y
  dfw <- dfw %>%
    # the left join to get new Y coordinates
    left_join(workertreedf, by = c("ResourceId" = "Parent"))

  return(dfw)
}

tree_filtering <- function(dfe, natures, types) {
  starvz_log("Starting the tree filtering to create Y coordinates")

  dfe %>%
    # Mutate things to character since data.tree don't like factors
    mutate(Type = as.character(.data$Type), Nature = as.character(.data$Nature)) %>%
    # Filter things I can't filter using Prune (because Prune doesn't like grepl)
    # Note that this might be pottentially dangerous and works only for StarPU traces
    filter(!grepl("InCtx", .data$Parent), !grepl("InCtx", .data$Name)) %>%
    # Rename the reserved word root
    mutate(
      Name = gsub("root", "ROOT", .data$Name),
      Parent = gsub("root", "ROOT", .data$Parent)
    ) %>%
    # Remove a node named 0 whose parent is also named 0
    filter(.data$Name != 0 & .data$Parent != 0) %>%
    # Convert to data.frame to avoid compatibility problems between tibble and data.tree
    as.data.frame() -> x
  # Sort by machines ID
  y <- x[mixedorder(as.character(x$Name), decreasing = TRUE), ]
  # Continue
  y %>%
    # Remove all variables
    # filter (Nature != "Variable") %>%
    # Remove bogus scheduler (should use a new trace)
    # filter (Name != "scheduler") %>%
    # Convert to data.tree object
    as.Node(mode = "network") -> tree
  # Remove all nodes that are present in the natures list
  if (!is.null(natures)) {
    Prune(tree, function(node) !(node$Nature %in% natures))
  }
  # Remove all types that are present in the types list
  if (!is.null(types)) {
    Prune(tree, function(node) !(node$Type %in% types))
  }

  return(tree)
}
y_coordinates <- function(atree, heights, paddings) {
  starvz_log("Starting y_coordinates")
  defineHeightPosition <- function(node, dfhs, dfps, curPos) {
    node$P <- curPos
    if (!is.null(node$Nature) && node$Nature == "State") {
      node$H <- dfhs %>%
        filter(.data$Type == node$Type) %>%
        .$Height
      # This is a StarPU+MPI hack to make CUDA resources look larger
      if (grepl("CUDA", node$parent$name)) node$H <- node$H * 2
      curPos <- curPos + node$H
    } else {
      padding <- 0
      if (!is.null(node$Type)) {
        padding <- dfps %>% filter(.data$Type == node$Type)
        if (nrow(padding) == 0) {
          padding <- 0
        } else {
          padding <- padding %>% .$Padding
        }
      }

      for (child in node$children) {
        curPos <- defineHeightPosition(child, dfhs, dfps, (curPos + padding))
      }
      if (length(node$children)) {
        node$H <- sum(sapply(node$children, function(child) child$H))
      } else {
        node$H <- 0
      }
    }
    return(curPos)
  }

  atree$Set(H = NULL)
  atree$Set(P = NULL)
  defineHeightPosition(atree, heights, paddings, 0)
  return(atree)
}
dt_to_df <- function(node) {
  ret <- dt_to_df_inner(node)
  return(ret)
}

dt_to_df_inner <- function(node) {
  cdf <- data.frame()
  ndf <- data.frame()
  if (!is.null(node$Nature) && node$Nature == "State") {
    ndf <- data.frame(
      Parent = node$parent$name,
      Type = node$name,
      Nature = node$Nature,
      Height = node$H,
      Position = node$P
    )
  } else {
    for (child in node$children) {
      cdf <- rbind(cdf, dt_to_df_inner(child))
    }
  }
  ret <- rbind(ndf, cdf)
  return(ret)
}

gaps.f_backward <- function(data) {
  # Create the seed chain
  if (TRUE %in% grepl("mpicom", data$Dag$JobId)) {
    data$Dag %>%
      filter(grepl("mpicom", .data$JobId)) -> tmpdag
  } else {
    data$Dag -> tmpdag
  }
  tmpdag %>%
    rename(DepChain = "JobId", Member = "Dependent") %>%
    select("DepChain", "Member") -> seedchain

  f2 <- function(dfdag, chain.i) {
    dfdag %>% select("JobId", "Dependent", "Application", "Value") -> full.i
    # qr mumps has duplicated data in these dfs and the left_join did not work correctly. unique() solves this problem
    full.i %>% unique() -> full.i
    chain.i %>% unique() -> chain.i
    full.i %>% left_join(chain.i, by = c("JobId" = "Member")) -> full.o

    # If there are no application tasks in dependency chains, keep looking
    if ((full.o %>% filter(!is.na(.data$DepChain), .data$Application == TRUE) %>% nrow()) == 0) {
      # Prepare the new chain
      full.o %>%
        filter(!is.na(.data$DepChain)) %>%
        rename(Member = "Dependent") %>%
        select("DepChain", "Member") -> chain.o
      return(f2(full.o, chain.o))
    } else {
      return(full.o)
    }
  }
  return(f2(data$Dag, seedchain))
}

gaps.f_forward <- function(data) {
  # Create the seed chain
  if (TRUE %in% grepl("mpicom", data$Dag$Dependent)) {
    data$Dag %>%
      filter(grepl("mpicom", .data$Dependent)) -> tmpdag
  } else {
    data$Dag -> tmpdag
  }
  tmpdag %>%
    rename(DepChain = "Dependent", Member = "JobId") %>%
    select("DepChain", "Member") -> seedchain

  f2 <- function(dfdag, chain.i) {
    dfdag %>% select("JobId", "Dependent", "Application", "Value") -> full.i
    # qr mumps has duplicated data in these dfs and the left_join did not work correctly. unique() solves this problem
    full.i %>% unique() -> full.i
    chain.i %>% unique() -> chain.i
    full.i %>% left_join(chain.i, by = c("Dependent" = "Member")) -> full.o

    # If there are no application tasks in dependency chains, keep looking
    if ((full.o %>% filter(!is.na(.data$DepChain), .data$Application == TRUE) %>% nrow()) == 0) {
      # Prepare the new chain
      full.o %>%
        filter(!is.na(.data$DepChain)) %>%
        rename(Member = "JobId") %>%
        select("DepChain", "Member") -> chain.o
      if (nrow(chain.o) > 0) {
        return(f2(full.o, chain.o))
      }
    }
    return(full.o)
  }
  return(f2(data$Dag, seedchain))
}

gaps <- function(data) {
  starvz_log("Starting the gaps calculation.")

  if (is.null(data$Dag)) {
    return(NULL)
  }
  if (is.null(data$Application)) {
    return(NULL)
  }
  # if(is.null(data$Link)) return(NULL);

  gaps.f_backward(data) %>%
    filter(!is.na(.data$DepChain)) %>%
    select("JobId", "DepChain") %>%
    rename(Dependent = "JobId") %>%
    rename(JobId = "DepChain") %>%
    select("JobId", "Dependent") %>%
    unique() -> data.b

  gaps.f_forward(data) %>%
    filter(!is.na(.data$DepChain)) %>%
    select("JobId", "DepChain") %>%
    rename(Dependent = "DepChain") %>%
    select("JobId", "Dependent") %>%
    unique() -> data.f

  data$Dag %>%
    filter(.data$Application == TRUE) %>%
    select("JobId", "Dependent") -> data.z

  # Create the new gaps Dag
  dfw <- data$Application %>%
    select("JobId", "Value", "ResourceId", "Node", "Start", "End")
  if (is.null(data$Link)) {
    dfl <- data.frame()
    data.b.dag <- data.frame()
    data.f.dag <- data.frame()
  } else {
    dfl <- data$Link %>%
      filter(grepl("mpicom", .data$Key)) %>%
      mutate(Value = NA, ResourceId = .data$Origin, Node = NA) %>%
      rename(JobId = "Key") %>%
      select("JobId", "Value", "ResourceId", "Node", "Start", "End")
    data.b %>%
      left_join(dfl, by = c("JobId" = "JobId")) %>%
      left_join(dfw, by = c("Dependent" = "JobId")) %>%
      mutate(Value.x = as.character(.data$Value.x)) -> data.b.dag
    data.f %>%
      left_join(dfw, by = c("JobId" = "JobId")) %>%
      left_join(dfl, by = c("Dependent" = "JobId")) %>%
      mutate(
        Value.y = as.character(.data$Value.y),
        Node.y = as.character(.data$Node.y)
      ) -> data.f.dag
  }
  data.z %>%
    left_join(dfw, by = c("JobId" = "JobId")) %>%
    left_join(dfw, by = c("Dependent" = "JobId")) -> data.z.dag

  return(bind_rows(data.z.dag, data.b.dag, data.f.dag))
}

compute_all_last <- function(data) {
  filtered_dag <- data$Dag %>%
    select("JobId", "Dependent", "Start", "End", "Cost", "Value") %>%
    mutate(JobId = as.character(.data$JobId), Dependent = as.character(.data$Dependent))

  all_levels <- unique(c(filtered_dag$JobId, filtered_dag$Dependent))

  filtered_dag %>%
    mutate(
      JobId = factor(.data$JobId, levels = all_levels),
      Dependent = factor(.data$Dependent, levels = all_levels)
    ) -> filtered_dag

  return(last_task_c(filtered_dag) %>% tibble())
}

Try the starvz package in your browser

Any scripts or data that you put into this service are public.

starvz documentation built on June 19, 2025, 1:08 a.m.