How to Efficiently Train Multiple ML Models on a Spark Cluster

ZebraX Data TeamSeptember 15 2021

Introduction to Spark

Spark is a fast, distributed analytics computing engine for large-scale data processing and machine learning modeling. Spark allows data sharing between processing steps through in-memory processing of data pipelines, hence it can run workloads a lot faster if compared to Hadoop. Spark also comes with powerful libraries that can be used together in the same application: SparkSQL, SparkML, SparkStreaming, and SparkGraphX.

spark.png

SparkML provides multiple types of machine learning algorithms, including classification, regression, clustering, and collaborative filtering, as well as supporting functionality such as model evaluation and data import. It is designed to perform machine learning algorithms on distributed data.

Training a Multiple Partitioned Models

Imagine if we are asked to build a different model for each partition in our dataset, e.g. product types, regions, etc. Let's say we have 100 different partitions, training 100 models in a serial mode will be time-consuming and not efficient. Parallel modeling can provide tremendous benefit in building a variety of models by speeding up the process through parallelization so the model building process becomes more efficient.

Building a machine learning model in Python, using its popular and powerful libraries such as Scikit-learn, Pandas, and Numpy, is easy and fun. But Python has its limitations when it comes to large-scale data processing. Python libraries are not set up to work in a distributed environment and are bound on a single compute machine with one contiguous block of memory, which make it infeasible to be used for training on large-scale distributed dataset.

With the help of Spark Pandas UDFs, we can partition our dataset by the specified category (column) into smaller chunks, then Spark will distribute each chunk across the cluster nodes and execute the Python code on each node. This feature also allows us to bring many Python functionalities from its massive libraries such as SciPy, NumPy, Scikit-learn, and StatsModels, just to name a few. Now, with Pandas UDFs there is a new scale of compute that is available for Python developers.

In order to bring powerful functionalities of Python’s libraries into Spark cluster framework, we need a new feature that enables us to run our Python code in a distributed environment. Fortunately, Databricks introduced the Pandas UDFs (a.k.a. Vectorized UDFs) feature in the Apache Spark 2.3 release that substantially improves the performance and usability of user-defined functions (UDFs) in Python. Instead of pulling the full dataset into memory on the driver node, we can use Pandas UDFs to distribute the dataset across a Spark cluster, and use Apache Arrow to translate between the Spark and Pandas data frame representations.

Pandas UDF Concept

Pandas UDFs are features that are executed by Spark which enable Python code to run in a distributed environment. User-Defined Functions or UDFs use Apache Arrow to transfer data and Pandas to work with the data [1]. There are two types of Pandas UDFs, scalar and grouped map UDFs. Scalar is used for vectorizing scalar operations while grouped map is designed for splitting combine pattern analysis based on conditions specified in the groupby operator [2]. For using Pandas UDFs, both scalar and grouped map are adding the @pandas.udf decorator above the function. But, they have some differences like input and output for scalar is Pandas series while grouped map is Pandas dataframe, scalar has no grouping semantics while grouped map defined by groupby clause, and the return types for scalar is a DataType and grouped map is a StructType.

  • Grouped Map Feature

ZX AnalytiX application uses the grouped map feature, which was initially introduced in Spark 2.3. In summary, this feature allows us to partition a Spark dataframe into smaller chunks that are converted to Pandas dataframes before being passed to our Python code. UDFs are executed on worker nodes, enabling existing Python code to now be executed in a distributed manner. ​​However, a few conditions need to be met for a problem to match this use case. The first prerequisite is that our data can be well partitioned by a categorical column (key), and the second prerequisite is that our data needs to be represented as a single Spark dataframe. The grouped map feature is able to distribute a single dataframe across worker nodes using a partitioning column, and only supports the groupby operation on a single object.

  • Inherent Risks of using pandas UDFs

The first issue is that debugging Pandas UDFs was much more difficult than working with Pandas dataframes on the driver node, because we can no longer use print statements to trace execution flow. Current best practice for testing Pandas UDFs is to first use the toPandas method on a small dataframe and write a groupby apply function that runs on the driver node before trying to distribute the operation for the full dataset.

The second issue is that the schema for the returned Pandas dataframe needs to be specified before the function is defined, since the schema is specified as part of the grouped map annotation. With this, you cannot store your trained model objects directly to the dataframe. You need to convert your model into a string representation first and then store it in the dataframe, this link [3] gives you a clear example of how to do that.

Another trick you can do is to store your trained models into a local/cloud storage. By doing this you can easily manage your models, evaluate, and use it whenever we want to make predictions on new data.

Parallel Modeling using Spark Pandas UDF

When working on a parallel model training, which aims to parallelize the training of machine learning model processes, the suitable Pandas UDFs type is grouped map Pandas UDFs. With this, we can build a machine learning model on each partition using the split-apply-combine pattern that grouped map UDFs offer.

The steps are simple, first is defining a Python function for the training process, then put the pandas udf decorator to it, so it will enable the parallelization. Then define its output schema, and the function as a PandasUDFType.GROUPED_MAP. That’s it, now we are ready to apply this Pandas UDF to our dataset. Applying the Pandas UDF is also simple. The most important part that we need to do first is to ensure that our dataset can be well divided by a categorical column that we are interested in. If it is not, then we should do data conditioning before applying the function. The next step is to use the groupby method to your dataframe, specify the partition column, and apply the Pandas UDF. Spark will execute the Pandas UDF on these smaller chunks of pandas dataframe and combine all the results back to one Spark dataframe.

Quick Demo Using ZX AnalytiX

Let’s take a look at the ZX AnalytiX platform below. In this platform we already implement the Pandas UDF grouped map feature to scale up the application of parallel modeling. Using this platform, we can build a model pipeline and then train the model without doing any coding, which is great to speed up our work in model building. In this simple demo, we use toy data of activities of assets in a mining area. The asset types are car and wheel loader. These two assets have very different behaviors/activities, so it is a right choice to build a different model for these two assets. The goal of our model is to predict the asset status, whether it is idle, active, or off. Now, we already understand our problem and goal, then we can continue to build our model pipeline. It starts with choosing the data from the database, choosing a partition column (the asset_type), and choosing feature engineering, feature selection, and feature handling methods if necessary. The final configuration of our pipeline is to choose the algorithm(s) that we want to use for our models. We can choose several algorithms for this training phase. The application will pick over the best model for each partition based on the user specified evaluation metric. After we execute the models, the following result shows that there are 3 best models for each partition (two models represent model for each partition, and one model represents model without partition) that were obtained from the parallel model training process.

experiment report.png

ZX AnalytiX Partition Model - Experiment Report

Below is a short clip of the ZebraX DMAA Platform for this simple use case.

ZX-AnalytiX-Google-Chrome-2021-GIF.gif

Summary

Pandas UDF is a great feature that enables us to run our Python codes in a distributed environment. We showed that we can benefit from this functionality to build multiple models in parallel manner, but the application of Pandas UDF is not limited to model training. It can be used for feature generation, statistical testing, and other variety of applications for data science.

At ZebraX we have used Pandas UDFs to improve our application capabilities, so that we can produce greater data-driven insights to our customers. By leveraging Spark capabilities for distributing the computing process and powerful functionalities of Python’s libraries, we aim to add more capacities to our Advanced Analytics platform to tackle diverse business problems using data-driven approaches.

Author:

  1. Gilang Samudra (Senior Data Scientist)
  2. Puteri Aulia Indrasti (Data Engineer)

References

https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html https://patrickroos.org/2019/02/25/training-many-scikit-learn-models-using-pyspark-and-pandas-udfs/

Take the next step
Get a Demo/Diagnostic for your Organization's Digital Transformation Journey