Matheus Schmitz
LinkedIn
Github brortfolio
import warnings
warnings.simplefilter('ignore')
import sys
from pyspark import SparkContext, SparkConf
import time
import xgboost as xgb
import numpy as np
import json
import gc
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
#from textblob import TextBlob
#from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
#from sklearn.impute import IterativeImputer
#import sklearn.ensemble
# Track time taken
start_time = time.time()
# Initialize spark
import findspark
findspark.init()
sc = SparkContext.getOrCreate(SparkConf().set("spark.executor.memory", "4g").set("spark.driver.memory", "4g"))
sc.setLogLevel("ERROR")
# Get user inputs
folder_path = "data/"
test_file_name = "data/yelp_val.csv"
output_file_name = "predictions.csv"
print(f'Loading Data...')
stage_time = time.time()
# Read the CSV skipping its header, and reshape it as ((user, bizz), rating)
trainRDD = sc.textFile(folder_path+'yelp_train.csv')
trainHeader = trainRDD.first()
trainRDD = trainRDD.filter(lambda row: row != trainHeader).sample(False, 0.00025).map(lambda row: row.split(',')).map(lambda row: ((row[0],row[1]), float(row[2]))).persist() #.sample(False, 0.50)
validRDD = sc.textFile(test_file_name)
validHeader = validRDD.first()
validRDD = validRDD.filter(lambda row: row != validHeader).sample(False, 0.00050).map(lambda row: row.split(',')).map(lambda row: ((row[0],row[1]), float(row[2]))).persist() #.sample(False, 0.50) #,float(row[2])
print(f'Loading Data: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Loading Data... Loading Data: Stage Time: 9 seconds. Total Time: 31 seconds.
print(f'Generating ID Encodings...')
stage_time = time.time()
# Merge RDDs to get all IDs
mergedRDD = sc.union([trainRDD,validRDD])
# Get distinct users and businesses (over train and valid datasets)
distinct_user = mergedRDD.map(lambda row: row[0][0]).distinct().sortBy(lambda user: user).collect()
distinct_bizz = mergedRDD.map(lambda row: row[0][1]).distinct().sortBy(lambda bizz: bizz).collect()
# Convert names to IDs (to optimize memory usage when holding the values)
user_to_encoding, encoding_to_user = {}, {}
for encoding, real_id in enumerate(distinct_user):
user_to_encoding[real_id] = encoding
#encoding_to_user[encoding] = real_id
bizz_to_encoding, encoding_to_bizz = {}, {}
for encoding, real_id in enumerate(distinct_bizz):
bizz_to_encoding[real_id] = encoding
#encoding_to_bizz[encoding] = real_id
# Use the IDs to encode the RDD, which reduces memory requirements when holding itemsets, and keep the shape as ((user, bizz), rating)
trainRDD_enc = trainRDD.map(lambda x: ((user_to_encoding[x[0][0]], bizz_to_encoding[x[0][1]]), x[1])).persist()
validRDD_enc = validRDD.map(lambda x: ((user_to_encoding[x[0][0]], bizz_to_encoding[x[0][1]]), x[1])).persist()
# Memory management
trainRDD.unpersist()
del trainRDD, trainHeader, validHeader, mergedRDD
gc.collect()
print(f'Generating ID Encodings: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Generating ID Encodings... Generating ID Encodings: Stage Time: 52 seconds. Total Time: 84 seconds.
print(f'Generating Average Rating Features...')
stage_time = time.time()
# Calculate average ratings
user_avg_rating = trainRDD_enc.map(lambda x: (x[0][0], [x[1]])).reduceByKey(lambda a, b: a + b).map(lambda row: (row[0], sum(row[1]) / len(row[1]))).collectAsMap()
bizz_avg_rating = trainRDD_enc.map(lambda x: (x[0][1], [x[1]])).reduceByKey(lambda a, b: a + b).map(lambda row: (row[0], sum(row[1]) / len(row[1]))).collectAsMap()
print(f'Generating Average Rating Features: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Generating Average Rating Features... Generating Average Rating Features: Stage Time: 12 seconds. Total Time: 96 seconds.
def item_based_CF(user,
bizz,
user_avg_rating,
bizz_avg_rating,
user_bizz_rating_dict,
bizz_user_rating_dict):
### Ensure no errors in case a user and/or business doesn't have an average rating score ###
# If both user and business have missing ratings, return the best guess, aka 3
if user not in user_avg_rating and bizz not in bizz_avg_rating:
return ((user, bizz), 3.75) # Based on real world knowledge I know that the overall average rating is somewhere between 3.5 and 4.0
# If only the business has a missing value, we still cannot calculate similarity, so return the average for the associated user
elif bizz not in bizz_avg_rating:
return ((user, bizz), user_avg_rating[user])
# If only the user has a missing value, we still cannot calculate similarity, so return the average for the associated business
elif user not in user_avg_rating:
return ((user, bizz), bizz_avg_rating[bizz])
# If both user and business have ratings, proceed to calculating similarities
similarities = list()
# For each business rated by the current user, calculate the similarity between the current business and the comparison business
bizz_rating_dict = user_bizz_rating_dict[user]
for encoding in range(len(bizz_rating_dict)):
pearson_corr = item_item_similarity(bizz, bizz_rating_dict[encoding][0], bizz_user_rating_dict)
# Skip similarities of 0 to gain performenace
if pearson_corr == 0:
continue
similarities.append((encoding, pearson_corr))
# Calculate the person correlation to make a weighted prediction
N = 0
D = 0
for (encoding, pearson_corr) in similarities:
bizz_rating_tuple = bizz_rating_dict[encoding]
business = bizz_rating_tuple[0]
rating = bizz_rating_tuple[1]
business_avg_rating = bizz_avg_rating[business]
N += (rating - business_avg_rating) * pearson_corr
D += abs(pearson_corr)
prediction = float(bizz_avg_rating[bizz] + N/D) if N != 0 else 3.75 # Based on real world knowledge I know that the overall average rating is somewhere between 3.5 and 4.0
return ((user, bizz), prediction)
def item_item_similarity(curr_bizz,
comp_bizz,
bizz_user_rating_dict):
# For each business get all pairs of user/rating
curr_bizz_ratings = bizz_user_rating_dict[curr_bizz]
comp_bizz_ratings = bizz_user_rating_dict[comp_bizz]
# Get co-rated users (those who rated both businesses)
corated_users = set(curr_bizz_ratings.keys()).intersection(set(comp_bizz_ratings.keys()))
# If there are no co-rated users, its impossible to calculate similarity, so return a guess
if len(corated_users) == 0:
return 0.5
# Calculate the average rating given to the businesses by the co-rated users
curr_bizz_total = 0
comp_bizz_total = 0
count = 0
for user in corated_users:
curr_bizz_total += curr_bizz_ratings[user]
comp_bizz_total += comp_bizz_ratings[user]
count += 1
curr_bizz_avg = curr_bizz_total/count
comp_bizz_avg = comp_bizz_total/count
# Calculate the pearson correlation
curr_x_comp_total = 0
curr_norm_square = 0
comp_norm_square = 0
for user in corated_users:
curr_x_comp_total += ((curr_bizz_ratings[user] - curr_bizz_avg) * (comp_bizz_ratings[user] - comp_bizz_avg))
curr_norm_square += (curr_bizz_ratings[user] - curr_bizz_avg)**2
comp_norm_square += (comp_bizz_ratings[user] - comp_bizz_avg)**2
# Get the Pearson Correlation (Of guess a correlation if we cannot calculate the correlation for a given pair)
pearson_corr = curr_x_comp_total/((curr_norm_square**0.5) * (comp_norm_square**0.5)) if curr_x_comp_total != 0 else 0.5
return pearson_corr
def merge_dicts(dict1, dict2):
# If the first element is already a dict...
if type(dict1) == dict:
# Then append the second element
dict1.update(dict2)
return dict1
# If it is not (because the reducer is comparing the null item to the first item)...
else:
# Then return the first item
return dict2
print(f'Generating Item-Item CF Features...')
stage_time = time.time()
# For each user/bizz, get a dict with all related bizz/user and the associated rating
user_bizz_rating_dict = trainRDD_enc.map(lambda x: (x[0][0], [(x[0][1], x[1])])).reduceByKey(lambda dict1, dict2: dict1 + dict2).collectAsMap()
bizz_user_rating_dict = trainRDD_enc.map(lambda x: (x[0][1], {x[0][0]:x[1]})).reduceByKey(lambda dict1, dict2: merge_dicts(dict1, dict2)).collectAsMap()
# Generate augmented features
item_CF_feature_train = trainRDD_enc.map(lambda row: item_based_CF(row[0][0],
row[0][1],
user_avg_rating,
bizz_avg_rating,
user_bizz_rating_dict,
bizz_user_rating_dict)).collectAsMap()
item_CF_feature_test = validRDD_enc.map(lambda row: item_based_CF(row[0][0],
row[0][1],
user_avg_rating,
bizz_avg_rating,
user_bizz_rating_dict,
bizz_user_rating_dict)).collectAsMap()
# Memory management
del user_bizz_rating_dict, bizz_user_rating_dict
gc.collect()
print(f'Generating Item-Item CF Features: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Generating Item-Item CF Features... Generating Item-Item CF Features: Stage Time: 14 seconds. Total Time: 111 seconds.
# The categories from the Yelp dataset are available at: https://blog.yelp.com/2018/01/yelp_category_list
yelp_categories = ["Active Life", "Arts & Entertainment", "Automotive", "Beauty & Spas", "Education", "Event Planning & Services",
"Financial Services", "Food", "Health & Medical", "Home Services", "Hotels & Travel", "Local Flavor", "Local Services", "Mass Media",
"Nightlife", "Pets", "Professional Services", "Public Services & Government", "Real Estate", "Religious Organizations", "Restaurants", "Shopping"]
print(f'Extracting User & Business Complimentary Features...')
stage_time = time.time()
# Read the user and business jsons, and load the features
user_features = sc.textFile(folder_path+'user.json').map(lambda row: json.loads(row)).filter(lambda row: row['user_id'] in distinct_user).map(lambda row: (user_to_encoding[row['user_id']], np.array([row['average_stars'], row['review_count'], int(row['yelping_since'][2:4]), row['useful'], row['funny'], row['cool'], row['fans'], len(row['friends']), len(row['elite'])], dtype=np.float16))).collectAsMap()
bizz_features = sc.textFile(folder_path+'business.json').map(lambda row: json.loads(row)).filter(lambda row: row['business_id'] in distinct_bizz).map(lambda row: (bizz_to_encoding[row['business_id']], [row['stars'], row['review_count'], row['is_open'], row['latitude'], row['longitude'], row['categories']])).collectAsMap()
# Clean the business categories keeping only those words relevant for generating features
for key, value in bizz_features.items():
try:
bizz_features[key][5] = ' '.join([category for category in bizz_features[key][5].split(', ') if category in yelp_categories])
except:
bizz_features[key][5] = ' '
print(f'Extracting User & Business Complimentary Features: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Extracting User & Business Complimentary Features... Extracting User & Business Complimentary Features: Stage Time: 80 seconds. Total Time: 192 seconds.
print(f'Training Latent Semantic Analysis Pipeline on Tips...')
stage_time = time.time()
all_tip_texts = sc.textFile(folder_path+'tip.json').map(lambda row: json.loads(row.replace('\\n', ''))).map(lambda row: row['text']).collect()
# Train the Tfidftips_vectorizer and TruncatedSVD (the LSA pipeline)
tips_vectorizer = TfidfVectorizer(ngram_range=(1, 2), max_features=9000, stop_words=None)
tips_svd_tf_idf = TruncatedSVD(n_components=30)
tips_LSA_pipeline = Pipeline([('tfidf', tips_vectorizer),
('svd', tips_svd_tf_idf)])
# Train the LSA model
tips_LSA_pipeline.fit(all_tip_texts)
# Memory management
del all_tip_texts
gc.collect()
print(f'Training Latent Semantic Analysis Pipeline on Tips: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
print(f'tips_LSA_pipeline size: {len(tips_LSA_pipeline["tfidf"].get_feature_names())} -> {tips_LSA_pipeline["svd"].components_.shape[0]}')
Training Latent Semantic Analysis Pipeline on Tips... Training Latent Semantic Analysis Pipeline on Tips: Stage Time: 118 seconds. Total Time: 310 seconds. tips_LSA_pipeline size: 60000 -> 30
print(f'Extracting Tips Features...')
stage_time = time.time()
VADER_sentiment_analyzer = SentimentIntensityAnalyzer()
# Extract tips Features with User/Businesses as keys
tips_features = sc.textFile(folder_path+'tip.json') \
.map(lambda row: json.loads(row.replace('\\n', ''))) \
.filter(lambda row: row['user_id'] in distinct_user) \
.filter(lambda row: row['business_id'] in distinct_bizz) \
.map(lambda row: ((user_to_encoding[row['user_id']], bizz_to_encoding[row['business_id']]), (np.array(list(VADER_sentiment_analyzer.polarity_scores(row['text']).values()), dtype=np.float16),
tips_LSA_pipeline.transform([row['text']])[0].astype(np.float32),
np.array([int(row['date'][2:4])], dtype=np.int8)))) \
.collectAsMap()
print(f'Extracting Tips Features: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Extracting Tips Features... Extracting Tips Features: Stage Time: 91 seconds. Total Time: 401 seconds.
def get_augmented_features_train(row):
# Hyperparameters -- Apply dropout to a certain share of features (set by DROPOUT_RATE). For all non-dropped features, apply additive/multiplicative noise of varying levels.
NOISE_HIGH = 1.75 # 1.5 # 2.0
NOISE_MEDIUM = 0.033 # 0.033 # 0.5 # 0.05
NOISE_LOW = 0.033 # 0.1 # 0.025
### Averages ###
try:
user_avg_X = user_avg_rating[row[0][0]] + np.random.normal(loc=0, scale=NOISE_HIGH)
user_avg_X = np.array([min(max(user_avg_X, 1.0), 5.0)], dtype=np.float32) # Prevent noisy features from going over the natural limits
except:
user_avg_X = np.array([np.NaN], dtype=np.float32)
try:
bizz_avg_X = bizz_avg_rating[row[0][1]] + np.random.normal(loc=0, scale=NOISE_HIGH)
bizz_avg_X = np.array([min(max(bizz_avg_X, 1.0), 5.0)], dtype=np.float32) # Prevent noisy features from going over the natural limits
except:
bizz_avg_X = np.array([np.NaN], dtype=np.float32)
### User & Business Complimentary Features ###
try:
user_X = user_features[row[0][0]]
user_X = np.array([min(max(user_X[0] + np.random.normal(loc=0, scale=NOISE_MEDIUM), 1.0), 5.0), user_X[1], user_X[2], user_X[3], user_X[4], user_X[5], user_X[6], user_X[7], user_X[8]], dtype=np.float32)
except:
user_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
bizz_X = bizz_features[row[0][1]][0:5]
bizz_X = np.array([min(max(bizz_X[0] + np.random.normal(loc=0, scale=NOISE_MEDIUM), 1.0), 5.0), bizz_X[1], bizz_X[2], bizz_X[3], bizz_X[4]], dtype=np.float32)
except:
bizz_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
categories_X = np.array([cat in bizz_features[row[0][1]][5] for cat in yelp_categories], dtype=np.float32)
except:
categories_X = np.array([np.NaN for _ in range(len(yelp_categories))], dtype=np.float32)
### Iem & User CF Features ###
try:
item_CF_X = item_CF_feature_train[row[0][0], row[0][1]] + np.random.normal(loc=0, scale=NOISE_HIGH)
item_CF_X = np.array([min(max(item_CF_X, 1.0), 5.0)], dtype=np.float32) # Prevent noisy features from going over the natural limits
except:
item_CF_X = np.array([np.NaN], dtype=np.float32)
### Tips Textual Features ###
try:
tip_sentiment_X = tips_features[row[0][0], row[0][1]][0]
tip_sentiment_X = np.array([min(max(feat + np.random.normal(loc=0, scale=NOISE_LOW), -1.0), 1.0) for feat in tip_sentiment_X.tolist()], dtype=np.float32)
except:
tip_sentiment_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
tip_LSA_X = tips_features[row[0][0], row[0][1]][1]
except:
tip_LSA_X = np.array([np.NaN for _ in range(tips_LSA_pipeline['svd'].components_.shape[0])], dtype=np.float32)
try:
tip_date_X = tips_features[row[0][0], row[0][1]][2]
except:
tip_date_X = np.array([np.NaN], dtype=np.float32)
### Interaction Terms ###
try:
interaction_X = np.array([user_X[0]*user_X[1], user_X[0]*user_X[2], user_X[0]*user_X[7], user_X[0]*bizz_X[0], user_X[0]*bizz_X[1],
user_X[1]*user_X[2], user_X[1]*user_X[7], user_X[1]*bizz_X[0], user_X[1]*bizz_X[1],
user_X[2]*user_X[7], user_X[2]*bizz_X[0], user_X[2]*bizz_X[1],
user_X[7]*bizz_X[0], user_X[7]*bizz_X[1],
bizz_X[0]*bizz_X[1]], dtype=np.float32)
except:
interaction_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
X = np.concatenate([user_avg_X, bizz_avg_X, user_X, bizz_X, categories_X, item_CF_X, tip_sentiment_X, tip_LSA_X, tip_date_X, interaction_X]).astype(np.float32)
'''
if row[0][0]%100000 == 0:
print('Train sample: ', X)
'''
return X
def get_augmented_features_test(row):
### Averages ###
try:
user_avg_X = np.array([user_avg_rating[row[0][0]]], dtype=np.float32)
except:
user_avg_X = np.array([np.NaN], dtype=np.float32)
try:
bizz_avg_X = np.array([bizz_avg_rating[row[0][1]]], dtype=np.float32)
except:
bizz_avg_X = np.array([np.NaN], dtype=np.float32)
### User & Business Complimentary Features ###
try:
user_X = user_features[row[0][0]]
user_X = np.array([user_X[0], user_X[1], user_X[2], user_X[3], user_X[4], user_X[5], user_X[6], user_X[7], user_X[8]], dtype=np.float32)
except:
user_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
bizz_X = np.array(bizz_features[row[0][1]][0:5], dtype=np.float32)
bizz_X = np.array([bizz_X[0], bizz_X[1], bizz_X[2], bizz_X[3], bizz_X[4]], dtype=np.float32)
except:
bizz_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
categories_X = np.array([cat in bizz_features[row[0][1]][5] for cat in yelp_categories], dtype=np.float32)
except:
categories_X = np.array([np.NaN for _ in range(len(yelp_categories))], dtype=np.float32)
### Iem & User CF Features ###
try:
item_CF_X = item_CF_feature_test[row[0][0], row[0][1]]
item_CF_X = np.array([item_CF_X], dtype=np.float32)
#item_CF_X = [min(max(item_CF_X, 1.0), 5.0)]
except:
item_CF_X = np.array([np.NaN], dtype=np.float32)
### Tips Textual Features ###
try:
tip_sentiment_X = tips_features[row[0][0], row[0][1]][0]
except:
tip_sentiment_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
try:
tip_LSA_X = tips_features[row[0][0], row[0][1]][1]
except:
tip_LSA_X = np.array([np.NaN for _ in range(tips_LSA_pipeline['svd'].components_.shape[0])], dtype=np.float32)
try:
tip_date_X = tips_features[row[0][0], row[0][1]][2]
except:
tip_date_X = np.array([np.NaN], dtype=np.float32)
### Interaction Terms ###
try:
interaction_X = np.array([user_X[0]*user_X[1], user_X[0]*user_X[2], user_X[0]*user_X[7], user_X[0]*bizz_X[0], user_X[0]*bizz_X[1],
user_X[1]*user_X[2], user_X[1]*user_X[7], user_X[1]*bizz_X[0], user_X[1]*bizz_X[1],
user_X[2]*user_X[7], user_X[2]*bizz_X[0], user_X[2]*bizz_X[1],
user_X[7]*bizz_X[0], user_X[7]*bizz_X[1],
bizz_X[0]*bizz_X[1]], dtype=np.float32)
except:
interaction_X = np.array([np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN], dtype=np.float32)
X = np.concatenate([user_avg_X, bizz_avg_X, user_X, bizz_X, categories_X, item_CF_X, tip_sentiment_X, tip_LSA_X, tip_date_X, interaction_X]).astype(np.float32)
'''
if row[0][0]%10000 == 0:
print('Test sample: ', X)
'''
return X
print(f'Building Datasets...')
stage_time = time.time()
# Feature Engineering
#mice_imputer = IterativeImputer()
#polynomial_features = PolynomialFeatures(degree=2)
#svd_all_features = TruncatedSVD(n_components=25)
#standard_scaler = StandardScaler()
# Get train data features to fit the feature transformers
#X_train = trainRDD_enc.map(lambda row: get_augmented_features_train(row)).collect()
#X_train = np.array(X_train)
#X_train = mice_imputer.fit_transform(X_train)
#X_train = polynomial_features.fit_transform(X_train)
#X_train = svd_all_features.fit_transform(X_train)
#X_train = standard_scaler.fit_transform(X_train)
# Train labels
y_train = trainRDD_enc.map(lambda row: row[1]).collect()
y_train = np.array(y_train)
# Get test data features
X_test = validRDD_enc.map(lambda row: get_augmented_features_test(row)).collect()
X_test = np.array(X_test)
#X_test = mice_imputer.transform(X_test)
#X_test = polynomial_features.transform(X_test)
#X_test = svd_all_features.transform(X_test)
#X_test = standard_scaler.transform(X_test)
# Test labels
y_test = validRDD_enc.map(lambda row: row[1]).collect()
y_test = np.array(y_test)
print(f'Building Datasets: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Building Datasets... Building Datasets: Stage Time: 27 seconds. Total Time: 429 seconds.
print(f'Starting Training...')
stage_time = time.time()
# Instantiate model
model = xgb.XGBRegressor(n_jobs = -1,
n_estimators = 10,
learning_rate = 0.05,
num_parallel_tree = 1,
booster = 'gbtree',
#booster = 'dart',
#rate_drop = 0.25,
eval_metric = 'rmse',
min_child_weight = 0,
min_split_loss = 0,
subsample = 0.5,
colsample_bytree = 0.5,
max_depth = 3,
reg_lambda = 0.0,
reg_alpha = 0.0)
# Train using a generator for the training data, to run rounds with noisy data
EPOCHS = 5 ##7
first_round = True
for epoch in range(EPOCHS):
print(f' Epoch {epoch+1}/{EPOCHS}...')
epoch_time = time.time()
# Get train data features
X_train = trainRDD_enc.map(lambda row: get_augmented_features_train(row)).collect()
X_train = np.array(X_train)
#X_train = mice_imputer.transform(X_train)
#X_train = polynomial_features.transform(X_train)
#X_train = svd_all_features.transform(X_train)
#X_train = standard_scaler.transform(X_train)
# Train
if first_round:
model.fit(X_train, y_train, eval_set = [(X_train, y_train), (X_test, y_test)], eval_metric = 'rmse')
first_round = False
else:
model.fit(X_train, y_train, xgb_model = model.get_booster(), eval_set = [(X_train, y_train), (X_test, y_test)], eval_metric = 'rmse')
print(f' Epoch {epoch+1}/{EPOCHS}. Epoch Time: {time.time() - epoch_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
# Memory management
del X_train
gc.collect()
# Memory management
del user_avg_rating, bizz_avg_rating, user_features, bizz_features
gc.collect()
print(f'Starting Training: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Starting Training... Epoch 1/5... [0] validation_0-rmse:3.44218 validation_1-rmse:2.79743 [1] validation_0-rmse:3.28469 validation_1-rmse:2.77264 [2] validation_0-rmse:3.12698 validation_1-rmse:2.74939 [3] validation_0-rmse:2.98407 validation_1-rmse:2.60255 [4] validation_0-rmse:2.83566 validation_1-rmse:2.55433 [5] validation_0-rmse:2.70181 validation_1-rmse:2.52887 [6] validation_0-rmse:2.57601 validation_1-rmse:2.41720 [7] validation_0-rmse:2.47990 validation_1-rmse:2.33742 [8] validation_0-rmse:2.36718 validation_1-rmse:2.30936 [9] validation_0-rmse:2.24984 validation_1-rmse:2.23048 Epoch 1/5. Epoch Time: 22 seconds. Total Time: 690 seconds. Epoch 2/5... [0] validation_0-rmse:2.18781 validation_1-rmse:2.14199 [1] validation_0-rmse:2.07776 validation_1-rmse:2.11485 [2] validation_0-rmse:1.99156 validation_1-rmse:2.09409 [3] validation_0-rmse:1.91508 validation_1-rmse:2.00063 [4] validation_0-rmse:1.84316 validation_1-rmse:1.94368 [5] validation_0-rmse:1.75813 validation_1-rmse:1.92054 [6] validation_0-rmse:1.67792 validation_1-rmse:1.88253 [7] validation_0-rmse:1.60476 validation_1-rmse:1.85764 [8] validation_0-rmse:1.54234 validation_1-rmse:1.87476 [9] validation_0-rmse:1.47035 validation_1-rmse:1.84415 Epoch 2/5. Epoch Time: 21 seconds. Total Time: 711 seconds. Epoch 3/5... [0] validation_0-rmse:1.51348 validation_1-rmse:1.74501 [1] validation_0-rmse:1.44240 validation_1-rmse:1.68336 [2] validation_0-rmse:1.39893 validation_1-rmse:1.67411 [3] validation_0-rmse:1.35412 validation_1-rmse:1.69641 [4] validation_0-rmse:1.30728 validation_1-rmse:1.67872 [5] validation_0-rmse:1.25581 validation_1-rmse:1.68151 [6] validation_0-rmse:1.20650 validation_1-rmse:1.63677 [7] validation_0-rmse:1.15982 validation_1-rmse:1.63718 [8] validation_0-rmse:1.12154 validation_1-rmse:1.61798 [9] validation_0-rmse:1.09052 validation_1-rmse:1.59600 Epoch 3/5. Epoch Time: 22 seconds. Total Time: 734 seconds. Epoch 4/5... [0] validation_0-rmse:1.11068 validation_1-rmse:1.56592 [1] validation_0-rmse:1.06668 validation_1-rmse:1.52522 [2] validation_0-rmse:1.03497 validation_1-rmse:1.47445 [3] validation_0-rmse:1.00842 validation_1-rmse:1.51033 [4] validation_0-rmse:0.97116 validation_1-rmse:1.48743 [5] validation_0-rmse:0.93958 validation_1-rmse:1.49815 [6] validation_0-rmse:0.91196 validation_1-rmse:1.47964 [7] validation_0-rmse:0.87839 validation_1-rmse:1.50109 [8] validation_0-rmse:0.84628 validation_1-rmse:1.46114 [9] validation_0-rmse:0.81741 validation_1-rmse:1.44266 Epoch 4/5. Epoch Time: 22 seconds. Total Time: 756 seconds. Epoch 5/5... [0] validation_0-rmse:0.92617 validation_1-rmse:1.42121 [1] validation_0-rmse:0.89768 validation_1-rmse:1.41057 [2] validation_0-rmse:0.86774 validation_1-rmse:1.40754 [3] validation_0-rmse:0.84519 validation_1-rmse:1.39426 [4] validation_0-rmse:0.82890 validation_1-rmse:1.38852 [5] validation_0-rmse:0.80279 validation_1-rmse:1.35919 [6] validation_0-rmse:0.78651 validation_1-rmse:1.34387 [7] validation_0-rmse:0.76110 validation_1-rmse:1.34169 [8] validation_0-rmse:0.73359 validation_1-rmse:1.34351 [9] validation_0-rmse:0.70773 validation_1-rmse:1.32964 Epoch 5/5. Epoch Time: 21 seconds. Total Time: 777 seconds. Starting Training: Stage Time: 110 seconds. Total Time: 778 seconds.
print(f'Writing Predictions...')
stage_time = time.time()
# Predict
predictions = model.predict(X_test) #, ntree_limit=75*10)
# Bind predictions to [1, 5]
predictions = [min(max(pred, 1.0), 5.0) for pred in predictions]
# Reverse the ID-Encoding dicts
encoding_to_user = {v: k for k, v in user_to_encoding.items()}
encoding_to_bizz = {v: k for k, v in bizz_to_encoding.items()}
# Output predictions
with open(output_file_name, "w") as fout:
fout.write("user_id, business_id, prediction")
for idx, row in enumerate(validRDD_enc.collect()):
fout.write("\n" + f"{encoding_to_user[row[0][0]]},{encoding_to_bizz[row[0][1]]},"+str(predictions[idx]))
print(f'Writing Predictions: Stage Time: {time.time() - stage_time:.0f} seconds. Total Time: {time.time() - start_time:.0f} seconds.')
Writing Predictions... Writing Predictions: Stage Time: 0 seconds. Total Time: 778 seconds.
print(f'Evaluating Recommender...')
# Join the ground truth with predictions
predictionsRDD = sc.textFile(output_file_name)
predictionsHeader = predictionsRDD.first()
predictionsRDD = predictionsRDD.filter(lambda row: row != predictionsHeader).map(lambda row: row.split(',')).map(lambda row: ((row[0],row[1]), float(row[2]))).persist()
evaluation = validRDD.join(predictionsRDD)
# Report error distribution
delta = evaluation.map(lambda row: abs(row[1][0] - row[1][1]))
delta_0_1 = delta.filter(lambda abs_err: abs_err < 1).count()
delta_1_2 = delta.filter(lambda abs_err: 1 <= abs_err < 2).count()
delta_2_3 = delta.filter(lambda abs_err: 2 <= abs_err < 3).count()
delta_3_4 = delta.filter(lambda abs_err: 3 <= abs_err < 4).count()
delta_4_5 = delta.filter(lambda abs_err: 4 <= abs_err ).count()
print('\n' + 'Error Distribution:')
print(f'>=0 and <1: {delta_0_1}')
print(f'>=1 and <2: {delta_1_2}')
print(f'>=2 and <3: {delta_2_3}')
print(f'>=3 and <4: {delta_3_4}')
print(f'>=4: {delta_4_5}')
# Report RMSE
RMSE = (delta.map(lambda x: x ** 2).mean()) ** 0.5
print('\n' + 'RMSE:')
print(f'{RMSE}')
# Close spark context
sc.stop()
# Measure the total time taken and report it
time_elapsed = time.time() - start_time
print('\n' + f'Duration: {time_elapsed}')
Evaluating Recommender... Error Distribution: >=0 and <1: 4 >=1 and <2: 5 >=2 and <3: 2 >=3 and <4: 0 >=4: 0 RMSE: 1.3296387013984001 Duration: 811.9390385150909
Matheus Schmitz
LinkedIn
Github brortfolio