Thursday, March 30, 2017

PREDICTING LOAN CREDIT RISK USING APACHE SPARK MACHINE LEARNING RANDOM FORESTS

source: https://mapr.com/blog/predicting-loan-credit-risk-using-apache-spark-machine-learning-random-forests/

July 12, 2016 | BY Carol McDonald
In this blog post, I’ll help you get started using Apache Spark’s spark.ml Random forests for classification of bank loan credit risk. Spark’s spark.ml library goal is to provide a set of APIs on top of DataFrames that help users create and tune machine learning workflows or pipelines. Using spark.ml with dataframes improves performance through intelligent optimizations.

CLASSIFICATION

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (for example whether a transaction is fraud or not fraud), based on labeled examples of known items (for example transactions known to be fraud or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck".
Let’s go through an example of Credit Risk for Bank Loans:
  • What are we trying to predict?
  • Whether a person will pay back a loan or not.
  • This is the Label: The Creditability of a person.
  • What are the “if questions” or properties that you can use to predict ?
  • An applicant’s demographic and socio-economic profile: Occupation, age, savings, marital status, savings...
  • These are the Features, to build a classifier model, you extract the features of interest that most contribute to the classification.

DECISION TREES

Decision trees create a model that predicts the class or label based on several input features. Decision trees work by evaluating an expression containing a feature at every node and selecting a branch to the next node based on the answer. A possible decision tree for predicting Credit Risk is shown below. The feature questions are the nodes, and the answers “yes” or “no” are the branches in the tree to the child nodes.
  • Q1: Is checking account balance > 200DM ?
    • no
    • Q2: Is Length of current employment > 1 year?
      • No
      • Not Creditable

RANDOM FORESTS

Ensemble learning algorithms combine multiple machine learning algorithms to obtain a better model. Random Forest is a popular ensemble learning method for Classification and regression. The algorithm builds a model consisting of multiple decision trees , based on different subsets of data at the training stage. Predictions are made by combining the output from all of the trees which reduces the variance, and improves the predictive accuracy. For Random Forest Classification each tree’s prediction is counted as a vote for one class. The label is predicted to be the class which receives the most votes.

ANALYZE CREDIT RISK WITH SPARK MACHINE LEARNING SCENARIO

Our data is from the German Credit Data Set which classifies people described by a set of attributes as good or bad credit risks. For each bank loan application we have the following information:
The german credit csv file has the following format :
1,1,18,4,2,1049,1,2,4,2,1,4,2,21,3,1,1,3,1,1,1
1,1,9,4,0,2799,1,3,2,3,1,2,1,36,3,1,2,3,2,1,1
1,2,12,2,9,841,2,4,2,2,1,4,1,23,3,1,1,2,1,1,1
In this scenario, we will build a random forest of decision trees to predict the label / classification of Creditable or not based on the following features:
  • Label → Creditable or Not Creditable (1 or 0)
  • Features → {balance, history, purpose…}

SOFTWARE

This tutorial will run on Spark 1.6.1
Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data file to your sandbox home directory /user/user01 using scp. (Note you may have to update the Spark version on you Sandbox) Start the spark shell with:
$spark-shell --master local[1]

LOAD AND PARSE THE DATA FROM A CSV FILE

First, we will import the machine learning packages.
(In the code boxes, comments are in Green and output is in Blue)
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import sqlContext.implicits._
import sqlContext._
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }
import org.apache.spark.ml.{ Pipeline, PipelineStage }
We use a Scala case class to define the Credit schema corresponding to a line in the csv data file.
**// define the Credit Schema**
case class Credit(
    creditability: Double,
    balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
    savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
    residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
    credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
  )
The functions below parse a line from the data file into the Credit class. A 1 is subtracted from some categorical values so that they all consistently start with 0.
**// function to create a  Credit class from an Array of Double**
def parseCredit(line: Array[Double]): Credit = {
    Credit(
      line(0),
      line(1) - 1, line(2), line(3), line(4) , line(5),
      line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
      line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
      line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
    )
  }
**// function to transform an RDD of Strings into an RDD of Double**
  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }
Below we load the data from the germancredit.csv file into an RDD of Strings. Then we use the map transformation on the rdd, which will apply the ParseRDD function to transform each String element in the RDD into an Array of Double . Then we use another map transformation, which will apply the ParseCredit function to transform each Array of Double in the RDD into an Array of Credit objects. The toDF() method transforms the RDD of Array[[Credit]] into a Dataframe with the Credit class schema.
**// load the data into a  RDD**
val creditDF= parseRDD(sc.textFile("germancredit.csv")).map(parseCredit).toDF().cache()
creditDF.registerTempTable("credit")
DataFrame printSchema() Prints the schema to the console in a tree format
**// Return the schema of this DataFrame**
creditDF.printSchema
 root
 |-- creditability: double (nullable = false)
 |-- balance: double (nullable = false)
 |-- duration: double (nullable = false)
 |-- history: double (nullable = false)
 |-- purpose: double (nullable = false)
 |-- amount: double (nullable = false)
 |-- savings: double (nullable = false)
 |-- employment: double (nullable = false)
 |-- instPercent: double (nullable = false)
 |-- sexMarried: double (nullable = false)
 |-- guarantors: double (nullable = false)
 |-- residenceDuration: double (nullable = false)
 |-- assets: double (nullable = false)
 |-- age: double (nullable = false)
 |-- concCredit: double (nullable = false)
 |-- apartment: double (nullable = false)
 |-- credits: double (nullable = false)
 |-- occupation: double (nullable = false)
 |-- dependents: double (nullable = false)
 |-- hasPhone: double (nullable = false)
 |-- foreign: double (nullable = false)
**// Display the top 20 rows of DataFrame**
creditDF.show
 +-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|     9.0|    4.0|    0.0|2799.0|    0.0|       2.0|        2.0|       2.0|       0.0|              1.0|   0.0|36.0|       2.0|      0.0|    1.0|       2.0|       1.0|     0.0|    0.0|
|          1.0|    1.0|    12.0|    2.0|    9.0| 841.0|    1.0|       3.0|        2.0|       1.0|       0.0|              3.0|   0.0|23.0|       2.0|      0.0|    0.0|       1.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2122.0|    0.0|       2.0|        3.0|       2.0|       0.0|              1.0|   0.0|39.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2171.0|    0.0|       2.0|        4.0|       2.0|       0.0|              3.0|   1.0|38.0|       0.0|      1.0|    1.0|       1.0|       0.0|     0.0|    1.0|
|          1.0|    0.0|    10.0|    4.0|    0.0|2241.0|    0.0|       1.0|        1.0|       2.0|       0.0|              2.0|   0.0|48.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|
|          1.0|    0.0|     8.0|    4.0|    0.0|3398.0|    0.0|       3.0|        1.0|       2.0|       0.0|              3.0|   0.0|39.0|       2.0|      1.0|    1.0|       1.0|       0.0|     0.0|    1.0|
|          1.0|    0.0|     6.0|    4.0|    0.0|1361.0|    0.0|       1.0|        2.0|       2.0|       0.0|              3.0|   0.0|40.0|       2.0|      1.0|    0.0|       1.0|       1.0|     0.0|    1.0|
|          1.0|    3.0|    18.0|    4.0|    3.0|1098.0|    0.0|       0.0|        4.0|       1.0|       0.0|              3.0|   2.0|65.0|       2.0|      1.0|    1.0|       0.0|       0.0|     0.0|    0.0|
|          1.0|    1.0|    24.0|    2.0|    3.0|3758.0|    2.0|       0.0|        1.0|       1.0|       0.0|              3.0|   3.0|23.0|       2.0|      0.0|    0.0|       0.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|    11.0|    4.0|    0.0|3905.0|    0.0|       2.0|        2.0|       2.0|       0.0|              1.0|   0.0|36.0|       2.0|      0.0|    1.0|       2.0|       1.0|     0.0|    0.0|
|          1.0|    0.0|    30.0|    4.0|    1.0|6187.0|    1.0|       3.0|        1.0|       3.0|       0.0|              3.0|   2.0|24.0|       2.0|      0.0|    1.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|     6.0|    4.0|    3.0|1957.0|    0.0|       3.0|        1.0|       1.0|       0.0|              3.0|   2.0|31.0|       2.0|      1.0|    0.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    1.0|    48.0|    3.0|   10.0|7582.0|    1.0|       0.0|        2.0|       2.0|       0.0|              3.0|   3.0|31.0|       2.0|      1.0|    0.0|       3.0|       0.0|     1.0|    0.0|
|          1.0|    0.0|    18.0|    2.0|    3.0|1936.0|    4.0|       3.0|        2.0|       3.0|       0.0|              3.0|   2.0|23.0|       2.0|      0.0|    1.0|       1.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|     6.0|    2.0|    3.0|2647.0|    2.0|       2.0|        2.0|       2.0|       0.0|              2.0|   0.0|44.0|       2.0|      0.0|    0.0|       2.0|       1.0|     0.0|    0.0|
|          1.0|    0.0|    11.0|    4.0|    0.0|3939.0|    0.0|       2.0|        1.0|       2.0|       0.0|              1.0|   0.0|40.0|       2.0|      1.0|    1.0|       1.0|       1.0|     0.0|    0.0|
|          1.0|    1.0|    18.0|    2.0|    3.0|3213.0|    2.0|       1.0|        1.0|       3.0|       0.0|              2.0|   0.0|25.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    1.0|    36.0|    4.0|    3.0|2337.0|    0.0|       4.0|        4.0|       2.0|       0.0|              3.0|   0.0|36.0|       2.0|      1.0|    0.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    3.0|    11.0|    4.0|    0.0|7228.0|    0.0|       2.0|        1.0|       2.0|       0.0|              3.0|   1.0|39.0|       2.0|      1.0|    1.0|       1.0|       0.0|     0.0|    0.0|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:
describe computes statistics for numeric columns, including count, mean, stddev, min, and max
**//  computes statistics for balance**
  creditDF.describe("balance").show
 +-------+-----------------+
|summary|          balance|
+-------+-----------------+
|  count|             1000|
|   mean|            1.577|
| stddev|1.257637727110893|
|    min|              0.0|
|    max|              3.0|
+-------+-----------------+

**// compute the avg balance by creditability (the label)**
 creditDF.groupBy("creditability").avg("balance").show
 +-------------+------------------+
|creditability|      avg(balance)|
+-------------+------------------+
|          1.0|1.8657142857142857|
|          0.0|0.9033333333333333|
+-------------+------------------+
You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:
**// Compute the average balance, amount, duration grouped by creditability**
 sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, avg(duration) as avgdur  FROM credit GROUP BY creditability ").show
 +-------------+------------------+------------------+------------------+
|creditability|        avgbalance|            avgamt|            avgdur|
+-------------+------------------+------------------+------------------+
|          1.0|1.8657142857142857| 2985.442857142857|19.207142857142856|
|          0.0|0.9033333333333333|3938.1266666666666|             24.86|
+-------------+------------------+------------------+------------------+

EXTRACT FEATURES

To build a classifier model, you first extract the features that most contribute to the classification. In the german credit data set the data is labeled with two classes – 1 (creditable) and 0 (not creditable).
The features for each item consists of the fields shown below:
  • Label → creditable: 0 or 1
  • Features → {"balance", "duration", "history", "purpose", "amount", "savings", "employment", "instPercent", "sexMarried", "guarantors", "residenceDuration", "assets", "age", "concCredit", "apartment", "credits", "occupation", "dependents", "hasPhone", "foreign"}

DEFINE FEATURES ARRAY

In order for the features to be used by a machine learning algorithm, the features are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature.
Below a VectorAssembler is used to transform and return a new dataframe with all of the feature columns in a vector column
**//define the feature columns to put in the feature vector**
val featureCols = Array("balance", "duration", "history", "purpose", "amount",
    "savings", "employment", "instPercent", "sexMarried",  "guarantors",
    "residenceDuration", "assets",  "age", "concCredit", "apartment",
    "credits",  "occupation", "dependents",  "hasPhone", "foreign" )
**//set the input and output column names**
  val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
**//return a dataframe with all of the  feature columns in  a vector column**
val df2 = assembler.transform( creditDF)
**// the transform method produced a new column: features.**
df2.show
 +-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|            features|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|(20,[1,2,3,4,6,7,...|
Next, we use a StringIndexer to return a Dataframe with the creditability column added as a label .
**//  Create a label column with the StringIndexer**
val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
**// the  transform method produced a new column: label.**
df3.show
 +-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|            features|label|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|(20,[1,2,3,4,6,7,...|  0.0|
Below the data it is split into a training data set and a test data set, 70% of the data is used to train the model, 30% will be used for testing.
**//  split the dataframe into training and test data**
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)

TRAIN THE MODEL

Next, we train a RandomForest Classifier with the parameters:
  • maxDepth: Maximum depth of a tree. Increasing the depth makes the model more powerful, but deep trees take longer to train.
  • maxBins: Maximum number of bins used for discretizing continuous features and for choosing how to split on features at each node.
  • impurity:Criterion used for information gain calculation
  • auto:Automatically select the number of features to consider for splits at each tree node
  • seed:Use a random seed number , allowing to repeat the results
The model is trained by making associations between the input features and the labeled output associated with those features.
**// create the classifier,  set parameters for training**
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
**//  use the random forest classifier  to train (fit) the model**
val model = classifier.fit(trainingData)

**// print out the random forest trees**
model.toDebugString
res20: String =
res5: String =
"RandomForestClassificationModel (uid=rfc_6c4ceb92ba78) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 0 <= 3="" 10="" 1.0)="" if="" (feature="" <="0.0)" predict:="" 0.0="" else=""> 6.0)
       Predict: 0.0
     Else (feature 10 > 0.0)
      If (feature 12 <= 12="" 63.0)="" predict:="" 0.0="" else="" (feature=""> 63.0)
       Predict: 0.0
    Else (feature 0 > 1.0)
     If (feature 13 <= 3="" 1.0)="" if="" (feature="" <="3.0)" predict:="" 0.0="" else=""> 3.0)
       Predict: 1.0
     Else (feature 13 > 1.0)
      If (feature 7 <= 7="" 1.0)="" predict:="" 0.0="" else="" (feature=""> 1.0)
       Predict: 0.0
  Tree 1 (weight 1.0):
    If (feature 2 <= 11="" 15="" 1.0)="" if="" (feature="" <="0.0)" predict:="" 0.0="" else=""> 0.0)
       Predict: 1.0
     Else (feature 15 > 0.0)
      If (feature 11 <= 11="" 0.0)="" predict:="" 0.0="" else="" (feature=""> 0.0)
       Predict: 1.0
    Else (feature 2 > 1.0)
     If (feature 12 <= 5="" 31.0)="" if="" (feature="" <="0.0)" predict:="" 0.0="" else=""> 0.0)
       Predict: 0.0
     Else (feature 12 > 31.0)
      If (feature 3 <= 3="" 4.0)="" predict:="" 0.0="" else="" (feature=""> 4.0)
       Predict: 0.0
  Tree 2 (weight 1.0):
    If (feature 8 <= 4="" 6="" 1.0)="" if="" (feature="" <="2.0)" predict:="" 0.0="" else=""> 10875.0)
       Predict: 1.0
     Else (feature 6 > 2.0)
      If (feature 1 <= 1="" 36.0)="" predict:="" 0.0="" else="" (feature=""> 36.0)
       Predict: 1.0
    Else (feature 8 > 1.0)
     If (feature 5 <= 4="" 0.0)="" if="" (feature="" <="4113.0)" predict:="" 0.0="" else=""> 4113.0)
       Predict: 1.0
     Else (feature 5 > 0.0)
      If (feature 11 <= 11="" 2.0)="" predict:="" 0.0="" else="" (feature=""> 2.0)
       Predict: 0.0
  Tree 3 ...

TEST THE MODEL

Next we use the test data to get predictions.
**// run the  model on test features to get predictions**
val predictions = model.transform(testData)
**//As you can see, the previous model transform produced a new columns: rawPrediction, probablity and prediction.**
predictions.show
 +-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+--------------------+--------------------+----------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|            features|label|       rawPrediction|         probability|prediction|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+--------------------+--------------------+----------+
|          0.0|    0.0|    12.0|    0.0|    5.0|1108.0|    0.0|       3.0|        4.0|       2.0|       0.0|              2.0|   0.0|28.0|       2.0|      1.0|    1.0|       2.0|       0.0|     0.0|    0.0|(20,[1,3,4,6,7,8,...|  1.0|[14.1964586927573...|[0.70982293463786...|       0.0|
Below we evaluate the predictions, we use a BinaryClassificationEvaluator which returns a precision metric (The Area Under an ROC Curve) by comparing the test label column with the test prediction column. In this case the evaluation returns 78% precision.
**// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.**
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
**// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).**
val accuracy = evaluator.evaluate(predictions)
accuracy: Double = 0.7824906081835722

USING AN ML PIPELINE

We will next train the model using a pipeline, which can give better results. A pipeline provides a simple way to try out different combinations of parameters, using a process called grid search, where you set up the parameters to test, and MLLib will test all the combinations. Pipelines make it easy to tune an entire model building workflow at once, rather than tuning each element in the Pipeline separately.
Below we use the ParamGridBuilder utility to construct the parameter grid.
_**// We use a ParamGridBuilder to construct a grid of parameters to search over**_
val paramGrid = new ParamGridBuilder()
  .addGrid(classifier.maxBins, Array(25, 28, 31))
  .addGrid(classifier.maxDepth, Array(4, 6, 8))
  .addGrid(classifier.impurity, Array("entropy", "gini"))
  .build()
Create and set up a pipeline. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer.
val steps: Array[PipelineStage] = Array(classifier)
val pipeline = new Pipeline().setStages(steps)
We use the CrossValidator class for model selection. The CrossValidator uses an Estimator, a set of ParamMaps, and an Evaluator. Note using a CrossValidator can be very expensive.
**// Evaluate model on test instances and compute test error**
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(10)
The pipeline automatically optimizes by exploring the parameter grid: for each ParamMap, the CrossValidator trains the given Estimator and evaluates it using the given Evaluator, then it fits the best Estimator using the best ParamMap and the entire dataset.
**// When fit is called, the stages are executed in order.
// Fit will run cross-validation,  and choose the best set of parameters
//The fitted model from a Pipeline is an PipelineModel, which consists of fitted models and transformers**
val pipelineFittedModel = cv.fit(trainingData)
Now we can evaluate the pipeline best-fitted model by comparing test predictions with test labels. The evaluator now returns 82% accuracy compared to 78% before.
**//  call tranform to make predictions on test data. The fitted model will use the best model found**
val predictions = pipelineFittedModel.transform(testData)
val accuracy = evaluator.evaluate(predictions)  
Double = 0.8204386232104784
**// Calculate Binary Classification Metrics**
val predictionAndLabels =predictions.select("prediction", "label").rdd.map(x =>
  (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
**// A Precision-Recall curve plots (precision, recall) points for different threshold values, while a receiver operating characteristic, or ROC, curve plots (recall, false positive rate) points.**
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
 area under the precision-recall curve: 0.6482521795731916
area under the receiver operating characteristic (ROC) curve : 0.6332876434155752

WANT TO LEARN MORE?

In this blog post, we showed you how to get started using Apache Spark’s machine learning Random Forests and ml pipelines for classification. If you have any further questions about this tutorial, please ask them in the comments section below.

MONITORING REAL-TIME UBER DATA USING SPARK MACHINE LEARNING

source: https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-1/
source: https://github.com/caroljmcdonald

MONITORING REAL-TIME UBER DATA USING SPARK MACHINE LEARNING, STREAMING, AND THE KAFKA API (PART 1)


November 28, 2016 | BY Carol McDonald
According to Gartner, by 2020, a quarter of a billion connected cars will form a major element of the Internet of Things. Connected vehicles are projected to generate 25GB of data per hour, which can be analyzed to provide real-time monitoring and apps, and will lead to new concepts of mobility and vehicle usage. One of the 10 major areas in which big data is currently being used to excellent advantage is in improving cities. For example, the analysis of GPS car data can allow cities to optimize traffic flows based on real-time traffic information.
Uber is using big data to perfect its processes, from calculating Uber’s pricing, to finding the optimal positioning of cars to maximize profits. In this series of blog posts, we are going to use public Uber trip data to discuss building a real-time example for analysis and monitoring of car GPS data. There are typically two phases in machine learning with real-time data:
  • Data Discovery: The first phase involves analysis on historical data to build the machine learning model.
  • Analytics Using the Model: The second phase uses the model in production on live events. (Note that Spark does provide some streaming machine learning algorithms, but you still often need to do an analysis of historical data.)
building the model
In this first post, I’ll help you get started using Apache Spark’s machine learning K-means algorithm to cluster Uber data based on location.

CLUSTERING

Google News uses a technique called clustering to group news articles into different categories, based on title and content. Clustering algorithms discover groupings that occur in collections of data.
In clustering, an algorithm groups objects into categories by analyzing similarities between input examples. Examples of clustering uses include:
  • Search results grouping
  • Grouping of customers
  • Anomaly detection
  • Text categorization
Clustering uses unsupervised algorithms, which do not have the outputs (labeled data) in advance.
K-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters (k). Clustering using the K-means algorithm begins by initializing all the coordinates to k number of centroids. With every pass of the algorithm, each point is assigned to its nearest centroid based on some distance metric, which is usually Euclidean distance. The centroids are then updated to be the “centers” of all the points assigned to it in that pass. This repeats until there is a minimum change in the centers.

EXAMPLE USE CASE DATA SET

The example data set is Uber trip data, which FiveThirtyEight obtained from the NYC Taxi & Limousine Commission. In this example, we will discover the clusters of Uber data based on the longitude and latitude, then we will analyze the cluster centers by date/time. The data set has the following schema:

THE DATA SET SCHEMA

  1. Date/Time: The date and time of the Uber pickup
  2. Lat: The latitude of the Uber pickup
  3. Lon: The longitude of the Uber pickup
  4. Base: The TLC base company affiliated with the Uber pickup
​​The Data Records are in CSV format. An example line is shown below:
2014-08-01 00:00:00,40.729,-73.9422,B02598

EXAMPLE USE CASE CODE

FIRST, WE IMPORT THE PACKAGES NEEDED FOR SPARK ML K-MEANS AND SQL.

Screen Shot 2016-11-17 at 4.08.01 PM.png
We specify the schema with a Spark Structype (Please note that if you are using a notebook, then you do not have to create the SQLContext).
Screen Shot 2016-11-17 at 4.15.58 PM.png
Next, we load the data from a CSV file into a Spark DataFrame.
Screen Shot 2016-11-17 at 5.19.33 PM.png
Using Spark 1.6 and --packages com.databricks:spark-csv_2.10:1.5.0, we create a DataFrame from a CSV file data source and apply the schema.
Screen Shot 2016-11-17 at 4.27.58 PM.png
Screen Shot 2016-11-17 at 4.28.56 PM.png
Note that with Spark 2.0, specifying the schema when loading data into a DataFrame will give better performance than schema inference.
DataFrame printSchema() prints the schema to the console in a tree format, shown below after running in a Zeppelin notebook:
Screen Shot 2016-11-17 at 5.04.06 PM.png
DataFrame show() displays the first 20 rows:
Screen Shot 2016-11-17 at 5.00.47 PM.png

DEFINE FEATURES ARRAY

In order for the features to be used by a machine learning algorithm, the features are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature. Below, a VectorAssembler is used to transform and return a new DataFrame with all of the feature columns in a vector column.
Screen Shot 2016-11-17 at 5.33.57 PM.png
Screen Shot 2016-11-17 at 5.36.36 PM.png
Output of df2.show:
Screen Shot 2016-11-17 at 5.37.52 PM.png
Next, we create a KMeans object, set the parameters to define the number of clusters and the maximum number of iterations to determine the clusters, and then we fit the model to the input data.
Screen Shot 2016-11-17 at 6.18.18 PM.png
Screen Shot 2016-11-17 at 6.00.08 PM.png
Output of model clusterCenters :
Screen Shot 2016-11-17 at 6.27.01 PM.png
Below, the cluster centers are displayed on a Google map:
NewyorkuberclustersScreen Shot 2016-11-07 at 11.47.09 AM.png
Next, we use the model to get the clusters for test data in order to further analyze the clustering.
Screen Shot 2016-11-18 at 2.40.35 PM.png
Screen Shot 2016-11-17 at 6.34.46 PM.png
Now we can ask questions like, "Which hours of the day and which cluster had the highest number of pickups?"
Screen Shot 2016-11-17 at 6.52.21 PM.png
How many pickups occurred in each cluster?
Screen Shot 2016-11-17 at 7.23.38 PM.png
With a Zeppelin notebook, we can also display query results in bar charts or graphs. Below the x axis is the hour, the y axis the count, and the colors are the different clusters.
Screen Shot 2016-11-17 at 6.57.52 PM.png
You can register a DataFrame as a temporary table using a given name, for example: df.registerTempTable("uber") , and then run SQL statements using the SQL methods provided by sqlContext. An example is shown below in a Zeppelin notebook.
Screen Shot 2016-11-17 at 7.38.41 PM.png
Screen Shot 2016-11-18 at 12.05.27 PM.png
The model can be persisted to disk as shown below, in order to use later (for example, with Spark Streaming).
Screen Shot 2016-11-17 at 7.46.12 PM.png

SOFTWARE

This tutorial will run on Spark 1.6.1
  • You can download the code, data, and readme to run this example from here: https://github.com/caroljmcdonald/spark-ml-kmeans-uber
  • The example in this post can be run in the Spark shell, or in a Zeppelin notebook. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data file to your sandbox home directory /user/user01 using scp as explained in the readme.
  • To run as a standalone application, copy the jar file to the cluster using scp, as explained in the readme, then run with the following command:
  • $ spark-submit --class com.sparkml.uber.ClusterUber --master local[2] --packages com.databricks:spark-csv_2.10:1.5.0 spark-kmeans-1.0.jar
  • To run in the Spark shell, start the Spark shell with: $spark-shell --master local[1]
​In this blog post, we went over how to get started using Apache Spark’s machine learning K-means for clustering. In the next blog post, we'll look at using the model in a Spark Streaming application.

[SPARK] Programmatically Specifying the Schema





Programmatically Specifying the Schema

    1. Create an RDD of Rows from the original RDD;
    2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
    3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.


import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+