R/faasr_trigger.R

Defines functions faasr_trigger

Documented in faasr_trigger

#' @name faasr_trigger
#' @title faasr_trigger
#' @description 
#' Uses FaaS-specific APIs to generate triggers to execute downstream User Function
#' Currently supports:
#' * Apache OpenWhisk
#' * AWS Lambda
#' * GitHub Actions
#' @param faasr list with parsed and validated Payload
#' @return return nothing / send requests to the FaaS servers.
#' @import jsonlite
#' @import httr
#' @importFrom "paws.compute" "lambda"
#' @export
#' @examples
#' # This function can be run only in the container
#' if (interactive()){
#' faasr_trigger(faasr)
#' }

faasr_trigger <- function(faasr) {

  # First extract the name of the user function
  user_function = faasr$FunctionInvoke

  # Find out which functions to InvokeNext
  invoke_next = faasr$FunctionList[[user_function]]$InvokeNext

  # Check if the list is empty or not
  if (length(invoke_next) == 0) {
    err_msg <- paste0('{\"faasr_trigger\":\"no triggers for ',user_function,'\"}', "\n")
    message(err_msg)
    faasr_log(err_msg)
  } else {
    # Iterate through invoke_next and use FaaS-specific mechanisms to send trigger
    # use "for" loop to iteratively check functions in invoke_next list
    for (invoke_next_function in invoke_next) {

      parts <- unlist(strsplit(invoke_next_function, "[()]"))

      if (length(parts) > 1){
        invoke_next_function <- parts[1]
        rank_num <- parts[2]
      } else {
        rank_num <- 1
      }

      # Change the FunctionInvoke to next function name
      faasr$FunctionInvoke <- invoke_next_function

      # Determine FaaS server name via faasr$FunctionList[[invoke_next_function]]$FaaSServer
      next_server <- faasr$FunctionList[[invoke_next_function]]$FaaSServer

      for (rank in 1:rank_num){
        if (rank_num > 1){
          faasr$FunctionList[[invoke_next_function]]$Rank <- paste0(rank,"/",rank_num)
        }

        # Validate that FaaS server name exists in faasr$ComputeServers list
        if (next_server %in% names(faasr$ComputeServers)) {
          NULL
        } else {
      	  err_msg <- paste0('{\"faasr_trigger\":\"invalid server name: ',next_server,'\"}', "\n")
          message(err_msg)
          faasr_log(err_msg)
          break
        }

        # check FaaSType from the named compute server
        next_server_type <- faasr$ComputeServers[[next_server]]$FaaSType

        switch(next_server_type,
          # if OpenWhisk - use OpenWhisk API
          "OpenWhisk"={
            #
            # OpenWhisk API handling
            #
            # Set the env values for the openwhisk action.
            endpoint <- faasr$ComputeServers[[next_server]]$Endpoint
            api_key <- faasr$ComputeServers[[next_server]]$API.key
            api_key <- strsplit(api_key, ":")[[1]]
            if (is.null(faasr$ComputeServers[[next_server]]$SSL) || faasr$ComputeServers[[next_server]]$SSL ==""){
              ssl <- TRUE
            } else{
              ssl <- as.logical(toupper(faasr$ComputeServers[[next_server]]$SSL))
            }
            namespace <- faasr$ComputeServers[[next_server]]$Namespace
            actionname <- invoke_next_function

            if (!startsWith(endpoint, "https") && !startsWith(endpoint, "http")){
              endpoint <- paste0("https://", endpoint)
            }
            url_2 <- paste0(endpoint, "/api/v1/namespaces/",namespace,"/actions/",actionname,"?blocking=false&result=false")
            
            headers <- c(
              'accept' = 'application/json', 
              'Content-Type' = 'application/json'
            )

            # send the REST request(POST/GET/PUT/PATCH)
            response <- POST(
              url = url_2,
              authenticate(api_key[1], api_key[2]),
              add_headers(.headers = headers),
              body=faasr,
              encode="json",
              httr::config(ssl_verifypeer = ssl, ssl_verifyhost = ssl),
              accept_json()
            )

            if (response$status_code==200 || response$status_code==202){
              succ_msg <- paste0('{\"faasr_trigger\":\"OpenWhisk: Successfully invoked: ', faasr$FunctionInvoke, '\"}\n')
              message(succ_msg)
              faasr_log(succ_msg)
            } else {
              err_msg <- paste0('{\"faasr_trigger\":\"OpenWhisk: Error invoking: ', faasr$FunctionInvoke, ' - ', content(response)$error,'\"}\n')
              message(err_msg)
              faasr_log(err_msg)
            }
          },
          
          # if AWS Lambda - use Lambda API
          "Lambda"={
            # AWS Lambda API handling
            # get next function server
            target_server <- faasr$ComputeServers[[next_server]]

            # prepare env variables for lambda
            
            # set invoke request body, it should be a JSON. To pass the payload, toJSON is required.
            payload_json <- toJSON(faasr, auto_unbox = TRUE)

            # Create a Lambda client using paws
            lambda <- paws.compute::lambda(
              config=list(
                credentials=list(
                  creds=list(
                    access_key_id=target_server$AccessKey,
                    secret_access_key=target_server$SecretKey,
                    session_token=""
                  )
                ),
                region=target_server$Region
              )
            )

      # Invoke next function with FunctionName and Payload, receive trigger response
            next_lambda_function_name <- invoke_next_function

      # Invoke next function with FunctionName and Payload, receive trigger response
            response <- lambda$invoke_async(
              FunctionName = next_lambda_function_name,
              InvokeArgs = payload_json
            )

      # Check if next function be invoked successfully
            if (response$Status == 202) {
              succ_msg <- paste0("faasr_trigger: Successfully invoked:", faasr$FunctionInvoke, "\n")
              message(succ_msg)
              faasr_log(succ_msg)
            } else {
              err_msg <- paste0("faasr_trigger: Error invoking: ",faasr$FunctionInvoke," reason:", response$Status, "\n")
              message(err_msg)
              faasr_log(err_msg)
            }
          },

        # if GitHub Actions - use GH Actions
          "GitHubActions"={
            # GitHub Actions API handling
            # Set env values for GitHub Actions event
            pat <- faasr$ComputeServers[[next_server]]$Token
            username <- faasr$ComputeServers[[next_server]]$UserName
            reponame <- faasr$ComputeServers[[next_server]]$ActionRepoName
            repo <- paste0(username, "/", reponame)
            if (!endsWith(invoke_next_function,".yml") && !endsWith(invoke_next_function,".yaml")){
              workflow_file <- paste0(invoke_next_function,".yml")
            } else {
              workflow_file <- invoke_next_function
            }
            git_ref <- faasr$ComputeServers[[next_server]]$Branch

            # Make a copy of faasr
            faasr_git <- faasr
            
            # Hide all credentials before sending the payload to the github actions
            for (faas_js in names(faasr_git$ComputeServers)){
              switch (faasr_git$ComputeServers[[faas_js]]$FaaSType,
                "GitHubActions"={
                  faasr_git$ComputeServers[[faas_js]]$Token <- paste0(faas_js,"_TOKEN")
                },
                "Lambda"={
                  faasr_git$ComputeServers[[faas_js]]$AccessKey <- paste0(faas_js,"_ACCESS_KEY")
                  faasr_git$ComputeServers[[faas_js]]$SecretKey <- paste0(faas_js,"_SECRET_KEY")
                },
                "OpenWhisk"={
                  faasr_git$ComputeServers[[faas_js]]$API.key <- paste0(faas_js,"_API_KEY")
                }
              )
              }
            for (data_js in names(faasr_git$DataStores)){
              faasr_git$DataStores[[data_js]]$AccessKey <- paste0(data_js,"_ACCESS_KEY")
              faasr_git$DataStores[[data_js]]$SecretKey <- paste0(data_js,"_SECRET_KEY")
            }
            # The inputs for the workflow
            inputs <- list(
            PAYLOAD = jsonlite::toJSON(faasr_git, auto_unbox=TRUE)
            )

            # Delete the copy to ensure the memory
            remove(faasr_git)

            # Set the URL for the REST API endpoint of next action
            url <- paste0("https://api.github.com/repos/", repo, "/actions/workflows/", workflow_file, "/dispatches")

            # Set the body of the POST request with github ref and inputs
            body <- list(
              ref = git_ref,
              inputs = inputs
            )

            # Use httr::POST to send the POST request
      # Reference link for POST request: https://docs.github.com/en/rest/actions/workflows?apiVersion=2022-11-28
            response <- POST(
              url = url,
              body = body,
              encode = "json",
              add_headers(
                Authorization = paste("token", pat),
                Accept = "application/vnd.github.v3+json",
                "X-GitHub-Api-Version" = "2022-11-28"
              )
            )

      # Check if next action be invoked successfully
            if (status_code(response) == 204) {
              succ_msg <- paste0("faasr_trigger: GitHub Action: Successfully invoked:", faasr$FunctionInvoke, "\n")
              message(succ_msg)
              faasr_log(succ_msg)
            } else if (status_code(response) == 401) {
        err_msg <- paste0("faasr_trigger: GitHub Action: Authentication failed, check the credentials\n")
              message(err_msg)
              faasr_log(err_msg)
      } else if (status_code(response) == 404) {
        err_msg <- paste0("faasr_trigger: GitHub Action: Cannot find the destination, check the repo name: \"",repo,"\" and workflow name: \"",workflow_file,"\"\n")
              message(err_msg)
              faasr_log(err_msg)
      } else if (status_code(response) == 422) {
        err_msg <- paste0("faasr_trigger: GitHub Action: Cannot find the destination, check the ref: ", faasr$FunctionInvoke, "\n")
              message(err_msg)
              faasr_log(err_msg)
      } else {
        err_msg <- paste0("faasr_trigger: GitHub Action: unknown error happens when invoke next function\n")
              message(err_msg)
              faasr_log(err_msg)
            }
        }
        )
      }
    }
  }
}

Try the FaaSr package in your browser

Any scripts or data that you put into this service are public.

FaaSr documentation built on April 11, 2025, 5:51 p.m.