Bay Area bike share analysis with the Hadoop Notebook and Spark & SQL

Bay Area bike share analysis with the Hadoop Notebook and Spark & SQL

This post was initially published on the Hue project blog http://gethue.com/bay-area-bike-share-data-analysis-with-spark-notebook-part-2/

Apache Spark is getting popular and Hue contributors are working on making it accessible to even more users. Specifically, by creating a Web interface that allows anyone with a browser to type some Spark code and execute it. A Spark submission REST API was built for this purpose and can also be leveraged by the developers.

In a previous post, we demonstrated how to use Hue’s Search app to seamlessly index and visualize trip data from Bay Area Bike Share and leverage Spark to supplement that analysis by adding weather data to our dashboard.

In this tutorial, we’ll use the Hadoop Notebook to study deeper the peak usage of the Bay Area Bike Share (BABS) system.

To start, download the latest data set from http://www.bayareabikeshare.com/datachallenge. This post uses the data from August 2013 through February 2014.

Importing CSV Data with the Metastore App

The BABS data set contains 4 CSVs that contain data for stations, trips, rebalancing (availability), and weather. Using Hue’s Metastore import wizard, we can easily import these data sets and create tables that infer their schema from the CSV header.

File Upload to Metastore

Metastore Sample

The import wizard also provides the opportunity to override any field names or types, which we’ll do for the Trip data to change the “duration” field from a TINYINT to an INT.

Metastore Schema Fields

Interactive Analysis with an Hadoop Notebook

Lightning-Fast Impala Queries

Now that we’ve imported the data into our cluster, we can create a new Notebook to perform our data crunching. To start, we’ll run some quick exploration queries using Impala.

Let’s find the top 10 most popular start stations based on the trip data:

SELECT startterminal, startstation, COUNT(1) AS count FROM bikeshare.trips GROUP BY startterminal, startstation ORDER BY count DESC LIMIT 10

impala_query

Once our results are returned, we can easily visualize this data; a bar graph works nicely for a simple COUNT..GROUP BY query.

Impala Bar Graph

It seems that the San Francisco Caltrain (Townsend at 4th) was by far the most common start station. Let’s determine which end stations, for trips starting from the SF Caltrain Townsend station, were the most popular. We’ll fetch the latitude and longitude coordinates so that we can visualize the results on a map.

SELECT
 s.station_id,
 s.name,
 s.lat,
 s.long,
 COUNT(1) AS count
FROM `bikeshare`.`trips` t
JOIN `bikeshare`.`stations` s ON s.station_id = t.endterminal
WHERE t.startterminal = 70
GROUP BY s.station_id, s.name, s.lat, s.long
ORDER BY count DESC LIMIT 10

Bike Share Map

The map visualization indicates that the most popular trips starting from the SF Caltrain station are in fairly close proximity to the station, with most of the destinations being clustered around the Financial District and SOMA.

Long Running Queries with Hive

For longer-running SQL queries, or queries that require use of Hive’s built-in functions, we can add a Hive snippet to our notebook to perform this analysis.

Let’s say we wanted to dig further into the trip data for the SF Caltrain station and find the total number of trips and average duration (in minutes) of those trips, grouped by hour.

Since the trip data stores startdate as a STRING, we’ll need to apply some string-manipulation to extract the hour within an inline SQL query. The outer query will aggregate the count of trips and the average duration.

SELECT
    hour,
    COUNT(1) AS trips,
    ROUND(AVG(duration) / 60) AS avg_duration
FROM (
    SELECT
        CAST(SPLIT(SPLIT(t.startdate, ' ')[1], ':')[0] AS INT) AS hour,
        t.duration AS duration
    FROM `bikeshare`.`trips` t
    WHERE
        t.startterminal = 70
        AND
        t.duration IS NOT NULL
    ) r
GROUP BY hour
ORDER BY hour ASC;

Since this data produces several numeric dimensions of data, we can visualize the results using a scatterplot graph, with the hour as the x-axis, number of trips as the y-axis, and the average duration as the scatterplot size.

Bike Share Scatter Plot

Let’s add another Hive snippet to analyze an hour-by-hour breakdown of availability at the SF Caltrain Station:

SELECT
  hour,
  ROUND(AVG(bikes_available)) AS avg_bikes,
  ROUND(AVG(docks_available)) AS avg_docks
FROM (
  SELECT
    r.time AS time,
    CAST(SUBSTR(r.time, 12, 2) AS INT) AS hour,
    CAST(r.bikes_available AS INT) AS bikes_available,
    CAST(r.docks_available AS INT) AS docks_available
  FROM `bikeshare`.`rebalancing` r
  JOIN `bikeshare`.`stations` s ON r.station_id = s.station_id
  WHERE
    r.station_id = 70
    AND
    SUBSTR(r.time, 15, 2) = '00'
  ) t
GROUP BY hour
ORDER BY hour ASC;

We’ll visualize the results as a line graph, which indicates that the bike availability tends to fall starting at 6 AM and is regained around 6 PM.

Bike Share Availability Line Graph

Robust Data Analysis with PySpark

At a certain point, your data analysis may exceed the limits of relational analysis with SQL or require a more expressive, full-fledged API.

Hue’s Spark notebooks allow users to mix exploratory SQL-analysis with custom Scala, Python (pyspark), and R code that utilizes the Spark API.

For example, we can open a pyspark snippet and load the trip data directly from the Hive warehouse and apply a sequence of filter, map, and reduceByKey operations to determine the average number of trips starting from the SF Caltrain Station:

trips = sc.textFile('/user/hive/warehouse/bikeshare.db/trips/201402_trip_data.csv')

trips = trips.map(lambda line: line.split(","))
 
station_70 = trips.filter(lambda x: x[4] == '70')

# Emit tuple of ((date, hour), 1)
trips_by_day_hour = station_70.map(lambda x: ((x[2].split()[0], x[2].split()[1].split(':')[0]), 1))

trips_by_day_hour = trips_by_day_hour.reduceByKey(lambda a, b: a+b)

# Emit tuple of (hour, count)
trips_by_hour = trips_by_day_hour.map(lambda x: (int(x[0][1]), x[1]))
 
avg_trips_by_hour = trips_by_hour.combineByKey( (lambda x: (x, 1)), 
 (lambda x, y: (x[0] + y, x[1] + 1)), 
 (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 )
avg_trips_by_hour = avg_trips_by_hour.mapValues(lambda v : v[0] / v[1]) 

avg_trips_sorted = sorted(avg_trips_by_hour.collect())
%table avg_trips_sorted

Notebook pyspark bar graph

 

As you can see, Hue’s Notebook app enables easy interactive data analysis and visualizations with a powerful mix of tools. Want to know more about the Spark Notebook work, read about the Livy, the Spark REST Job server and see you at the upcoming Spark Summit in Amsterdam! The version is currently in beta and v1 is currently targeted for Hue 3.10 / CDH 5.7.

Stay tuned for a number of exciting improvements to the notebook app, and as usual feel free to comment on the hue-user list or @gethue!

 


Helpful Tips

Importing quoted-CSV data

The BABS rebalancing data (named 201402_status_data.csv) uses quotes.  In these cases, it is easier to create the table in Hive in the Hive editor and use the OpenCSV Row SERDE for Hive:

CREATE TABLE rebalancing(station_id int, bikes_available int, docks_available int, time string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",",
"quoteChar" = """,
"escapeChar" = "\"
)
STORED AS TEXTFILE;

Then you can go back to the Metastore to import the CSV into the table; note that you may have to remove the header line manually.

Reset Impala’s Metastore Cache

When you create new databases or tables and plan to query them in an Impala snippet, it’s a good idea to run an INVALIDATE METADATA; command first to reset the metastore cache. Otherwise, you may encounter an error where the database or table is not recognized.

0 Comments

Leave a reply

Your email address will not be published. Required fields are marked *

*