Matheus Schmitz
LinkedIn
Github Portfolio
Write a Python class, using principles of object-oriented design, to wrap a logistic regression estimator with the following functionality:
1. The ability to fit on training data:
self.fit(X, y)
I/O | Typing | Definiion |
---|---|---|
Input | X : pd.DataFrame | Input features |
Input | y : np.ndarray | Ground truth labels as a numpy array of 0-s and 1-s. |
Output | None |
2. The ability to predict class labels on new data:
self.predict(X)
I/O | Typing | Definiion |
---|---|---|
Input | X : pd.DataFrame | Input features |
Output | np.ndarray | Ex: np.array([1, 0, 1]) |
3. The ability to predict the probability of each label:
self.predict_proba(X)
I/O | Typing | Definiion |
---|---|---|
Input | X : pd.DataFrame | Input features |
Output | np.ndarray | Ex: np.array([[0.2, 0.8], [0.9, 0.1], [0.5, 0.5]]) |
4. The ability to get the value of the following metrics: F1-score, LogLoss:
self.evaluate(X, y)
I/O | Typing | Definiion |
---|---|---|
Input | X : pd.DataFrame | Input features |
Input | y : np.ndarray | Ground truth labels as a numpy array of 0-s and 1-s. |
Output | dict | Ex: {'f1_score': 0.3, 'logloss': 0.7} |
5. The ability to run K-fold cross validation to choose the best parameters:
Note: Output the average scores across all CV validation partitions and best parameters
self.tune_parameters(X, y)
I/O | Typing | Definiion |
---|---|---|
Input | X : pd.DataFrame | Input features |
Input | y : np.ndarray | Ground truth labels as a numpy array of 0-s and 1-s. |
Output | dict | Ex: {'tol': 0.02, 'fit_intercept': False, 'solver': 'sag', 'scores': {'f1_score': 0.3, 'logloss': 0.7}} |
Please write the unit tests to check whether your model:
To test your implementation, please use the attached dataset sample_dataset.csv
. It contains a binary classification target for predicting loan defaults, where the column is_bad
contains the target variable and all other columns contain features. Make sure to explore the dataset to think of feature corner cases which your model must handle.
Assume the data can have numeric and categorical variables. Both your training and prediction functions will take in a two-dimensional pandas DataFrame with a mixture of categorical and numeric variables.
import numpy as np
import pandas as pd
pd.options.display.max_columns = 25
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, log_loss
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
df = pd.read_csv('sample_dataset.csv')
X = df.drop(columns='is_bad')
y = np.array(df['is_bad'])
df.shape
(10000, 24)
df.head(3)
Id | is_bad | emp_length | home_ownership | annual_inc | verification_status | pymnt_plan | purpose_cat | zip_code | addr_state | debt_to_income | delinq_2yrs | inq_last_6mths | mths_since_last_delinq | mths_since_last_record | open_acc | pub_rec | revol_bal | revol_util | total_acc | initial_list_status | collections_12_mths_ex_med | mths_since_last_major_derog | policy_code | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 0 | 10 | MORTGAGE | 50000.0 | not verified | n | medical | 766xx | TX | 10.87 | 0.0 | 0.0 | NaN | NaN | 15.0 | 0.0 | 12087 | 12.1 | 44.0 | f | 0.0 | 1 | PC4 |
1 | 2 | 0 | 1 | RENT | 39216.0 | not verified | n | debt consolidation | 660xx | KS | 9.15 | 0.0 | 2.0 | NaN | NaN | 4.0 | 0.0 | 10114 | 64.0 | 5.0 | f | 0.0 | 2 | PC1 |
2 | 3 | 0 | 4 | RENT | 65000.0 | not verified | n | credit card | 916xx | CA | 11.24 | 0.0 | 0.0 | NaN | NaN | 4.0 | 0.0 | 81 | 0.6 | 8.0 | f | 0.0 | 3 | PC4 |
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 10000 entries, 0 to 9999 Data columns (total 24 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 Id 10000 non-null int64 1 is_bad 10000 non-null int64 2 emp_length 10000 non-null object 3 home_ownership 10000 non-null object 4 annual_inc 9999 non-null float64 5 verification_status 10000 non-null object 6 pymnt_plan 10000 non-null object 7 purpose_cat 10000 non-null object 8 zip_code 10000 non-null object 9 addr_state 10000 non-null object 10 debt_to_income 10000 non-null float64 11 delinq_2yrs 9995 non-null float64 12 inq_last_6mths 9995 non-null float64 13 mths_since_last_delinq 3684 non-null float64 14 mths_since_last_record 840 non-null float64 15 open_acc 9995 non-null float64 16 pub_rec 9995 non-null float64 17 revol_bal 10000 non-null int64 18 revol_util 9974 non-null float64 19 total_acc 9995 non-null float64 20 initial_list_status 10000 non-null object 21 collections_12_mths_ex_med 9968 non-null float64 22 mths_since_last_major_derog 10000 non-null int64 23 policy_code 10000 non-null object dtypes: float64(11), int64(4), object(9) memory usage: 1.8+ MB
class OOPModel():
'''Wrapper class for Logistic Regression with built-in preprocessing'''
def __init__(self, nan_token: str = 'na'):
super(OOPModel, self).__init__()
self.pipeline = Pipeline([("scaler", MinMaxScaler()),
("model", LogisticRegression())])
self.__trained = False
self.nan_token = nan_token
# Logistic Regression parameters
self.pipeline['model'].penalty = 'l2'
self.pipeline['model'].tol = 10e-5
self.pipeline['model'].C = 10e-1
self.pipeline['model'].class_weight = 'balanced'
self.pipeline['model'].solver = 'saga'
self.pipeline['model'].max_iter = 10e3
self.pipeline['model'].n_jobs = -1
def _preprocess_X_train(self, X: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
self.preprocessing = {} # Store rules on how to preprocess data at inference time
unique_cols = X.columns[X.nunique() == X.shape[0]] # Get columns with unique identifiers and remove them
invariant_cols = X.columns[X.nunique() == 1] # Get columns without feature variance and remove them
removal_cols = np.append(unique_cols, invariant_cols)
X = X.drop(columns = removal_cols)
for col in removal_cols:
self.preprocessing[col] = 'delete'
# Rule for converting strings to missing values
X = X.replace(self.nan_token, np.NaN)
for column in X.columns:
# Convert any numeric columns loaded as strings to float and fill missing values with median
try:
X[column] = X[column].astype(float)
X[column] = X[column].fillna(X[column].median())
self.preprocessing[column] = 'numeric'
# Else column is categorical, one-hot encode it
except ValueError:
x = pd.get_dummies(X[column])
X = X.drop(columns=column)
# If the ratio of unique categories to samples is too large, drop the feature
if x.shape[1] > np.sqrt(X.shape[0]):
self.preprocessing[column] = 'delete'
else:
self.preprocessing[column] = x.columns.tolist()
x.columns = [str(column)+'_'+str(col) for col in x.columns] # rename the dummy columns prepending the variable name
X = pd.concat([X, x], axis='columns')
return X.sort_index(axis='columns')
def _preprocess_X_test(self, X: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
# Rule for converting strings to missing values
X = X.replace(self.nan_token, np.NaN)
# Preprocess each column according to the rules defined during training
for column in X.columns:
# Delete columns identified as unique during training
if self.preprocessing[column] == 'delete':
X = X.drop(columns=column)
# Float columns become float, and anything that cannot be converted is replaced by median
elif self.preprocessing[column] == 'numeric':
X[column] = pd.to_numeric(X[column], errors='coerce') # coerce errors to NaN
X[column] = X[column].fillna(X[column].median())
# Categorial columns get one-hot encoded
elif type(self.preprocessing[column]) == list:
x = pd.get_dummies(X[column])
x = x.drop(columns=[col for col in x if col not in self.preprocessing[column]]) # Drop any category level not seen in training
x[[col for col in self.preprocessing[column] if col not in x]] = 0 # Ensure all training category levels are present in the test data
x.columns = [str(column)+'_'+str(col) for col in x.columns] # rename the dummy columns prepending the variable name
X = X.drop(columns=column)
X = pd.concat([X, x], axis='columns')
else:
raise ValueError(f"Received an unexpected new column '{column}' which could not be properly handled")
return X.sort_index(axis='columns')
def _check_trained(self) -> None:
if self.__trained == False:
raise Exception("Model not yet trained. Call 'fit' with appropriate arguments before using this estimator.")
def _check_inputs(self, X: pd.core.frame.DataFrame, y: np.ndarray = None) -> None:
if not type(X) == pd.core.frame.DataFrame: raise TypeError("X should be a pandas.core.frame.DataFrame")
if self.__trained == True and not len(X.columns) == len(self.__features): raise ValueError("X should have the same columns as the training data passed to 'fit'")
if self.__trained == True and not all(X.columns == self.__features): raise ValueError("X should have the same columns as the training data passed to 'fit'")
if y is not None:
if not type(y) == np.ndarray: raise TypeError("y should be a numpy.ndarray")
if not np.unique(y).tolist() == [0, 1]: raise ValueError("y must contain negative (0) and positive (1) labels, and nothing else")
if not X.shape[0] == y.shape[0]: raise ValueError(f"X and y must have the same number of samples, got {X.shape[0]} X samples and {y.shape[0]} y samples")
def fit(self, X: pd.core.frame.DataFrame, y: np.ndarray) -> None:
self._check_inputs(X, y)
self.__features = X.columns
X = self._preprocess_X_train(X)
self.pipeline.fit(X, y)
self.__trained = True
def predict(self, X: pd.core.frame.DataFrame) -> np.ndarray:
self._check_trained()
self._check_inputs(X)
X = self._preprocess_X_test(X)
predictions = self.pipeline.predict(X)
return predictions
def predict_proba(self, X: pd.core.frame.DataFrame) -> np.ndarray:
self._check_trained()
self._check_inputs(X)
X = self._preprocess_X_test(X)
predictions = self.pipeline.predict_proba(X)
return predictions
def evaluate(self, X: pd.core.frame.DataFrame, y: np.ndarray) -> dict:
self._check_trained()
self._check_inputs(X, y)
X = self._preprocess_X_test(X)
f1score = f1_score(y, self.pipeline.predict(X))
logloss = log_loss(y, self.pipeline.predict_proba(X))
return {'f1_score': f1score, 'log_loss': logloss}
def tune_parameters(self, X: pd.core.frame.DataFrame, y: np.ndarray) -> dict:
self._check_inputs(X, y)
self.__features = X.columns
X = self._preprocess_X_train(X)
y_labels = sorted(np.unique(y))
param_grid = {'model__penalty': ['l1', 'l2'],
'model__tol': np.logspace(-6, -4, 3),
'model__C': np.logspace(-2, 2, 5)}
self.model_gridCV = GridSearchCV(self.pipeline, param_grid, cv=5, n_jobs=-1, scoring=['f1', 'neg_log_loss'], refit='neg_log_loss', verbose=3)
self.model_gridCV.fit(X, y)
self.pipeline = self.model_gridCV.best_estimator_ # Updates the existing model with the optimal model found
self.__trained = True
best_mean_f1 = self.model_gridCV.cv_results_['mean_test_f1'][self.model_gridCV.best_index_]
best_mean_log_loss = self.model_gridCV.cv_results_['mean_test_neg_log_loss'][self.model_gridCV.best_index_] * -1 # invert signal from negative log_loss
return {'penalty': self.model_gridCV.best_params_['model__penalty'],
'tol': self.model_gridCV.best_params_['model__tol'],
'C': self.model_gridCV.best_params_['model__C'],
'scores': {'f1_score': best_mean_f1, 'log_loss': best_mean_log_loss}}
import unittest
class TestAssignment(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
self.model = OOPModel()
self.df = pd.read_csv('sample_dataset.csv')
self.X = self.df.drop(columns='is_bad')
self.y = np.array(self.df['is_bad'])
def tearDown(self):
del self.model, self.df, self.X, self.y
def test_missing_values_fit(self):
for row, column in enumerate(self.X.columns):
self.X.at[row, column] = None
self.model.fit(self.X, self.y)
def test_missing_values_predict(self):
self.model.fit(self.X, self.y)
for row, column in enumerate(self.X.columns):
self.X.at[row, column] = None
predictions = self.model.predict(self.X)
def test_new_category_levels_at_prediction_time(self):
self.model.fit(self.X, self.y)
self.X.at[0, 'policy_code'] = 'new_category_level'
self.X.at[1, 'home_ownership'] = 'another_category_level'
predictions = self.model.predict(self.X)
def test_result_format_fit(self):
self.model.fit(self.X, self.y)
predictions = self.model.predict(self.X)
self.assertIsInstance(predictions, np.ndarray, msg='Output shoud be of numpy.ndarray type')
def test_result_format_evaluate(self):
self.model.fit(self.X, self.y)
output = self.model.evaluate(self.X, self.y)
self.assertIsInstance(output, dict, msg='tune_parameters() should output a dictionary')
self.assertEqual(['f1_score', 'log_loss'], list(output.keys()), msg="Output should contain a key 'scores' with f1_score and log_loss")
def test_result_format_tune_parameters(self):
output = self.model.tune_parameters(self.X, self.y)
self.assertIsInstance(output, dict, msg='tune_parameters() should output a dictionary')
self.assertEqual(['f1_score', 'log_loss'], list(output['scores'].keys()), msg="Output should contain a key 'scores' with f1_score and log_loss")
def test_probability_range(self):
self.model.fit(self.X, self.y)
probabilities = self.model.predict_proba(self.X)
self.assertTrue(0 <= probabilities.min(), msg='Lowest probability should be >= 0')
self.assertTrue(probabilities.max() <= 1, msg='Highest probability should be <= 1')
def test_new_column_error(self):
self.model.fit(self.X, self.y)
self.X['new_column'] = 'new_value'
with self.assertRaises(ValueError) as cm:
predictions = self.model.predict(self.X)
def test_missing_column_error(self):
self.model.fit(self.X, self.y)
self.X = self.X.drop(columns=['zip_code'])
with self.assertRaises(ValueError) as cm:
predictions = self.model.predict(self.X)
# Run unit test
unittest.main(argv=['first-arg-is-ignored'], exit=False)
........
Fitting 5 folds for each of 30 candidates, totalling 150 fits
. ---------------------------------------------------------------------- Ran 9 tests in 755.876s OK
<unittest.main.TestProgram at 0x1c5fd078a88>
Matheus Schmitz
LinkedIn
Github Portfolio