# NOTE: This code has been modified from AWS Stepfunctions Python:
# https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/main/src/stepfunctions/template/pipeline/inference.py
#' @import R6
#' @include steps_sagemaker.R
#' @include steps_states.R
#' @include workflow_stepfunctions.R
#' @include template_pipeline_common.R
#' @title InferencePipeline for Sagemaker
#' @description Creates a standard inference pipeline with the following steps in order:
#' \itemize{
#' \item{Train preprocessor}
#' \item{Create preprocessor model}
#' \item{Transform input data using preprocessor model}
#' \item{Train estimator}
#' \item{Create estimator model}
#' \item{Endpoint configuration}
#' \item{Deploy estimator model}
#' }
#' @export
InferencePipeline = R6Class("InferencePipeline",
inherit = WorkflowTemplate,
public = list(
#' @description Initialize InferencePipeline class
#' @param preprocessor (sagemaker.estimator.EstimatorBase): The estimator use
#' to preprocess and transform the training data.
#' @param estimator (sagemaker.estimator.EstimatorBase): The estimator to use
#' for training. Can be a BYO estimator, Framework estimator or Amazon
#' algorithm estimator.
#' @param role (str): An AWS IAM role (either name or full Amazon Resource Name (ARN)).
#' This role is used to create, manage, and execute the Step Functions workflows.
#' @param inputs : Information about the training data. Please refer to the `fit()`
#' method of the associated estimator, as this can take any of the following forms:
#' \itemize{
#' \item{(str) - The S3 location where training data is saved.}
#' \item{(list[str, str] or list[str, `sagemaker.inputs.TrainingInput`]) - If
#' using multiple channels for training data, you can specify a list mapping
#' channel names to strings or `sagemaker.inputs.TrainingInput` objects.}
#' \item{(`sagemaker.inputs.TrainingInput`) - Channel configuration for S3 data
#' sources that can provide additional information about the training dataset.
#' See `sagemaker.inputs.TrainingInput` for full details.}
#' \item{(`sagemaker.amazon.amazon_estimator.RecordSet`) - A collection of Amazon
#' `Record` objects serialized and stored in S3. For use with an estimator
#' for an Amazon algorithm.}
#' \item{(list[`sagemaker.amazon.amazon_estimator.RecordSet`]) - A list of
#' `sagemaker.amazon.amazon_estimator.RecordSet` objects, where each instance
#' is a different channel of training data.}
#' }
#' @param s3_bucket (str): S3 bucket under which the output artifacts from
#' the training job will be stored. The parent path used is built
#' using the format: ``s3://{s3_bucket}/{pipeline_name}/models/{job_name}/``.
#' In this format, `pipeline_name` refers to the keyword argument provided
#' for TrainingPipeline. If a `pipeline_name` argument was not provided,
#' one is auto-generated by the pipeline as `training-pipeline-<timestamp>`.
#' Also, in the format, `job_name` refers to the job name provided
#' when calling the :meth:`TrainingPipeline.run()` method.
#' @param client (SFN.Client, optional): \code{\link[paws]{sfn}} client to use for creating and
#' interacting with the training pipeline in Step Functions. (default: None)
#' @param compression_type (str, optional): Compression type (Gzip/None) of the
#' file for TransformJob. (default:None)
#' @param content_type (str, optional): Content type (MIME) of the document to
#' be used in preprocessing script. See SageMaker documentation for
#' more details. (default:None)
#' @param pipeline_name (str, optional): Name of the pipeline. This name will
#' be used to name jobs (if not provided when calling execute()),
#' models, endpoints, and S3 objects created by the pipeline. If a
#' `pipeline_name` argument was not provided, one is auto-generated
#' by the pipeline as `training-pipeline-<timestamp>`. (default:None)
initialize = function(preprocessor,
estimator,
inputs,
s3_bucket,
role,
client=NULL,
compression_type=NULL,
content_type=NULL,
pipeline_name=NULL){
self$preprocessor = preprocessor
self$estimator = estimator
self$inputs = inputs
self$s3_bucket = s3_bucket
self$compression_type = compression_type
self$content_type = content_type
self$pipeline_name = pipeline_name
if (is.null(self.pipeline_name))
self$pipeline_name = sprintf('inference-pipeline-%s',private$.generate_timestamp())
self$definition = self$build_workflow_definition()
self$input_template = private$.extract_input_template(self$definition)
workflow = Workflow$new(
name=self$pipeline_name,
definition=self$definition,
role=role,
format_json=TRUE,
client=client)
super$initialize(s3_bucket=s3_bucket, workflow=workflow, role=role, client=client)
# get methods from R6sagemaker
private$.PipelineModel = pkg_method("PipelineModel", "R6sagemaker")
private$.Model = pkg_method("Model", "R6sagemaker")
},
#' @description Build the workflow definition for the inference pipeline with
#' all the states involved.
#' @return :class:`~stepfunctions.steps.states.Chain`: Workflow definition as
#' a chain of states involved in the the inference pipeline.
build_workflow_definition = function(){
default_name = self$pipeline_name
instance_type = self$preprocessor$instance_type
instance_count = self$preprocessor$instance_count
# Preprocessor for feature transformation
preprocessor_train_step = TrainingStep$new(
StepId$TrainPreprocessor,
estimator=self$preprocessor,
job_name=paste0(default_name, '/preprocessor-source'),
data=self$inputs
)
preprocessor_model = self$preprocessor$create_model()
preprocessor_model_step = ModelStep$new(
StepId$CreatePreprocessorModel,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
preprocessor_transform_step = TransformStep$new(
StepId$TransformInput,
transformer=self$preprocessor$transformer(
instance_count=instance_count,
instance_type=instance_type,
max_payload=20),
job_name=default_name,
model_name=default_name,
data=self$inputs[['train']],
compression_type=self$compression_type,
content_type=self$content_type
)
# Training
instance_type = self$estimator$instance_type
instance_count = self$estimator$instance_count
training_step = TrainingStep$new(
StepId$Train,
estimator=self$estimator,
job_name=paste0(default_name, '/estimator-source'),
data=self$inputs
)
pipeline_model = private$.PipelineModel$new(
name='PipelineModel',
role=self$estimator$role,
models=list(
self$preprocessor$create_model(),
self$estimator$create_model())
)
pipeline_model_step = ModelStep$new(
StepId$CreatePipelineModel,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
pipeline_model_step$parameters = private$.pipeline_model_config(instance_type, pipeline_model)
deployable_model = private$.Model$new(model_data='', image_uri='')
# Deployment
endpoint_config_step = EndpointConfigStep(
StepId$ConfigureEndpoint,
endpoint_config_name=default_name,
model_name=default_name,
initial_instance_count=instance_count,
instance_type=instance_type
)
deploy_step = EndpointStep$new(
StepId$Deploy,
endpoint_name=default_name,
endpoint_config_name=default_name,
)
return(Chain$new(list(
preprocessor_train_step,
preprocessor_model_step,
preprocessor_transform_step,
training_step,
pipeline_model_step,
endpoint_config_step,
deploy_step))
)
},
#' @description Run the inference pipeline.
#' @param job_name (str, optional): Name for the training job. This is also
#' used as suffix for the preprocessing job as `preprocess-<job_name>`.
#' If one is not provided, a job name will be auto-generated. (default: None)
#' @param hyperparameters (list, optional): Hyperparameters for the estimator
#' training. (default: None)
#' @return :R:class:`~stepfunctions.workflow.Execution`: Running instance of the inference pipeline.
execute = function(job_name=NULL,
hyperparameters=NULL){
inputs = self$input_template
if (!is.null(hyperparameters))
inputs[[StepId$Train]][['HyperParameters']] = hyperparameters
if (is.null(job_name))
job_name = sprintf('%s-%s','inference-pipeline', private$.generate_timestamp())
# Configure preprocessor
inputs[[StepId$TrainPreprocessor]][['TrainingJobName']] = 'preprocessor-' + job_name
inputs[[StepId$TrainPreprocessor]][['OutputDataConfig']][['S3OutputPath']] = sprintf(
's3://%s/%s/models',
self$s3_bucket,
self$workflow$name
)
inputs[[StepId$TrainPreprocessor]][['DebugHookConfig']][['S3OutputPath']] = sprintf(
's3://%s/%s/models/debug',
self$s3_bucket,
self$workflow.name
)
inputs[[StepId$CreatePreprocessorModel]][['PrimaryContainer']][['ModelDataUrl']] = sprintf(
'%s/%s/output/model.tar.gz',
inputs[[StepId$TrainPreprocessor]][['OutputDataConfig']][['S3OutputPath']],
inputs[[StepId$TrainPreprocessor]][['TrainingJobName']]
)
inputs[[StepId$CreatePreprocessorModel]][['ModelName']] = inputs[[StepId$TrainPreprocessor]][['TrainingJobName']]
inputs[[StepId$TransformInput]][['ModelName']] = inputs[[StepId$CreatePreprocessorModel]][['ModelName']]
inputs[[StepId$TransformInput]][['TransformJobName']] = inputs[StepId.CreatePreprocessorModel.value]['ModelName']
inputs[[StepId$TransformInput]][['TransformOutput']][['S3OutputPath']] = sprintf(
's3://%s/{pipeline_name}/{transform_job}/transform',
s3_bucket=self.s3_bucket,
pipeline_name=self$workflow.name,
transform_job='preprocessor-transform-' + job_name
)
inputs[[StepId$TrainPreprocessor]] = private$.replace_sagemaker_job_name(
inputs[[StepId$TrainPreprocessor]], inputs[[StepId$TrainPreprocessor]][['TrainingJobName']])
# Configure training and model
inputs[[StepId$Train]][['TrainingJobName']] = paste0('estimator-', job_name)
inputs[[StepId$Train]][['InputDataConfig']] = list(list(
'ChannelName'='train',
'DataSource'=list(
'S3DataSource'=list(
'S3DataDistributionType'='FullyReplicated',
'S3DataType'='S3Prefix',
'S3Uri'=inputs[[StepId$TransformInput]][['TransformOutput']][['S3OutputPath']]
)
)
))
inputs[[StepId$Train]][['OutputDataConfig']][['S3OutputPath']] = sprintf(
's3://%s/%s/models',
self$s3_bucket,
self$workflow$name
)
inputs[[StepId.Train]][['DebugHookConfig']][['S3OutputPath']] = sprintf(
's3://{s3_bucket}/{pipeline_name}/models/debug',
self$s3_bucket,
self$workflow$name
)
inputs[[StepId$CreatePipelineModel]][['ModelName']] = job_name
inputs[[StepId$Train]] = private$.replace_sagemaker_job_name(
inputs[[StepId$Train]], inputs[[StepId$Train]][['TrainingJobName']])
# Configure pipeline model
inputs[[StepId$CreatePipelineModel]][['Containers']][[1]][['ModelDataUrl']] = (
inputs[[StepId$CreatePreprocessorModel]][['PrimaryContainer']][['ModelDataUrl']])
inputs[[StepId$CreatePipelineModel]][['Containers']][[2]][['ModelDataUrl']] = sprintf(
'%s/%s/output/model.tar.gz',
inputs[[StepId$Train]][['OutputDataConfig']][['S3OutputPath']],
inputs[[StepId$Train]][['TrainingJobName']]
)
# Configure endpoint
inputs[[StepId$ConfigureEndpoint]][['EndpointConfigName']] = job_name
for (variant in inputs[[StepId$ConfigureEndpoint]][['ProductionVariants']]){
variant[['ModelName']] = job_name}
inputs[[StepId$Deploy]][['EndpointConfigName']] = job_name
inputs[[StepId$Deploy]][['EndpointName']] = job_name
return(self$workflow$execute(inputs=inputs, name=job_name))
}
),
private = list(
.PipelineModel = NULL,
.Model = NULL,
.pipeline_model_config = function(instance_type, pipeline_model){
return(list(
'ModelName'='pipeline-model',
'Containers'=pipeline_model$pipeline_container_def(instance_type),
'ExecutionRoleArn'=pipeline_model$role)
)
},
.replace_sagemaker_job_name = function(config, job_name){
if (sagemaker$model$JOB_NAME_PARAM_NAME %in% names(config[['HyperParameters']]))
config[['HyperParameters']][[sagemaker$model$JOB_NAME_PARAM_NAME]] = sprintf('"%s"', job_name)
return(config)
}
),
lock_objects=F
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.