R/template_pipeline_inference.R

# NOTE: This code has been modified from AWS Stepfunctions Python:
# https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/main/src/stepfunctions/template/pipeline/inference.py

#' @import R6

#' @include steps_sagemaker.R
#' @include steps_states.R
#' @include workflow_stepfunctions.R
#' @include template_pipeline_common.R

#' @title InferencePipeline for Sagemaker
#' @description Creates a standard inference pipeline with the following steps in order:
#' \itemize{
#'   \item{Train preprocessor}
#'   \item{Create preprocessor model}
#'   \item{Transform input data using preprocessor model}
#'   \item{Train estimator}
#'   \item{Create estimator model}
#'   \item{Endpoint configuration}
#'   \item{Deploy estimator model}
#' }
#' @export
InferencePipeline = R6Class("InferencePipeline",
  inherit = WorkflowTemplate,
  public = list(

    #' @description Initialize InferencePipeline class
    #' @param preprocessor (sagemaker.estimator.EstimatorBase): The estimator use
    #'              to preprocess and transform the training data.
    #' @param estimator (sagemaker.estimator.EstimatorBase): The estimator to use
    #'              for training. Can be a BYO estimator, Framework estimator or Amazon
    #'              algorithm estimator.
    #' @param role (str): An AWS IAM role (either name or full Amazon Resource Name (ARN)).
    #'              This role is used to create, manage, and execute the Step Functions workflows.
    #' @param inputs : Information about the training data. Please refer to the `fit()`
    #'              method of the associated estimator, as this can take any of the following forms:
    #' \itemize{
    #'     \item{(str) - The S3 location where training data is saved.}
    #'     \item{(list[str, str] or list[str, `sagemaker.inputs.TrainingInput`]) - If
    #'           using multiple channels for training data, you can specify a list mapping
    #'           channel names to strings or `sagemaker.inputs.TrainingInput` objects.}
    #'     \item{(`sagemaker.inputs.TrainingInput`) - Channel configuration for S3 data
    #'           sources that can provide additional information about the training dataset.
    #'          See `sagemaker.inputs.TrainingInput` for full details.}
    #'     \item{(`sagemaker.amazon.amazon_estimator.RecordSet`) - A collection of Amazon
    #'          `Record` objects serialized and stored in S3. For use with an estimator
    #'          for an Amazon algorithm.}
    #'     \item{(list[`sagemaker.amazon.amazon_estimator.RecordSet`]) - A list of
    #'          `sagemaker.amazon.amazon_estimator.RecordSet` objects, where each instance
    #'          is a different channel of training data.}
    #' }
    #' @param s3_bucket (str): S3 bucket under which the output artifacts from
    #'              the training job will be stored. The parent path used is built
    #'              using the format: ``s3://{s3_bucket}/{pipeline_name}/models/{job_name}/``.
    #'              In this format, `pipeline_name` refers to the keyword argument provided
    #'              for TrainingPipeline. If a `pipeline_name` argument was not provided,
    #'              one is auto-generated by the pipeline as `training-pipeline-<timestamp>`.
    #'              Also, in the format, `job_name` refers to the job name provided
    #'              when calling the :meth:`TrainingPipeline.run()` method.
    #' @param client (SFN.Client, optional): \code{\link[paws]{sfn}} client to use for creating and
    #'              interacting with the training pipeline in Step Functions. (default: None)
    #' @param compression_type (str, optional): Compression type (Gzip/None) of the
    #'              file for TransformJob. (default:None)
    #' @param content_type (str, optional): Content type (MIME) of the document to
    #'              be used in preprocessing script. See SageMaker documentation for
    #'              more details. (default:None)
    #' @param pipeline_name (str, optional): Name of the pipeline. This name will
    #'              be used to name jobs (if not provided when calling execute()),
    #'              models, endpoints, and S3 objects created by the pipeline. If a
    #'              `pipeline_name` argument was not provided, one is auto-generated
    #'              by the pipeline as `training-pipeline-<timestamp>`. (default:None)
    initialize = function(preprocessor,
                          estimator,
                          inputs,
                          s3_bucket,
                          role,
                          client=NULL,
                          compression_type=NULL,
                          content_type=NULL,
                          pipeline_name=NULL){
      self$preprocessor = preprocessor
      self$estimator = estimator
      self$inputs = inputs
      self$s3_bucket = s3_bucket
      self$compression_type = compression_type
      self$content_type = content_type
      self$pipeline_name = pipeline_name

      if (is.null(self.pipeline_name))
        self$pipeline_name = sprintf('inference-pipeline-%s',private$.generate_timestamp())

      self$definition = self$build_workflow_definition()
      self$input_template = private$.extract_input_template(self$definition)

      workflow = Workflow$new(
        name=self$pipeline_name,
        definition=self$definition,
        role=role,
        format_json=TRUE,
        client=client)

      super$initialize(s3_bucket=s3_bucket, workflow=workflow, role=role, client=client)


      # get methods from R6sagemaker
      private$.PipelineModel = pkg_method("PipelineModel", "R6sagemaker")
      private$.Model = pkg_method("Model", "R6sagemaker")
    },

    #' @description Build the workflow definition for the inference pipeline with
    #'              all the states involved.
    #' @return :class:`~stepfunctions.steps.states.Chain`: Workflow definition as
    #'              a chain of states involved in the the inference pipeline.
    build_workflow_definition = function(){
      default_name = self$pipeline_name

      instance_type = self$preprocessor$instance_type
      instance_count = self$preprocessor$instance_count

      # Preprocessor for feature transformation
      preprocessor_train_step = TrainingStep$new(
        StepId$TrainPreprocessor,
        estimator=self$preprocessor,
        job_name=paste0(default_name, '/preprocessor-source'),
        data=self$inputs
      )
      preprocessor_model = self$preprocessor$create_model()
      preprocessor_model_step = ModelStep$new(
        StepId$CreatePreprocessorModel,
        instance_type=instance_type,
        model=preprocessor_model,
        model_name=default_name
      )
      preprocessor_transform_step = TransformStep$new(
        StepId$TransformInput,
        transformer=self$preprocessor$transformer(
          instance_count=instance_count,
          instance_type=instance_type,
          max_payload=20),
        job_name=default_name,
        model_name=default_name,
        data=self$inputs[['train']],
        compression_type=self$compression_type,
        content_type=self$content_type
      )

      # Training
      instance_type = self$estimator$instance_type
      instance_count = self$estimator$instance_count

      training_step = TrainingStep$new(
        StepId$Train,
        estimator=self$estimator,
        job_name=paste0(default_name, '/estimator-source'),
        data=self$inputs
      )

      pipeline_model = private$.PipelineModel$new(
        name='PipelineModel',
        role=self$estimator$role,
        models=list(
          self$preprocessor$create_model(),
          self$estimator$create_model())
      )
      pipeline_model_step = ModelStep$new(
        StepId$CreatePipelineModel,
        instance_type=instance_type,
        model=preprocessor_model,
        model_name=default_name
      )
      pipeline_model_step$parameters = private$.pipeline_model_config(instance_type, pipeline_model)

      deployable_model = private$.Model$new(model_data='', image_uri='')

      # Deployment
      endpoint_config_step = EndpointConfigStep(
        StepId$ConfigureEndpoint,
        endpoint_config_name=default_name,
        model_name=default_name,
        initial_instance_count=instance_count,
        instance_type=instance_type
      )

      deploy_step = EndpointStep$new(
        StepId$Deploy,
        endpoint_name=default_name,
        endpoint_config_name=default_name,
      )

      return(Chain$new(list(
        preprocessor_train_step,
        preprocessor_model_step,
        preprocessor_transform_step,
        training_step,
        pipeline_model_step,
        endpoint_config_step,
        deploy_step))
      )
    },

    #' @description Run the inference pipeline.
    #' @param job_name (str, optional): Name for the training job. This is also
    #'              used as suffix for the preprocessing job as `preprocess-<job_name>`.
    #'              If one is not provided, a job name will be auto-generated. (default: None)
    #' @param hyperparameters (list, optional): Hyperparameters for the estimator
    #'              training. (default: None)
    #' @return :R:class:`~stepfunctions.workflow.Execution`: Running instance of the inference pipeline.
    execute = function(job_name=NULL,
                       hyperparameters=NULL){

      inputs = self$input_template

      if (!is.null(hyperparameters))
        inputs[[StepId$Train]][['HyperParameters']] = hyperparameters

      if (is.null(job_name))
        job_name = sprintf('%s-%s','inference-pipeline', private$.generate_timestamp())

      # Configure preprocessor
      inputs[[StepId$TrainPreprocessor]][['TrainingJobName']] = 'preprocessor-' + job_name
      inputs[[StepId$TrainPreprocessor]][['OutputDataConfig']][['S3OutputPath']] = sprintf(
        's3://%s/%s/models',
        self$s3_bucket,
        self$workflow$name
      )
      inputs[[StepId$TrainPreprocessor]][['DebugHookConfig']][['S3OutputPath']] = sprintf(
        's3://%s/%s/models/debug',
        self$s3_bucket,
        self$workflow.name
      )
      inputs[[StepId$CreatePreprocessorModel]][['PrimaryContainer']][['ModelDataUrl']] = sprintf(
        '%s/%s/output/model.tar.gz',
        inputs[[StepId$TrainPreprocessor]][['OutputDataConfig']][['S3OutputPath']],
        inputs[[StepId$TrainPreprocessor]][['TrainingJobName']]
      )
      inputs[[StepId$CreatePreprocessorModel]][['ModelName']] = inputs[[StepId$TrainPreprocessor]][['TrainingJobName']]
      inputs[[StepId$TransformInput]][['ModelName']] = inputs[[StepId$CreatePreprocessorModel]][['ModelName']]
      inputs[[StepId$TransformInput]][['TransformJobName']] = inputs[StepId.CreatePreprocessorModel.value]['ModelName']
      inputs[[StepId$TransformInput]][['TransformOutput']][['S3OutputPath']] = sprintf(
        's3://%s/{pipeline_name}/{transform_job}/transform',
        s3_bucket=self.s3_bucket,
        pipeline_name=self$workflow.name,
        transform_job='preprocessor-transform-' + job_name
      )

      inputs[[StepId$TrainPreprocessor]] = private$.replace_sagemaker_job_name(
        inputs[[StepId$TrainPreprocessor]], inputs[[StepId$TrainPreprocessor]][['TrainingJobName']])

      # Configure training and model
      inputs[[StepId$Train]][['TrainingJobName']] = paste0('estimator-', job_name)
      inputs[[StepId$Train]][['InputDataConfig']] = list(list(
        'ChannelName'='train',
        'DataSource'=list(
          'S3DataSource'=list(
            'S3DataDistributionType'='FullyReplicated',
            'S3DataType'='S3Prefix',
            'S3Uri'=inputs[[StepId$TransformInput]][['TransformOutput']][['S3OutputPath']]
            )
          )
        ))

      inputs[[StepId$Train]][['OutputDataConfig']][['S3OutputPath']] = sprintf(
        's3://%s/%s/models',
        self$s3_bucket,
        self$workflow$name
      )
      inputs[[StepId.Train]][['DebugHookConfig']][['S3OutputPath']] = sprintf(
        's3://{s3_bucket}/{pipeline_name}/models/debug',
        self$s3_bucket,
        self$workflow$name
      )
      inputs[[StepId$CreatePipelineModel]][['ModelName']] = job_name
      inputs[[StepId$Train]] = private$.replace_sagemaker_job_name(
        inputs[[StepId$Train]], inputs[[StepId$Train]][['TrainingJobName']])

      # Configure pipeline model
      inputs[[StepId$CreatePipelineModel]][['Containers']][[1]][['ModelDataUrl']] = (
        inputs[[StepId$CreatePreprocessorModel]][['PrimaryContainer']][['ModelDataUrl']])
      inputs[[StepId$CreatePipelineModel]][['Containers']][[2]][['ModelDataUrl']] = sprintf(
        '%s/%s/output/model.tar.gz',
        inputs[[StepId$Train]][['OutputDataConfig']][['S3OutputPath']],
        inputs[[StepId$Train]][['TrainingJobName']]
      )

      # Configure endpoint
      inputs[[StepId$ConfigureEndpoint]][['EndpointConfigName']] = job_name
      for (variant in inputs[[StepId$ConfigureEndpoint]][['ProductionVariants']]){
        variant[['ModelName']] = job_name}
      inputs[[StepId$Deploy]][['EndpointConfigName']] = job_name
      inputs[[StepId$Deploy]][['EndpointName']] = job_name

      return(self$workflow$execute(inputs=inputs, name=job_name))
    }
  ),
  private = list(
    .PipelineModel = NULL,
    .Model = NULL,

    .pipeline_model_config = function(instance_type, pipeline_model){
      return(list(
        'ModelName'='pipeline-model',
        'Containers'=pipeline_model$pipeline_container_def(instance_type),
        'ExecutionRoleArn'=pipeline_model$role)
      )
    },

    .replace_sagemaker_job_name = function(config, job_name){
      if (sagemaker$model$JOB_NAME_PARAM_NAME %in% names(config[['HyperParameters']]))
        config[['HyperParameters']][[sagemaker$model$JOB_NAME_PARAM_NAME]] = sprintf('"%s"', job_name)
      return(config)
    }
  ),
  lock_objects=F
)
DyfanJones/aws-step-functions-data-science-sdk-r documentation built on Dec. 17, 2021, 5:31 p.m.