R/processing.R

# NOTE: This code has been modified from AWS Sagemaker Python:
# https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/processing.py

#' @include r_utils.R
#' @include job.R
#' @include network.R
#' @include dataset_definition_inputs.R


#' @import R6
#' @import sagemaker.core
#' @importFrom fs file_exists is_file path
#' @importFrom urltools url_decode

#' @title Processor Class
#' @family Processor
#' @description Handles Amazon SageMaker Processing tasks.
#' @export
Processor = R6Class("Processor",
  public = list(
    #' @field role
    #' An AWS IAM role name or ARN
    role = NULL,

    #' @field image_uri
    #' The URI of the Docker image to use
    image_uri = NULL,

    #' @field instance_count
    #' The number of instances to run
    instance_count = NULL,

    #' @field instance_type
    #' The type of EC2 instance to use
    instance_type = NULL,

    #' @field entrypoint
    #' The entrypoint for the processing job
    entrypoint = NULL,

    #' @field volume_size_in_gb
    #' Size in GB of the EBS volume
    volume_size_in_gb = NULL,

    #' @field volume_kms_key
    #' A KMS key for the processing
    volume_kms_key = NULL,

    #' @field output_kms_key
    #' The KMS key ID for processing job outputs
    output_kms_key = NULL,

    #' @field max_runtime_in_seconds
    #' Timeout in seconds
    max_runtime_in_seconds = NULL,

    #' @field base_job_name
    #' Prefix for processing job name
    base_job_name = NULL,

    #' @field sagemaker_session
    #' Session object which manages interactions with Amazon SageMaker
    sagemaker_session = NULL,

    #' @field env
    #' Environment variables
    env = NULL,

    #' @field tags
    #' List of tags to be passed
    tags = NULL,

    #' @field network_config
    #' A :class:`~sagemaker.network.NetworkConfig`
    network_config = NULL,

    #' @field jobs
    #' Jobs ran /running
    jobs = NULL,

    #' @field latest_job
    #' Previously ran jobs
    latest_job = NULL,

    #' @field .current_job_name
    #' Current job
    .current_job_name = NULL,

    #' @field arguments
    #' extra agruments
    arguments = NULL,
    #' @description Initializes a ``Processor`` instance. The ``Processor`` handles Amazon
    #'              SageMaker Processing tasks.
    #' @param role (str): An AWS IAM role name or ARN. Amazon SageMaker Processing
    #'              uses this role to access AWS resources, such as
    #'              data stored in Amazon S3.
    #' @param image_uri (str): The URI of the Docker image to use for the
    #'              processing jobs.
    #' @param instance_count (int): The number of instances to run
    #'              a processing job with.
    #' @param instance_type (str): The type of EC2 instance to use for
    #'              processing, for example, 'ml.c4.xlarge'.
    #' @param entrypoint (list[str]): The entrypoint for the processing job (default: NULL).
    #'              This is in the form of a list of strings that make a command.
    #' @param volume_size_in_gb (int): Size in GB of the EBS volume
    #'              to use for storing data during processing (default: 30).
    #' @param volume_kms_key (str): A KMS key for the processing
    #'              volume (default: NULL).
    #' @param output_kms_key (str): The KMS key ID for processing job outputs (default: NULL).
    #' @param max_runtime_in_seconds (int): Timeout in seconds (default: NULL).
    #'              After this amount of time, Amazon SageMaker terminates the job,
    #'              regardless of its current status. If `max_runtime_in_seconds` is not
    #'              specified, the default value is 24 hours.
    #' @param base_job_name (str): Prefix for processing job name. If not specified,
    #'              the processor generates a default job name, based on the
    #'              processing image name and current timestamp.
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain.
    #' @param env (dict[str, str]): Environment variables to be passed to
    #'              the processing jobs (default: NULL).
    #' @param tags (list[dict]): List of tags to be passed to the processing job
    #'              (default: NULL). For more, see
    #'              https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
    #' @param network_config (:class:`~sagemaker.network.NetworkConfig`):
    #'              A :class:`~sagemaker.network.NetworkConfig`
    #'              object that configures network isolation, encryption of
    #'              inter-container traffic, security group IDs, and subnets.
    initialize = function(role,
                          image_uri,
                          instance_count,
                          instance_type,
                          entrypoint=NULL,
                          volume_size_in_gb=30,
                          volume_kms_key=NULL,
                          output_kms_key=NULL,
                          max_runtime_in_seconds=NULL,
                          base_job_name=NULL,
                          sagemaker_session=NULL,
                          env=NULL,
                          tags=NULL,
                          network_config=NULL){
      self$role = role
      self$image_uri = image_uri
      self$instance_count = instance_count
      self$instance_type = instance_type
      self$entrypoint = entrypoint
      self$volume_size_in_gb = volume_size_in_gb
      self$volume_kms_key = volume_kms_key
      self$output_kms_key = output_kms_key
      self$max_runtime_in_seconds = max_runtime_in_seconds
      self$base_job_name = base_job_name
      self$env = env
      self$tags = tags
      self$network_config = network_config

      self$jobs = list()
      self$latest_job = NULL
      self$.current_job_name = NULL
      self$arguments = NULL

      if(self$instance_type %in% c("local", "local_gpu")){
        if(!inherits(sagemaker_session, "LocalSession")){
          sagemaker_session = LocalSession$new()
        }
      }

      self$sagemaker_session = sagemaker_session %||% Session$new()
    },

    #' @description Runs a processing job.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'              the processing job. These must be provided as
    #'              :class:`~sagemaker.processing.ProcessingInput` objects (default: NULL).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'              the processing job. These can be specified as either path strings or
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects (default: NULL).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'              processing job (default: NULL).
    #' @param wait (bool): Whether the call should wait until the job completes (default: True).
    #' @param logs (bool): Whether to show the logs produced by the job.
    #'              Only meaningful when ``wait`` is True (default: True).
    #' @param job_name (str): Processing job name. If not specified, the processor generates
    #'              a default job name, based on the base job name and current timestamp.
    #' @param experiment_config (dict[str, str]): Experiment management configuration.
    #'              Dictionary contains three optional keys:
    #'              'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'.
    run = function(inputs=NULL,
                   outputs=NULL,
                   arguments=NULL,
                   wait=TRUE,
                   logs=TRUE,
                   job_name=NULL,
                   experiment_config=NULL){
      if (logs && !wait)
        ValueError$new(
          "Logs can only be shown if wait is set to True. ",
          "Please either set wait to True or set logs to False."
        )

      ll = private$.normalize_args(
        job_name=job_name,
        arguments=arguments,
        inputs=inputs,
        kms_key=kms_key,
        outputs=outputs
      )
      self$latest_job = ProcessingJob$new()$start_new(
        processor=self,
        inputs=ll$normalized_inputs,
        outputs=ll$normalized_outputs,
        experiment_config=experiment_config
      )
      self$jobs = c(self$jobs, self$latest_job)
      if (wait) self$latest_job$wait(logs=logs)
    },

    #' @description format class
    format = function(){
      format_class(self)
    }
  ),
  private = list(

    # Extend inputs and outputs based on extra parameters
    .extend_processing_args = function(inputs, outputs, ...){
      return(list(inputs, outputs))
    },

    # Normalizes the arguments so that they can be passed to the job run
    # Args:
    #   job_name (str): Name of the processing job to be created. If not specified, one
    # is generated, using the base name given to the constructor, if applicable
    # (default: None).
    # arguments (list[str]): A list of string arguments to be passed to a
    # processing job (default: None).
    # inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    # the processing job. These must be provided as
    # :class:`~sagemaker.processing.ProcessingInput` objects (default: None).
    # outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    # the processing job. These can be specified as either path strings or
    # :class:`~sagemaker.processing.ProcessingOutput` objects (default: None).
    # code (str): This can be an S3 URI or a local path to a file with the framework
    # script to run (default: None). A no op in the base class.
    # kms_key (str): The ARN of the KMS key that is used to encrypt the
    # user code file (default: None).
    .normalize_args = function(job_name=NULL,
                               arguments=NULL,
                               inputs=NULL,
                               outputs=NULL,
                               code=NULL,
                               kms_key=NULL){
      self$.current_job_name = private$.generate_current_job_name(job_name=job_name)

      inputs_with_code = private$.include_code_in_inputs(inputs, code, kms_key)
      normalized_inputs = private$.normalize_inputs(inputs_with_code, kms_key)
      normalized_outputs = private$.normalize_outputs(outputs)
      self$arguments = arguments
      return(list(
        "normalized_inputs"=normalized_inputs,
        "normalized_outputs"=normalized_outputs)
      )
    },

    # A no op in the base class to include code in the processing job inputs.
    # Args:
    #   inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    # the processing job. These must be provided as
    # :class:`~sagemaker.processing.ProcessingInput` objects.
    # _code (str): This can be an S3 URI or a local path to a file with the framework
    # script to run (default: None). A no op in the base class.
    # kms_key (str): The ARN of the KMS key that is used to encrypt the
    # user code file (default: None).
    # Returns:
    #   list[:class:`~sagemaker.processing.ProcessingInput`]: inputs
    .include_code_in_inputs = function(inputs,
                                       .code,
                                       .kms_key){
      return(inputs)
    },

    # Generates the job name before running a processing job.
    # Args:
    #   job_name (str): Name of the processing job to be created. If not
    # specified, one is generated, using the base name given to the
    # constructor if applicable.
    # Returns:
    #   str: The supplied or generated job name.
    .generate_current_job_name = function(job_name=NULL){
      if (!is.null(job_name))
        return(job_name)
      # Honor supplied base_job_name or generate it.
      if (!is.null(self$base_job_name))
        base_name = self$base_job_name
      else
        base_name = base_name_from_image(self$image_uri)
      return(name_from_base(base_name))
    },

    # Ensures that all the ``ProcessingInput`` objects have names and S3 URIs.
    # Args:
    #   inputs (list[sagemaker.processing.ProcessingInput]): A list of ``ProcessingInput``
    # objects to be normalized (default: None). If not specified,
    # an empty list is returned.
    # kms_key (str): The ARN of the KMS key that is used to encrypt the
    # user code file (default: None).
    # Returns:
    #   list[sagemaker.processing.ProcessingInput]: The list of normalized
    # ``ProcessingInput`` objects.
    # Raises:
    #   TypeError: if the inputs are not ``ProcessingInput`` objects.
    .normalize_inputs = function(inputs=NULL,
                                 kms_key=NULL){
      # Initialize a list of normalized ProcessingInput objects.
      normalized_inputs = list()
      if (!is.null(inputs)){
        # Iterate through the provided list of inputs.
        for (count in 1:length(inputs)){
          if (!inherits(inputs[[count]], "ProcessingInput"))
            TypeError$new("Your inputs must be provided as ProcessingInput objects.")
          # Generate a name for the ProcessingInput if it doesn't have one.
          if (islistempty(inputs[[count]]$input_name))
            inputs[[count]]$input_name = sprintf("input-%s",count)

          if (inherits(inputs[[count]]$source, "Properties") || !is.null(inputs[[count]]$dataset_definition)){
            normalized_inputs = c(normalized_inputs,  inputs[[count]])
            next
          }
          if (inherits(inputs[[count]]$s3_input$s3_uri, c("Parameter", "Expression", "Properties"))){
            normalized_inputs = c(normalized_inputs,  inputs[[count]])
            next
          }
          # If the source is a local path, upload it to S3
          # and save the S3 uri in the ProcessingInput source.
          parse_result = parse_url(inputs[[count]]$s3_input$s3_uri)
          if (!identical(parse_result$scheme, "s3")){
            desired_s3_uri = s3_path_join(
              "s3://",
              self$sagemaker_session$default_bucket(),
              self$.current_job_name,
              "input",
              inputs[[count]]$input_name)
            s3_uri = S3Uploader$new()$upload(
              local_path=inputs[[count]]$s3_input$s3_uri,
              desired_s3_uri=desired_s3_uri,
              sagemaker_session=self$sagemaker_session,
              kms_key=kms_key)
            inputs[[count]]$s3_input$s3_uri = s3_uri
          }
          normalized_inputs = list.append(normalized_inputs, inputs[[count]])
        }
      }
      return(normalized_inputs)
    },

    # Ensures that all the outputs are ``ProcessingOutput`` objects with
    # names and S3 URIs.
    # Args:
    #   outputs (list[sagemaker.processing.ProcessingOutput]): A list
    # of outputs to be normalized (default: NULL). Can be either strings or
    # ``ProcessingOutput`` objects. If not specified,
    # an empty list is returned.
    # Returns:
    #   list[sagemaker.processing.ProcessingOutput]: The list of normalized
    # ``ProcessingOutput`` objects.
    .normalize_outputs = function(outputs = NULL){
      # Initialize a list of normalized ProcessingOutput objects.
      normalized_outputs = list()
      if (!is.null(outputs)){
        # Iterate through the provided list of outputs.
        for(count in 1:length(outputs)){
          if (!inherits(outputs[[count]], "ProcessingOutput"))
            TypeError$new("Your outputs must be provided as ProcessingOutput objects.")
          # Generate a name for the ProcessingOutput if it doesn't have one.
          if (islistempty(outputs[[count]]$output_name))
            outputs[[count]]$output_name = sprintf("output-%s",count)
          if (inherits(outputs[[count]]$destination, c("Parameter", "Expression", "Properties"))){
            normalized_outputs = list.append(normalized_outputs, outputs[[count]])
            next
          }
          # If the output's destination is not an s3_uri, create one.
          parse_result = parse_url(outputs[[count]]$destination)
          if (!identical(parse_result$scheme, "s3")){
            s3_uri = s3_path_join(
              "s3://",
              self$sagemaker_session$default_bucket(),
              self$.current_job_name,
              "input",
              outputs[[count]]$output_name)
            outputs[[count]]$destination = s3_uri}
          normalized_outputs = list.append(normalized_outputs, outputs[[count]])
        }
      }
      return(normalized_outputs)
    }
  )
)

#' @title Script Processor class
#' @family Processor
#' @description Handles Amazon SageMaker processing tasks for jobs using a machine learning framework.
#' @export
ScriptProcessor = R6Class("ScriptProcessor",
  inherit = Processor,
  public = list(

    #' @description Initializes a ``ScriptProcessor`` instance. The ``ScriptProcessor``
    #'              handles Amazon SageMaker Processing tasks for jobs using a machine learning framework,
    #'              which allows for providing a script to be run as part of the Processing Job.
    #' @param role (str): An AWS IAM role name or ARN. Amazon SageMaker Processing
    #'              uses this role to access AWS resources, such as
    #'              data stored in Amazon S3.
    #' @param image_uri (str): The URI of the Docker image to use for the
    #'              processing jobs.
    #' @param command ([str]): The command to run, along with any command-line flags.
    #'              Example: ["python3", "-v"].
    #' @param instance_count (int): The number of instances to run
    #'              a processing job with.
    #' @param instance_type (str): The type of EC2 instance to use for
    #'              processing, for example, 'ml.c4.xlarge'.
    #' @param volume_size_in_gb (int): Size in GB of the EBS volume
    #'              to use for storing data during processing (default: 30).
    #' @param volume_kms_key (str): A KMS key for the processing
    #'              volume (default: NULL).
    #' @param output_kms_key (str): The KMS key ID for processing job outputs (default: NULL).
    #' @param max_runtime_in_seconds (int): Timeout in seconds (default: NULL).
    #'              After this amount of time, Amazon SageMaker terminates the job,
    #'              regardless of its current status. If `max_runtime_in_seconds` is not
    #'              specified, the default value is 24 hours.
    #' @param base_job_name (str): Prefix for processing name. If not specified,
    #'              the processor generates a default job name, based on the
    #'              processing image name and current timestamp.
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain.
    #' @param env (dict[str, str]): Environment variables to be passed to
    #'              the processing jobs (default: NULL).
    #' @param tags (list[dict]): List of tags to be passed to the processing job
    #'              (default: NULL). For more, see
    #'              https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
    #' @param network_config (:class:`~sagemaker.network.NetworkConfig`):
    #'              A :class:`~sagemaker.network.NetworkConfig`
    #'              object that configures network isolation, encryption of
    #'              inter-container traffic, security group IDs, and subnets.
    initialize = function(role,
                          image_uri,
                          command,
                          instance_count,
                          instance_type,
                          volume_size_in_gb=30,
                          volume_kms_key=NULL,
                          output_kms_key=NULL,
                          max_runtime_in_seconds=NULL,
                          base_job_name=NULL,
                          sagemaker_session=NULL,
                          env=NULL,
                          tags=NULL,
                          network_config=NULL){
      self$.CODE_CONTAINER_BASE_PATH = "/opt/ml/processing/input/"
      self$.CODE_CONTAINER_INPUT_NAME = "code"
      self$command = command

      super$initialize(role=role,
                       image_uri=image_uri,
                       instance_count=instance_count,
                       instance_type=instance_type,
                       volume_size_in_gb=volume_size_in_gb,
                       volume_kms_key=volume_kms_key,
                       output_kms_key=output_kms_key,
                       max_runtime_in_seconds=max_runtime_in_seconds,
                       base_job_name=base_job_name,
                       sagemaker_session=sagemaker_session,
                       env=env,
                       tags=tags,
                       network_config=network_config)
    },

    #' @description Returns a RunArgs object.
    #'              For processors (:class:`~sagemaker.spark.processing.PySparkProcessor`,
    #'              :class:`~sagemaker.spark.processing.SparkJar`) that have special
    #'              run() arguments, this object contains the normalized arguments for passing to
    #'              :class:`~sagemaker.workflow.steps.ProcessingStep`.
    #' @param code (str): This can be an S3 URI or a local path to a file with the framework
    #'              script to run.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'              the processing job. These must be provided as
    #'              :class:`~sagemaker.processing.ProcessingInput` objects (default: None).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'              the processing job. These can be specified as either path strings or
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects (default: None).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'              processing job (default: None).
    get_run_args = function(code,
                            inputs=NULL,
                            outputs=NULL,
                            arguments=NULL){
      return(RunArgs$new(code=code, inputs=inputs, outputs=outputs, arguments=arguments))
    },

    #' @description Runs a processing job.
    #' @param code (str): This can be an S3 URI or a local path to
    #'              a file with the framework script to run.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'              the processing job. These must be provided as
    #'              :class:`~sagemaker.processing.ProcessingInput` objects (default: NULL).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'              the processing job. These can be specified as either path strings or
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects (default: NULL).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'              processing job (default: NULL).
    #' @param wait (bool): Whether the call should wait until the job completes (default: True).
    #' @param logs (bool): Whether to show the logs produced by the job.
    #'              Only meaningful when wait is True (default: True).
    #' @param job_name (str): Processing job name. If not specified, the processor generates
    #'              a default job name, based on the base job name and current timestamp.
    #' @param experiment_config (dict[str, str]): Experiment management configuration.
    #'              Dictionary contains three optional keys:
    #'              'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'.
    #' @param kms_key (str): The ARN of the KMS key that is used to encrypt the
    #'              user code file (default: None).
    run = function(code,
                   inputs=NULL,
                   outputs=NULL,
                   arguments=NULL,
                   wait=TRUE,
                   logs=TRUE,
                   job_name=NULL,
                   experiment_config=NULL,
                   kms_key=NULL){
      ll = private$.normalize_args(
        job_name=job_name,
        arguments=arguments,
        inputs=inputs,
        outputs=outputs,
        code=code,
        kms_key=kms_key)
      self$latest_job = ProcessingJob$new()$start_new(
          processor=self,
          inputs=ll$normalized_inputs,
          outputs=ll$normalized_outputs,
          experiment_config=experiment_config)
      self$jobs = c(self$jobs, self$latest_job)

      if (wait) self$latest_job$wait(logs=logs)
    }
  ),
  private = list(

    # Converts code to appropriate input and includes in input list.
    # Side effects include:
    #   * uploads code to S3 if the code is a local file.
    # * sets the entrypoint attribute based on the command and user script name from code.
    # Args:
    #   inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    # the processing job. These must be provided as
    # :class:`~sagemaker.processing.ProcessingInput` objects.
    # code (str): This can be an S3 URI or a local path to a file with the framework
    # script to run (default: None).
    # kms_key (str): The ARN of the KMS key that is used to encrypt the
    # user code file (default: None).
    # Returns:
    #   list[:class:`~sagemaker.processing.ProcessingInput`]: inputs together with the
    # code as `ProcessingInput`.
    .include_code_in_inputs = function(inputs, code, kms_key=NULL){
      user_code_s3_uri = private$.handle_user_code_url(code, kms_key)
      user_script_name = private$.get_user_code_name(code)

      inputs_with_code = private$.convert_code_and_add_to_inputs(inputs, user_code_s3_uri)
      private$.set_entrypoint(self$command, user_script_name)
      return(inputs_with_code)
    },

    # Gets the basename of the user's code from the URL the customer provided.
    # Args:
    #     code (str): A URL to the user's code.
    # Returns:
    #   str: The basename of the user's code.
    .get_user_code_name = function(code){
      code_url = parse_url(code)
      return (basename(code_url$path))
    },

    # Gets the S3 URL containing the user's code.
    #    Inspects the scheme the customer passed in ("s3://" for code in S3, "file://" or nothing
    #    for absolute or local file paths. Uploads the code to S3 if the code is a local file.
    # Args:
    #     code (str): A URL to the customer's code.
    # Returns:
    #   str: The S3 URL to the customer's code.
    .handle_user_code_url = function(code, kms_key=NULL){
      code_url = parse_url(code)
      if (identical(code_url$scheme, "s3")) {
        user_code_s3_uri = code
      } else if(is.na(code_url$scheme) || identical(code_url$scheme, "file")) {
        # Validate that the file exists locally and is not a directory.
        code_path = urltools::url_decode(code_url$path)
        if (!fs::file_exists(code_path)){
          ValueError$new(sprintf(
            "code %s wasn't found. Please make sure that the file exists.",
            code)
          )
        }
        if (!fs::is_file(code_path)){
          ValueError$new(sprintf(
            "code %s must be a file, not a directory. Please pass a path to a file.",
            code)
          )
        }
        user_code_s3_uri = private$.upload_code(code_path, kms_key)
      } else {
        ValueError$new(sprintf(
          "code %s url scheme %s is not recognized. Please pass a file path or S3 url",
          code, code_url$scheme)
        )
      }
      return(user_code_s3_uri)
    },

    # Uploads a code file or directory specified as a string
    # and returns the S3 URI.
    # Args:
    #   code (str): A file or directory to be uploaded to S3.
    # Returns:
    #   str: The S3 URI of the uploaded file or directory.
    .upload_code = function(code, kms_key=NULL){
      desired_s3_uri = s3_path_join(
        "s3://",
        self$sagemaker_session$default_bucket(),
        self$.current_job_name,
        "input",
        self$.CODE_CONTAINER_INPUT_NAME)
      return(S3Uploader$new()$upload(
        local_path=code,
        desired_s3_uri=desired_s3_uri,
        kms_key=kms_key,
        sagemaker_session=self$sagemaker_session
        )
      )
    },

    # Creates a ``ProcessingInput`` object from an S3 URI and adds it to the list of inputs.
    # Args:
    #   inputs (list[sagemaker.processing.ProcessingInput]):
    #   List of ``ProcessingInput`` objects.
    # s3_uri (str): S3 URI of the input to be added to inputs.
    # Returns:
    #   list[sagemaker.processing.ProcessingInput]: A new list of ``ProcessingInput`` objects,
    # with the ``ProcessingInput`` object created from ``s3_uri`` appended to the list.
    .convert_code_and_add_to_inputs = function(inputs, s3_uri){
      code_file_input = ProcessingInput$new(
        source=s3_uri,
        destination=sprintf("%s%s",
          self$.CODE_CONTAINER_BASE_PATH, self$.CODE_CONTAINER_INPUT_NAME),
        input_name=self$.CODE_CONTAINER_INPUT_NAME)
      return(c(inputs, code_file_input))
    },

    # Sets the entrypoint based on the user's script and corresponding executable.
    # Args:
    #     user_script_name (str): A filename with an extension.
    .set_entrypoint = function(command, user_script_name){
      user_script_location = sprintf("%s%s/%s",
        self$.CODE_CONTAINER_BASE_PATH, self$.CODE_CONTAINER_INPUT_NAME, user_script_name)
      self$entrypoint = list(command, user_script_location)
    }
  ),
  lock_objects = F
)

#' @title ProccesingJob Class
#' @family Processor
#' @description Provides functionality to start, describe, and stop processing jobs.
#' @export
ProcessingJob = R6Class("ProcessingJob",
  inherit = .Job,
  public = list(
    #' @field inputs
    #' A list of :class:`~sagemaker.processing.ProcessingInput` objects.
    inputs = NULL,

    #' @field outputs
    #' A list of :class:`~sagemaker.processing.ProcessingOutput` objects.
    outputs = NULL,

    #' @field output_kms_key
    #' The output KMS key associated with the job
    output_kms_key = NULL,

    #' @description Initializes a Processing job.
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain.
    #' @param job_name (str): Name of the Processing job.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): A list of
    #'              :class:`~sagemaker.processing.ProcessingInput` objects.
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): A list of
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects.
    #' @param output_kms_key (str): The output KMS key associated with the job (default: None).
    initialize = function(sagemaker_session=NULL,
                          job_name=NULL,
                          inputs=NULL,
                          outputs=NULL,
                          output_kms_key=NULL){
      self$inputs = inputs
      self$outputs = outputs
      self$output_kms_key = output_kms_key
      super$initialize(sagemaker = sagemaker_session, job_name = job_name)
    },

    #' @description Starts a new processing job using the provided inputs and outputs.
    #' @param processor (:class:`~sagemaker.processing.Processor`): The ``Processor`` instance
    #'              that started the job.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): A list of
    #'              :class:`~sagemaker.processing.ProcessingInput` objects.
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): A list of
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects.
    #' @param experiment_config (dict[str, str]): Experiment management configuration.
    #'              Dictionary contains three optional keys:
    #'              'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'.
    #' @return :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created
    #'              using the ``Processor``.
    start_new = function(processor,
                         inputs,
                         outputs,
                         experiment_config){
      process_args = private$.get_process_args(processor, inputs, outputs, experiment_config)

      # Print the job name and the user's inputs and outputs as lists of dictionaries.
      writeLines("")
      writeLines(sprintf("Job Name: %s", process_args$job_name))
      writeLines(sprintf("Inputs: %s",
        jsonlite::toJSON(process_args$inputs, auto_unbox = T)))
      writeLines(sprintf("Outputs: %s",
        jsonlite::toJSON(process_args$output_config$Outputs, auto_unbox = T)))

      # Call sagemaker_session.process using the arguments dictionary.
      do.call(processor$sagemaker_session$process, process_args)

      cls = self$clone()
      cls$initialize(
        processor$sagemaker_session,
        processor$.current_job_name,
        inputs,
        outputs,
        processor$output_kms_key
      )
      return(cls)
    },

    #' @description Initializes a ``ProcessingJob`` from a processing job name.
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain.
    #' @param processing_job_name (str): Name of the processing job.
    #' @return :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created
    #'              from the job name.
    from_processing_name = function(sagemaker_session,
                                    processing_job_name){
      job_desc = sagemaker_session$describe_processing_job(job_name=processing_job_name)

      inputs = NULL
      if (!islistempty(job_desc$ProcessingInputs)){
        inputs = lapply(
          job_desc$ProcessingInputs, function(processing_input) {
            ProcessingInput$new(
              input_name=processing_input$InputName,
              s3_input=S3Input$new()$from_paws(processing_input[["S3Input"]]),
              dataset_definition=DatasetDefinition$new()$from_paws(
                processing_input[["DatasetDefinition"]]),
              app_managed=processing_input[["AppManaged"]] %||% FALSE)
        })
      }
      outputs = NULL
      if (!islistempty(job_desc$ProcessingOutputConfig) && !islistempty(job_desc$ProcessingOutputConfig$Outputs)){
        outputs = lapply(
          job_desc$ProcessingOutputConfig$Outputs,
          function(processing_output_dict) {
            processing_output = ProcessingOutput$new(
              output_name=processing_output_dict[["OutputName"]],
              app_managed=processing_output_dict[["AppManaged"]] %||% FALSE,
              feature_store_output=FeatureStoreOutput$new()$from_paws(
                processing_output_dict[["FeatureStoreOutput"]]))

            if("S3Output" %in% names(processing_output_dict)){
              processing_output$source = processing_output_dict[["S3Output"]][["LocalPath"]]
              processing_output$destination = processing_output_dict[["S3Output"]][["S3Uri"]]
            }
            return(processing_output)
        })
      }
      output_kms_key = NULL
      if (!islistempty(job_desc$ProcessingOutputConfig))
        output_kms_key = job_desc$ProcessingOutputConfig$KmsKeyId

      cls = self$clone()
      cls$initialize(
        sagemaker_session=sagemaker_session,
        job_name=processing_job_name,
        inputs=inputs,
        outputs=outputs,
        output_kms_key=output_kms_key
      )
      return(cls)
    },

    #' @description Initializes a ``ProcessingJob`` from a Processing ARN.
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain.
    #' @param processing_job_arn (str): ARN of the processing job.
    #' @return :class:`~sagemaker.processing.ProcessingJob`: The instance of ``ProcessingJob`` created
    #'              from the processing job's ARN.
    from_processing_arn = function(sagemaker_session,
                                   processing_job_arn){
      processing_job_name = split_str(processing_job_arn, ":")[6]
      processing_job_name = substring(
        processing_job_name,
        nchar("processing-job/") + 1,
        nchar(processing_job_name)) # This is necessary while the API only vends an arn.
      return(self$from_processing_name(
        sagemaker_session=sagemaker_session, processing_job_name=processing_job_name
        )
      )
    },

    #' @description Waits for the processing job to complete.
    #' @param logs (bool): Whether to show the logs produced by the job (default: True).
    wait = function(logs = TRUE){
      if (logs)
        self$sagemaker_session$logs_for_processing_job(self$job_name, wait=TRUE)
      else
        self$sagemaker_session$wait_for_processing_job(self$job_name)
    },

    #' @description Prints out a response from the DescribeProcessingJob API call.
    describe = function(){
      return(self$sagemaker_session$describe_processing_job(self$job_name))
    },

    #' @description the processing job.
    stop = function(){
      return(self$sagemaker_session$stop_processing_job(self$name))
    },

    #' @description Prepares a dict that represents a ProcessingJob's AppSpecification.
    #' @param container_arguments (list[str]): The arguments for a container
    #'              used to run a processing job.
    #' @param container_entrypoint (list[str]): The entrypoint for a container
    #'              used to run a processing job.
    #' @param image_uri (str): The container image to be run by the processing job.
    #' @return dict: Represents AppSpecification which configures the
    #'              processing job to run a specified Docker container image.
    prepare_app_specification = function(container_arguments,
                                         container_entrypoint,
                                         image_uri){
      config = list("ImageUri"=image_uri)
      if (!islistempty(container_arguments))
        config[["ContainerArguments"]] = container_arguments
      if (!islistempty(container_entrypoint))
        config[["ContainerEntrypoint"]] = container_entrypoint
      return(config)
    },

    #' @description Prepares a dict that represents a ProcessingOutputConfig.
    #' @param kms_key_id (str): The AWS Key Management Service (AWS KMS) key that
    #'              Amazon SageMaker uses to encrypt the processing job output.
    #'              KmsKeyId can be an ID of a KMS key, ARN of a KMS key, alias of a KMS key,
    #'              or alias of a KMS key. The KmsKeyId is applied to all outputs.
    #' @param outputs (list[dict]): Output configuration information for a processing job.
    #' @return dict: Represents output configuration for the processing job.
    prepare_output_config = function(kms_key_id,
                                     outputs){
      config = list("Outputs"=outputs)
      if (!is.null(kms_key_id))
        config[["KmsKeyId"]] = kms_key_id
      return(config)
    },

    #' @description Prepares a dict that represents the ProcessingResources.
    #' @param instance_count (int): The number of ML compute instances
    #'              to use in the processing job. For distributed processing jobs,
    #'              specify a value greater than 1. The default value is 1.
    #' @param instance_type (str): The ML compute instance type for the processing job.
    #' @param volume_kms_key_id (str): The AWS Key Management Service (AWS KMS) key
    #'              that Amazon SageMaker uses to encrypt data on the storage
    #'              volume attached to the ML compute instance(s) that run the processing job.
    #' @param volume_size_in_gb (int): The size of the ML storage volume in gigabytes
    #'              that you want to provision. You must specify sufficient
    #'              ML storage for your scenario.
    #' @return dict: Represents ProcessingResources which identifies the resources,
    #'              ML compute instances, and ML storage volumes to deploy
    #'              for a processing job.
    prepare_processing_resources = function(instance_count,
                                            instance_type,
                                            volume_kms_key_id,
                                            volume_size_in_gb){
      processing_resources = list()
      cluster_config = list(
        "InstanceCount"=instance_count,
        "InstanceType"=instance_type,
        "VolumeSizeInGB"=volume_size_in_gb
      )
      if (!is.null(volume_kms_key_id))
        cluster_config[["VolumeKmsKeyId"]] = volume_kms_key_id
      processing_resources[["ClusterConfig"]] = cluster_config
      return(processing_resources)
    },

    #' @description Prepares a dict that represents the job's StoppingCondition.
    #' @param max_runtime_in_seconds (int): Specifies the maximum runtime in seconds.
    #' @return list
    prepare_stopping_condition= function(max_runtime_in_seconds){
      return(list("MaxRuntimeInSeconds"=max_runtime_in_seconds))
    }
  ),
  private = list(

    # Gets a dict of arguments for a new Amazon SageMaker processing job from the processor
    # Args:
    #   processor (:class:`~sagemaker.processing.Processor`): The ``Processor`` instance
    # that started the job.
    # inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): A list of
    # :class:`~sagemaker.processing.ProcessingInput` objects.
    # outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): A list of
    # :class:`~sagemaker.processing.ProcessingOutput` objects.
    # experiment_config (dict[str, str]): Experiment management configuration.
    # Dictionary contains three optional keys:
    #   'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'.
    # Returns:
    #   Dict: dict for `sagemaker.session.Session.process` method
    .get_process_args = function(processor,
                                 inputs,
                                 outputs,
                                 experiment_config){
      # Initialize an empty dictionary for arguments to be passed to sagemaker_session.process.
      process_request_args = list()

      # Add arguments to the dictionary.
      process_request_args[["inputs"]] = lapply(inputs, function(input) {
        input$to_request_list()
      })

      process_request_args[["output_config"]] = list(
        "Outputs"= lapply(outputs, function(output) output$to_request_list())
      )
      if (!islistempty(processor$output_kms_key))
        process_request_args[["output_config"]][["KmsKeyId"]] = processor$output_kms_key
      # ensure NULL value is kept
      process_request_args["experiment_config"] = list(experiment_config)
      process_request_args[["job_name"]] = processor$.current_job_name

      process_request_args[["resources"]] = list(
        "ClusterConfig" = list(
          "InstanceType"= processor$instance_type,
          "InstanceCount"= processor$instance_count,
          "VolumeSizeInGB"= processor$volume_size_in_gb
        )
      )

      if(!islistempty(processor$volume_kms_key)){
        process_request_args[["resources"]][["ClusterConfig"]][["VolumeKmsKeyId"]] = processor$volume_kms_key
      }
      if(!islistempty(processor$max_runtime_in_seconds)){
        process_request_args[["stopping_condition"]] = list(
          "MaxRuntimeInSeconds"= processor$max_runtime_in_seconds)
      } else {
        # ensure NULL value is kept
        process_request_args["stopping_condition"] = list(NULL)
      }
      process_request_args[["app_specification"]] = list("ImageUri"= processor$image_uri)
      if (!islistempty(processor$arguments))
        process_request_args[["app_specification"]][["ContainerArguments"]] = processor$arguments
      if (!islistempty(processor$entrypoint))
        process_request_args[["app_specification"]][["ContainerEntrypoint"]] = processor$entrypoint

      # ensure NULL value is kept
      process_request_args["environment"] = list(processor$env)

      if (!islistempty(processor$network_config)){
        process_request_args[["network_config"]] = processor$network_config$to_request_list()
      } else {
        # ensure NULL value is kept
        process_request_args["network_config"] = list(NULL)
      }

      # ensure NULL value is kept
      process_request_args["role_arn"] = list(processor$sagemaker_session$expand_role(processor$role))
      process_request_args["tags"] = list(processor$tags)

      return(process_request_args)
    },

    # Used for Local Mode. Not yet implemented.
    # Args:
    #   input_url (str): input URL
    .is_local_channel = function(input_url){
      NotImplementedError$new()
    }
  )
)

#' @title ProcessingInput Class
#' @family Processor
#' @description Accepts parameters that specify an Amazon S3 input for a processing job and
#'              provides a method to turn those parameters into a dictionary.
#' @export
ProcessingInput = R6Class("ProcessingInput",
  public = list(

    #' @description Initializes a ``ProcessingInput`` instance. ``ProcessingInput`` accepts parameters
    #'              that specify an Amazon S3 input for a processing job and provides a method
    #'              to turn those parameters into a dictionary.
    #' @param source (str): The source for the input. If a local path is provided, it will
    #'              automatically be uploaded to S3 under:
    #'              "s3://<default-bucket-name>/<job-name>/input/<input-name>".
    #' @param destination (str): The destination of the input.
    #' @param input_name (str): The name for the input. If a name
    #'              is not provided, one will be generated (eg. "input-1").
    #' @param s3_data_type (str): Valid options are "ManifestFile" or "S3Prefix".
    #' @param s3_input_mode (str): Valid options are "Pipe" or "File".
    #' @param s3_data_distribution_type (str): Valid options are "FullyReplicated"
    #'              or "ShardedByS3Key".
    #' @param s3_compression_type (str): Valid options are "None" or "Gzip".
    #' @param s3_input (:class:`~sagemaker.dataset_definition.S3Input`)
    #'              Metadata of data objects stored in S3
    #' @param dataset_definition (:class:`~sagemaker.dataset_definition.DatasetDefinition`)
    #'              DatasetDefinition input
    #' @param app_managed (bool): Whether the input are managed by SageMaker or application
    initialize = function(source=NULL,
                          destination=NULL,
                          input_name=NULL,
                          s3_data_type=c("S3Prefix", "ManifestFile"),
                          s3_input_mode=c("File", "Pipe"),
                          s3_data_distribution_type=c("FullyReplicated", "ShardedByS3Key"),
                          s3_compression_type=c("None", "Gzip"),
                          s3_input=NULL,
                          dataset_definition=NULL,
                          app_managed=FALSE){
      self$source = source
      self$destination = destination
      self$input_name = input_name
      self$s3_data_type = match.arg(s3_data_type)
      self$s3_input_mode = match.arg(s3_input_mode)
      self$s3_data_distribution_type = match.arg(s3_data_distribution_type)
      self$s3_compression_type = match.arg(s3_compression_type)
      self$s3_input = s3_input
      self$dataset_definition = dataset_definition
      self$app_managed = app_managed
      private$.create_s3_input()
    },

    #' @description Generates a request dictionary using the parameters provided to the class.
    to_request_list = function(){
      # Create the request dictionary.
      s3_input_request = list(
        "InputName"= self$input_name,
        "AppManaged"=self$app_managed
      )

      if(!is.null(self$s3_input)){
        # Check the compression type, then add it to the dictionary.
        if (self$s3_compression_type == "Gzip"
            && self$s3_input_mode != "Pipe")
          ValueError$new("Data can only be gzipped when the input mode is Pipe.")
        s3_input_request[["S3Input"]] = S3Input$new()$to_paws(self$s3_input)
      }

      if (!is.null(self$dataset_definition))
        s3_input_request[["DatasetDefinition"]] = DatasetDefinition$new()$to_paws(
          self$dataset_definition)

      # Return the request dictionary.
      return (s3_input_request)
    },

    #' @description format class
    format = function(){
      format_class(self)
    }
  ),
  private = list(

    # Create and initialize S3Input.
    # When client provides S3Input, backfill other class memebers because they are used
    # in other places. When client provides other S3Input class memebers, create and
    # init S3Input.
    .create_s3_input = function(){
      if (!is.null(self$s3_input)){
        # backfill other class members
        self$source = self$s3_input$s3_uri
        self$destination = self$s3_input$local_path
        self$s3_data_type = self$s3_input$s3_data_type
        self$s3_input_mode = self$s3_input$s3_input_mode
        self$s3_data_distribution_type = self$s3_input$s3_data_distribution_type
      }else if (!is.null(self$source) && !is.null(self$destination)){
        self$s3_input = S3Input$new(
          s3_uri=self$source,
          local_path=self$destination,
          s3_data_type=self$s3_data_type,
          s3_input_mode=self$s3_input_mode,
          s3_data_distribution_type=self$s3_data_distribution_type,
          s3_compression_type=self$s3_compression_type
        )
      }
    }
  ),
  lock_objects = F
)

#' @title ProcessingOutput Class
#' @family Processor
#' @description Accepts parameters that specify an Amazon S3 output for a processing job and provides
#'              a method to turn those parameters into a dictionary.
#' @export
ProcessingOutput = R6Class("ProcessingOutput",
  public = list(
    #' @description Initializes a ``ProcessingOutput`` instance. ``ProcessingOutput`` accepts parameters that
    #'              specify an Amazon S3 output for a processing job and provides a method to turn
    #'              those parameters into a dictionary.
    #' @param source (str): The source for the output.
    #' @param destination (str): The destination of the output. If a destination
    #'              is not provided, one will be generated:
    #'              "s3://<default-bucket-name>/<job-name>/output/<output-name>".
    #' @param output_name (str): The name of the output. If a name
    #'              is not provided, one will be generated (eg. "output-1").
    #' @param s3_upload_mode (str): Valid options are "EndOfJob" or "Continuous".
    #'             s3_upload_mode (str): Valid options are "EndOfJob" or "Continuous".
    #' @param app_managed (bool): Whether the input are managed by SageMaker or application
    #' @param feature_store_output (:class:`~sagemaker.processing.FeatureStoreOutput`)
    #'             Configuration for processing job outputs of FeatureStore.
    initialize = function(source=NULL,
                          destination=NULL,
                          output_name=NULL,
                          s3_upload_mode=c("EndOfJob", "Continuous"),
                          app_managed=FALSE,
                          feature_store_output=NULL){
      self$source = source
      self$destination = destination
      self$output_name = output_name
      self$s3_upload_mode = match.arg(s3_upload_mode)
      self$app_managed = app_managed
      self$feature_store_output = feature_store_output
    },

    #' @description Generates a request dictionary using the parameters provided to the class.
    to_request_list = function(){
     # Create the request dictionary.
      # Create the request dictionary.
      s3_output_request = list(
        "OutputName"=self$output_name,
        "AppManaged"=self$app_managed
      )

      if (!is.null(self$source))
        s3_output_request[["S3Output"]] = list(
          "S3Uri"=self$destination,
          "LocalPath"=self$source,
          "S3UploadMode"=self$s3_upload_mode
        )

      if (!is.null(self$feature_store_output))
        s3_output_request[["FeatureStoreOutput"]] = FeatureStoreOutput$new()$to_paws(
          self$feature_store_output
        )

      # Return the request dictionary.
      return(s3_output_request)
    },

    #' @description format class
    format = function(){
      format_class(self)
    }
  ),
  lock_objects = F
)

#' @title RunArgs Class
#' @description Accepts parameters that correspond to ScriptProcessors.
#' @export
RunArgs = R6Class("RunArgs",
  public = list(

    #' @field code
    #' This can be an S3 URI or a local path to a file with the framework script to run
    code=NULL,

    #' @field inputs
    #' Input files for the processing job
    inputs=NULL,

    #' @field outputs
    #' Outputs for the processing job
    outputs=NULL,

    #' @field arguments
    #' A list of string arguments to be passed to a processing job
    arguments=NULL,

    #' @description An instance of this class is returned from the ``get_run_args()`` method on processors,
    #'             and is used for normalizing the arguments so that they can be passed to
    #'             :class:`~sagemaker.workflow.steps.ProcessingStep`
    #' @param code (str): This can be an S3 URI or a local path to a file with the framework
    #'             script to run.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'             the processing job. These must be provided as
    #'             :class:`~sagemaker.processing.ProcessingInput` objects (default: None).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'             the processing job. These can be specified as either path strings or
    #'             :class:`~sagemaker.processing.ProcessingOutput` objects (default: None).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'             processing job (default: None).
    initialize = function(code,
                          inputs=NULL,
                          outputs=NULL,
                          arguments=NULL){
      self$code=code
      self$inputs=inputs
      self$outputs=outputs
      self$arguments=arguments
    }
  )
)

#' @title Amazon SageMaker Feature Store class
#' @description Configuration for processing job outputs in Amazon SageMaker Feature Store
#' @export
FeatureStoreOutput = R6Class("FeatureStoreOutput",
  inherit = ApiObject,
  public = list(

    #' @field feature_group_name
    #' placeholder
    feature_group_name = NULL
  ),
  lock_objects = F
)

#' @title FrameworkProcessor class
#' @description Handles Amazon SageMaker processing tasks for jobs using a machine learning framework
#' @export
FrameworkProcessor = R6Class("FrameworkProcessor",
  inherit = ScriptProcessor,
  public = list(

    #' @field framework_entrypoint_command
    #'
    framework_entrypoint_command = list("/bin/bash"),

    #' @description Initializes a ``FrameworkProcessor`` instance.
    #'              The ``FrameworkProcessor`` handles Amazon SageMaker Processing tasks for jobs
    #'              using a machine learning framework, which allows for a set of Python scripts
    #'              to be run as part of the Processing Job.
    #' @param estimator_cls (type): A subclass of the :class:`~sagemaker.estimator.Framework`
    #'              estimator
    #' @param framework_version (str): The version of the framework. Value is ignored when
    #'              ``image_uri`` is provided.
    #' @param role (str): An AWS IAM role name or ARN. Amazon SageMaker Processing uses
    #'              this role to access AWS resources, such as data stored in Amazon S3.
    #' @param instance_count (int): The number of instances to run a processing job with.
    #' @param instance_type (str): The type of EC2 instance to use for processing, for
    #'              example, 'ml.c4.xlarge'.
    #' @param py_version (str): Python version you want to use for executing your
    #'              model training code. One of 'py2' or 'py3'. Defaults to 'py3'. Value
    #'              is ignored when ``image_uri`` is provided.
    #' @param image_uri (str): The URI of the Docker image to use for the
    #'              processing jobs (default: None).
    #' @param command ([str]): The command to run, along with any command-line flags
    #'              to *precede* the ```code script```. Example: ["python3", "-v"]. If not
    #'              provided, ["python"] will be chosen (default: None).
    #' @param volume_size_in_gb (int): Size in GB of the EBS volume
    #'              to use for storing data during processing (default: 30).
    #' @param volume_kms_key (str): A KMS key for the processing volume (default: None).
    #' @param output_kms_key (str): The KMS key ID for processing job outputs (default: None).
    #' @param code_location (str): The S3 prefix URI where custom code will be
    #'              uploaded (default: None). The code file uploaded to S3 is
    #'              'code_location/job-name/source/sourcedir.tar.gz'. If not specified, the
    #'              default ``code location`` is 's3://{sagemaker-default-bucket}'
    #' @param max_runtime_in_seconds (int): Timeout in seconds (default: None).
    #'              After this amount of time, Amazon SageMaker terminates the job,
    #'              regardless of its current status. If `max_runtime_in_seconds` is not
    #'              specified, the default value is 24 hours.
    #' @param base_job_name (str): Prefix for processing name. If not specified,
    #'              the processor generates a default job name, based on the
    #'              processing image name and current timestamp (default: None).
    #' @param sagemaker_session (:class:`~sagemaker.session.Session`):
    #'              Session object which manages interactions with Amazon SageMaker and
    #'              any other AWS services needed. If not specified, the processor creates
    #'              one using the default AWS configuration chain (default: None).
    #' @param env (dict[str, str]): Environment variables to be passed to
    #'              the processing jobs (default: None).
    #' @param tags (list[dict]): List of tags to be passed to the processing job
    #'              (default: None). For more, see
    #'              \url{https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html}.
    #' @param network_config (:class:`~sagemaker.network.NetworkConfig`):
    #'              A :class:`~sagemaker.network.NetworkConfig`
    #'              object that configures network isolation, encryption of
    #'              inter-container traffic, security group IDs, and subnets (default: None).
    initialize = function(estimator_cls,
                          framework_version,
                          role,
                          instance_count,
                          instance_type,
                          py_version="py3",
                          image_uri=NULL,
                          command=NULL,
                          volume_size_in_gb=30,
                          volume_kms_key=NULL,
                          output_kms_key=NULL,
                          code_location=NULL,
                          max_runtime_in_seconds=NULL,
                          base_job_name=NULL,
                          sagemaker_session=NULL,
                          env=NULL,
                          tags=NULL,
                          network_config=NULL){
      if (is.null(command))
        command = "python"

      self$estimator_cls = estimator_cls
      self$framework_version = framework_version
      self$py_version = py_version

      # 1. To finalize/normalize the image_uri or base_job_name, we need to create an
      #    estimator_cls instance.
      # 2. We want to make it easy for children of FrameworkProcessor to override estimator
      #    creation via a function (to create FrameworkProcessors for Estimators that may have
      #    different signatures - like HuggingFace or others in future).
      # 3. Super-class __init__ doesn't (currently) do anything with these params besides
      #    storing them
      #
      # Therefore we'll init the superclass first and then customize the setup after:
      super$initialize(
        role=role,
        image_uri=image_uri,
        command=command,
        instance_count=instance_count,
        instance_type=instance_type,
        volume_size_in_gb=volume_size_in_gb,
        volume_kms_key=volume_kms_key,
        output_kms_key=output_kms_key,
        max_runtime_in_seconds=max_runtime_in_seconds,
        base_job_name=base_job_name,
        sagemaker_session=sagemaker_session,
        env=env,
        tags=tags,
        network_config=network_config)

      # This subclass uses the "code" input for actual payload and the ScriptProcessor parent's
      # functionality for uploading just a small entrypoint script to invoke it.
      self$.CODE_CONTAINER_INPUT_NAME = "entrypoint"

      self$code_location = (
        if (!is.null(code_location) && endsWith(code_location, "/")) {
          substr(code_location, 1, nchar(code_location)-1)
        } else {
          code_location}
      )
      if (is.null(image_uri) || is.null(base_job_name)){
        # For these default configuration purposes, we don't need the optional args:
        est = private$.create_estimator()
        if (is.null(image_uri))
          self$image_uri = est$training_image_uri()
        if (is.null(base_job_name)){
          self$base_job_name = est$base_job_name %||% attr(estimator_cls, "_framework_name")
          if (is.null(base_job_name))
            base_job_name = "framework-processor"
        }
      }
    },

    #' @description This object contains the normalized inputs, outputs and arguments needed
    #'              when using a ``FrameworkProcessor`` in a :class:`~sagemaker.workflow.steps.ProcessingStep`.
    #' @param code (str): This can be an S3 URI or a local path to a file with the framework
    #'              script to run. See the ``code`` argument in
    #'              `sagemaker.processing.FrameworkProcessor.run()`.
    #' @param source_dir (str): Path (absolute, relative, or an S3 URI) to a directory wit
    #'              any other processing source code dependencies aside from the entrypoint
    #'              file (default: None). See the ``source_dir`` argument in
    #'              `sagemaker.processing.FrameworkProcessor.run()`
    #' @param dependencies (list[str]): A list of paths to directories (absolute or relative)
    #'              with any additional libraries that will be exported to the container
    #'              (default: []). See the ``dependencies`` argument in
    #'              `sagemaker.processing.FrameworkProcessor.run()`.
    #' @param git_config (dict[str, str]): Git configurations used for cloning files. See the
    #'              `git_config` argument in `sagemaker.processing.FrameworkProcessor.run()`.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'              the processing job. These must be provided as
    #'              :class:`~sagemaker.processing.ProcessingInput` objects (default: None).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'              the processing job. These can be specified as either path strings or
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects (default: None).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'              processing job (default: None).
    #' @param job_name (str): Processing job name. If not specified, the processor generates
    #'              a default job name, based on the base job name and current timestamp.
    #' @return Returns a RunArgs object.
    get_run_args = function(code,
                            source_dir=NULL,
                            dependencies=NULL,
                            git_config=NULL,
                            inputs=NULL,
                            outputs=NULL,
                            arguments=NULL,
                            job_name=NULL){
      # When job_name is None, the job_name to upload code (+payload) will
      # differ from job_name used by run().
      ll = private$.pack_and_upload_code(
        code, source_dir, dependencies, git_config, job_name, inputs
      )
      names(ll) = c("s3_runproc_sh", "inputs", "job_name")

      return(RunArgs$new(
        ll$s3_runproc_sh,
        inputs=ll$inputs,
        outputs=outputs,
        arguments=arguments)
      )
    },

    #' @description Runs a processing job.
    #' @param code (str): This can be an S3 URI or a local path to a file with the
    #'              framework script to run.Path (absolute or relative) to the local
    #'              Python source file which should be executed as the entry point
    #'              to training. When `code` is an S3 URI, ignore `source_dir`,
    #'              `dependencies, and `git_config`. If ``source_dir`` is specified,
    #'              then ``code`` must point to a file located at the root of ``source_dir``.
    #' @param source_dir (str): Path (absolute, relative or an S3 URI) to a directory
    #'              with any other processing source code dependencies aside from the entry
    #'              point file (default: None). If ``source_dir`` is an S3 URI, it must
    #'              point to a tar.gz file. Structure within this directory are preserved
    #'              when processing on Amazon SageMaker (default: None).
    #' @param dependencies (list[str]): A list of paths to directories (absolute
    #'              or relative) with any additional libraries that will be exported
    #'              to the container (default: []). The library folders will be
    #'              copied to SageMaker in the same folder where the entrypoint is
    #'              copied. If 'git_config' is provided, 'dependencies' should be a
    #'              list of relative locations to directories with any additional
    #'              libraries needed in the Git repo (default: None).
    #' @param git_config (dict[str, str]): Git configurations used for cloning
    #'              files, including ``repo``, ``branch``, ``commit``,
    #'              ``2FA_enabled``, ``username``, ``password`` and ``token``. The
    #'              ``repo`` field is required. All other fields are optional.
    #'              ``repo`` specifies the Git repository where your training script
    #'              is stored. If you don't provide ``branch``, the default value
    #'              'master' is used. If you don't provide ``commit``, the latest
    #'              commit in the specified branch is used.
    #'              results in cloning the repo specified in 'repo', then
    #'              checkout the 'master' branch, and checkout the specified
    #'              commit.
    #'              ``2FA_enabled``, ``username``, ``password`` and ``token`` are
    #'              used for authentication. For GitHub (or other Git) accounts, set
    #'              ``2FA_enabled`` to 'True' if two-factor authentication is
    #'              enabled for the account, otherwise set it to 'False'. If you do
    #'              not provide a value for ``2FA_enabled``, a default value of
    #'              'False' is used. CodeCommit does not support two-factor
    #'              authentication, so do not provide "2FA_enabled" with CodeCommit
    #'              repositories.
    #'              For GitHub and other Git repos, when SSH URLs are provided, it
    #'              doesn't matter whether 2FA is enabled or disabled; you should
    #'              either have no passphrase for the SSH key pairs, or have the
    #'              ssh-agent configured so that you will not be prompted for SSH
    #'              passphrase when you do 'git clone' command with SSH URLs. When
    #'              HTTPS URLs are provided: if 2FA is disabled, then either token
    #'              or username+password will be used for authentication if provided
    #'              (token prioritized); if 2FA is enabled, only token will be used
    #'              for authentication if provided. If required authentication info
    #'              is not provided, python SDK will try to use local credentials
    #'              storage to authenticate. If that fails either, an error message
    #'              will be thrown.
    #'              For CodeCommit repos, 2FA is not supported, so '2FA_enabled'
    #'              should not be provided. There is no token in CodeCommit, so
    #'              'token' should not be provided too. When 'repo' is an SSH URL,
    #'              the requirements are the same as GitHub-like repos. When 'repo'
    #'              is an HTTPS URL, username+password will be used for
    #'              authentication if they are provided; otherwise, python SDK will
    #'              try to use either CodeCommit credential helper or local
    #'              credential storage for authentication.
    #' @param inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for
    #'              the processing job. These must be provided as
    #'              :class:`~sagemaker.processing.ProcessingInput` objects (default: None).
    #' @param outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for
    #'              the processing job. These can be specified as either path strings or
    #'              :class:`~sagemaker.processing.ProcessingOutput` objects (default: None).
    #' @param arguments (list[str]): A list of string arguments to be passed to a
    #'              processing job (default: None).
    #' @param wait (bool): Whether the call should wait until the job completes (default: True).
    #' @param logs (bool): Whether to show the logs produced by the job.
    #'              Only meaningful when wait is True (default: True).
    #' @param job_name (str): Processing job name. If not specified, the processor generates
    #'              a default job name, based on the base job name and current timestamp.
    #' @param experiment_config (dict[str, str]): Experiment management configuration.
    #'              Dictionary contains three optional keys:
    #'              'ExperimentName', 'TrialName', and 'TrialComponentDisplayName'.
    #' @param kms_key (str): The ARN of the KMS key that is used to encrypt the
    #'              user code file (default: None).
    run = function(code,
                   source_dir=NULL,
                   dependencies=NULL,
                   git_config=NULL,
                   inputs=NULL,
                   outputs=NULL,
                   arguments=NULL,
                   wait=TRUE,
                   logs=TRUE,
                   job_name=NULL,
                   experiment_config=NULL,
                   kms_key=NULL){
      ll = private$.pack_and_upload_code(
        code, source_dir, dependencies, git_config, job_name, inputs)
      names(ll) = c("s3_runproc_sh", "inputs", "job_name")
      # Submit a processing job.
      super$run(
        code=s3_runproc_sh,
        inputs=inputs,
        outputs=outputs,
        arguments=arguments,
        wait=wait,
        logs=logs,
        job_name=job_name,
        experiment_config=experiment_config,
        kms_key=kms_key)
    }
  ),

  private = list(

    # Instantiate the Framework Estimator that backs this Processor
    .create_estimator = function(entry_point="",
                                 source_dir=NULL,
                                 dependencies=NULL,
                                 git_config=NULL){
      return(self$estimator_cls(
        framework_version=self$framework_version,
        py_version=self.py_version,
        entry_point=entry_point,
        source_dir=source_dir,
        dependencies=dependencies,
        git_config=git_config,
        code_location=self$code_location,
        enable_network_isolation=FALSE,  # True -> uploads to input channel. Not what we want!
        image_uri=self$image_uri,
        role=self$role,
        # Estimator instance_count doesn't currently matter to FrameworkProcessor, and the
        # SKLearn Framework Estimator requires instance_type==1. So here we hard-wire it to 1,
        # but if it matters in future perhaps we could take self.instance_count here and have
        # SKLearnProcessor override this function instead:
        instance_count=1,
        instance_type=self$instance_type,
        sagemaker_session=self$sagemaker_session,
        debugger_hook_config=FALSE,
        disable_profiler=TRUE)
      )
    },

    # Pack local code bundle and upload to Amazon S3.
    .pack_and_upload_code = function(code,
                                     source_dir,
                                     dependencies,
                                     git_config,
                                     job_name,
                                     inputs){
      if (startsWith(code, "s3://"))
        return(list(code, inputs, job_name))

      if (is.null(job_name))
        job_name = private$.generate_current_job_name(job_name)

      estimator = private$.upload_payload(
        code,
        source_dir,
        dependencies,
        git_config,
        job_name)
      inputs = private$.patch_inputs_with_payload(
        inputs,
        estimator$.hyperparameters[["sagemaker_submit_directory"]])
      local_code = get_config_value("local.local_code", self$sagemaker_session$config)
      if (self$sagemaker_session$local_mode && local_code)
        RuntimeError$new(
          "SageMaker Processing Local Mode does not currently support 'local code' mode. ",
          "Please use a LocalSession created with disable_local_code=TRUE, or leave ",
          "sagemaker_session unspecified when creating your Processor to have one set up ",
          "automatically."
        )

      # Upload the bootstrapping code as s3://.../jobname/source/runproc.sh.
      entrypoint_s3_uri = gsub(estimator$uploaded_code$s3_prefix,
        "sourcedir.tar.gz",
        "runproc.sh"
      )
      script = estimator$uploaded_code$script_name
      s3_runproc_sh = S3Uploader$new()$upload_string_as_file_body(
        private$.generate_framework_script(script),
        desired_s3_uri=entrypoint_s3_uri,
        sagemaker_session=self$sagemaker_session
      )

      LOGGER$info("runproc.sh uploaded to %s", s3_runproc_sh)

      return(list(s3_runproc_sh, inputs, job_name))
    },

    # Generate the framework entrypoint file (as text) for a processing job.
    # This script implements the "framework" functionality for setting up your code:
    #   Untar-ing the sourcedir bundle in the ```code``` input; installing extra
    # runtime dependencies if specified; and then invoking the ```command``` and
    # ```code``` configured for the job.
    # Args:
    #   user_script (str): Relative path to ```code``` in the source bundle
    # - e.g. 'process.py'.
    .generate_framework_script = function(user_script){
      return(sprintf(
        paste(
          c('#!/bin/bash',
            'cd /opt/ml/pro,cessing/input/code/',
            'tar -xzf sourcedir.tar.gz',
            '# Exit on any error. SageMaker uses error code to mark failed job.',
            'set -e',
            'if [[ -f \'requirements.txt\' ]]; then',
            '    # Some py3 containers has typing, which may breaks pip install',
            '    pip uninstall --yes typing',
            '    pip install -r requirements.txt',
            'fi',
            '%s %s "$@"'), collapse ="\n"),
        paste(self$command, collapse = " "),
        user_script)
      )
    },

    # Upload payload sourcedir.tar.gz to S3.
    .upload_payload = function(entry_point,
                               source_dir,
                               dependencies,
                               git_config,
                               job_name){
      # A new estimator instance is required, because each call to ScriptProcessor.run() can
      # use different codes.
      estimator = private$.create_estimator(
        entry_point=entry_point,
        source_dir=source_dir,
        dependencies=dependencies,
        git_config=git_config)

      estimator$.prepare_for_training(job_name=job_name)
      LOGGER$info(
        "Uploaded %s to %s",
        estimator$source_dir,
        estimator$.hyperparameters[["sagemaker_submit_directory"]])
      return(estimator)
    },

    # Add payload sourcedir.tar.gz to processing input.
    # This method follows the same mechanism in ScriptProcessor.
    .patch_inputs_with_payload = function(inputs,
                                          s3_payload){
      # Follow the exact same mechanism that ScriptProcessor does, which
      # is to inject the S3 code artifact as a processing input. Note that
      # framework processor take-over /opt/ml/processing/input/code for
      # sourcedir.tar.gz, and let ScriptProcessor to place runproc.sh under
      # /opt/ml/processing/input/{self._CODE_CONTAINER_INPUT_NAME}.
      #
      # See:
      # - ScriptProcessor._CODE_CONTAINER_BASE_PATH, ScriptProcessor._CODE_CONTAINER_INPUT_NAME.
      # - https://github.com/aws/sagemaker-python-sdk/blob/ \
      #   a7399455f5386d83ddc5cb15c0db00c04bd518ec/src/sagemaker/processing.py#L425-L426

      if (is.null(inputs))
        inputs = list()
        inputs = c(inputs,
          ProcessingInput$new(
            input_name="code",
            source=s3_payload,
            destination="/opt/ml/processing/input/code/")
          )
      return(inputs)
    },

    # Framework processor override for setting processing job entrypoint.
    # Args:
    #   command ([str]): Ignored in favor of self.framework_entrypoint_command
    # user_script_name (str): A filename with an extension.
    .set_entrypoint = function(command,
                               user_script_name){
      user_script_location = fs::path(
        self$.CODE_CONTAINER_BASE_PATH, self$.CODE_CONTAINER_INPUT_NAME, user_script_name
      )
      self$entrypoint = c(self$framework_entrypoint_command, user_script_location)
    }
  )
)
DyfanJones/sagemaker-r-common documentation built on June 14, 2022, 10:31 p.m.