R/faasr_abort_on_multiple_invocations.R

Defines functions faasr_abort_on_multiple_invocations

Documented in faasr_abort_on_multiple_invocations

#' @name faasr_abort_on_multiple_invocations
#' @title faasr_abort_on_multiple_invocations
#' @description 
#' Ensures that only one Action proceeds to execute a User Function if there are multiple triggers
#' This is necessary because if in the Workflow a function receives multiple triggers, 
#' multiple Actions are invoked; however, we don't want to execute the same function multiple times.
#' This function aborts all but the last Action triggered.
#' @param faasr list with parsed and validated Payload
#' @param pre list with names of functions and corresponding predecessors
#' @keywords internal
#' @import uuid
#' @importFrom "paws.storage" "s3"

faasr_abort_on_multiple_invocations <- function(faasr, pre) {

  # Set env for checking
  if (is.null(faasr$LoggingDataStore)){
    log_server_name = faasr$DefaultDataStore
  } else {
    log_server_name = faasr$LoggingDataStore
  }
  
  if (log_server_name %in% names(faasr$DataStores)) {
    NULL
  } else {
    err_msg <- paste0('{\"faasr_abort_on_multiple_invocation\":\"Invalid data server name: ',log_server_name,'\"}', "\n")
    message(err_msg)
    stop()
  }

  log_server <- faasr$DataStores[[log_server_name]]
  s3<-paws.storage::s3(
    config=list(
      credentials=list(
        creds=list(
          access_key_id=log_server$AccessKey,
          secret_access_key=log_server$SecretKey
        )
      ),
      endpoint=log_server$Endpoint,
      region=log_server$Region
    )
  )

  id_folder <- paste0(faasr$FaaSrLog,"/",faasr$InvocationID)

  # Step 1: First, we check if all possible predecessor Actions are marked "done"
  # This is done by checking if a file named "func.done" exists in S3, where func is the name of the predecessor
  # If all possible predecessors are "done", we continue to step 2: below to check which of those should execute
  # If not all predecessors are done, it means there are still predecessors pending, and it's safe for this one to abort

  check_fn_done<-s3$list_objects_v2(Bucket=log_server$Bucket, Prefix=id_folder)
  check_fn_done_list <- lapply(check_fn_done$Contents, function(x) x$Key)
                               
  for (func in pre) {
    # check filename is "functionname.done"
    func_done <- paste0(id_folder,"/",func,".done")
    # if object exists, do nothing.
    # if object doesn't exist, leave a log that this function should wait and will be discarded
    if (!func_done %in% check_fn_done_list){
      res_msg <- paste0('{\"faasr_abort_on_multiple_invocations\":\"not the last trigger invoked - no flag\"}', "\n")
      message(res_msg)
      return("abort-on-multiple-invocation")
    }
  }

  # generate random number to be appended to a file named "$FunctionInvoke.candidate"
  random_number <- sample(1:10000000, 1)

  # Check whether local directory exists, if not, create one.
  if (!dir.exists(id_folder)) {
    dir.create(id_folder, recursive=TRUE)
  }
  
  func_candidate <- paste0(id_folder,"/",faasr$FunctionInvoke,".candidate")

  # Step 2: This code is reached only if all predecessors are done. Now we need to select only one Action to proceed,
  #         while all other Actions should abort
  # The code region below uses a lock implementation over S3 to implement read/modify/write and avoid a race condition
  # Between lock acquire and release, we do the following:
  # 1) download the "$FunctionInvoke.candidate" file from S3. The candidate file stores random numbers generated by
  #    each Actions which have been invoked for this function after all predecessors are done.
  # 2) append a random number to the local file, which is generated by this Action
  # 3) upload the file back to the S3 bucket
  # 4) download the file from S3
  # acquire a Lock
  faasr_acquire(faasr)

  # if file named "$FunctionInvoke.candidate" exists on the S3 server, download it to the local folder
  check_fn_candidate <- s3$list_objects_v2(Bucket=log_server$Bucket, Prefix=func_candidate)
  if (length(check_fn_candidate$Contents) != 0) {
    if (file.exists(func_candidate)) {
      file.remove(func_candidate)
    }
    s3$download_file(Key=func_candidate, Filename=func_candidate, Bucket=log_server$Bucket)
  }

  # append random number to the file, and upload it back to the s3 bucket
  write.table(random_number, func_candidate, col.names=FALSE, row.names = FALSE, append=TRUE, quote=FALSE)
  result <- s3$put_object(Body=func_candidate, Key=func_candidate, Bucket=log_server$Bucket)

  # download from the S3 server to the local folder, again
  if (file.exists(func_candidate)) {
    file.remove(func_candidate)
  }
  s3$download_file(Key=func_candidate, Filename=func_candidate, Bucket=log_server$Bucket)

  # release the Lock
  faasr_release(faasr)

  # if the first line of the candidate file matches the random number generated by this action,
  # this action is the only one that won't abort
  # essentially, the first action to append to the candidate file proceeds; all others will abort
  if (as.character(random_number) == readLines(func_candidate,1)) {
    return("")
  } else {
    res_msg <- paste0('{\"faasr_abort_on_multiple_invocations\":\"not the last trigger invoked - random number does not match\"}', "\n")
    message(res_msg)
    return("abort-on-multiple-invocation")
  }
}

Try the FaaSr package in your browser

Any scripts or data that you put into this service are public.

FaaSr documentation built on June 22, 2024, 9:38 a.m.