R/custom_layers.R

Defines functions layer_euc_dist_wrapper layer_cosine_sim_wrapper layer_transformer_block_wrapper layer_pos_sinusoid_wrapper layer_pos_embedding_wrapper layer_aggregate_time_dist_wrapper

Documented in layer_aggregate_time_dist_wrapper layer_pos_embedding_wrapper layer_pos_sinusoid_wrapper layer_transformer_block_wrapper

#' Aggregation layer 
#' 
#' Aggregate output of time distribution representations using sum, max and/or mean function.
#' 
#' @param load_r6 Whether to load the R6 layer class.
#' @param method At least one of the options, `"sum", "max"` or `"mean"`.
#' @param multi_in Whether to aggregate for a model with multiple inputs (and shared weights).
#' @examples
#' 
#' \donttest{
#' library(keras)
#' l <- layer_aggregate_time_dist_wrapper() 
#' }
#' @returns A keras layer applying pooling operation(s).
#' @export
layer_aggregate_time_dist_wrapper <- function(load_r6 = FALSE, method = "sum", multi_in = FALSE) {
  
  layer_aggregate_time_dist <- keras::new_layer_class(
    "layer_aggregate_time_dist",
    
    initialize = function(method, multi_in=FALSE, ...) {
      super$initialize(...)
      self$method <- method
      self$axis <- ifelse(multi_in, 0L, 1L)
      self$multi_in <- multi_in
    },
    
    call = function(inputs, mask = NULL) {
      out <- list()
      if ("sum" %in% self$method) {
        out <- c(out, tensorflow::tf$math$reduce_sum(inputs, axis = self$axis))
      }
      if ("mean" %in% self$method) {
        out <- c(out, tensorflow::tf$math$reduce_mean(inputs, axis = self$axis))
      }
      if ("max" %in% self$method) {
        out <- c(out, tensorflow::tf$math$reduce_max(inputs, axis = self$axis))
      }
      
      if (length(out) > 1) {
        out <- tensorflow::tf$concat(out, axis = -1L)
      } else {
        out <- out[[1]]
      }
      
      out
    },
    
    get_config = function() {
      config <- super$get_config()
      config$method <- self$method
      config$multi_in <- self$multi_in
      config
    }
  )
  
  if (load_r6) {
    return(layer_aggregate_time_dist)
  } else {
    return(layer_aggregate_time_dist(method = method, multi_in = multi_in))
  }
  
}


#' Layer for positional embedding
#' 
#' Positional encoding layer with learned embedding.
#' 
#' @inheritParams create_model_transformer
#' @param load_r6 Whether to load the R6 layer class.
#' @examples
#' 
#' \donttest{
#' library(keras)
#' l <- layer_pos_embedding_wrapper()
#' }
#' @returns A keras layer implementing positional embedding.
#' @export
layer_pos_embedding_wrapper <- function(maxlen = 100, vocabulary_size = 4, load_r6 = FALSE, embed_dim = 64) {
  
  layer_pos_embedding <- keras::new_layer_class(
    "layer_pos_embedding",
    
    initialize = function(maxlen=100, vocabulary_size=4, embed_dim=64, ...) {
      super$initialize(...)
      if (embed_dim != 0) {
        self$token_emb <- tensorflow::tf$keras$layers$Embedding(input_dim = as.integer(vocabulary_size),
                                                                output_dim = as.integer(embed_dim))
        self$position_embeddings <- tensorflow::tf$keras$layers$Embedding(input_dim = as.integer(maxlen),
                                                                          output_dim = as.integer(embed_dim))
      } else {
        self$position_embeddings <- tensorflow::tf$keras$layers$Embedding(input_dim = as.integer(maxlen),
                                                                          output_dim = as.integer(vocabulary_size))
      }
      self$embed_dim <- as.integer(embed_dim)
      self$maxlen <- as.integer(maxlen)
      self$vocabulary_size <- as.integer(vocabulary_size)
    },
    
    call = function(inputs) {
      positions <- tensorflow::tf$range(self$maxlen, dtype = "int32") 
      embedded_positions <- self$position_embeddings(positions)
      if (self$embed_dim != 0) inputs <- self$token_emb(inputs)
      inputs + embedded_positions
    },
    
    get_config = function() {
      config <- super$get_config()
      config$maxlen <- self$maxlen
      config$vocabulary_size <- self$vocabulary_size
      config$embed_dim <- self$embed_dim
      config
    }
  )
  
  if (load_r6) {
    return(layer_pos_embedding)
  } else {
    return(layer_pos_embedding(maxlen=maxlen, vocabulary_size=vocabulary_size, embed_dim=embed_dim))
  }
  
}

#' Layer for positional encoding
#' 
#' Positional encoding layer with sine/cosine matrix of different frequencies.
#' 
#' @inheritParams create_model_transformer
#' @param load_r6 Whether to load the R6 layer class.
#' @examples
#' 
#' \donttest{
#' library(keras)
#' l <- layer_pos_sinusoid_wrapper() 
#' }
#' @returns A keras layer implementing positional encoding using sine/cosine waves.
#' @export
layer_pos_sinusoid_wrapper <- function(maxlen = 100, vocabulary_size = 4, n = 10000, load_r6 = FALSE, embed_dim = 64) {
  
  layer_pos_sinusoid <- keras::new_layer_class(
    "layer_pos_sinusoid",
    initialize = function(maxlen, vocabulary_size, n, embed_dim, ...) {
      super$initialize(...)
      self$maxlen <- as.integer(maxlen)
      self$vocabulary_size <- vocabulary_size
      self$n <- as.integer(n)
      self$pe_matrix <- positional_encoding(seq_len = maxlen,
                                            d_model = ifelse(embed_dim == 0,
                                                             as.integer(vocabulary_size),
                                                             as.integer(embed_dim)),  
                                            n = n)
      
      if (embed_dim != 0) {
        self$token_emb <- tensorflow::tf$keras$layers$Embedding(input_dim = vocabulary_size, output_dim = as.integer(embed_dim))
      }
      self$embed_dim <- as.integer(embed_dim)
      
    },
    
    call = function(inputs) {
      if (self$embed_dim != 0) {
        inputs <- self$token_emb(inputs)
      } 
      inputs + self$pe_matrix
    },
    
    get_config = function() {
      config <- super$get_config()
      config$maxlen <- self$maxlen
      config$vocabulary_size <- self$vocabulary_size
      config$n <- self$n
      config$embed_dim <- self$embed_dim
      config$pe_matrix <- self$pe_matrix
      config
    }
  )
  
  if (load_r6) {
    return(layer_pos_sinusoid)
  } else {
    return(layer_pos_sinusoid(maxlen=maxlen, vocabulary_size=vocabulary_size, n=n,
                              embed_dim = embed_dim))
  }
  
}


#' Transformer block
#' 
#' Create transformer block. Consists of self attention, dense layers, layer normalization, recurrent connection and dropout.
#' 
#' @inheritParams create_model_transformer
#' @param dropout_rate Rate to randomly drop out connections.
#' @param load_r6 Whether to return the layer class.
#' @examples
#' 
#' \donttest{
#' library(keras)
#' l <- layer_transformer_block_wrapper()
#' }
#' @returns A keras layer implementing a transformer block.
#' @export
layer_transformer_block_wrapper <- function(num_heads = 2, head_size = 4, dropout_rate = 0, ff_dim = 64,  
                                            vocabulary_size = 4, load_r6 = FALSE, embed_dim = 64) {
  
  layer_transformer_block <- keras::new_layer_class(
    "layer_transformer_block",
    initialize = function(num_heads=2, head_size=4, dropout_rate=0, ff_dim=64L, vocabulary_size=4, embed_dim=64, ...) {
      super$initialize(...)
      self$num_heads <- num_heads
      self$head_size <- head_size
      self$dropout_rate <- dropout_rate
      self$ff_dim <- ff_dim
      self$embed_dim <- as.integer(embed_dim)
      self$vocabulary_size <- vocabulary_size
      self$att <- tensorflow::tf$keras$layers$MultiHeadAttention(num_heads=as.integer(num_heads),
                                                                 key_dim=as.integer(head_size))
      
      self$ffn <- keras::keras_model_sequential() %>% keras::layer_dense(units=as.integer(ff_dim), activation="relu") %>%
        keras::layer_dense(units=ifelse(embed_dim == 0, as.integer(vocabulary_size), as.integer(embed_dim)))
      
      self$layernorm1 <- keras::layer_layer_normalization(epsilon=1e-6)
      self$layernorm2 <- keras::layer_layer_normalization(epsilon=1e-6)
      self$dropout1 <- keras::layer_dropout(rate=dropout_rate)
      self$dropout2 <- keras::layer_dropout(rate=dropout_rate)
    },
    
    call = function(inputs) {
      attn_output <- self$att(inputs, inputs, inputs)
      attn_output <- self$dropout1(attn_output)
      out1 <- self$layernorm1(inputs + attn_output)
      ffn_output <- self$ffn(out1)
      ffn_output <- self$dropout2(ffn_output)
      seq_output <- self$layernorm2(out1 + ffn_output)
      return(seq_output)
    },
    
    get_config = function() {
      config <- super$get_config()
      config$num_heads <- self$num_heads
      config$head_size <- self$head_size
      config$dropout_rate <- self$dropout_rate
      config$ff_dim <- self$ff_dim
      config$vocabulary_size <- self$vocabulary_size
      config$embed_dim <- self$embed_dim
      config
    }
  )
  
  if (load_r6) {
    return(layer_transformer_block)
  } else {
    return(layer_transformer_block(num_heads=num_heads,
                                   head_size=head_size,
                                   dropout_rate=dropout_rate,
                                   vocabulary_size=vocabulary_size,
                                   embed_dim=embed_dim,
                                   ff_dim=ff_dim))
  }
  
}


layer_cosine_sim_wrapper <- function(load_r6 = FALSE) {
  
  layer_cosine_sim <- keras::new_layer_class(
    "layer_cosine_sim",
    
    initialize = function(...) {
      super$initialize(...)
    },
    
    call = function(inputs) {
      cosine_similarity(vects=inputs)
    },
    
    get_config = function() {
      config <- super$get_config()
      config
    }
  )
  
  if (load_r6) {
    return(layer_cosine_sim)
  } else {
    return(layer_cosine_sim())
  }
  
}


layer_euc_dist_wrapper <- function(load_r6 = FALSE) {
  
  layer_euc_dist <- keras::new_layer_class(
    "layer_euc_dist",
    
    initialize = function(...) {
      super$initialize(...)
    },
    
    call = function(inputs) {
      euclidean_distance(vects=inputs)
    },
    
    get_config = function() {
      config <- super$get_config()
      config
    }
  )
  
  if (load_r6) {
    return(layer_euc_dist)
  } else {
    return(layer_euc_dist())
  }
  
}
GenomeNet/deepG documentation built on Dec. 24, 2024, 12:11 p.m.