R/workflow_stepfunctions.R

# NOTE: This code has been modified from AWS Stepfunctions Python:
# https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/main/src/stepfunctions/workflow/stepfunctions.py

#' @importFrom paws sfn
#' @import R6
#' @import jsonlite
#' @import lgr

#' @include utils.R
#' @include steps_states.R
#' @include workflow_widgets_graph.R
#' @include workflow_widgets_event_table.R
#' @include workflow_widgets_workflows_table.R
#' @include workflow_widgets_utils.R
#' @include workflow_cloudformation.R


EventsList = R6Class("EventsList",
  inherit = EventsTableWidget,
  public = list(
    to_html = function(){
      return(self$show())
    }
  ),
  lock_objects=F
)

WorkflowList = R6Class("WorkflowList",
  inherit = WorkflowsTableWidget,
  public = list(
    to_html = function(){
      return(self$show())
    }
  ),
  lock_objects=F
)

ExecutionsList = R6Class("ExecutionsList",
  inherit = ExecutionsTableWidget,
  public = list(
    to_html = function(){
      return(self$show())
    }
  ),
  lock_objects=F
)

ExecutionStatus = Enum(
  Running   = 'RUNNING',
  Succeeded = 'SUCCEEDED',
  Failed    = 'FAILED',
  TimedOut  = 'TIMED_OUT',
  Aborted   = 'ABORTED'
)


#' @title Workflow Class
#' @description Class for creating and managing a workflow.
#' @export
Workflow = R6Class("Workflow",
  public = list(

    #' @description Lists all the workflows in the account.
    #' @param max_items (int, optional): The maximum number of items to be returned. (default: 100)
    #' @param client (SFN.Client, optional): \code{\link[paws]{sfn}} client to use for the query. If
    #'              not provided, a default \code{\link[paws]{sfn}} client for Step Functions will be
    #'              automatically created and used. (default: None)
    #' @param html (bool, optional): Renders the list as an HTML table (If running in
    #'              an IRkernel environment). If the parameter is not provided, or set
    #'              to FALSE, a R list is returned. (default: False)
    #' @return list: The list of workflows. Refer to :meth:`.SFN.Client.list_state_machines()`
    #'              for the response structure.
    list_workflows = function(max_items=100,
                              client=NULL,
                              html=FALSE){
      if (is.null(client)){
        LOGGER$debug(paste("The argument 'client' is not provided. Creating a new",
                           "`paws::sfn()` client instance with default settings."))
        client = sfn()
      }

      LOGGER$debug("Retrieving list of workflows from AWS Step Functions.")

      token = NULL
      workflows = list()
      while(!identical(token, character(0))){
        batch_response=client$list_state_machines(
          maxResults=1000, # maximum allowed page size
          nextToken=token
        )

        workflows = c(workflows, batch_response[["stateMachines"]])
        if(length(workflows) >= max_items){
          workflows = workflows[1:max_items]
          break
        }
        token=batch_response[["nextToken"]]
      }

      if (html){
        display_html <- pkg_method("display_html","IRdisplay")
        return(display_html(WorkflowList$new(workflows)$to_html()))
      } else {
        return(workflows)
      }
    },

    #' @description Initialize Workflow Class
    #' @param name (str): The name of the workflow. A name must not contain:
    #' \itemize{
    #'     \item{whitespace}
    #'     \item{brackets ``< > { } [ ]``}
    #'     \item{wildcard characters ``? *``}
    #'     \item{special characters ``\" # \% \ ^ | ~ ` $ & , ; : /``}
    #'     \item{control characters (``U+0000-001F``, ``U+007F-009F``)}
    #'     }
    #' @param definition (State or Chain): The Amazon States Language
    #'              \url{https://states-language.net/spec.html} definition of the workflow.
    #' @param role (str): The Amazon Resource Name (ARN) of the IAM role to use for creating,
    #'              managing, and running the workflow.
    #' @param tags (list): Tags to be added when creating a workflow. Tags are key-value pairs
    #'              that can be associated with Step Functions workflows and activities. (default: [])
    #' @param execution_input (ExecutionInput, optional): Placeholder collection that defines
    #'              the placeholder variables for the workflow execution. This is also used
    #'              to validate inputs provided when executing the workflow. (default: None)
    #' @param timeout_seconds (int, optional): The maximum number of seconds an execution
    #'              of the workflow can run. If it runs longer than the specified time,
    #'              the workflow run fails with a `States.Timeout` Error Name. (default: None)
    #' @param comment (str, optional): A human-readable description of the workflow. (default: None)
    #' @param version (str, optional): The version of the Amazon States Language used
    #'              in the workflow. (default: None)
    #' @param state_machine_arn (str, optional): The Amazon Resource Name (ARN) of
    #'              the workflow. (default: None)
    #' @param format_json (bool, optional): Boolean flag set to `True` if workflow
    #'              definition and execution inputs should be prettified for this workflow.
    #'              `False`, otherwise. (default: True)
    #' @param client (SFN.Client, optional): paws client to use for creating, managing,
    #'              and running the workflow on Step Functions. If not provided, a
    #'              default \code{\link[paws]{sfn}} client for Step Functions will be automatically created
    #'              and used. (default: None)
    initialize = function(name,
                          definition,
                          role,
                          tags=list(),
                          execution_input=NULL,
                          timeout_seconds=NULL,
                          comment=NULL,
                          version=NULL,
                          state_machine_arn=NULL,
                          format_json=TRUE,
                          client=NULL){
      self$timeout_seconds = timeout_seconds
      self$comment = comment
      self$version = version
      if (inherits(definition, "Graph"))
        self$definition = definition
      else
        self$definition = Graph$new(
          definition,
          timeout_seconds=self$timeout_seconds,
          comment=self$comment,
          version=self$version
        )
      self$name = name
      self$role = role
      self$tags = tags
      self$workflow_input = execution_input

      if (!is.null(client))
        self$client = client
      else
        self$client = sfn()

      self$format_json = format_json
      self$state_machine_arn = state_machine_arn
    },

    #' @description Factory method to create an instance attached to an exisiting
    #'              workflow in Step Functions.
    #' @param state_machine_arn (str): The Amazon Resource Name (ARN) of the existing workflow.
    #' @param client (SFN.Client, optional): \code{\link[paws]{sfn}} client to use for attaching the existing
    #'              workflow in Step Functions to the Workflow object. If not provided,
    #'              a default \code{\link[paws]{sfn}} client for Step Functions will be automatically
    #'              created and used. (default: None)
    #' @return Workflow: Workflow object attached to the existing workflow in Step Functions.
    attach = function(state_machine_arn,
                      client=NULL){

      if (!is.null(client)){
        LOGGER$debug(paste("The argument 'client' is not provided. Creating a new",
                    "`paws::sfn` client instance with default settings."))
        client = sfn()
      }
      response = client$describe_state_machine(stateMachineArn=state_machine_arn)
      return(Workflow$new(
        name=response[['name']],
        definition=FrozenGraph$public_methods$from_json(response[['definition']]),
        role=response[['roleArn']],
        state_machine_arn=response[['stateMachineArn']],
        client=client)
      )
    },

    #' @description Creates the workflow on Step Functions.
    #' @return str: The Amazon Resource Name (ARN) of the workflow created. If the workflow
    #'              already existed, the ARN of the existing workflow is returned.
    create = function(){
      if (!is.null(self$state_machine_arn)){
        LOGGER$warn("The workflow already exists on AWS Step Functions. No action will be performed.")
        return(self$state_machine_arn)
      }

      tryCatch({
        self$state_machine_arn = private$.create()
      },
      error = function(e){
        self$state_machine_arn = private$.extract_state_machine_arn(e)
        LOGGER$error(paste("A workflow with the same name already exists on AWS",
        "Step Functions. To update a workflow, use Workflow.update()."))
      })
      return(self$state_machine_arn)
    },

    #' @description Updates an existing state machine by modifying its definition
    #'              and/or role. Executions started immediately after calling this
    #'              method may use the previous definition and role.
    #' @param definition (State or Chain, optional): The Amazon States Language
    #'              \url{https://states-language.net/spec.html} definition to update
    #'              the workflow with. (default: None)
    #' @param role (str, optional): The Amazon Resource Name (ARN) of the IAM role
    #'              to use for creating, managing, and running the workflow. (default: None)
    #' @return str: The state machine definition and/or role updated. If the update fails,
    #' None will be returned.
    update = function(definition=NULL,
                      role=NULL){
      if (is.null(definition) && is.null(role))
        stop("A new definition and/or role must be provided to update an existing workflow.")

      if (is.null(self$state_machine_arn))
        stop("Local workflow instance does not point to an existing workflow ",
        "on AWS StepFunctions. Please consider using Workflow.create(...) ",
        "to create a new workflow, or Workflow.attach(...) to attach the ",
        "instance to an existing workflow on AWS Step Functions.")

      if (!is.null(definition)){
        if (inherits(definition, "Graph"))
          self$definition = definition
        else
          self$definition = Graph$new(
            definition,
            timeout_seconds=self$timeout_seconds,
            comment=self$comment,
            version=self$version)
      }
      if(!is.null(role))
        self$role = role

      response = self$client$update_state_machine(
        stateMachineArn=self$state_machine_arn,
        definition=self$definition$to_json(pretty=self$format_json),
        roleArn=self$role
      )
      LOGGER$info(paste("Workflow updated successfully on AWS Step Functions.",
      "All execute() calls will use the updated definition and role within a few seconds."))
      return(self$state_machine_arn)
    },

    #' @description Starts a single execution of the workflow.
    #' @param name (str, optional): The name of the workflow execution. If one is
    #'              not provided, a workflow execution name will be auto-generated. (default: None)
    #' @param inputs (str, list, optional): Input data for the workflow execution. (default: None)
    #' @return stepfunctions.workflow.Execution: An execution instance of the workflow.
    execute = function(name=NULL, inputs=NULL){
      if (!is.null(self$workflow_input)){
        validation_result = self$workflow_input$validate(inputs)
        if (isFALSE(validation_result$valid))
          stop(sprintf(
            "Expected run input with the schema: %s",
            self$workflow_input$get_schema_as_json()))
      }
      if (is.null(self$state_machine_arn))
        stop("Local workflow instance does not point to an existing workflow on ",
             "AWS StepFunctions. Before executing a workflow, call Workflow.create(...) ",
             "or Workflow.attach(...).")

      params = list(
        'stateMachineArn'=self$state_machine_arn)
      params[['name']] = name

      if (!is.null(inputs))
        params[['input']] = toJSON(inputs, pretty=self$format_json, auto_unbox=T)

      response = self$client$start_execution(
        stateMachineArn=params$stateMachineArn,
        name=params$name,
        input=params$input)

      LOGGER$info("Workflow execution started successfully on AWS Step Functions.")

      # name is None because \code{\link[paws]{sfn}} client.start_execution only returns startDate and executionArn
      return(Execution$new(
        workflow=self,
        execution_arn=response[['executionArn']],
        start_date=response[['startDate']],
        status=ExecutionStatus$Running,
        client=self$client)
      )
    },

    #' @description Lists the executions for the workflow.
    #' @param max_items (int, optional): The maximum number of items to be returned. (default: 100)
    #' @param status_filter (ExecutionStatus, optional): If specified, only list
    #'              the executions whose current status matches the given filter. (default: None)
    #' @param html (bool, optional): Renders the list as an HTML table (If running
    #'              in an IRKernel environment). If the parameter is not provided, or
    #'              set to FALSE, a R list is returned. (default: False)
    #' @return list: List of workflow run instances.
    list_executions = function(max_items=100,
                               status_filter=NULL,
                               html=FALSE){
      if (is.null(self$state_machine_arn))
        return(list())

      LOGGER$debug("Retrieving list of executions from AWS Step Functions.")

      token=NULL
      response = list()

      while(!identical(token, character(0))){
        batch_response=response=self$client$list_executions(
          stateMachineArn=self$state_machine_arn,
          statusFilter=if(is.null(status_filter)) status_filter else toupper(status_filter),
          maxResults=1000, # maximum allowed page size
          nextToken=token)

        response = c(response, batch_response["executions"])
        if(length(response) >= max_items){
          response = response[1:max_items]
          break
        }
        token = batch_response[["nextToken"]]
      }
      runs = lapply(response$executions, function(execution){
        Execution$new(
          name=execution[['name']],
          workflow=self,
          execution_arn=execution[['executionArn']],
          start_date=execution[['startDate']],
          stop_date=execution[['stopDate']],
          status=execution[['status']],
          client=self$client
        )
      })
      if (html){
        display_html <- pkg_method("display_html","IRdisplay")
        return(display_html(ExecutionsList$new(runs)$to_html()))
      } else {
        return(runs)
      }
    },

    #' @description  Deletes the workflow, if it exists.
    delete = function(){
      if (!is.null(self$state_machine_arn))
        self$client$delete_state_machine(stateMachineArn=self$state_machine_arn)
      LOGGER$info(paste("Workflow has been marked for deletion. If the workflow has",
      "running executions, it will be deleted when all executions are stopped."))
    },

    #' @description Renders a visualization of the workflow graph.
    #' @param portrait (bool, optional): Boolean flag set to `True` if the workflow
    #'              graph should be rendered in portrait orientation. Set to `False`,
    #'              if the graph should be rendered in landscape orientation. (default: False)
    render_graph = function(portrait = FALSE){
      widget = WorkflowGraphWidget$new(self$definition$to_json())
      return(widget$show(portrait=portrait))
    },

    #' @description Returns a CloudFormation template that contains only the StateMachine
    #'              resource. To reuse the CloudFormation template in a different region,
    #'              please make sure to update the region specific AWS resources (e.g: Lambda
    #'              ARN, Training Image) in the StateMachine definition.
    get_cloudformation_template = function(){
      return(build_cloudformation_template(self))
    },

    #' @description class formatting
    format = function(){
      # identify if in Rstudio or not
      if (pkg_env$jupyter){
        if (!is.null(self$state_machine_arn))
          return(sprintf(
            'Workflow: <a target="_blank" href="%s">%s</a>',
            create_sfn_workflow_url(self$state_machine_arn), self$state_machine_arn))
        else
          return('Workflow: Does Not Exist.')
      } else {
        if (!is.null(self$state_machine_arn)){
          cls_fmt = "%s(name='%s', role='%s', state_machine_arn='%s')"
          return(sprintf(cls_fmt, class(self)[1],
                 self$name, self$role, self$state_machine_arn))
        } else {
          cls_fmt = "%s(name='%s', role='%s'): Does Not Exist"
          return(sprintf(cls_fmt, class(self)[1],
                 self$name, self$role))
        }
      }
    },

    #' @description print class
    print = function(){
      if(pkg_env$jupyter){
        display_html <- pkg_method("display_html","IRdisplay")
        display_html(self$format())
      } else {
        cat(self$format(), "\n")
      }
    }
  ),
  private = list(
    .create = function(){
      response = self$client$create_state_machine(
        name=self$name,
        definition=self$definition$to_json(pretty=self$format_json),
        roleArn=self$role,
        tags=self$tags)
      LOGGER$info("Workflow created successfully on AWS Step Functions.")
      return(response[['stateMachineArn']])
    },

    .extract_state_machine_arn = function(exception){
      message = attributes(exception)$error_response$message
      return(unlist(strsplit(message, "'"))[2])
    }
  ),
  lock_objects=F
)

#' @title Execution class
#' @description Class for managing a workflow execution.
#' @export
Execution = R6Class("Execution",
  public = list(

    #' @description Initialize Execution class
    #' @param workflow (Workflow): Step Functions workflow instance.
    #' @param execution_arn (str): The Amazon Resource Name (ARN) of the workflow execution.
    #' @param start_date (datetime.datetime): The date the workflow execution was started.
    #' @param status (RunStatus): Status of the workflow execution.
    #' @param client (SFN.Client, optional): \code{\link[paws]{sfn}} client to use for running and
    #'              managing the workflow executions on Step Functions. If no client
    #'              is provided, the \code{\link[paws]{sfn}} client from the parent workflow will be used. (default: None)
    #' @param name (str, optional): Name for the workflow execution. (default: None)
    #' @param stop_date (datetime.datetime, optional): The date the workflow execution
    #'              was stopped, if applicable. (default: None)
    initialize = function(workflow,
                          execution_arn,
                          start_date,
                          status,
                          client=NULL,
                          name=NULL,
                          stop_date=NULL){
      self$name = name
      self$workflow = workflow
      self$execution_arn = execution_arn
      self$start_date = start_date
      self$stop_date = stop_date
      self$status = status
      if (!is.null(client))
        self$client = client
      else
        self$client = self$workflow$client
    },

    #' @description Stops a workflow execution.
    #' @param error (str, optional): The error code of the failure. (default: None)
    #' @param cause (str, optional): A more detailed explanation of the cause of the failure. (default: None)
    #' @return list: Datetime of when the workflow execution was stopped. Example below::
    #' \code{
    #' list(
    #'  stopDate = as.POSIXct(
    #'    "2015-01-01"
    #'  )
    #' )
    #' }
    #' **Response structure**:
    #' \itemize{
    #'    \item{(list)}
    #'    \item{stopDate (datetime): The date the workflow execution is stopped}
    #' }
    stop = function(cause=NULL,
                    error=NULL){
      response = self$client$stop_execution(
        executionArn=self$execution_arn,
        cause=cause,
        error=error
      )
      return(response)
    },

    #' @description Lists the events in the workflow execution.
    #' @param max_items (int, optional): The maximum number of items to be returned.
    #'              (default: 100)
    #' @param reverse_order (bool, optional): Boolean flag set to `True` if the events
    #'              should be listed in reverse chronological order. Set to `False`,
    #'              if the order should be in chronological order. (default: False)
    #' @param html (bool, optional): Renders the list as an HTML table (If running in
    #'              an IRKernel environment). If the parameter is not provided, or set
    #'              to FALSE, a R list is returned. (default: False)
    #' @return list: Object containing the list of workflow execution events. Refer
    #'              to :meth:`.SFN.Client.get_execution_history()` for the response structure.
    list_events = function(max_items=100,
                           reverse_order=FALSE,
                           html=FALSE){
      LOGGER$debug("Retrieving list of history events for your execution from AWS Step Functions.")

      events=list()
      token=NULL
      while(!identical(token, character(0))){
        batch_response = self$client$get_execution_history(
          executionArn=self$execution_arn,
          maxResults=1000, # maximum allowed page size
          reverseOrder=reverse_order,
          nextToken=token
        )
        events=c(events, batch_response[["events"]])
        if(length(events) >= max_items){
          events = events[1:max_items]
          break
        }
        token=batch_response[["nextToken"]]
      }
      if (html){
        display_html <- pkg_method("display_html","IRdisplay")
        return(display_html(EventsList$new(events)$to_html()))
      } else {
        return(events)
      }
    },

    #' @description Describes a workflow execution.
    #' @return list: Details of the workflow execution.
    #' **Response structure**:
    #' \itemize{
    #'     \item{executionArn (string): The Amazon Resource Name (ARN) that identifies the workflow execution.}
    #'     \item{stateMachineArn (string): The Amazon Resource Name (ARN) of the workflow that was executed.}
    #'     \item{name (string): The name of the workflow execution.}
    #'     \item{status (string): The current status of the workflow execution.}
    #'     \item{startDate (datetime): The date the workflow execution is started.}
    #'     \item{stopDate (datetime): If the workflow execution has already ended, the date the execution stopped.}
    #'     \item{input (string): The string that contains the JSON input data of the workflow execution.}
    #'     \item{output (string): The JSON output data of the workflow execution.}
    #' }
    describe = function(){
      return(self$client$describe_execution(executionArn=self$execution_arn))
    },

    #' @description Renders a visualization of the workflow execution graph.
    #' @param portrait (bool, optional): Boolean flag set to `True` if the workflow
    #'              execution graph should be rendered in portrait orientation. Set
    #'              to `False`, if the graph should be rendered in landscape orientation. (default: False)
    #' @param max_events (int, optional): Specifies the number of events to be visualized
    #'              in the workflow execution graph. (default: 25000)
    render_progress = function(portrait=FALSE,
                               max_events=25000){
      events = self$list_events(max_items=max_events)
      widget = ExecutionGraphWidget$new(
        self$workflow$definition$to_json(),
        toJSON(events, auto_unbox = T),
        execution_arn=self$execution_arn)
      return(widget$show(portrait=portrait))
    },

    #' @description Get the input for the workflow execution.
    #' @return list: Workflow execution input.
    get_input = function(){
      run_input = self$describe()[['input']]
      if(is.null(run_input))
        return(run_input)
      return(fromJSON(run_input))
    },


    #' @description Get the output for the workflow execution.
    #' @param wait (bool, optional): Boolean flag set to `True` if the call should
    #'              wait for a running workflow execution to end before returning
    #'              the output. Set to `False`, otherwise. Note that if the status
    #'              is running, and `wait` is set to `True`, this will be a blocking
    #'              call. (default: False)
    #' @return list: Workflow execution output.
    get_output = function(wait=FALSE){
      while(wait && self$describe()[['status']] == 'RUNNING'){
        Sys.sleep(1)
      }
      output = self$describe()[["output"]]
      if (is.null(outpu))
        return(output)
      return(fromJSON(output))
    },

    #' @description format class
    format = function(){
      if(pkg_env$jupyter){
        return(sprintf('Execution: <a target="_blank" href="%s">%s</a>',
              create_sfn_execution_url(self$execution_arn), self$execution_arn))
      } else {
        return(sprintf("%s(execution_arn='%s', name='%s', status='%s', start_date='%s')",
              class(self)[1], self$execution_arn, self$name, self$status, self$start_date))
      }
    },

    #' @description print class
    print = function(){
      if(pkg_env$jupyter){
        display_html <- pkg_method("display_html","IRdisplay")
        display_html(self$format())
      } else {
        cat(self$format(), "\n")
      }
    }
  ),
  lock_objects=F
)
DyfanJones/aws-step-functions-data-science-sdk-r documentation built on Dec. 17, 2021, 5:31 p.m.