The ability to predict that a particular customer is at a high risk of churning, while there is still time to do something about it, represents a huge potential revenue source for every online business. Depending on the industry and business objective, the problem statement can be multi-layered. The following are some business objectives based on this strategy:

This post discusses how you can orchestrate an end-to-end churn prediction model across each step: data preparation, experimenting with a baseline model and hyperparameter optimization (HPO), training and tuning, and registering the best model. You can manage your Amazon SageMaker training and inference workflows using Amazon SageMaker Studio and the SageMaker Python SDK. SageMaker offers all the tools you need to create high-quality data science solutions.

SageMaker helps data scientists and developers prepare, build, train, and deploy high-quality machine learning (ML) models quickly by bringing together a broad set of capabilities purpose-built for ML.

Studio provides a single, web-based visual interface where you can perform all ML development steps, improving data science team productivity by up to 10 times.

Amazon SageMaker Pipelines is a tool for building ML pipelines that takes advantage of direct SageMaker integration. With Pipelines, you can easily automate the steps of building a ML model, catalog models in the model registry, and use one of several templates provided in SageMaker Projects to set up continuous integration and continuous delivery (CI/CD) for the end-to-end ML lifecycle at scale.

After the model is trained, you can use Amazon SageMaker Clarify to identify and limit bias and explain predictions to business stakeholders. You can share these automated reports with business and technical teams for downstream target campaigns or to determine features that are key differentiators for customer lifetime value.

By the end of this post, you should have enough information to successfully use this end-to-end template using Pipelines to train, tune, and deploy your own predictive analytics use case. The full instructions are available on the GitHub repo.

In this solution, your entry point is the Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from Studio, see View, Track, and Execute SageMaker Pipelines in SageMaker Studio.

The following diagram illustrates the high-level architecture of the data science workflow.

After you create the Studio domain, select your user name and choose Open Studio. A web-based IDE opens that allows you to store and collect all the things that you need—whether it’s code, notebooks, datasets, settings, or project folders.

Pipelines is integrated directly with SageMaker, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because Pipelines is a fully managed service, which means that it creates and manages resources for you. For more information the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker service page.

For this use case, you use the following components for the fully automated model development process:

A SageMaker pipeline is a series of interconnected steps that is defined by a JSON pipeline definition. This pipeline definition encodes a pipeline using a directed acyclic graph (DAG). This DAG gives information on the requirements for and relationships between each step of your pipeline. The structure of a pipeline’s DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step’s output are passed as the input to another step.

For this post, our use case is a classic ML problem that aims to understand what various marketing strategies based on consumer behavior we can adopt to increase customer retention for a given retail store. The following diagram illustrates the complete ML workflow for the churn prediction use case.

Let’s go through the accelerated ML workflow development process in detail.

To follow along with this post, you need to download and save the sample dataset in the default Amazon Simple Storage Service (Amazon S3) bucket associated with your SageMaker session, and in the S3 bucket of your choice. For rapid experimentation or baseline model building, you can save a copy of the dataset under your home directory in Amazon Elastic File System (Amazon EFS) and follow the Jupyter notebook Customer_Churn_Modeling.ipynb.

The following screenshot shows the sample set with the target variable as retained 1, if customer is assumed to be active, or 0 otherwise.

Run the following code in a Studio notebook to preprocess the dataset and upload it to your own S3 bucket:

import boto3
import pandas as pd
import numpy as np ## Preprocess the dataset
def preprocess_data(file_path):  df = pd.read_csv(file_path) ## Convert to datetime columns df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce') df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce') ## Drop Rows with null values df = df.dropna()    ## Create Column which gives the days between the last order and the first order df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days ## Create Column which gives the days between when the customer record was created and the first order df['created'] = pd.to_datetime(df['created']) df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days ## Drop Columns df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True) ## Apply one hot encoding on favday and city columns df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city']) return df ## Set the required configurations
model_name = "churn_model"
env = "dev"
## S3 Bucket
default_bucket = "customer-churn-sm-pipeline"
## Preprocess the dataset
storedata = preprocess_data(f"s3://{default_bucket}/data/storedata_total.csv")

With Studio notebooks with elastic compute, you can now easily run multiple training and tuning jobs. For this use case, you use the SageMaker built-in XGBoost algorithm and SageMaker HPO with objective function as "binary:logistic" and "eval_metric":"auc".

def split_datasets(df): y=df.pop("retained") X_pre = df y_pre = y.to_numpy().reshape(len(y),1) feature_names = list(X_pre.columns) X= np.concatenate((y_pre,X_pre),axis=1) np.random.shuffle(X) train,validation,test=np.split(X,[int(.7*len(X)),int(.85*len(X))]) return feature_names,train,validation,test # Split dataset
feature_names,train,validation,test = split_datasets(storedata) # Save datasets in Amazon S3
pd.DataFrame(train).to_csv(f"s3://{default_bucket}/data/train/train.csv",header=False,index=False)
pd.DataFrame(validation).to_csv(f"s3://{default_bucket}/data/validation/validation.csv",header=False,index=False)
pd.DataFrame(test).to_csv(f"s3://{default_bucket}/data/test/test.csv",header=False,index=False)

Train, tune, and find the best candidate model with the following code:

# Training and Validation Input for SageMaker Training job
s3_input_train = TrainingInput( s3_data=f"s3://{default_bucket}/data/train/",content_type="csv")
s3_input_validation = TrainingInput( s3_data=f"s3://{default_bucket}/data/validation/",content_type="csv") # Hyperparameter used
fixed_hyperparameters = { "eval_metric":"auc", "objective":"binary:logistic", "num_round":"100", "rate_drop":"0.3", "tweedie_variance_power":"1.4"
} # Use the built-in SageMaker algorithm sess = sagemaker.Session()
container = sagemaker.image_uris.retrieve("xgboost",region,"0.90-2") estimator = sagemaker.estimator.Estimator( container, role, instance_count=1, hyperparameters=fixed_hyperparameters, instance_type="ml.m4.xlarge", output_path="s3://{}/output".format(default_bucket), sagemaker_session=sagemaker_session
) hyperparameter_ranges = { "eta": ContinuousParameter(0, 1), "min_child_weight": ContinuousParameter(1, 10), "alpha": ContinuousParameter(0, 2), "max_depth": IntegerParameter(1, 10),
}
objective_metric_name = "validation:auc"
tuner = HyperparameterTuner(
estimator, objective_metric_name,
hyperparameter_ranges,max_jobs=10,max_parallel_jobs=2) # Tune
tuner.fit({ "train":s3_input_train, "validation":s3_input_validation },include_cls_metadata=False) ## Explore the best model generated
tuning_job_result = boto3.client("sagemaker").describe_hyper_parameter_tuning_job( HyperParameterTuningJobName=tuner.latest_tuning_job.job_name
) job_count = tuning_job_result["TrainingJobStatusCounters"]["Completed"]
print("%d training jobs have completed" %job_count)
## 10 training jobs have completed ## Get the best training job from pprint import pprint
if tuning_job_result.get("BestTrainingJob",None): print("Best Model found so far:") pprint(tuning_job_result["BestTrainingJob"])
else: print("No training jobs have reported results yet.")

ML-4931-HyperParameterTuning

After you establish a baseline, you can use Amazon SageMaker Debugger for offline model analysis. Debugger is a capability within SageMaker that automatically provides visibility into the model training process for real-time and offline analysis. Debugger saves the internal model state at periodic intervals, which you can analyze in real time during training and offline after the training is complete. For this use case, you use the explainability tool SHAP (SHapley Additive exPlanation) and the native integration of SHAP with Debugger. Refer to the following notebook for detailed analysis.

The following summary plot explains the positive and negative relationships of the predictors with the target variable. For example, the top variable here, esent, is defined as number of emails sent. This plot is made of all data points in the training set. Blue indicates dragging the final output to class 0, and pink represents class 1. Key influencing features are ranked in descending order.

ML-4931-Shapley-Plots

Now you can proceed with the deploy and manage step of the ML workflow.

Develop and automate the workflow

Let’s start with the project structure:

  • /customer-churn-model – Project name
  • /data – Dataset
  • /pipelines – Code for SageMaker pipeline components
  • SageMaker_Pipelines_project.ipynb – Allows you to create and run the ML workflow
  • Customer_Churn_Modeling.ipynb – Baseline model development notebook

ML-4931-projectstructure

Under <project-name>/pipelines/customerchurn, you can see the following Python scripts:

  • Preprocess.py – Integrates with SageMaker Processing for feature engineering
  • Evaluate.py – Allows model metrics calculation, in this case auc_score
  • Generate_config.py – Allows dynamic configuration needed for the downstream Clarify job for model explainability
  • Pipeline.py – Templatized code for the Pipelines ML workflow

ML-4931-CodeStructure

Let’s walk through every step in the DAG and how they run. The steps are similar to when we first prepared the data.

Perform data readiness with the following code:

# processing step for feature engineering sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type=processing_instance_type, instance_count=processing_instance_count, sagemaker_session=sagemaker_session, role=role, ) step_process = ProcessingStep( name="ChurnModelProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\ destination=f"s3://{default_bucket}/output/train" ), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\ destination=f"s3://{default_bucket}/output/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\ destination=f"s3://{default_bucket}/output/test") ], code=f"s3://{default_bucket}/input/code/preprocess.py", )

Train, tune, and find the best candidate model:

# training step for generating model artifacts model_path = f"s3://{default_bucket}/output" image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) fixed_hyperparameters = { "eval_metric":"auc", "objective":"binary:logistic", "num_round":"100", "rate_drop":"0.3", "tweedie_variance_power":"1.4" } xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, hyperparameters=fixed_hyperparameters, output_path=model_path, base_job_name=f"churn-train", sagemaker_session=sagemaker_session, role=role, ) hyperparameter_ranges = { "eta": ContinuousParameter(0, 1), "min_child_weight": ContinuousParameter(1, 10), "alpha": ContinuousParameter(0, 2), "max_depth": IntegerParameter(1, 10), } objective_metric_name = "validation:auc"

You can add a model tuning step (TuningStep) in the pipeline, which automatically invokes a hyperparameter tuning job (see the following code). The hyperparameter tuning finds the best version of a model by running many training jobs on the dataset using the algorithm and the ranges of hyperparameters that you specified. You can then register the best version of the model into the model registry using the RegisterModel step.

## Direct Integration for HPO step_tuning = TuningStep( name = "ChurnHyperParameterTuning", tuner = HyperparameterTuner(xgb_train, objective_metric_name, hyperparameter_ranges, max_jobs=2, max_parallel_jobs=2), inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv", ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv", ), }, )

ML-4931-SM_PipelineHPO

After you tune the model, depending on the tuning job objective metrics, you can use branching logic when orchestrating the workflow. For this post, the conditional step for model quality check is as follows:

# condition step for evaluating model quality and branching execution cond_lte = ConditionGreaterThan( left=JsonGet( step=step_eval, property_file=evaluation_report, json_path="classification_metrics.auc_score.value" ), right=0.75, )

The best candidate model is registered for batch scoring using the RegisterModel step:

step_register = RegisterModel( name="RegisterChurnModel", estimator=xgb_train, model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"), content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.large"], transform_instances=["ml.m5.large"], model_package_group_name=model_package_group_name, model_metrics=model_metrics, )

Now that the model is trained, let’s see how Clarify helps us understand what features the models base their predictions on. You can create an analysis_config.json file dynamically per workflow run using the generate_config.py utility. You can version and track the config file per pipeline runId and store it in Amazon S3 for further references. Initialize the dataconfig and modelconfig files as follows:

data_config = sagemaker.clarify.DataConfig( s3_data_input_path=f's3://{args.default_bucket}/output/train/train.csv', s3_output_path=args.bias_report_output_path, label=0, headers= ['target','esent','eopenrate','eclickrate','avgorder','ordfreq','paperless','refill','doorstep','first_last_days_diff','created_first_days_diff','favday_Friday','favday_Monday','favday_Saturday','favday_Sunday','favday_Thursday','favday_Tuesday','favday_Wednesday','city_BLR','city_BOM','city_DEL','city_MAA'], dataset_type="text/csv", ) model_config = sagemaker.clarify.ModelConfig( model_name=args.modelname, instance_type=args.clarify_instance_type, instance_count=1, accept_type="text/csv", ) model_predicted_label_config = sagemaker.clarify.ModelPredictedLabelConfig(probability_threshold=0.5) bias_config = sagemaker.clarify.BiasConfig( label_values_or_threshold=[1], facet_name="doorstep", facet_values_or_threshold=[0], )

After you add the Clarify step as a postprocessing job using sagemaker.clarify.SageMakerClarifyProcessor in the pipeline, you can see a detailed feature and bias analysis report per pipeline run.

ML-4931-ClarifyReport

ML-4931-SM-UI-1

ML-4931-SM-UI-2

As the final step of the pipeline workflow, you can use the TransformStep step for offline scoring. Pass in the transformer instance and the TransformInput with the batch_data pipeline parameter defined earlier:

# step to perform batch transformation transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/ChurnTransform" ) step_transform = TransformStep( name="ChurnTransform", transformer=transformer, inputs=TransformInput(data=batch_data,content_type="text/csv") )

Finally, you can trigger a new pipeline run by choosing Start an execution on the Studio IDE interface.

ML-4931-SMPipeline-Execution

You can also describe a pipeline run or start the pipeline using the following notebook. The following screenshot shows our output.

ML-4931-SMPipeline-DescribeExecution

You can schedule your SageMaker model building pipeline runs using Amazon EventBridge. SageMaker model building pipelines are supported as a target in Amazon EventBridge. This allows you to trigger your pipeline to run based on any event in your event bus. EventBridge enables you to automate your pipeline runs and respond automatically to events such as training job or endpoint status changes. Events include a new file being uploaded to your S3 bucket, a change in status of your SageMaker endpoint due to drift, and Amazon Simple Notification Service (Amazon SNS) topics.

Conclusion

This post explained how to use SageMaker Pipelines with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy the best candidate model for churn prediction. For instructions on implementing this solution, see the GitHub repo. You can also clone and extend this solution with additional data sources for model retraining. We encourage you to reach out and discuss your ML use cases with your AWS account manager.

Additional references

For additional information, see the following resources:


About the Authors

Hyperedge- . IoT, Embedded Systems, Artificial Intelligence,Gayatri Ghanakota is a Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.

Hyperedge- . IoT, Embedded Systems, Artificial Intelligence,Sarita Joshi is a Senior Data Scientist with AWS Professional Services focused on supporting customers across industries including retail, insurance, manufacturing, travel, life sciences, media and entertainment, and financial services. She has several years of experience as a consultant advising clients across many industries and technical domains, including AI, ML, analytics, and SAP. Today, she is passionately working with customers to develop and implement machine learning and AI solutions at scale.

Read more about this on: AWS