An API to Analyze Cab GeoLocation Data and a Simulated App for finding an available cab in Real-Time
#Map My Cab
#Table of Contents
#Introduction
This is a data engineering project at Insight Data Science. There are two goals that this project aims to accomplish:
#Data Set
Historical:
The project is based on historical geolocation data for 500 yellow cabs in San Francisco, collected over a month’s time frame (11 million rows). The data is available as a time series, with updates on individual cab occupancy and locations at a time interval of 1 minute (approximately). The following table provides a snap shot of the raw data set (500 text files, each representing one cab):
cabID, lat, long, occucpancy, timestamp
Real-Time:
The historical data set is played back to simulate real-time behavior.
AWS Clusters:
A distributed AWS cluster of four ec2 machines is being used for this project. All the components (ingestion, batch and real-time processing) are configured and run in distributed mode, with one master and 3 slaves.
#Data Processing Framework
Ingestion Layer (Kafka): The raw data is consumed by a message broker, configured in publish-subscribe mode.
Batch Layer (HDFS, Hadoop): A kafka consumer stores the data into HDFS. Additional columns are added to the dataset to generate metrics as described in the ensuing section. This is accomplished using Hive (and MrJob). Following this, tables representing the aggregate views for serving queries at the user end are generated using Hive.
Serving Layer (HBase): Datastore tables store the aggregate views for hour of day, day of week and individual cab profiles as generated by Hive. The table schema is optimized for quick access, by storing the hours as columns and the totals for each day in the same row (this way, hourly and daily profiles can be served from the same table/rows).
Speed Layer (Storm): The topology for processing real-time data comprises of a kafka-spout and a bolt (with tick interval frequency of 5 sec). The data is filtered to only store currently available (unoccupied) cabs into HBase. In order to serve queries with low latency, all the data is stored in one row (maximum possible columns = number of cabs = 500). For future work, the data can be stored with the key as city, so that all cabs pertaining to a city can be retrieved via one row. If the number of cabs is large, further breakdown on city_zipcode (as key) will enable quick access, while retaining the advantage of quick row scans in HBase.
Front-end (Flask): The cab locations are rendered on Google Maps and updated at 2 sec interval via AJAX. Historical data is represented as bar and line charts. Realted files: views.py, batch.js, map.js.
Libraries and APIs: Happybase, Pyleus, Kafka-python
#Data Transformations
Following metrics are computed via a MapReduce operation on the raw dataset (MrJob):
The resulting table is aggregated using Hive to enable batch queries such as:
The windowing operation in Hive is used for translating the continous time series data (by cab) into tables representing trips and associated durations.
Table below displays the transformed data: tripID (cabID_timestamp), day, month, year, idle time (secs), idle time (hours)
Hive Workflow:
#Schemas
Batch Schema:
The key for batch storage is organized as yyyy_month_dayofweek. Each column represents an hour and the cells contain metrics for the hour. An additional column stores aggregate metrics for the whole day. This allows the same table to service two types of queries: hour of day and day of week profiles.
Realtime Schema:
The realtime schema represents a city for each row (since there is only one city for the current data base, it has one row). The columns represent cabID (the ones that are available as filtered by Storm). The cells contain latitude longitude data.
Streaming Data
#Live Demo:
A Live Demo of the project is available here: www.mapmycab.org
A snap shot of the map with cabs:
#Presentation Deck
The presentation slides are available here:
www.mapmycab.org/aboutme
#Instructions to Run this Pipeline
Install python packages:
sudo pip install kafka-python happybase pyleus mrjob
Run the Kafka producer / consumer:
python kafka/producer.py
python kafka/kafka_consumer.py
Run MrJob:
python mr_hourly_job.py -r hadoop --hadoop-bin /usr/bin/hadoop hdfs:///<input file path> -o <output file path>
Run Hive Scripts
hive -f <filename>
Build storm topology:
pyleus build cab_topology.yaml
Submit pyleus topology:
pyleus submit -n 54.153.51.200 cab_topology.jar