Classification using Python and Spark

This post assumes that you are aware of the basics of Machine Learning and Apache Spark.

Apache Spark

Apache Spark is a powerful open source processing engine built around speed, ease of use, and sophisticated analytics. It was originally developed at UC Berkeley in 2009. It runs on top of existing hadoop cluster and access hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter etc.

Spark’s MLlib

Apache Spark’s Machine Learning Library (MLlib) allows data scientists to focus on their data problems and models instead of solving the complexities surrounding distributed data (such as infrastructure, configurations and so on).
In this post we will implement a Classification model using Logistic Regression and PySpark (Spark’s Python API). The dataset we will use is the classic Titanic dataset.

Initializing Spark

A generalized way to use PySpark in Jupyter notebook is to use findspark. The init method will add the pyspark module to PATH during run time.

1
2
import findspark
findspark.init('/home/jinudaniel74/spark-2.1.1-bin-hadoop2.7')

In earlier versions of spark, SparkContext was the entry point for Spark. But from Spark 2.0, DataSet and Dataframe API’s are becoming new standard API’s replacing RDD. The entry point for Data Set and DataFrame is SparkSession.
SparkSession follows builder factory design pattern. The below is the code to create a spark session.

1
2
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

Loading the DataSet

The data set we will use is the classic Titanic Dataset which will help us in predicting if a given passenger survived the titanic disaster given a set of features about the passenger.

1
2
data = spark.read.csv('titanic.csv', inferSchema=True, header=True)
data.printSchema()

Feature Engineering

Let’s select the columns that we think played a role in determining if the passenger survived. Also let us drop columns that dont have any values in any one the columns.

1
2
3
4
columns = ['Survived','Pclass','Sex','Age','SibSp','Parch',
 			'Fare','Cabin','Embarked']

final_data = data.select(columns).na.drop()

StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels], ordered by label frequencies, so the most frequent label gets index 0. After converting them to indices, let’s apply one hot encoding.

1
2
3
4
5
6
7
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models.

1
2
3
assembler = VectorAssembler(inputCols=['Pclass','SexVec','Age','SibSp','Parch',
					'Fare', 'EmbarkedVec'], 
					outputCol='features')

Training the model

A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. More on Pipeline can be found in the documentation

1
2
3
4
5
6
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

log_model = LogisticRegression(labelCol='Survived', featuresCol='features')
pipeline = Pipeline(stages=[gender_indexer, gender_encoder, embark_indexer, 
				embark_encoder, assembler, log_model])

Split the data into Train and Test set in the ratio 70:30 and fit the model on training data.

1
2
train_data, test_data = final_data.randomSplit([0.7, 0.3])
train_fit = pipeline.fit(train_data)

Validating the model

How do we evaluate our model that we just trained? How do we know if the model we trained is a good one?
We have various metrics like accuracy, precision, recall etc which will help us in evaluating our model. Let’s try to find the accuracy of the model that we trained on.

1
2
3
4
5
6
7
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = train_fit.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='Survived')
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(accuracy)

0.76

An accuracy of 76% on the test set is not bad. We can tune the model to increase the accuracy or we can use other classification methods like DecisionTreeClassifier, RandomForestClassifier etc.
The accuracy increased to 80% when I used DecisionTreeClassifier with default hyperparameters.
The entire code is available as a jupyter notebook on my github repo

References