# check https://www.ncbi.nlm.nih.gov/Sitemap/samplerecord.html
#' R6 Class for loading and analysing Genbank whole genome files
#'
#' @description
#' This class aims to implement a couple of trivial methods for wrangling whole
#' microbial genome information from Genbank files
#'
#' @import R6
#' @importFrom magrittr %>%
#'
#' @export
GenbankGenome <- R6::R6Class(
inherit = FloundeR,
classname = "GenbankGenome",
public = list(
length = 0,
accession = NULL,
version = NULL,
definition = NULL,
#' @description
#' Creates a new GenbankGenome object. This
#' initialisation method performs minimal sanity checking
#' of the defined file(s)
#'
#' @param gb_file The source
#' sequencing_summary file.
#' @return A new `GenbankGenome` object.
#'
#' @examples
#' TB_reference = flnDr("TB_H37Rv.gb.gz")
#' tb <- GenbankGenome$new(TB_reference)
initialize = function(gb_file) {
private$process_file(gb_file)
},
#' @description
#' exports the annotated genomic features as a GenomicRanges object.
#' This can be used for the review of content in a genomics context.
list_cds = function() {
cds_data <- GenomicRanges::makeGRangesFromDataFrame(
private$gb_cds, keep.extra.columns = TRUE)
GenomeInfoDb::seqlengths(cds_data) <- self$length
GenomeInfoDb::genome(cds_data) <- self$definition
return(cds_data)
},
#' @description
#' Get the CDS information for one or more annotated features from the
#' genome of interest.
get_cds = function(feature_id="fusA1") {
cli::cli_alert(
stringr::str_interp(
"extracting cds [${feature_id}]"))
if (!any(feature_id %in% private$gb_cds$gene)) {
silent_stop(stringr::str_interp(
"cds feature [${feature_id}] not annotated in [${self$accession}]"))
} else {
cds_data <- GenomicRanges::makeGRangesFromDataFrame(
private$gb_cds[which(private$gb_cds$gene %in% feature_id),],
keep.extra.columns = TRUE)
GenomeInfoDb::seqlengths(cds_data) <- self$length
GenomeInfoDb::genome(cds_data) <- self$definition
return(cds_data)
}
},
focus_cds = function(feature_id="fusA1") {
cli::cli_alert(
stringr::str_interp(
"setting focus to cds [${feature_id}]"))
private$focus_grange <- self$get_cds(feature_id)
return(invisible(self))
},
focus_range = function(chromosome=NULL, start=NULL, end=NULL) {
},
as_tibble = function() {
return(tibble::as_tibble(private$gb_cds))
},
whole_genome_focus = function() {
gr <- GenomicRanges::GRanges(
seqnames = self$accession, strand = "+",
ranges = IRanges::IRanges(start = 1, width = self$length))
GenomeInfoDb::seqlengths(gr) <- self$length
GenomeInfoDb::genome(gr) <- self$definition
return(gr)
},
focus_nucleotide = function(position) {
if (!is.numeric(position)) {
silent_stop("position must be an integer")
}
range <- self$get_focus()
strand <- as.character(BiocGenerics::strand(tb$get_focus()))
delta_pos <- NULL
if (strand == "+") {
cli::cli_alert("working on FWD")
core_position <- BiocGenerics::start(range)
cli::cli_alert(stringr::str_interp("core_pos == [(${strand})..${core_position}]"))
delta_pos <- core_position + position
} else if (strand == "-") {
cli::cli_alert("working on REV")
core_position <- BiocGenerics::end(range)
cli::cli_alert(stringr::str_interp("core_pos == [(${strand})..${core_position}]"))
delta_pos <- core_position - position
} else {
silent_stop("This method requires strand information")
}
if (delta_pos > self$length || delta_pos <= 0) {
silent_stop("position has dropped off the end of the chromosome")
}
cli::cli_alert(stringr::str_interp("delta_pos == [(${strand})..${delta_pos}]"))
self$get_nucleotide(delta_pos, strand)
},
focus_codon = function(codon) {
cli::cli_alert("focusing codon ...")
range <- self$get_focus()
print(range)
strand <- as.character(BiocGenerics::strand(self$get_focus()))
if (strand == "+") {
cli::cli_alert("working on FWD")
core_position <- BiocGenerics::start(range)
# codons are 1-based ???
start <- core_position + ((codon - 1) * 3)
end <- start + 2
nucleotide <- self$sequence[start:end]
print(nucleotide)
print(Biostrings::translate(nucleotide))
} else if (strand == "-") {
cli::cli_alert("working on REV")
core_position <- BiocGenerics::end(range)
print(self$sequence[BiocGenerics::start(range):BiocGenerics::end(range)])
end <- core_position - ((codon - 1) * 3)
start <- end - 2
cli::cli_alert(stringr::str_interp("slicing [ ${start} : ${end} ]"))
nucleotide <- Biostrings::reverseComplement(self$sequence[start:end])
print(nucleotide)
print(Biostrings::translate(nucleotide))
} else {
silent_stop("This method requires strand information")
}
},
get_nucleotide = function(position, strand) {
if (position > self$length || position <= 0) {
silent_stop("position has dropped off the end of the chromosome")
}
nucleotide <- NULL
if (strand == "+") {
cli::cli_alert("working on FWD")
nucleotide <- self$sequence[position]
} else if (strand == "-") {
cli::cli_alert("working on REV")
nucleotide <- Biostrings::reverseComplement(self$sequence[position])
} else {
silent_stop("This method requires strand information")
}
cli::cli_alert(stringr::str_interp("[${nucleotide}]"))
},
unfocus = function() {
private$focus_grange <- NULL
return(invisible(self))
},
is_focused = function() {
return(!is.null(private$focus_grange))
},
get_focus = function() {
range <- self$whole_genome_focus()
if (self$is_focused()) {
range <- private$focus_grange
}
return(range)
}
),
active = list (
sequence = function(set=NULL) {
if (!is.null(set)) {
cli::cli_alert(
stringr::str_interp(
"setting sequence"))
if (class(set)[[1]] == "DNAString") {
private$gb_sequence <- set
} else if (class(set)[[1]] == "character") {
private$gb_sequence <- Biostrings::DNAString(set)
} else {
silent_stop("sequence can only be DNAString|character")
}
}
return(private$gb_sequence)
}
),
private = list(
conn = NA,
current_tag = NA,
tag_string = NULL,
key_tags = c("LOCUS", "DEFINITION", "ACCESSION", "VERSION", "FEATURES", "ORIGIN"),
gb_cds = data.frame(
gene=character(0),
chr=character(0),
strand=character(0),
start=integer(0),
end=integer(0),
translation=character(0)),
gb_sequence = "undefined",
focus_grange = NULL,
process_file = function(gb_file) {
line_count <- 0
cli::cli_alert_info(
stringr::str_interp("Parsing genbank file [${gb_file}]"))
private$conn = file(gb_file, "r")
tryCatch(
{
while(TRUE) {
line = readLines(private$conn, n = 1)
if ( length(line) == 0 ) {
break
}
line_count <- line_count + 1
private$tag_handler(line, line_count)
}
cli::cli_alert_success(
stringr::str_interp("Done! [${line_count}] lines parsed"))
},
error=function(cond) {
cli::cli_alert_danger("Genbank file not parsed as expected")
print(cond)
},
finally = {
close(private$conn)
}
)
},
in_tag = function() {
return(!is.na(private$current_tag))
},
tag_handler = function(line, pos) {
# if leading whitespace continue quickly
if (grepl("^ +", line) && private$in_tag()) {
if (
grepl("^ [^ ]", line) &&
private$current_tag == "FEATURES") {
private$flush_feature()
private$extend_tag(line)
} else {
if (private$in_tag()) {
private$extend_tag(line)
}
}
} else {
tagged <- FALSE
for (tag in private$key_tags) {
if (grepl(paste0("^", tag), line)) {
private$enter_tag(tag, pos)
private$extend_tag(line)
tagged <- TRUE
}
}
if (!tagged) {
private$exit_tag(pos)
}
}
},
enter_tag = function(tag, pos) {
if (!is.na(private$current_tag)) {
private$exit_tag(pos)
}
# cli::cli_alert(
# stringr::str_interp("entering tag [${tag}] at line [${pos}]"))
private$current_tag <- tag
},
exit_tag = function(pos) {
if (!is.na(private$current_tag)) {
# cli::cli_alert(
# stringr::str_interp(
# "leaving tag [${private$current_tag}] at line [${pos}]"))
if (private$current_tag == "ACCESSION") {
self$accession <- private$clip_tag()
} else if (private$current_tag == "VERSION") {
self$version <- private$clip_tag()
} else if (private$current_tag == "DEFINITION") {
self$definition <- private$clip_tag()
} else if (private$current_tag == "FEATURES") {
# features are messily parsed during load - there should
# be a single residual feature to process ...
private$flush_feature()
} else if (private$current_tag == "ORIGIN") {
private$process_sequence()
} else if (private$current_tag == "LOCUS") {
private$process_locus()
}
private$current_tag <- NA
private$tag_string <- NULL
}
},
clip_tag = function() {
regex = paste0("(?<=",private$current_tag,").+")
return(
stringr::str_trim(
stringr::str_extract(
private$tag_string, regex))
)
},
extend_tag = function(line) {
private$tag_string <- append(private$tag_string, line)
},
flush_feature = function(silent=TRUE) {
if (!silent) {
print(private$tag_string)
}
if (grepl("^ source", private$tag_string[1])) {
private$feature_source()
} else if (grepl("^ CDS", private$tag_string[1])) {
private$feature_cds()
}
private$tag_string <- NULL
},
feature_source = function() {
#print(private$tag_string)
#silent_stop("debugging")
},
feature_cds = function(subfeature_tag = " CDS +") {
# clip the subfeature tag & leading whitespace
private$tag_string <- private$tag_string %>%
stringr::str_replace(subfeature_tag, "") %>%
stringr::str_trim() %>%
stringr::str_c(collapse="")
# extract start nucleotide, end nucleotide and strand
coordinates <- private$extract_positions(
private$tag_string %>%
stringr::str_extract("^[^/]+"))
gene_id <- private$tag_string %>%
private$extract_value_from_regex(
"(?<=gene=\")[^\"]+", "(?<=locus_tag=\")[^\"]+")
translation <- stringr::str_extract(
private$tag_string, "(?<=translation=\")[^\"]+")
#cli::cli_alert(
# stringr::str_interp("[${gene_id}] --> [${coordinates}]"))
private$gb_cds <- private$gb_cds %>%
dplyr::bind_rows(
list(
chr=coordinates[1],
gene=gene_id,
strand=coordinates[2],
start=as.integer(coordinates[3]),
end=as.integer(coordinates[4]),
translation=translation
)
)
},
extract_value_from_regex = function(text, regex_a, regex_b) {
if (stringr::str_detect(text, regex_a)) {
stringr::str_extract(text, regex_a)
} else if (stringr::str_detect(text, regex_b)) {
stringr::str_extract(text, regex_b)
} else {
silent_stop("broken")
}
},
process_locus = function() {
locus_string <- private$clip_tag() %>%
stringr::str_split("\\s+")
self$length <- as.integer(locus_string[[1]][2])
},
extract_positions = function(position_str) {
strand <- "+"
if (stringr::str_detect(position_str, "complement")) {
strand <- "-"
position_str <- stringr::str_extract(
position_str, "(?<=complement\\()[^\\)]+")
}
positions <- unlist(stringr::str_split(position_str, "\\.\\."))
# there are a few edge cases where gene locations are not atomic
filter_position = function(str) {
str_mod <- str %>%
stringr::str_replace("<", "") %>%
stringr::str_replace(">", "") %>%
stringr::str_replace("join\\(", "") %>%
stringr::str_replace(",.+", "")
xval = stringr::str_extract(str_mod, "[^\\d]+")
if (is.na(xval)) return(str_mod)
print(str)
silent_stop("numeric issue")
}
return(c(self$accession,
strand,
filter_position(positions[1]),
filter_position(positions[2])))
},
process_sequence = function() {
cli::cli_alert(
stringr::str_interp(
"processing sequence [${length(private$tag_string)}] lines"))
# clip leading whitespace
private$tag_string <- private$tag_string %>%
stringr::str_replace("^ORIGIN\\s+", "") %>%
stringr::str_replace("^\\s+\\d+\\s+", "") %>%
stringr::str_replace_all("\\s", "") %>%
stringr::str_c(collapse="")
self$sequence <- private$tag_string
}
)
)
#' @importFrom stringdist stringdist
triplet2DeltaPeptide = function(triplet=Biostrings::DNAString("CCG"), peptide=Biostrings::AAString("Q")) {
cli::cli_alert(stringr::str_interp("${triplet} --> ${Biostrings::translate(triplet)}"))
geneticcode <- Biostrings::getGeneticCode("11")
modified_triplets <- names(geneticcode)[which(as.character(geneticcode) == as.character(peptide))]
distances <- stringdist::stringdist(triplet, modified_triplets)
print(distances)
print(modified_triplets[order(distances, decreasing=FALSE)])
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.