r params$smrtlink_system System Job Summary

All JobTypes Summary

library(smrtlinkr)
library(lubridate)
library(tidyverse)
library(stringr)

smrtlink_system <- params$smrtlink_system
created_at <- params$created_at
root_data_dir <- params$root_data_dir

summary_msg <- paste0("Created at ", created_at," with SMRT Link system ", smrtlink_system)
print(summary_msg)

# Util func
to_job_json_path_from_type <- function(job_type) { return(to_job_json_path(root_data_dir, smrtlink_system, job_type))}

# Util func for plotting Created At
plot_created_at <- function(xdf, max_recent_date, job_type) {
  recent_adf <- filter(xdf, created_at >= max_recent_date)

  num_seconds <- 60
  binwidth <- 60 * num_seconds
  s_title <- paste0("System ", smrtlink_system, " Job type ", job_type, " binwidth=", binwidth)
  y_lab <- paste0("Number of Created ", job_type, " Jobs")
  recent_adf %>% ggplot(aes(created_at)) + geom_freqpoly(binwidth = binwidth) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + xlab("Created At") + ylab(y_lab) + ggtitle(s_title)
}

# Util for Plotting Range of Created At 
plot_all_created_at <- function(xdf, job_type) {
  max_created_at <- max(xdf$created_at)
  plot_created_at_from_duration <- function(x_duration) {
    plot_created_at(xdf, max_created_at - x_duration, job_type) %>% print()
  }

  plot_created_at_from_duration(dhours(1))
  plot_created_at_from_duration(dhours(3))
  plot_created_at_from_duration(dhours(12))
  plot_created_at_from_duration(dhours(24))
  plot_created_at_from_duration(dweeks(1))
  plot_created_at_from_duration(dweeks(4))
  plot_created_at_from_duration(dweeks(12))

}

# Define all Job type paths here to load into a single DF
path_analysis_job <- to_job_json_path_from_type(ALL_JOB_TYPES$ANALYSIS)
path_import_job <- to_job_json_path_from_type(ALL_JOB_TYPES$IMPORT)
path_merge_job <- to_job_json_path_from_type(ALL_JOB_TYPES$MERGE)
path_ts_status_job <- to_job_json_path_from_type(ALL_JOB_TYPES$TS_STATUS)
path_ts_failed_job <- to_job_json_path_from_type(ALL_JOB_TYPES$TS_FAILED)
path_db_back <- to_job_json_path_from_type(ALL_JOB_TYPES$DB_BACKUP)
path_job_fasta_to_ref <- to_job_json_path_from_type(ALL_JOB_TYPES$FASTA_REF)
path_job_export_ds <- to_job_json_path_from_type(ALL_JOB_TYPES$EXPORT)
path_job_fasta_to_bc <- to_job_json_path_from_type(ALL_JOB_TYPES$FASTA_BARCODE)
path_job_delete_job <- to_job_json_path_from_type(ALL_JOB_TYPES$DELETE_JOB)

# TODO. Make sure this is complete
all_job_paths <- c(path_analysis_job, path_import_job, path_merge_job, path_ts_failed_job, path_ts_status_job, path_db_back, path_job_fasta_to_ref, path_job_export_ds, path_job_fasta_to_bc, path_job_delete_job)

all_df <- smrtlinkr::load_jobs_from_json_paths(all_job_paths, smrtlink_system)

# Recent jobs
all_df %>% arrange(desc(job_id)) %>% slice(1:25) %>% print(n=25, width = Inf)

All Job Type Summary

to_summary_job(all_df, smrtlink_system, "ALL")
all_df %>% ggplot(aes(x = factor(1), fill = factor(state) )) + geom_bar(width = 1) + coord_polar(theta = "y") + ggtitle("ALL Jobs by Job State")
all_df %>% ggplot(aes(state)) + geom_bar() + ggtitle("ALL Job by Job State")

All JobTypes Created At

plot_all_created_at(all_df, "ALL")

Analysis Job Summary

adf <- smrtlinkr::load_job_analysis_from_json(path_analysis_job, smrtlink_system)
# Recent jobs
adf %>% arrange(desc(job_id)) %>% slice(1:25) %>% print(n = 25, width = Inf)

Generate Summary of Analysis Job States

adf %>% group_by(state) %>% summarize(total = n()) %>% print()
adf %>% ggplot(aes(x = factor(1), fill = factor(state) )) + geom_bar(width = 1) + coord_polar(theta = "y") + ggtitle("Analysis DataSet Jobs by Job State")
adf %>% ggplot(aes(state)) + geom_bar() + ggtitle("Analysis Job by Job State")

All Job by Created At

# FIXME. Need to load all jobs
#plot_all_created_at(adf, ALL_JOB_TYPES$ANALYSIS)

Generate Summary of Analysis Jobs By Pipeline Id

adf %>% group_by(pipeline_id) %>% summarize(total = n()) %>% ungroup() %>% arrange(-total) %>% print()
adf %>% ggplot(aes(pipeline_id)) + geom_bar() + coord_flip() + ggtitle("Break Down of Jobs By Pipeline") + xlab("Pipeline Id") + ylab("Number of Jobs")
adf %>% filter(pipeline_id != "pbsmrtpipe.pipelines.sa3_ds_resequencing_fat") %>% ggplot(aes(pipeline_id)) + geom_bar() + coord_flip()  + ggtitle("Break Down of Jobs By Pipeline", subtitle = "(excluding Resquencing+Reports)") +  xlab("Pipeline Id")

Generate Summary of Frequently Used Pipelines and Job States

adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_resequencing_fat") %>% ggplot(aes(state)) + geom_bar() + ggtitle("Analysis Resequencing Pipeline Job State")
xdf <- adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_resequencing_fat", state == "SUCCESSFUL")
plot_job_run_times(xdf, title = "Analysis Resequencing Fat Successful Job Walltime (seconds)", bins=200)
adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.polished_falcon_fat") %>% ggplot(aes(state)) + geom_bar() + ggtitle("Analysis Pipeline Polished Falcon + Reports Pipeline Job State")
xdf <- adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.polished_falcon_fat", state == "SUCCESSFUL")
plot_job_run_times(xdf, title = "Analysis Polish Falcon Successful Job Walltime (seconds)", bins=200)
adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_ccs") %>% ggplot(aes(state)) + geom_bar() + ggtitle("Analysis Pipeline CCS + Reports Pipeline Job State")
xdf <- adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_ccs", state == "SUCCESSFUL")
plot_job_run_times(xdf, title = "Analysis CCS Successful Job Walltime (seconds)", bins=200)
adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_ccs_align") %>% ggplot(aes(state)) + geom_bar() + ggtitle("Analysis CCS + Align + Reports Pipeline Job State")
xdf <- adf %>% filter(pipeline_id == "pbsmrtpipe.pipelines.sa3_ds_ccs_align", state == "SUCCESSFUL")
plot_job_run_times(xdf, title = "Analysis CCS Align Successful Job Walltime (seconds)", bins=200)

Analysis Job by Created At

plot_all_created_at(adf, ALL_JOB_TYPES$ANALYSIS)

Analysis Job Run Times

Summary of Successful Jobs

adf %>% filter(state == "SUCCESSFUL") %>% summarise(
  total = n(),
  mean_time_seconds=mean(run_time_min), 
  max_run_time_seconds=max(run_time_min),
  min_run_time_seconds=min(run_time_min)
  ) %>% print()
# filter out old jobs where the run time is 0 from a bug
atdf <- adf %>% filter(state == "SUCCESSFUL")
plot_job_run_times(atdf, title = "Analysis Successful Job Walltime (seconds)", bins=200)

Import DataSet Job Summary

path_import_job <- to_job_json_path_from_type(ALL_JOB_TYPES$IMPORT)
idf <- smrtlinkr::load_import_dataset_from_json(path_import_job, smrtlink_system)
idf %>% arrange(desc(job_id)) %>% slice(1:25) %>% print(n=25, width = Inf)

Generate Summary of Import DataSet Job States

idf %>% group_by(state) %>% summarize(n()) %>% print()

Generate Summary of Import DataSet By DataSet MetaType

idf %>% group_by(dataset_metatype) %>% summarize(n()) %>% print()

Import DataSet Job By Job State

idf %>% group_by(state) %>% summarize(total = n()) %>% print()
idf %>% ggplot(aes(x = factor(1), fill = factor(state) )) + geom_bar(width = 1) + coord_polar(theta = "y") + ggtitle("Import DataSet Jobs by Job State")

Generate Summary of Import DataSet By DataSet MetaType

idf %>% group_by(dataset_metatype) %>% summarize(n()) %>% print()

Import Job by Created At

plot_all_created_at(idf, ALL_JOB_TYPES$IMPORT)

Import DataSet Run Time Distribution

This is a fundamental problem. The Updating of the db record, versus starting and completed at of the job are conflated. Specifically for "FAILED" jobs (marked by the data integrity system to mark "RUNNING" jobs as "FAILED")

Import DataSet Summary of ALL job states

idf %>% summarise(
  total = n(),
  mean_time_min=mean(run_time_min), 
  max_run_time_max=max(run_time_min),
  min_run_time_min=min(run_time_min)
  )
idf %>% select(job_id, state, created_at, updated_at, run_time_sec, run_time_min) %>% arrange(-run_time_sec) %>% slice(1:50)  %>% print(n=20, width=Inf)

Import DataSet Summary of Successful Jobs

idf %>% filter(state == "SUCCESSFUL") %>% summarise(
  total = n(),
  mean_time_min=mean(run_time_min), 
  max_run_time_max=max(run_time_min),
  min_run_time_min=min(run_time_min)
  )
plot_job_run_times(filter(idf, state=="SUCCESSFUL"), title = "Import DataSet Successful Job Walltime (seconds)")
plot_job_run_times(filter(idf, state=="SUCCESSFUL", str_detect(smrtlinkVersion, "5.0.0")), title = "Successful >= 5.0.0 Import DataSet Job Walltime (seconds)")

Merge DataSet Job Summary

path_merge_job <- to_job_json_path_from_type(ALL_JOB_TYPES$MERGE)
mdf <- smrtlinkr::load_merge_dataset_from_json(path_merge_job, smrtlink_system)
# recent jobs
mdf %>% arrange(desc(job_id)) %>% slice(1:25) %>% print(n = 25, width = Inf)

Generate Summary of Merge DataSet By DataSet MetaType

mdf %>% group_by(dataset_metatype) %>% summarize(n()) %>% print()

Generate Summary of Merge DataSets Job States

mdf %>% group_by(state) %>% summarize(n()) %>% print()
mdf %>% ggplot(aes(state)) + geom_bar() + ggtitle("Merge DataSet Jobs by Job State")
mdf %>% ggplot(aes(x = factor(1), fill = factor(state) )) + geom_bar(width = 1) + coord_polar(theta = "y") + ggtitle("Merge DataSet Jobs by Job State")

Merge Job by Created At

plot_all_created_at(mdf, ALL_JOB_TYPES$MERGE)

Number of DataSets Merged

mdf %>% ggplot(aes(dataset_num)) + geom_histogram(bins = 40) + ggtitle("Number of Merge DataSets") + xlab("Number of Merged DataSets")
mdf %>% ggplot(aes(dataset_metatype)) + geom_bar() + ggtitle("Merge DataSet Jobs by DataSet MetaType")

Run Time of Merge DataSet Jobs

Summary of ALL job states of Import DataSet Jobs

mdf %>% filter(state == "SUCCESSFUL") %>% summarise(
  total = n(),
  mean_time_min=mean(run_time_min), 
  max_run_time_max=max(run_time_min),
  min_run_time_min=min(run_time_min)
  )
plot_job_run_times(filter(mdf, state=="SUCCESSFUL"), title = "MergeDataSet Job Walltime (seconds)")

In 5.0.0 there was bug fix and performance improvement added for computing the Reports.

plot_job_run_times(filter(mdf, state=="SUCCESSFUL", str_detect(smrtlinkVersion, "5.0.0")), title = "Successful >= 5.0.0 MergeDataSet Job Walltime (seconds)")

Successful Merge DataSet Job Walltime by Number of DataSets Merged

mdf %>% filter(state=="SUCCESSFUL") %>% ggplot(aes(run_time_min, dataset_num, color=factor(dataset_metatype))) + geom_point(alpha = 6/10)  + ggtitle("Successful Merge DataSet Walltime (in seconds)") + xlab("Walltime (seconds)") + ylab("Number of DataSets Merged")

Other Job Types

Convert Fasta to Barcode

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$FASTA_BARCODE), smrtlink_system, ALL_JOB_TYPES$FASTA_BARCODE)

Convert Fasta to Reference

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$FASTA_REF), smrtlink_system, ALL_JOB_TYPES$FASTA_REF)

Export DataSet Job Type

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$EXPORT), smrtlink_system, ALL_JOB_TYPES$EXPORT)

Tech Support Status Job Type

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$TS_STATUS), smrtlink_system, ALL_JOB_TYPES$TS_STATUS)

Tech Support Failed Job Type

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$TS_FAILED), smrtlink_system, ALL_JOB_TYPES$TS_FAILED)

Convert RS Movie Job Type

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$CONVERT_RS_MOVIE), smrtlink_system, ALL_JOB_TYPES$CONVERT_RS_MOVIEs)

Database Backup Job Type

to_summary_job_from_path(to_job_json_path_from_type(ALL_JOB_TYPES$DB_BACKUP), smrtlink_system, ALL_JOB_TYPES$DB_BACKUP)


mpkocher/smrtlink-system-stats documentation built on May 29, 2019, 5:45 a.m.