Matheus Schmitz
LinkedIn
Github Portfolio
# File manipulation imports for Google Colab
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir("/content/drive/My Drive/Colab Notebooks/MCTS_AI_2048/")
Mounted at /content/drive
import random
import copy
import numpy as np
import time
class Board:
def __init__(self):
self.size = 4
self.start_tiles = 2
self.score = 0
self.victory = False
self.defeat = False
self.directions = ['left', 'right', 'up', 'down']
# Initialize the board
self.board_state = np.zeros((self.size, self.size), dtype=int)
[self.add_new_tile(value=2) for _ in range(self.start_tiles)]
# Add a value to the board in one of the empty spaces
def add_new_tile(self, value=None):
row_num = random.randrange(0, self.size)
col_num = random.randrange(0, self.size)
# Pick spots until we find one that is empty
while not self.board_state[row_num][col_num] == 0:
row_num = random.randrange(0, self.size)
col_num = random.randrange(0, self.size)
# Fill the empty spot with a new value
if value is None:
self.board_state[row_num][col_num] = self.generate_random_value()
else:
self.board_state[row_num][col_num] = value
# Pick a new value to add to the board
@staticmethod
def generate_random_value():
rand_int = random.randint(1,10)
value_to_add = 2
if rand_int > 9:
value_to_add = 4
return value_to_add
def merge_left(self):
initial_board = copy.deepcopy(self)
# On each row push all values left
for row_idx in range(self.size):
non_zeros = [x for x in self.board_state[row_idx] if x !=0]
while len(non_zeros) < 4:
non_zeros.append(0)
self.board_state[row_idx] = np.array(non_zeros)
# Merge matching values
for row_idx in range(self.size):
for col_idx in range(self.size - 1):
if self.board_state[row_idx][col_idx] == self.board_state[row_idx][col_idx + 1]:
self.board_state[row_idx][col_idx] *= 2
self.board_state[row_idx][col_idx + 1] = 0
# Update score
self.score += self.board_state[row_idx][col_idx]
# On each row push all values left (need to repeat because of the merges)
for row_idx in range(self.size):
non_zeros = [x for x in self.board_state[row_idx] if x !=0]
while len(non_zeros) < 4:
non_zeros.append(0)
self.board_state[row_idx] = np.array(non_zeros)
# If the move did something (changed the board) add a new tile
if not np.array_equal(self.board_state, initial_board.board_state):
self.add_new_tile()
def reverse_rows(self):
self.board_state = self.board_state[:,::-1]
def transpose_board(self):
self.board_state = self.board_state.transpose()
def merge_right(self):
self.reverse_rows()
self.merge_left()
self.reverse_rows()
def merge_up(self):
self.transpose_board()
self.merge_left()
self.transpose_board()
def merge_down(self):
self.transpose_board()
self.merge_right()
self.transpose_board()
def check_victory(self):
if 2048 in self.board_state:
self.victory = True
def has_moves(self):
# Can move left?
initial_board = copy.deepcopy(self)
initial_board.merge_left()
if not np.array_equal(self.board_state, initial_board.board_state):
return True
# Can move right?
initial_board = copy.deepcopy(self)
initial_board.merge_right()
if not np.array_equal(self.board_state, initial_board.board_state):
return True
# Can move up?
initial_board = copy.deepcopy(self)
initial_board.merge_up()
if not np.array_equal(self.board_state, initial_board.board_state):
return True
# Can move down?
initial_board = copy.deepcopy(self)
initial_board.merge_down()
if not np.array_equal(self.board_state, initial_board.board_state):
return True
return False
def check_defeat(self):
if not self.has_moves():
self.defeat = True
def winner_screen(self):
print("###########")
print(" YOU WON ")
print("###########")
print()
print(f"Final Score: {self.score} | Max Tile: {self.board_state.max()}")
print()
print(self.board_state)
print()
def loser_screen(self):
print("############")
print(" YOU LOST ")
print("############")
print()
print(f"Final Score: {self.score} | Max Tile: {self.board_state.max()}")
print()
print(self.board_state)
print()
class Game:
def __init__(self):
self.board = Board()
def move(self, direction, update_status=True):
# Make designated move
if direction == 'left':
self.board.merge_left()
elif direction == 'right':
self.board.merge_right()
elif direction == 'up':
self.board.merge_up()
elif direction == 'down':
self.board.merge_down()
else:
print("Invalid move. Choose ['left', 'right', 'up', 'down']")
print()
self.board.check_victory()
self.board.check_defeat()
if update_status:
print(f"Current Score: {self.board.score} | Max Tile: {self.board.board_state.max()}")
print(self.board.board_state)
print()
# Check for end of game
if self.board.victory:
self.board.winner_screen()
elif self.board.defeat:
self.board.loser_screen()
def play_randomly(self):
while True:
direction = random.choice(self.board.directions)
self.move(direction, update_status=False)
#if self.board.victory or self.board.defeat:
if self.board.defeat:
return self.board.score
def monte_carlo_tree_search(game, num_iterations):
overall_scores = list()
for direction in game.board.directions:
direction_score = 0
for _ in range(num_iterations):
simulation = copy.deepcopy(game)
simulation.move(direction, update_status=False)
simulation_score = simulation.play_randomly()
direction_score += simulation_score
overall_scores.append(direction_score)
return overall_scores
import glob
import pandas as pd
# Generate training data ad-infinitum
while True:
# Initialize a game and define its number
game = Game()
game_number = len(glob.glob("./simulated_games/*.csv"))+1
# DataFrame to store all game steps
df = pd.DataFrame()
# Play the game with Monte Carlo Tree Search
while True:
# Choose and execute the next move
move_scores = monte_carlo_tree_search(game, 25)
next_move_idx = np.argmax(move_scores)
next_move = game.board.directions[next_move_idx]
game.move(next_move, update_status=False)
# Save current board state
current_state = game.board.board_state.flatten().tolist()
current_state.append(game.board.score)
df = df.append([current_state])
# Check for end of game
#if game.board.victory:
# # Linear probability increase with each step [0,1] for victory
# probas = np.linspace(start=10e-6, stop=1, num=df.shape[0])
# df['probas'] = probas
# # Save game history
# df.to_csv(f"simulated_games/game_{game_number}.csv", index=False)
# game_number += 1
# # Print game-over screen
# game.board.winner_screen()
# break
if game.board.defeat:
# Linear probability increase with each step [0,-1] for defeat
#probas = np.linspace(start=10e-6, stop=-1, num=df.shape[0])
#df['probas'] = probas
#df['score'] = game.board.score
# Save game history
df.to_csv(f"simulated_games/game_{game_number}.csv", index=False)
game_number += 1
# Print game-over screen
game.board.loser_screen()
break
# Tensor Flow & Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, MaxPool2D, Reshape, Softmax, Dropout, ZeroPadding2D, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
!pip install livelossplot --quiet
from livelossplot import PlotLossesKeras
# Feature Engineering
from sklearn.preprocessing import MinMaxScaler
# File manipulation
import pandas as pd
import numpy as np
import glob
import os
import datetime
# Auxilary
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
# Function to create number spacing based on their logs
def logspace(start, stop, num):
return [np.exp(i) for i in np.linspace(np.log(start), np.log(stop), num=num)]
def scale(value, max, min):
return (value-min)/(max-min)
# Path to all simulated games
simulated_games = glob.glob("./simulated_games/simulated_games_0/*.csv")
# Read as df
game_dfs = (pd.read_csv(csv) for csv in simulated_games)
# Set the target variable to be the probability of winning
processed_dfs = []
for df in game_dfs:
if (df.drop(columns='16') >= 2048).any().any():
df['16'] = logspace(1e-7, 1, df.shape[0])
else:
df['16'] = logspace(1e-7, 1, df.shape[0])
df['16'] = -df['16']
processed_dfs.append(df)
# Concatenate into a single dataset
df = pd.concat(processed_dfs)
# Late game moves matter more, so give the probabilities an exponential curve
#df['16'] = df['16'].apply(lambda x: x**2)
# Split X (board encodings) and Y (win probability)
X = df.drop(columns = '16')
#y = df['16'].apply(scale, max=df['16'].max(), min=df['16'].min())
#X = df.apply(get_board_state, axis=1)
y = df['16']
fig, ax = plt.subplots(figsize=(10,5))
sns.histplot(y)
plt.show()
# Convert the flattened board data into the input needed for the NN
input_data = []
for _, row in tqdm(X.iterrows(), total=X.shape[0]):
# Input matrix for the data
input_matrix = np.zeros([13,4,4], dtype=int)
# Parse through the row and populate the matrix
for idx, value in zip(row.index, row):
row_idx = int(idx) // 4
col_idx = int(idx) % 4
# Find the position on the 3D matrix to add the one-hot encoding representing the value
if value == 0:
#depth = 0
continue
else:
depth = int(np.log2(value)) - 1
# Add the binary position to the matrix
input_matrix[depth][row_idx][col_idx] = 1
# Once the 11x4x4 matrix is populated, add it to input_data
input_data.append(input_matrix)
# Convert to numpy array
input_data = np.array(input_data)
100%|██████████| 1652414/1652414 [06:12<00:00, 4434.24it/s]
# TensorFlow expects the data format to be [sample, width, height, channels], convert the data to that format
print(f"Input shape before transposing: {input_data.shape}")
input_data = np.transpose(input_data, (0,2,3,1))
print(f"Input shape after transposing: {input_data.shape}")
Input shape before transposing: (1652414, 13, 4, 4) Input shape after transposing: (1652414, 4, 4, 13)
# CNN
model = Sequential()
model.add(ZeroPadding2D(padding=(0,0), input_shape=(4,4,13)))
BatchNormalization()
model.add(Conv2D((4*4*13), kernel_size=1, activation='relu', padding='same'))
BatchNormalization()
model.add(Dropout(0.25))
model.add(Conv2D(4*4*13//4, kernel_size=1, activation='relu', padding='same'))
BatchNormalization()
model.add(Conv2D(4*4*13//8, kernel_size=1, activation='relu', padding='same'))
BatchNormalization()
model.add(Flatten())
model.add(Dense(4*4*13//16))
model.add(Dropout(0.25))
model.add(Dense(1))
# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
loss='mean_squared_error')
# Model summary
model.summary()
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= zero_padding2d (ZeroPadding (None, 4, 4, 13) 0 2D) conv2d (Conv2D) (None, 4, 4, 208) 2912 dropout (Dropout) (None, 4, 4, 208) 0 conv2d_1 (Conv2D) (None, 4, 4, 52) 10868 conv2d_2 (Conv2D) (None, 4, 4, 26) 1378 flatten (Flatten) (None, 416) 0 dense (Dense) (None, 13) 5421 dropout_1 (Dropout) (None, 13) 0 dense_1 (Dense) (None, 1) 14 ================================================================= Total params: 20,593 Trainable params: 20,593 Non-trainable params: 0 _________________________________________________________________
# Set to true to continue the model's training using the pre-trained neurons - THIS WILL GIVE AN ERROR IF THE WINDOW_SIZE OR NEURONS WAS CHANGED
# Set to false to train the neural network from scratch - THIS WILL LOSE ALL PROGRESS AND CAUSE WORSE PREDICTIONS
LOAD_TRAINED_WEIGHTS = True
if LOAD_TRAINED_WEIGHTS and os.path.exists('./checkpoints/model_2048.h5'):
# Try loading weights. Will fail if the model structure changed
try:
# Load best model weights
model.load_weights('./checkpoints/model_2048.h5')
# Objective Function
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
loss='mean_squared_error')
# Check model
model.summary()
# Variable to guide the csv log callback
SUCCESSFUL_WEIGHT_LOAD = True
except:
SUCCESSFUL_WEIGHT_LOAD = False
print('Could not load weights. Most likely the network architecture changed.')
else:
SUCCESSFUL_WEIGHT_LOAD = False
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= zero_padding2d (ZeroPadding (None, 4, 4, 13) 0 2D) conv2d (Conv2D) (None, 4, 4, 208) 2912 dropout (Dropout) (None, 4, 4, 208) 0 conv2d_1 (Conv2D) (None, 4, 4, 52) 10868 conv2d_2 (Conv2D) (None, 4, 4, 26) 1378 flatten (Flatten) (None, 416) 0 dense (Dense) (None, 13) 5421 dropout_1 (Dropout) (None, 13) 0 dense_1 (Dense) (None, 1) 14 ================================================================= Total params: 20,593 Trainable params: 20,593 Non-trainable params: 0 _________________________________________________________________
EPOCHS = 1000
BATCH_SIZE = 128
TRAIN_MODEL = True
if TRAIN_MODEL:
# Define directory for model checkpoints
BACKUP_DIR = './checkpoints'
if not os.path.exists(BACKUP_DIR):
os.mkdir(BACKUP_DIR)
# Define file to store checkpoint
BACKUP_FILE = os.path.join(BACKUP_DIR, 'model_2048.h5')
# Callbacks
checkpoint = ModelCheckpoint(BACKUP_FILE,
monitor='val_loss',
save_best_only=True,
save_weights_only=True,
verbose=0)
plateauLRreduce = ReduceLROnPlateau(factor = 0.5,
patience = 10,
monitor='val_loss',
min_lr = 0.000001,
verbose=1)
stopearly = EarlyStopping(monitor='val_loss',
patience=25,
verbose=1)
logCSV = CSVLogger(filename='log_model_2048.csv',
separator=',',
append=(LOAD_TRAINED_WEIGHTS & SUCCESSFUL_WEIGHT_LOAD))
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
model_callbacks = [checkpoint, plateauLRreduce, stopearly, logCSV, PlotLossesKeras()]
# Train model and save history
model_history = model.fit(input_data,
y,
epochs=EPOCHS,
batch_size=BATCH_SIZE,
validation_split=0.2,
callbacks=model_callbacks,
verbose=1,
shuffle=True)
Loss training (min: 0.024, max: 0.024, cur: 0.024) validation (min: 0.025, max: 0.025, cur: 0.025) lr lr (min: 0.000, max: 0.000, cur: 0.000) 10328/10328 [==============================] - 69s 7ms/step - loss: 0.0244 - val_loss: 0.0246 - lr: 1.2500e-05 Epoch 00034: early stopping
# Load best parameters from training
LOAD_TRAINED_WEIGHTS = True
if LOAD_TRAINED_WEIGHTS and os.path.exists('./checkpoints/model_2048.h5'):
# Try loading weights. Will fail if the model structure changed
try:
# Load best model weights
model.load_weights('./checkpoints/model_2048.h5')
# Objective Function
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
loss='mean_squared_error')
# Check model
model.summary()
# Variable to guide the csv log callback
SUCCESSFUL_WEIGHT_LOAD = True
except:
SUCCESSFUL_WEIGHT_LOAD = False
print('Could not load weights. Most likely the network architecture changed.')
else:
SUCCESSFUL_WEIGHT_LOAD = False
def predict_normalized_score(board_state):
# Input matrix for the data
input_matrix = np.zeros([13,4,4], dtype=int)
# Encode the future board state into input for the NN
for row_idx in range(4):
for col_idx in range(4):
value = board_state[row_idx][col_idx]
# Find the position on the 3D matrix to add the one-hot encoding representing the value
if value == 0:
depth = 0
else:
depth = int(np.log2(value))
input_matrix[depth][row_idx][col_idx] = 1
# Data has to be [samples, width, height, channels]
input_data = np.transpose([input_matrix], (0,2,3,1))
# Predict probability of winning given the next step {left, right, up, down}
win_proba = model.predict(input_data)[0][0]
return win_proba
def neural_network_tree_search(game, num_iterations='placeholder'):
overall_scores = list()
for direction in game.board.directions:
simulation = copy.deepcopy(game)
simulation.move(direction, update_status=False)
predicted_score = predict_normalized_score(simulation.board.board_state)
overall_scores.append(predicted_score)
return overall_scores
start = time.time()
# Initialize a game
game = Game()
counter = 1
# Play the game with Neural Network Tree Search
while True:
# Predict the scores for the next possible moves
move_scores = neural_network_tree_search(game)
# If all scores tie, pick a random move
if all(score == move_scores[0] for score in move_scores):
next_move_idx = game.board.directions[random.randint(0, len(game.board.directions) -1)]
# Else pick the move with highest score on the Monte Carlo simulation
else:
next_move_idx = np.argmax(move_scores)
# Convert the move index to the actual move
next_move = game.board.directions[next_move_idx]
# Check if the move is valid
test_game = copy.deepcopy(game)
test_game.move(next_move, update_status=False)
# If the new board didn't change, the move is not valid, try something else
while np.all(test_game.board.board_state == game.board.board_state):
# Zero the score for the chosen move
move_scores[np.argmax(move_scores)] = -float('inf')
# Then pick the best move among the remaining ones
next_move_idx = np.argmax(move_scores)
# Convert the move index to the actual move
next_move = game.board.directions[next_move_idx]
# Try making the move
test_game.move(next_move, update_status=False)
# The make the move
game.move(next_move, update_status=False)
# Check for end of game
#if game.board.victory:
# # Print game-over screen
# game.board.winner_screen()
# break
if game.board.defeat:
# Print game-over screen
game.board.loser_screen()
break
# Every 100 plays print the board
if counter % 25 == 0:
print(f"{counter}th move:")
print(game.board.board_state)
counter += 1
end = time.time()
print(f"Took {(end-start)/60:.2f} minutes")
25th move: [[ 0 0 0 4] [ 2 0 0 8] [ 0 0 2 16] [ 0 8 16 4]] 50th move: [[ 4 16 2 16] [ 0 4 32 8] [ 2 4 8 16] [ 0 0 2 4]] 75th move: [[ 8 16 2 16] [ 4 16 64 8] [16 8 2 0] [ 8 2 0 0]] ############ YOU LOST ############ Final Score: 764 | Max Tile: 64 [[ 2 8 2 4] [ 4 32 64 32] [ 2 16 32 8] [ 4 2 4 2]] Took 0.37 minutes
Matheus Schmitz
LinkedIn
Github Portfolio