Big Data Analytics with Spark

Matheus Schmitz

LinkedIn

Github Portfolio

Problem Statement

This project aims to explore a bicycle bike rental dataset through usage of "big data" frameworks such as Spark.

There are five business questions which need to be answered through data exploration, manipulation, filtering and so forth.

Business questions:

  1. What are the top 5 stations with the most bicycles rented?
  2. Whare are the top 5 routes based on initial and final stations, and their respective average bicycle rental durations?
  3. Who rents the most bicycles, men or women? For how long does each group rent bicycles?
  4. Which age group rents the most bicycles? For how long does each group rent bicycles?
  5. What are the most rented bicycles?

Imports

In [1]:
# Imports
import time
import argparse
import pyspark
import numpy as np 
import pandas as pd
import datetime as dt
from pathlib import Path
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

Spark Initialization

In [7]:
# Spark Context
# sc = SparkContext()
sc
Out[7]:

SparkContext

Spark UI

Version
v2.4.2
Master
local[*]
AppName
PySparkShell
In [6]:
# Spark Session
spark = SparkSession(sc)
spark
Out[6]:

SparkSession - hive

SparkContext

Spark UI

Version
v2.4.2
Master
local[*]
AppName
PySparkShell

Dataset

In [18]:
# Load dataset
rides_rdd = sc.textFile('data/bike_rentals.csv', use_unicode=True)
rides_rdd.take(5)
Out[18]:
['User_Gender,User_Age,Bike,Station_Rental,Date_Rental,Hour_Rental,Station_Arrival,Date_Arrival,Hour_Arrival',
 'M,44,4357,442,1/2/2020,0:00:38,116,1/2/2020,0:35:17',
 'M,22,12083,66,1/2/2020,0:00:53,37,1/2/2020,0:06:23',
 'M,29,11562,331,1/2/2020,0:00:55,341,1/2/2020,0:26:47',
 'M,27,10206,164,1/2/2020,0:01:18,35,1/2/2020,0:16:51']
In [20]:
# Function to covert string days and hours into datetime
def convert_date(date, time):
    from datetime import datetime
    return datetime.strptime(date + ' ' + time, '%d/%m/%Y %H:%M:%S')
In [21]:
# Function to create age range
def convert_age_range(age):
    if age <= 17:
        return '00-17'
    elif age >= 18 and age <= 34:
        return '18-34'
    elif age >= 35 and age <= 44:
        return '35-44'
    elif age >= 45 and age <= 54:
        return '45-54'
    elif age >= 55 and age <= 64:
        return '55-64'
    else:
        return '65+'
In [26]:
# Function to clean data
def clean_data(part_id, list_of_records):
    if part_id == 0: 
        next(list_of_records) 
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        gender = row[0]
        age_range = convert_age_range(int(row[1]))
        bike_id = int(row[2])
        station_start = row[3]
        datetime_start = convert_date(row[4], row[5])
        station_end = row[6]
        datetime_end = convert_date(row[7], row[8])
        yield (bike_id, gender, age_range, station_start, station_end, datetime_start, datetime_end)
In [29]:
# Now clean the RDD using the above functions
rides_clean = rides_rdd.mapPartitionsWithIndex(clean_data).cache()
rides_clean.take(3)
Out[29]:
[(4357,
  'M',
  '35-44',
  '442',
  '116',
  datetime.datetime(2020, 2, 1, 0, 0, 38),
  datetime.datetime(2020, 2, 1, 0, 35, 17)),
 (12083,
  'M',
  '18-34',
  '66',
  '37',
  datetime.datetime(2020, 2, 1, 0, 0, 53),
  datetime.datetime(2020, 2, 1, 0, 6, 23)),
 (11562,
  'M',
  '18-34',
  '331',
  '341',
  datetime.datetime(2020, 2, 1, 0, 0, 55),
  datetime.datetime(2020, 2, 1, 0, 26, 47))]

1. What are the top 5 stations with the most bicycles rented?

In [60]:
# First map values assigning a 1 count each time a bike was rented from a station
q1_step1 = rides_clean.map(lambda x: (x[3], 1))
q1_step1.take(3)
Out[60]:
[('442', 1), ('66', 1), ('331', 1)]
In [61]:
# Then group by key and count the number of rentals
q1_step2 = q1_step1.reduceByKey(lambda a,b: a+b)
q1_step2.take(3)
Out[61]:
[('442', 365), ('66', 2548), ('331', 989)]
In [63]:
# Then extract the top 5
q1_step3 = q1_step2.top(5, key=lambda x: x[1])
q1_step3
Out[63]:
[('1', 6311), ('27', 6241), ('271', 5265), ('64', 4834), ('41', 4633)]
In [133]:
# Then print results
print('Top 5 Stations:')
print()
for entry in q1_step3:
    print(f'Station: {int(entry[0]):3d}  |  Number of Rentals: {int(entry[1]):5d}')
Top 5 Stations:

Station:   1  |  Number of Rentals:  6311
Station:  27  |  Number of Rentals:  6241
Station: 271  |  Number of Rentals:  5265
Station:  64  |  Number of Rentals:  4834
Station:  41  |  Number of Rentals:  4633

2. Whare are the top 5 routes based on initial and final stations, and their respective average bicycle rental durations?

In [73]:
# For each ride create a start_station/end_station tutple and calculate ride time
q2_step1 = rides_clean.map(lambda x: ((x[3],x[4]), x[6]-x[5]))
q2_step1.take(3)
Out[73]:
[(('442', '116'), datetime.timedelta(seconds=2079)),
 (('66', '37'), datetime.timedelta(seconds=330)),
 (('331', '341'), datetime.timedelta(seconds=1552))]
In [74]:
# Then add a 1 to indicate to help tally the rides between station tuples
q2_step2 = q2_step1.mapValues(lambda x: (x.total_seconds(), 1))
q2_step2.take(3)
Out[74]:
[(('442', '116'), (2079.0, 1)),
 (('66', '37'), (330.0, 1)),
 (('331', '341'), (1552.0, 1))]
In [75]:
# Reduce by key, adding up travel times and number of rentals
q2_step3 = q2_step2.reduceByKey(lambda x, y: ((x[0]+y[0]), x[1]+y[1]))
q2_step3.take(3)
Out[75]:
[(('442', '116'), (4336.0, 2)),
 (('331', '341'), (3844.0, 4)),
 (('120', '47'), (7168.0, 14))]
In [77]:
# Divide total travel time per number of rentals
q2_step4 = q2_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q2_step4.take(3)
Out[77]:
[(('442', '116'), (2168.0, 2)),
 (('331', '341'), (961.0, 4)),
 (('120', '47'), (512.0, 14))]
In [79]:
# Extract top 5
q2_step5 = q2_step4.top(5, lambda x: x[1][1])
q2_step5
Out[79]:
[(('33', '33'), (1849.6968085106382, 376)),
 (('18', '1'), (334.94984326018806, 319)),
 (('449', '449'), (1038.6611842105262, 304)),
 (('211', '217'), (212.5181518151815, 303)),
 (('208', '206'), (548.6308724832214, 298))]
In [132]:
# Print results
print('Top 5 Routes:')
print()
for entry in q2_step5:
    print(f'Start Station: {int(entry[0][0]):3d}  |  End Station: {int(entry[0][1]):3d}  |  Bikes Rented: {int(entry[1][1]):3d}  |  Average Duration: {int(entry[1][0])/60:5.2f} minutes')
Top 5 Routes:

Start Station:  33  |  End Station:  33  |  Bikes Rented: 376  |  Average Duration: 30.82 minutes
Start Station:  18  |  End Station:   1  |  Bikes Rented: 319  |  Average Duration:  5.57 minutes
Start Station: 449  |  End Station: 449  |  Bikes Rented: 304  |  Average Duration: 17.30 minutes
Start Station: 211  |  End Station: 217  |  Bikes Rented: 303  |  Average Duration:  3.53 minutes
Start Station: 208  |  End Station: 206  |  Bikes Rented: 298  |  Average Duration:  9.13 minutes

3. Who rents the most bicycles, men or women? For how long does each group rent bicycles?

In [102]:
# For each entry take the gender and rental duration
q3_step1 = rides_clean.map(lambda x: (x[1], x[6]-x[5]))
q3_step1.take(3)
Out[102]:
[('M', datetime.timedelta(seconds=2079)),
 ('M', datetime.timedelta(seconds=330)),
 ('M', datetime.timedelta(seconds=1552))]
In [94]:
# Then add a 1 to help calculate average times per gender
q3_step2 = q3_step1.mapValues(lambda x: (x.total_seconds(), 1))
q3_step2.take(3)
Out[94]:
[('M', (2079.0, 1)), ('M', (330.0, 1)), ('M', (1552.0, 1))]
In [95]:
# Reduce by key, summing times and counts
q3_step3 = q3_step2.reduceByKey(lambda a, b: ((a[0]+b[0]), a[1]+b[1]))
q3_step3.take(3)
Out[95]:
[('M', (475637791.0, 511103)), ('F', (167059868.0, 175224))]
In [96]:
# Calculate average rental duration per gender
q3_step4 = q3_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q3_step4.take(3)
Out[96]:
[('M', (930.6104464266498, 511103)), ('F', (953.4074555996896, 175224))]
In [131]:
# Print results
print('Number and Duration of Rentals per Gender')
print()
for entry in q3_step4.collect():
    print(f'Gender: {entry[0]}  |  Number of Rentals: {int(entry[1][1]):6d}  |  Average Duration: {entry[1][0]/60:5.2f} minutes') 
Number and Duration of Rentals per Gender

Gender: M  |  Number of Rentals: 511103  |  Average Duration: 15.51 minutes
Gender: F  |  Number of Rentals: 175224  |  Average Duration: 15.89 minutes

4. Which age group rents the most bicycles? For how long does each group rent bicycles?

In [103]:
# For each entry take the age group and rental duration
q4_step1 = rides_clean.map(lambda x: (x[2], x[6]-x[5]))
q4_step1.take(3)
Out[103]:
[('35-44', datetime.timedelta(seconds=2079)),
 ('18-34', datetime.timedelta(seconds=330)),
 ('18-34', datetime.timedelta(seconds=1552))]
In [104]:
# Then add a 1 to help calculate average times per age group
q4_step2 = q4_step1.mapValues(lambda x: (x.total_seconds(), 1))
q4_step2.take(3)
Out[104]:
[('35-44', (2079.0, 1)), ('18-34', (330.0, 1)), ('18-34', (1552.0, 1))]
In [105]:
# Reduce by key, summing times and counts
q4_step3 = q4_step2.reduceByKey(lambda a, b: ((a[0]+b[0]), a[1]+b[1]))
q4_step3.take(3)
Out[105]:
[('35-44', (163918692.0, 168088)),
 ('18-34', (369201406.0, 393168)),
 ('45-54', (69205671.0, 78139))]
In [106]:
# Calculate average rental duration per age group
q4_step4 = q4_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q4_step4.take(3)
Out[106]:
[('35-44', (975.1956832135548, 168088)),
 ('18-34', (939.0423584828877, 393168)),
 ('45-54', (885.6738760414133, 78139))]
In [114]:
# Then order by number of rentals
q4_step5 = q4_step4.top(6, key=lambda x: x[1][1])
q4_step5
Out[114]:
[('18-34', (939.0423584828877, 393168)),
 ('35-44', (975.1956832135548, 168088)),
 ('45-54', (885.6738760414133, 78139)),
 ('55-64', (859.0278830083565, 35900)),
 ('65+', (874.7642024951026, 9699)),
 ('00-17', (786.5348837209302, 1333))]
In [134]:
# Print results
print('Number and Duration of Rentals per Age Group')
print()
for entry in q4_step5:
    print(f'Age Group: {entry[0]:>5}  |  Number of Rentals: {int(entry[1][1]):>6}  |  Average Duration: {entry[1][0]/60:5.2f} minutes') 
Number and Duration of Rentals per Age Group

Age Group: 18-34  |  Number of Rentals: 393168  |  Average Duration: 15.65 minutes
Age Group: 35-44  |  Number of Rentals: 168088  |  Average Duration: 16.25 minutes
Age Group: 45-54  |  Number of Rentals:  78139  |  Average Duration: 14.76 minutes
Age Group: 55-64  |  Number of Rentals:  35900  |  Average Duration: 14.32 minutes
Age Group:   65+  |  Number of Rentals:   9699  |  Average Duration: 14.58 minutes
Age Group: 00-17  |  Number of Rentals:   1333  |  Average Duration: 13.11 minutes

5. What are the most rented bicycles?

In [118]:
# For each bike create a start_station/end_station tutple and calculate ride time
q5_step1 = rides_clean.map(lambda x: (x[0], x[6]-x[5]))
q5_step1.take(3)
Out[118]:
[(4357, datetime.timedelta(seconds=2079)),
 (12083, datetime.timedelta(seconds=330)),
 (11562, datetime.timedelta(seconds=1552))]
In [119]:
# Then add a 1 to indicate to help tally the rides of each bike
q5_step2 = q5_step1.mapValues(lambda x: (x.total_seconds(), 1))
q5_step2.take(3)
Out[119]:
[(4357, (2079.0, 1)), (12083, (330.0, 1)), (11562, (1552.0, 1))]
In [120]:
# Reduce by key, adding up travel times and number of rentals
q5_step3 = q5_step2.reduceByKey(lambda x, y: ((x[0]+y[0]), x[1]+y[1]))
q5_step3.take(3)
Out[120]:
[(11562, (147542.0, 138)), (10206, (130567.0, 171)), (8458, (119851.0, 147))]
In [126]:
# Extract top 5
q5_step4 = q5_step3.top(5, lambda x: x[1][1])
q5_step4
Out[126]:
[(10771, (91389.0, 217)),
 (10810, (167146.0, 208)),
 (7854, (166969.0, 193)),
 (8463, (197323.0, 190)),
 (4013, (158476.0, 189))]
In [135]:
# Print results
print('Top 5 Bicycles:')
print()
for entry in q5_step4:
    print(f'Bicycle ID: {int(entry[0]):>5}  |  Number of Rentals: {int(entry[1][1]):3d}  |  Total Usage Time: {int(entry[1][0])/60:5.2f} minutes')
Top 5 Bicycles:

Bicycle ID: 10771  |  Number of Rentals: 217  |  Total Usage Time: 1523.15 minutes
Bicycle ID: 10810  |  Number of Rentals: 208  |  Total Usage Time: 2785.77 minutes
Bicycle ID:  7854  |  Number of Rentals: 193  |  Total Usage Time: 2782.82 minutes
Bicycle ID:  8463  |  Number of Rentals: 190  |  Total Usage Time: 3288.72 minutes
Bicycle ID:  4013  |  Number of Rentals: 189  |  Total Usage Time: 2641.27 minutes

End

Matheus Schmitz

LinkedIn

Github Portfolio