This project aims to explore a bicycle bike rental dataset through usage of "big data" frameworks such as Spark.
There are five business questions which need to be answered through data exploration, manipulation, filtering and so forth.
Business questions:
# Imports
import time
import argparse
import pyspark
import numpy as np
import pandas as pd
import datetime as dt
from pathlib import Path
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
# Spark Context
# sc = SparkContext()
sc
# Spark Session
spark = SparkSession(sc)
spark
# Load dataset
rides_rdd = sc.textFile('data/bike_rentals.csv', use_unicode=True)
rides_rdd.take(5)
# Function to covert string days and hours into datetime
def convert_date(date, time):
from datetime import datetime
return datetime.strptime(date + ' ' + time, '%d/%m/%Y %H:%M:%S')
# Function to create age range
def convert_age_range(age):
if age <= 17:
return '00-17'
elif age >= 18 and age <= 34:
return '18-34'
elif age >= 35 and age <= 44:
return '35-44'
elif age >= 45 and age <= 54:
return '45-54'
elif age >= 55 and age <= 64:
return '55-64'
else:
return '65+'
# Function to clean data
def clean_data(part_id, list_of_records):
if part_id == 0:
next(list_of_records)
import csv
reader = csv.reader(list_of_records)
for row in reader:
gender = row[0]
age_range = convert_age_range(int(row[1]))
bike_id = int(row[2])
station_start = row[3]
datetime_start = convert_date(row[4], row[5])
station_end = row[6]
datetime_end = convert_date(row[7], row[8])
yield (bike_id, gender, age_range, station_start, station_end, datetime_start, datetime_end)
# Now clean the RDD using the above functions
rides_clean = rides_rdd.mapPartitionsWithIndex(clean_data).cache()
rides_clean.take(3)
# First map values assigning a 1 count each time a bike was rented from a station
q1_step1 = rides_clean.map(lambda x: (x[3], 1))
q1_step1.take(3)
# Then group by key and count the number of rentals
q1_step2 = q1_step1.reduceByKey(lambda a,b: a+b)
q1_step2.take(3)
# Then extract the top 5
q1_step3 = q1_step2.top(5, key=lambda x: x[1])
q1_step3
# Then print results
print('Top 5 Stations:')
print()
for entry in q1_step3:
print(f'Station: {int(entry[0]):3d} | Number of Rentals: {int(entry[1]):5d}')
# For each ride create a start_station/end_station tutple and calculate ride time
q2_step1 = rides_clean.map(lambda x: ((x[3],x[4]), x[6]-x[5]))
q2_step1.take(3)
# Then add a 1 to indicate to help tally the rides between station tuples
q2_step2 = q2_step1.mapValues(lambda x: (x.total_seconds(), 1))
q2_step2.take(3)
# Reduce by key, adding up travel times and number of rentals
q2_step3 = q2_step2.reduceByKey(lambda x, y: ((x[0]+y[0]), x[1]+y[1]))
q2_step3.take(3)
# Divide total travel time per number of rentals
q2_step4 = q2_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q2_step4.take(3)
# Extract top 5
q2_step5 = q2_step4.top(5, lambda x: x[1][1])
q2_step5
# Print results
print('Top 5 Routes:')
print()
for entry in q2_step5:
print(f'Start Station: {int(entry[0][0]):3d} | End Station: {int(entry[0][1]):3d} | Bikes Rented: {int(entry[1][1]):3d} | Average Duration: {int(entry[1][0])/60:5.2f} minutes')
# For each entry take the gender and rental duration
q3_step1 = rides_clean.map(lambda x: (x[1], x[6]-x[5]))
q3_step1.take(3)
# Then add a 1 to help calculate average times per gender
q3_step2 = q3_step1.mapValues(lambda x: (x.total_seconds(), 1))
q3_step2.take(3)
# Reduce by key, summing times and counts
q3_step3 = q3_step2.reduceByKey(lambda a, b: ((a[0]+b[0]), a[1]+b[1]))
q3_step3.take(3)
# Calculate average rental duration per gender
q3_step4 = q3_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q3_step4.take(3)
# Print results
print('Number and Duration of Rentals per Gender')
print()
for entry in q3_step4.collect():
print(f'Gender: {entry[0]} | Number of Rentals: {int(entry[1][1]):6d} | Average Duration: {entry[1][0]/60:5.2f} minutes')
# For each entry take the age group and rental duration
q4_step1 = rides_clean.map(lambda x: (x[2], x[6]-x[5]))
q4_step1.take(3)
# Then add a 1 to help calculate average times per age group
q4_step2 = q4_step1.mapValues(lambda x: (x.total_seconds(), 1))
q4_step2.take(3)
# Reduce by key, summing times and counts
q4_step3 = q4_step2.reduceByKey(lambda a, b: ((a[0]+b[0]), a[1]+b[1]))
q4_step3.take(3)
# Calculate average rental duration per age group
q4_step4 = q4_step3.mapValues(lambda x: (x[0] / x[1], x[1]))
q4_step4.take(3)
# Then order by number of rentals
q4_step5 = q4_step4.top(6, key=lambda x: x[1][1])
q4_step5
# Print results
print('Number and Duration of Rentals per Age Group')
print()
for entry in q4_step5:
print(f'Age Group: {entry[0]:>5} | Number of Rentals: {int(entry[1][1]):>6} | Average Duration: {entry[1][0]/60:5.2f} minutes')
# For each bike create a start_station/end_station tutple and calculate ride time
q5_step1 = rides_clean.map(lambda x: (x[0], x[6]-x[5]))
q5_step1.take(3)
# Then add a 1 to indicate to help tally the rides of each bike
q5_step2 = q5_step1.mapValues(lambda x: (x.total_seconds(), 1))
q5_step2.take(3)
# Reduce by key, adding up travel times and number of rentals
q5_step3 = q5_step2.reduceByKey(lambda x, y: ((x[0]+y[0]), x[1]+y[1]))
q5_step3.take(3)
# Extract top 5
q5_step4 = q5_step3.top(5, lambda x: x[1][1])
q5_step4
# Print results
print('Top 5 Bicycles:')
print()
for entry in q5_step4:
print(f'Bicycle ID: {int(entry[0]):>5} | Number of Rentals: {int(entry[1][1]):3d} | Total Usage Time: {int(entry[1][0])/60:5.2f} minutes')