Spark: What’s up?
I have been reading news about “The amazing blasting fast” data processing engine Apache Spark and it has been on my TODO list for quite some time and I guess it’s about time to give it a try.
First, let’s see what’s up with Spark what problem does it solve and where does it excel.
Spark claims its power is that it provides very high speeds for computations on data that fit in memory(of a single device or a cluster) and ease of handling of distributed data (Clusters).
What I also find really powerful with Apache Spark is its API that is easy to use and is well ported to many languages including Scala, Java, Python, and R.
So I guess the typical use case for Spark is when you have a lot of data and you want to perform a set of operations on it and you’re capable of creating a cluster somewhere.
Setting up Spark and its Python API
I remember when I had installed Spark before on my Ubuntu system, it wasn’t a pleasant experience or at least it was not straightforward (Not as complex as installing an Nvidia driver though).
However, these days are over since I started depending on Anaconda and I advise you to do so. The way Anaconda handles dependencies and its huge repository of packages makes it quite easy to have an environment tailored to your needs. So let’s install PySpark via Conda.
conda install -c conda-forge pyspark
And that’s it, you’ll have PySpark installed within minutes and you can test it by running:
from pyspark import SparkContext
Amazing, right?
First steps with Spark
Being a huge Pandas fan makes me tempted to use Spark since it offers a similar API to Pandas. so let’s pick a data set and start analyzing it using PySpark and I’m interested in seeing how does it differ from typical Pandas workflows.
I’m going to use a data set from Numerai since it’s tabular data with a classification target and provided in CSV format.
You can read more about the origin of this data on their website and maybe apply this tutorial to start competing in the tournament.
Let’s start playing with it.
Loading Data
First, we need to initialize SparkContext which configures spark and we will configure it as a local instance using all available cores.
from pyspark import SparkContext sc = SparkContext("local[*]", "Numerai APP")
Next, we need to import SQLContext which includes Data Frames and we will read our CSV file.
from pyspark import SQLContext spark = SQLContext(sc) df = spark.read.csv("numerai_training_data.csv", header=True, inferSchema=True)
Now let’s take a look at the data Spark has loaded.
Exploring Data
In [3]: df.columns Out[3]: ['id', 'era', 'data_type', 'feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9', 'feature10', 'feature11', 'feature12', 'feature13', 'feature14', 'feature15', 'feature16', 'feature17', 'feature18', 'feature19', 'feature20', 'feature21', 'feature22', 'feature23', 'feature24', 'feature25', 'feature26', 'feature27', 'feature28', 'feature29', 'feature30', 'feature31', 'feature32', 'feature33', 'feature34', 'feature35', 'feature36', 'feature37', 'feature38', 'feature39', 'feature40', 'feature41', 'feature42', 'feature43', 'feature44', 'feature45', 'feature46', 'feature47', 'feature48', 'feature49', 'feature50', 'target']
So it appears that we have 50 features and a single target variable.
Let’s take a look at a sample of the data.
In [4]: df.show(1) 2018-05-03 19:07:27 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf| id| era|data_type|feature1|feature2|feature3|feature4|feature5|feature6|feature7|feature8|feature9|feature10|feature11|feature12|feature13|feature14|feature15|feature16|feature17|feature18|feature19|feature20|feature21|feature22|feature23|feature24|feature25|feature26|feature27|feature28|feature29|feature30|feature31|feature32|feature33|feature34|feature35|feature36|feature37|feature38|feature39|feature40|feature41|feature42|feature43|feature44|feature45|feature46|feature47|feature48|feature49|feature50|target||n2b2e3dd163cb422|era1| train| 0.43487| 0.44645| 0.25802| 0.37149| 0.62235| 0.67451| 0.68103| 0.45166| 0.82443| 0.38744| 0.56873| 0.51336| 0.58306| 0.48586| 0.59625| 0.50267| 0.48245| 0.48319| 0.47992| 0.44705| 0.51512| 0.56923| 0.70767| 0.45668| 0.6259| 0.42978| 0.58577| 0.376| 0.29311| 0.54153| 0.47237| 0.44449| 0.46055| 0.5531| 0.49626| 0.62609| 0.53893| 0.43073| 0.45922| 0.39688| 0.4778| 0.52962| 0.42439| 0.5168| 0.46297| 0.57426| 0.57946| 0.49646| 0.48968| 0.54194| 1|only showing top 1 row
What about the data types that Spark inferred?
In [5]: df.dtypes Out[5]: [('id', 'string'), ('era', 'string'), ('data_type', 'string'), ('feature1', 'double'), ('feature2', 'double'), ('feature3', 'double'), ('feature4', 'double'), ('feature5', 'double'), ('feature6', 'double'), ('feature7', 'double'), ('feature8', 'double'), ('feature9', 'double'), ('feature10', 'double'), ('feature11', 'double'), ('feature12', 'double'), ('feature13', 'double'), ('feature14', 'double'), ('feature15', 'double'), ('feature16', 'double'), ('feature17', 'double'), ('feature18', 'double'), ('feature19', 'double'), ('feature20', 'double'), ('feature21', 'double'), ('feature22', 'double'), ('feature23', 'double'), ('feature24', 'double'), ('feature25', 'double'), ('feature26', 'double'), ('feature27', 'double'), ('feature28', 'double'), ('feature29', 'double'), ('feature30', 'double'), ('feature31', 'double'), ('feature32', 'double'), ('feature33', 'double'), ('feature34', 'double'), ('feature35', 'double'), ('feature36', 'double'), ('feature37', 'double'), ('feature38', 'double'), ('feature39', 'double'), ('feature40', 'double'), ('feature41', 'double'), ('feature42', 'double'), ('feature43', 'double'), ('feature44', 'double'), ('feature45', 'double'), ('feature46', 'double'), ('feature47', 'double'), ('feature48', 'double'), ('feature49', 'double'), ('feature50', 'double'), ('target', 'int')]
Let’s look at some summary statistics for some of the features and the target.
How does the target variable look like?
In [8]: df.describe('target').show() +-------+------------------+ |summary| target| +-------+------------------+ | count| 393613| | mean|0.4999225127218867| | stddev|0.5000006291385471| | min| 0| | max| 1| +-------+------------------+
What about for example feature32?
In [9]: df.describe('feature32').show() +-------+-------------------+ |summary| feature32| +-------+-------------------+ | count| 393613| | mean|0.48513697860589206| | stddev| 0.1102409251895831| | min| 0.00536| | max| 1.0| +-------+-------------------+
The “Era” variable is a string variable so it’s probably a categorical variable, what does it hold?
In [27]: df.select('era').distinct().count() Out[27]: 120
120 different values, what are they?
In [33]: df.select('era').groupby('era').count().show() +------+-----+ | era|count| +------+-----+ | era30| 3292| | era28| 3030| | era91| 3444| | era97| 3813| | era58| 4001| |era108| 3075| | era83| 3359| | era13| 2752| | era71| 3148| | era99| 3857| |era116| 3416| | era47| 3647| | era82| 3662| | era45| 3413| | era3| 1640| | era85| 3514| | era96| 3214| | era92| 3370| | era75| 3022| | era93| 3586| +------+-----+ only showing top 20 rows
What are the top eras by occurrence?
In [40]: df.select('era').groupby('era').count().orderBy('count', ascending=False).show() +------+-----+ | era|count| +------+-----+ | era55| 4041| | era54| 4007| | era58| 4001| | era59| 3966| | era50| 3932| |era102| 3928| | era98| 3871| | era52| 3867| | era53| 3867| | era99| 3857| | era57| 3826| | era97| 3813| |era101| 3806| | era61| 3800| | era51| 3785| | era49| 3781| |era100| 3777| |era106| 3764| | era56| 3747| |era103| 3741| +------+-----+ only showing top 20 rows
What are the data types?
In [42]: df.select('data_type').distinct().show() +---------+ |data_type| +---------+ | train| +---------+
So this is pretty much a useless column in the training set.
What is the mean for each feature grouped by the era?
In [47]: df.groupBy('era').mean().show| era| avg(feature1)| avg(feature2)| avg(feature3)| avg(feature4)| avg(feature5)| avg(feature6)| avg(feature7)| avg(feature8)| avg(feature9)| avg(feature10)| avg(feature11)| avg(feature12)| avg(feature13)| avg(feature14)| avg(feature15)| avg(feature16)| avg(feature17)| avg(feature18)| avg(feature19)| avg(feature20)| avg(feature21)| avg(feature22)| avg(feature23)| avg(feature24)| avg(feature25)| avg(feature26)| avg(feature27)| avg(feature28)| avg(feature29)| avg(feature30)| avg(feature31)| avg(feature32)| avg(feature33)| avg(feature34)| avg(feature35)| avg(feature36)| avg(feature37)| avg(feature38)| avg(feature39)| avg(feature40)| avg(feature41)| avg(feature42)| avg(feature43)| avg(feature44)| avg(feature45)| avg(feature46)| avg(feature47)| avg(feature48)| avg(feature49)| avg(feature50)|avg(target)||era30|0.4672037575941682|0.4585846294046168|0.481838116646415|0.4452093134872422|0.5098163942891866|0.48495227521263706|0.5240403948967194|0.5585424088699866|0.5167403341433782|0.469467214459295|0.5179730589307424|0.4966605619684082|0.48897555589307395|0.5061033475091133|0.5405062788578373|0.45752346597812804|0.5184621719319564|0.4770609781287982|0.5388221476306165|0.48378031287970846|0.5223059842041315|0.5399116008505479|0.5325082077764293|0.45513347812879557|0.503795501215067|0.4937804921020667|0.5658011877278245|0.4804644046172537|0.4623523238153107|0.5132233323207784|0.4959696962332936|0.4854901123936817|0.5260988122721753|0.46107154617254037|0.4825312970838389|0.5339099301336574|0.5222398116646426|0.5142803584447148|0.4966200880923455|0.47680153705953754|0.5284867284325645|0.49497383353584434|0.4833738517618467|0.4881642800729042|0.5327173147023094|0.5300611208991504|0.5204471233292842|0.5203861452004868|0.4954813669501819|0.5205513213851761| 0.5|only showing top 1 row
The questions can keep going on but the idea is clear, the API is close to Pandas’ API and is simple and explicit.
Enough with the exploration and let’s proceed to build a model on the data.
Building a classification model on the data
I’m going to split the tournament training data into two splits: Train and test.
First, let’s preprocess our categorical variable era into an integer and get rid of the data type column and the ID.
In [57]: df = df.drop('data_type', 'id')
In [58]: from pyspark.ml.feature import StringIndexer In [59]: indexer = StringIndexer(inputCol="era", outputCol="era_int") In [60]: df = indexer.fit(df).transform(df) In [61]: df.select('era_int').show() +-------+ |era_int| +-------+ | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| | 117.0| +-------+ only showing top 20 rows In [62]: df.select('era').show() +----+ | era| +----+ |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| |era1| +----+ only showing top 20 rows
The StringIndexer gives each class an integer value based on its frequency in the data.
Finally, let’s drop the original Era variable.
In [64]: df = df.drop('era')
We need to have a features column that includes the names of our features so let’s prepare that.
In [90]: features = df.columns In [91]: features.remove('target')
Now there’s that thing I don’t really understand with Spark, which is to assemble features in a features vector by using VectorAssembler instead of specifying the column names, so let’s try that.
In [116]: from pyspark.ml.feature import VectorAssembler In [117]: feature_assembler = VectorAssembler(inputCols=features, outputCol='features') In [118]: df = feature_assembler.transform(df)
Let’s split our data into 70% train and 30% test.
In [65]: train, test = df.randomSplit([0.7, 0.3]) In [66]: train.count() Out[66]: 275666 In [67]: test.count() Out[67]: 117947
Looks just about right.
Now let’s see how would a LogisticRegression model perform on such data.
Import the classifier and initiate it.
In [122]: from pyspark.ml.classification import LogisticRegression In [123]: lr = LogisticRegression(labelCol='target', featuresCol='features')
Fit the training data
In [124]: model = lr.fit(train)
Now we’ll instantiate an Evaluator from Spark and evaluate the resulting model.
In [146]: from pyspark.ml.evaluation import MulticlassClassificationEvaluator In [147]: evaluator = MulticlassClassificationEvaluator( ...: labelCol="target", predictionCol="prediction", metricName="accuracy") ...: predictions = model.transform(test) In [148]: accuracy = evaluator.evaluate(predictions) In [149]: accuracy Out[149]: 0.5155174016273627
The accuracy is 51% which is reasonable given the nature of Numerai’s data.
Conclusion
Spark has a nice API that looks familiar if you know Pandas’ API.
Training a classifier from MLLib on your data is simple almost as simple as scikit-learn’s API.
Spark also provides evaluators for performance that are simple to use and do the job.
So I guess Spark is a powerful tool for data analytics with an upper hand on “big” data and distributed systems.