R/genbank_genome.R

Defines functions triplet2DeltaPeptide

# check https://www.ncbi.nlm.nih.gov/Sitemap/samplerecord.html

#' R6 Class for loading and analysing Genbank whole genome files
#'
#' @description
#' This class aims to implement a couple of trivial methods for wrangling whole
#' microbial genome information from Genbank files
#'
#' @import R6
#' @importFrom magrittr %>%
#'
#' @export
GenbankGenome <- R6::R6Class(
    inherit = FloundeR,
    classname = "GenbankGenome",
    public = list(

        length = 0,
        accession = NULL,
        version = NULL,
        definition = NULL,


        #' @description
        #' Creates a new GenbankGenome object. This
        #' initialisation method performs minimal sanity checking
        #' of the defined file(s)
        #'
        #' @param gb_file The source
        #' sequencing_summary file.
        #' @return A new `GenbankGenome` object.
        #'
        #' @examples
        #' TB_reference = flnDr("TB_H37Rv.gb.gz")
        #' tb <- GenbankGenome$new(TB_reference)
        initialize = function(gb_file) {
            private$process_file(gb_file)
        },


        #' @description
        #' exports the annotated genomic features as a GenomicRanges object.
        #' This can be used for the review of content in a genomics context.
        list_cds = function() {
            cds_data <- GenomicRanges::makeGRangesFromDataFrame(
                private$gb_cds, keep.extra.columns = TRUE)
            GenomeInfoDb::seqlengths(cds_data) <- self$length
            GenomeInfoDb::genome(cds_data) <- self$definition
            return(cds_data)
        },

        #' @description
        #' Get the CDS information for one or more annotated features from the
        #' genome of interest.
        get_cds = function(feature_id="fusA1") {
            cli::cli_alert(
                stringr::str_interp(
                    "extracting cds [${feature_id}]"))
            if (!any(feature_id %in% private$gb_cds$gene)) {
                silent_stop(stringr::str_interp(
                    "cds feature [${feature_id}] not annotated in [${self$accession}]"))
            } else {
                cds_data <- GenomicRanges::makeGRangesFromDataFrame(
                    private$gb_cds[which(private$gb_cds$gene %in% feature_id),],
                    keep.extra.columns = TRUE)
                GenomeInfoDb::seqlengths(cds_data) <- self$length
                GenomeInfoDb::genome(cds_data) <- self$definition
                return(cds_data)

            }
        },

        focus_cds = function(feature_id="fusA1") {
            cli::cli_alert(
                stringr::str_interp(
                    "setting focus to cds [${feature_id}]"))
            private$focus_grange <- self$get_cds(feature_id)
            return(invisible(self))
        },

        focus_range = function(chromosome=NULL, start=NULL, end=NULL) {

        },

        as_tibble = function() {
            return(tibble::as_tibble(private$gb_cds))
        },

        whole_genome_focus = function() {
            gr <- GenomicRanges::GRanges(
                seqnames = self$accession, strand = "+",
                ranges = IRanges::IRanges(start = 1, width = self$length))
            GenomeInfoDb::seqlengths(gr) <- self$length
            GenomeInfoDb::genome(gr) <- self$definition
            return(gr)
        },


        focus_nucleotide = function(position) {
            if (!is.numeric(position)) {
                silent_stop("position must be an integer")
            }
            range <- self$get_focus()
            strand <- as.character(BiocGenerics::strand(tb$get_focus()))

            delta_pos <- NULL
            if (strand == "+") {
                cli::cli_alert("working on FWD")
                core_position <-  BiocGenerics::start(range)
                cli::cli_alert(stringr::str_interp("core_pos == [(${strand})..${core_position}]"))
                delta_pos <- core_position + position
            } else if (strand == "-") {
                cli::cli_alert("working on REV")
                core_position <-  BiocGenerics::end(range)
                cli::cli_alert(stringr::str_interp("core_pos == [(${strand})..${core_position}]"))
                delta_pos <- core_position - position
            } else {
                silent_stop("This method requires strand information")
            }

            if (delta_pos > self$length || delta_pos <= 0) {
                silent_stop("position has dropped off the end of the chromosome")
            }
            cli::cli_alert(stringr::str_interp("delta_pos == [(${strand})..${delta_pos}]"))
            self$get_nucleotide(delta_pos, strand)
        },


        focus_codon = function(codon) {
            cli::cli_alert("focusing codon ...")
            range <- self$get_focus()
            print(range)
            strand <- as.character(BiocGenerics::strand(self$get_focus()))

            if (strand == "+") {
                cli::cli_alert("working on FWD")
                core_position <-  BiocGenerics::start(range)
                # codons are 1-based ???
                start <- core_position + ((codon - 1) * 3)
                end <- start + 2
                nucleotide <- self$sequence[start:end]
                print(nucleotide)
                print(Biostrings::translate(nucleotide))
            } else if (strand == "-") {
                cli::cli_alert("working on REV")
                core_position <-  BiocGenerics::end(range)

                print(self$sequence[BiocGenerics::start(range):BiocGenerics::end(range)])

                end <- core_position - ((codon - 1) * 3)
                start <- end - 2
                cli::cli_alert(stringr::str_interp("slicing [ ${start} : ${end} ]"))
                nucleotide <- Biostrings::reverseComplement(self$sequence[start:end])
                print(nucleotide)
                print(Biostrings::translate(nucleotide))
            } else {
                silent_stop("This method requires strand information")
            }
        },

        get_nucleotide = function(position, strand) {
            if (position > self$length || position <= 0) {
                silent_stop("position has dropped off the end of the chromosome")
            }
            nucleotide <- NULL
            if (strand == "+") {
                cli::cli_alert("working on FWD")
                nucleotide <- self$sequence[position]
            } else if (strand == "-") {
                cli::cli_alert("working on REV")
                nucleotide <- Biostrings::reverseComplement(self$sequence[position])
            } else {
                silent_stop("This method requires strand information")
            }
            cli::cli_alert(stringr::str_interp("[${nucleotide}]"))
        },

        unfocus = function() {
            private$focus_grange <- NULL
            return(invisible(self))
        },

        is_focused = function() {
            return(!is.null(private$focus_grange))
        },

        get_focus = function() {
            range <- self$whole_genome_focus()
            if (self$is_focused()) {
                range <- private$focus_grange
            }
            return(range)
        }



    ),

    active = list (


        sequence = function(set=NULL) {
            if (!is.null(set)) {
                cli::cli_alert(
                    stringr::str_interp(
                        "setting sequence"))
                if (class(set)[[1]] == "DNAString") {
                    private$gb_sequence <- set
                } else if (class(set)[[1]] == "character") {
                    private$gb_sequence <- Biostrings::DNAString(set)
                } else {
                    silent_stop("sequence can only be DNAString|character")
                }
            }
            return(private$gb_sequence)
        }

    ),

    private = list(
        conn = NA,
        current_tag = NA,
        tag_string = NULL,
        key_tags = c("LOCUS", "DEFINITION", "ACCESSION", "VERSION", "FEATURES", "ORIGIN"),
        gb_cds = data.frame(
            gene=character(0),
            chr=character(0),
            strand=character(0),
            start=integer(0),
            end=integer(0),
            translation=character(0)),
        gb_sequence = "undefined",
        focus_grange = NULL,

        process_file = function(gb_file) {
            line_count <- 0
            cli::cli_alert_info(
                stringr::str_interp("Parsing genbank file [${gb_file}]"))
            private$conn = file(gb_file, "r")

            tryCatch(
                {
                    while(TRUE) {
                        line = readLines(private$conn, n = 1)
                        if ( length(line) == 0 ) {
                            break
                        }
                        line_count <- line_count + 1
                        private$tag_handler(line, line_count)
                    }

                    cli::cli_alert_success(
                        stringr::str_interp("Done! [${line_count}] lines parsed"))
                },

                error=function(cond) {
                    cli::cli_alert_danger("Genbank file not parsed as expected")
                    print(cond)
                },

                finally = {
                    close(private$conn)
                }
            )



        },

        in_tag = function() {
            return(!is.na(private$current_tag))
        },

        tag_handler = function(line, pos) {
            # if leading whitespace continue quickly
            if (grepl("^ +", line) && private$in_tag()) {
                if (
                    grepl("^     [^ ]", line) &&
                    private$current_tag == "FEATURES") {
                    private$flush_feature()
                    private$extend_tag(line)
                } else {

                    if (private$in_tag()) {
                        private$extend_tag(line)
                    }
                }
            } else {

                tagged <- FALSE
                for (tag in private$key_tags) {
                    if (grepl(paste0("^", tag), line)) {
                        private$enter_tag(tag, pos)
                        private$extend_tag(line)
                        tagged <- TRUE
                    }
                }
                if (!tagged) {
                    private$exit_tag(pos)
                }
            }
        },


        enter_tag = function(tag, pos) {
            if (!is.na(private$current_tag)) {
                private$exit_tag(pos)
            }
            # cli::cli_alert(
            #     stringr::str_interp("entering tag [${tag}] at line [${pos}]"))
            private$current_tag <- tag
        },


        exit_tag = function(pos) {
            if (!is.na(private$current_tag)) {
                # cli::cli_alert(
                #     stringr::str_interp(
                #         "leaving tag [${private$current_tag}] at line [${pos}]"))

                if (private$current_tag == "ACCESSION") {
                    self$accession <- private$clip_tag()
                } else if (private$current_tag == "VERSION") {
                    self$version <- private$clip_tag()
                } else if (private$current_tag == "DEFINITION") {
                    self$definition <- private$clip_tag()
                } else if (private$current_tag == "FEATURES") {
                    # features are messily parsed during load - there should
                    # be a single residual feature to process ...
                    private$flush_feature()
                } else if (private$current_tag == "ORIGIN") {
                    private$process_sequence()
                } else if (private$current_tag == "LOCUS") {
                    private$process_locus()
                }

                private$current_tag <- NA
                private$tag_string <- NULL
            }
        },


        clip_tag = function() {
            regex = paste0("(?<=",private$current_tag,").+")
            return(
                stringr::str_trim(
                    stringr::str_extract(
                        private$tag_string, regex))
           )
        },


        extend_tag = function(line) {
            private$tag_string <- append(private$tag_string, line)
        },


        flush_feature = function(silent=TRUE) {
            if (!silent) {
                print(private$tag_string)
            }

            if (grepl("^     source", private$tag_string[1])) {
                private$feature_source()
            } else if (grepl("^     CDS", private$tag_string[1])) {
                private$feature_cds()
            }
            private$tag_string <- NULL
        },


        feature_source = function() {
            #print(private$tag_string)

            #silent_stop("debugging")
        },


        feature_cds = function(subfeature_tag = "     CDS +") {

            # clip the subfeature tag & leading whitespace
            private$tag_string <- private$tag_string %>%
                stringr::str_replace(subfeature_tag, "") %>%
                stringr::str_trim() %>%
                stringr::str_c(collapse="")
            # extract start nucleotide, end nucleotide and strand
            coordinates <- private$extract_positions(
                private$tag_string %>%
                    stringr::str_extract("^[^/]+"))

            gene_id <- private$tag_string %>%
                private$extract_value_from_regex(
                    "(?<=gene=\")[^\"]+", "(?<=locus_tag=\")[^\"]+")

            translation <- stringr::str_extract(
                private$tag_string, "(?<=translation=\")[^\"]+")

            #cli::cli_alert(
            #    stringr::str_interp("[${gene_id}] --> [${coordinates}]"))
            private$gb_cds <- private$gb_cds %>%
                dplyr::bind_rows(
                    list(
                        chr=coordinates[1],
                        gene=gene_id,
                        strand=coordinates[2],
                        start=as.integer(coordinates[3]),
                        end=as.integer(coordinates[4]),
                        translation=translation
                        )
                    )
        },


        extract_value_from_regex = function(text, regex_a, regex_b) {
            if (stringr::str_detect(text, regex_a)) {
                stringr::str_extract(text, regex_a)
            } else if (stringr::str_detect(text, regex_b)) {
                stringr::str_extract(text, regex_b)
            } else {
                silent_stop("broken")
            }
        },


        process_locus = function() {
            locus_string <- private$clip_tag() %>%
                stringr::str_split("\\s+")
            self$length <- as.integer(locus_string[[1]][2])
        },

        extract_positions = function(position_str) {
            strand <- "+"
            if (stringr::str_detect(position_str, "complement")) {
                strand <- "-"
                position_str <- stringr::str_extract(
                    position_str, "(?<=complement\\()[^\\)]+")
            }
            positions <- unlist(stringr::str_split(position_str, "\\.\\."))

            # there are a few edge cases where gene locations are not atomic
            filter_position = function(str) {
                str_mod <- str %>%
                    stringr::str_replace("<", "") %>%
                    stringr::str_replace(">", "") %>%
                    stringr::str_replace("join\\(", "") %>%
                    stringr::str_replace(",.+", "")

                xval = stringr::str_extract(str_mod, "[^\\d]+")
                if (is.na(xval)) return(str_mod)

                print(str)
                silent_stop("numeric issue")
            }

            return(c(self$accession,
                     strand,
                     filter_position(positions[1]),
                     filter_position(positions[2])))
        },


        process_sequence = function() {
            cli::cli_alert(
                stringr::str_interp(
                    "processing sequence [${length(private$tag_string)}] lines"))
            # clip leading whitespace
            private$tag_string <- private$tag_string %>%
                stringr::str_replace("^ORIGIN\\s+", "") %>%
                stringr::str_replace("^\\s+\\d+\\s+", "")  %>%
                stringr::str_replace_all("\\s", "") %>%
                stringr::str_c(collapse="")
            self$sequence <- private$tag_string

        }

    )


)


#' @importFrom stringdist stringdist
triplet2DeltaPeptide = function(triplet=Biostrings::DNAString("CCG"), peptide=Biostrings::AAString("Q")) {

    cli::cli_alert(stringr::str_interp("${triplet} --> ${Biostrings::translate(triplet)}"))
    geneticcode <- Biostrings::getGeneticCode("11")
    modified_triplets <- names(geneticcode)[which(as.character(geneticcode) == as.character(peptide))]
    distances <- stringdist::stringdist(triplet, modified_triplets)
    print(distances)
    print(modified_triplets[order(distances, decreasing=FALSE)])
}
sagrudd/floundeR documentation built on Nov. 18, 2022, 10:31 a.m.