Thursday, March 30, 2017

MONITORING REAL-TIME UBER DATA USING SPARK MACHINE LEARNING

source: https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-1/
source: https://github.com/caroljmcdonald

MONITORING REAL-TIME UBER DATA USING SPARK MACHINE LEARNING, STREAMING, AND THE KAFKA API (PART 1)


November 28, 2016 | BY Carol McDonald
According to Gartner, by 2020, a quarter of a billion connected cars will form a major element of the Internet of Things. Connected vehicles are projected to generate 25GB of data per hour, which can be analyzed to provide real-time monitoring and apps, and will lead to new concepts of mobility and vehicle usage. One of the 10 major areas in which big data is currently being used to excellent advantage is in improving cities. For example, the analysis of GPS car data can allow cities to optimize traffic flows based on real-time traffic information.
Uber is using big data to perfect its processes, from calculating Uber’s pricing, to finding the optimal positioning of cars to maximize profits. In this series of blog posts, we are going to use public Uber trip data to discuss building a real-time example for analysis and monitoring of car GPS data. There are typically two phases in machine learning with real-time data:
  • Data Discovery: The first phase involves analysis on historical data to build the machine learning model.
  • Analytics Using the Model: The second phase uses the model in production on live events. (Note that Spark does provide some streaming machine learning algorithms, but you still often need to do an analysis of historical data.)
building the model
In this first post, I’ll help you get started using Apache Spark’s machine learning K-means algorithm to cluster Uber data based on location.

CLUSTERING

Google News uses a technique called clustering to group news articles into different categories, based on title and content. Clustering algorithms discover groupings that occur in collections of data.
In clustering, an algorithm groups objects into categories by analyzing similarities between input examples. Examples of clustering uses include:
  • Search results grouping
  • Grouping of customers
  • Anomaly detection
  • Text categorization
Clustering uses unsupervised algorithms, which do not have the outputs (labeled data) in advance.
K-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters (k). Clustering using the K-means algorithm begins by initializing all the coordinates to k number of centroids. With every pass of the algorithm, each point is assigned to its nearest centroid based on some distance metric, which is usually Euclidean distance. The centroids are then updated to be the “centers” of all the points assigned to it in that pass. This repeats until there is a minimum change in the centers.

EXAMPLE USE CASE DATA SET

The example data set is Uber trip data, which FiveThirtyEight obtained from the NYC Taxi & Limousine Commission. In this example, we will discover the clusters of Uber data based on the longitude and latitude, then we will analyze the cluster centers by date/time. The data set has the following schema:

THE DATA SET SCHEMA

  1. Date/Time: The date and time of the Uber pickup
  2. Lat: The latitude of the Uber pickup
  3. Lon: The longitude of the Uber pickup
  4. Base: The TLC base company affiliated with the Uber pickup
​​The Data Records are in CSV format. An example line is shown below:
2014-08-01 00:00:00,40.729,-73.9422,B02598

EXAMPLE USE CASE CODE

FIRST, WE IMPORT THE PACKAGES NEEDED FOR SPARK ML K-MEANS AND SQL.

Screen Shot 2016-11-17 at 4.08.01 PM.png
We specify the schema with a Spark Structype (Please note that if you are using a notebook, then you do not have to create the SQLContext).
Screen Shot 2016-11-17 at 4.15.58 PM.png
Next, we load the data from a CSV file into a Spark DataFrame.
Screen Shot 2016-11-17 at 5.19.33 PM.png
Using Spark 1.6 and --packages com.databricks:spark-csv_2.10:1.5.0, we create a DataFrame from a CSV file data source and apply the schema.
Screen Shot 2016-11-17 at 4.27.58 PM.png
Screen Shot 2016-11-17 at 4.28.56 PM.png
Note that with Spark 2.0, specifying the schema when loading data into a DataFrame will give better performance than schema inference.
DataFrame printSchema() prints the schema to the console in a tree format, shown below after running in a Zeppelin notebook:
Screen Shot 2016-11-17 at 5.04.06 PM.png
DataFrame show() displays the first 20 rows:
Screen Shot 2016-11-17 at 5.00.47 PM.png

DEFINE FEATURES ARRAY

In order for the features to be used by a machine learning algorithm, the features are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature. Below, a VectorAssembler is used to transform and return a new DataFrame with all of the feature columns in a vector column.
Screen Shot 2016-11-17 at 5.33.57 PM.png
Screen Shot 2016-11-17 at 5.36.36 PM.png
Output of df2.show:
Screen Shot 2016-11-17 at 5.37.52 PM.png
Next, we create a KMeans object, set the parameters to define the number of clusters and the maximum number of iterations to determine the clusters, and then we fit the model to the input data.
Screen Shot 2016-11-17 at 6.18.18 PM.png
Screen Shot 2016-11-17 at 6.00.08 PM.png
Output of model clusterCenters :
Screen Shot 2016-11-17 at 6.27.01 PM.png
Below, the cluster centers are displayed on a Google map:
NewyorkuberclustersScreen Shot 2016-11-07 at 11.47.09 AM.png
Next, we use the model to get the clusters for test data in order to further analyze the clustering.
Screen Shot 2016-11-18 at 2.40.35 PM.png
Screen Shot 2016-11-17 at 6.34.46 PM.png
Now we can ask questions like, "Which hours of the day and which cluster had the highest number of pickups?"
Screen Shot 2016-11-17 at 6.52.21 PM.png
How many pickups occurred in each cluster?
Screen Shot 2016-11-17 at 7.23.38 PM.png
With a Zeppelin notebook, we can also display query results in bar charts or graphs. Below the x axis is the hour, the y axis the count, and the colors are the different clusters.
Screen Shot 2016-11-17 at 6.57.52 PM.png
You can register a DataFrame as a temporary table using a given name, for example: df.registerTempTable("uber") , and then run SQL statements using the SQL methods provided by sqlContext. An example is shown below in a Zeppelin notebook.
Screen Shot 2016-11-17 at 7.38.41 PM.png
Screen Shot 2016-11-18 at 12.05.27 PM.png
The model can be persisted to disk as shown below, in order to use later (for example, with Spark Streaming).
Screen Shot 2016-11-17 at 7.46.12 PM.png

SOFTWARE

This tutorial will run on Spark 1.6.1
  • You can download the code, data, and readme to run this example from here: https://github.com/caroljmcdonald/spark-ml-kmeans-uber
  • The example in this post can be run in the Spark shell, or in a Zeppelin notebook. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data file to your sandbox home directory /user/user01 using scp as explained in the readme.
  • To run as a standalone application, copy the jar file to the cluster using scp, as explained in the readme, then run with the following command:
  • $ spark-submit --class com.sparkml.uber.ClusterUber --master local[2] --packages com.databricks:spark-csv_2.10:1.5.0 spark-kmeans-1.0.jar
  • To run in the Spark shell, start the Spark shell with: $spark-shell --master local[1]
​In this blog post, we went over how to get started using Apache Spark’s machine learning K-means for clustering. In the next blog post, we'll look at using the model in a Spark Streaming application.

No comments:

Post a Comment