Computer Vision and Natural Language Processing for Automatic Image Captioning
Data Sources:
Flickr 8K - https://forms.illinois.edu/sec/1713398
PATH_IMAGES = 'Flickr_8K/Images/'
# File manipulation imports for Google Colab
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir("/content/drive/My Drive/Colab Notebooks/Image_Captioning/")
# Disable h5 file locking in google drive
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
!pip install -q livelossplot
# Py Data Stack
import numpy as np
import pandas as pd
# Visualization
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from livelossplot import PlotLossesKeras
# NLP
import re
# Computer Vision
import cv2
# Tensorflow & Keras
import tensorflow
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.applications.xception import Xception, preprocess_input, decode_predictions
from tensorflow.keras.layers import Add
from tensorflow.keras.layers import Input, Dense, Dropout, Embedding, LSTM
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
# Scikit-Learn
from sklearn.model_selection import train_test_split
# Others
import pickle
import collections
from random import shuffle
from tqdm import tqdm
# Get rid of warnings
import sys
import warnings
if not sys.warnoptions:
warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=FutureWarning)
# Read the captions from Flickr_8K/captions.txt
df_captions = pd.read_csv('Flickr_8K/captions.txt', sep=',')
print(f'df_captions.shape: {df_captions.shape}')
# Set the image name as index
df_captions.set_index('image', inplace=True)
df_captions.tail(3)
# Seems like there are multiple alternative captions per image...
SAMPLE_IMAGE = df_captions.sample(1).index[0]
df_captions[df_captions.index == SAMPLE_IMAGE]
# Plot an (IMAGE, CAPTIONS) pair
img = cv2.imread(PATH_IMAGES + SAMPLE_IMAGE)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img)
plt.axis('off')
plt.show()
print('Captions:')
for caption in df_captions[df_captions.index == SAMPLE_IMAGE]['caption']:
print(caption)
The legends need some cleaning as they have some captilized letters as well as dots (.) at the end and might have other issues such as special characters.
DOWNSAMPLE = False
FRACTION = 0.05
if DOWNSAMPLE:
all_imgs = df_captions.groupby('image').groups.keys()
sampled_imgs = pd.DataFrame(all_imgs).sample(frac=FRACTION).values.flatten()
df_captions = df_captions[df_captions.index.isin(sampled_imgs)]
print(f'df_captions.shape: {df_captions.shape}')
SAMPLE_IMAGE = df_captions.sample(1).index[0]
# 80/20 split
idx_train, idx_test = train_test_split(df_captions.index, test_size=0.3)
idx_test, idx_valid = train_test_split(idx_test, test_size=0.5)
print(f'Train set size: {len(idx_train)}')
print(f'Valid set size: {len(idx_valid)}')
print(f'Test set size: {len(idx_test)}')
Xception: Deep Learning with Depthwise Separable Convolutions
# Load Xception model without it's head (two last layers [dense and softmax]), keeping only the feature extraction layers
FEATURE_EXTRACTOR = Xception(include_top=False, pooling='avg', weights='imagenet', input_shape=(299, 299, 3))
FEATURE_EXTRACTOR.summary()
def xception_preprocess(img):
# Load the image
img = image.load_img(img, target_size=(299, 299))
img = image.img_to_array(img)
# Conver the tensor from 3D to 4D
img = np.expand_dims(img, axis=0)
# Preprocess with Xception's default preprocess funtion
img = preprocess_input(img)
return img
# Testing the proprocess function
img = xception_preprocess(PATH_IMAGES + SAMPLE_IMAGE)
print(f'img.shape: {img.shape}')
plt.imshow(img[0])
plt.axis('off')
plt.show()
Seems like the normalization from preprocessing is making the images darker...
def extract_img_features(img):
img = xception_preprocess(img)
feature_vector = FEATURE_EXTRACTOR.predict(img)
feature_vector = feature_vector.reshape((-1,))
return feature_vector
# Test the function
print(extract_img_features(PATH_IMAGES+SAMPLE_IMAGE)[:10])
print('Vector shape:', extract_img_features(PATH_IMAGES+SAMPLE_IMAGE).shape)
# Extract features from images
### CELL PROCESSING TIME IS ABOUT 1 HOUR IF NOT LOADING FROM DISK###
LOAD_FEATURES_FROM_DISK = True
if LOAD_FEATURES_FROM_DISK:
try:
with open('Flickr_8K/img_features_dict.pkl', 'rb') as file:
img_features_dict = pickle.load(file)
print('Loaded image features from disk.')
except FileNotFoundError as fnf_error:
print(fnf_error)
except:
print('Could not load image features from disk!')
else:
print('Extracting image features from Xception.')
print('Estimated processing time: 1 hour.')
img_features_dict = {}
# Loop through all images
for img in tqdm(list(idx_train) + list(idx_valid) + list(idx_test)):
try:
img_features_dict[img] = extract_img_features(PATH_IMAGES + img)
except:
pass
# Save result to disk
with open('Flickr_8K/img_features_dict.pkl', 'wb') as file:
pickle.dump(img_features_dict, file)
# Function to clean data
def clean_caption(sentence):
sentence = sentence.lower()
# Change all non-characters to spaces
sentence = re.sub("[^a-z]+", " ", sentence)
# Only return the sentence if it's longer than 1 character
if len(sentence) > 1:
return sentence
else:
pass
# Test the function
clean_caption("BIG 123 foot ain't real '>.<" )
# Apply to all captions
df_captions['caption_clean'] = df_captions['caption'].map(clean_caption)
df_captions.head(3)
# Check if we generated any NaN's
print('df_captions.shape:', df_captions.shape)
df_captions[df_captions.isnull().any(axis=1)]
# Keep only the non-NaN rows
df_captions = df_captions[~df_captions.isnull().any(axis=1)]
print('New shape: ', df_captions.shape)
# Because there are duplicatei indexes, in order to save we first need to covert
# To a dictionary whose values are a list of alternative captions
img_captions = {}
for img, caption in df_captions.iterrows():
# Initiate the dict elements for each image
if img_captions.get(img) is None:
img_captions[img] = []
# Append all alternate image captions under the same image key
img_captions[img].append(caption['caption_clean'])
# Check result
img_captions[SAMPLE_IMAGE]
# Create a set to store the vocabulary
vocab = set()
# Then loop through the dict with images and captions and append to the vocab
for key, value in img_captions.items():
[vocab.update(sentence.split()) for sentence in value]
# Unique words
print(f'Unique vocabulary words: {len(vocab)}')
# Total words
total_words = []
for key, value in img_captions.items():
[total_words.append(word) for sentence in value for word in sentence.split()]
print(f'Total number of words: {len(total_words)}')
# Create a word counter
counter = collections.Counter(total_words)
counter.most_common(10)
Complete Vocabulary
# Plot Word Occurrences
fig, axes = plt.subplots(ncols=3, nrows=1, figsize=plt.figaspect(0.25))
fig.sca(axes[0])
plt.plot([x for x in sorted(counter.values(), reverse=True)])
plt.title('Word Occurrences')
plt.xlabel('No. of Words with more than X occurrences')
plt.ylabel('Word Occurrences')
fig.sca(axes[1])
sns.kdeplot([x for x in sorted(counter.values(), reverse=True)], shade=True, ax=axes[1])
plt.xlabel('Word Occurrences')
plt.title('Probability Density Function')
fig.sca(axes[2])
sns.kdeplot([x for x in sorted(counter.values(), reverse=True)], shade=True, cumulative=True, ax=axes[2])
plt.xlabel('Word Occurrences')
plt.title('Cumulative Density Function')
plt.tight_layout()
# Extracting the quantiles
quantiles = pd.Series(counter).quantile(np.arange(0, 1.01, 0.01), interpolation='higher')
# Plotting the quantils
quantiles.plot(kind='line', color='steelblue')
# Demark the quantiles in increments of 0.05 and 0.25
plt.scatter(x=quantiles.index[::5], y=quantiles.values[::5], c='orange', label='Quantiles with 0.05 intervals')
plt.scatter(x=quantiles.index[::25], y=quantiles.values[::25], c='red', label='Quantiles with 0.25 intervals')
# Titles, labels, legend
plt.title('Quantiles and their Values')
plt.xlabel('Quantile')
plt.ylabel('Number of Occurances')
plt.legend(loc='best')
# Annotate the 0th, 25th, 50th, 75th and 100th percentiles
for x, y in zip(quantiles.index[::25], quantiles.values[::25]):
plt.annotate(s=f'({x} , {y})', xy=(x,y), xytext=(x-0.1, y+max(quantiles.values)*0.05), fontweight='bold')
plt.show()
This is peculiar... looks like over half of the words have two or less occurances...
This doesn't bode well for the success of our classifier...
I'll try removing words under a certain threshold.
Shrank Vocabulary
# Convert the counter to a dictionary, so we can remove low frequency words
freq_cnt = dict(counter)
# Then sort the dictionary by word frequency
freq_cnt = sorted(freq_cnt.items(), reverse=True, key=lambda x: x[1])
# Define a threshold for cutting
THRESHOLD = quantiles[0.5] # Keeping only the 50% most frequent words
# Remove infrequent words
freq_cnt = [word_tuple for word_tuple in freq_cnt if word_tuple[1] > THRESHOLD]
vocab = set([word_tuple[0] for word_tuple in freq_cnt])
print(f'Unique vocabulary words: {len(vocab)}')
# Plot Word Occurrences
fig, axes = plt.subplots(ncols=3, nrows=1, figsize=plt.figaspect(0.25))
fig.sca(axes[0])
plt.plot([x[1] for x in freq_cnt])
plt.title('Word Occurrences')
plt.xlabel('No. of Words with more than X occurrences')
plt.ylabel('Word Occurrences')
fig.sca(axes[1])
sns.kdeplot([x[1] for x in freq_cnt], shade=True, ax=axes[1])
plt.xlabel('Word Occurrences')
plt.title('Probability Density Function')
fig.sca(axes[2])
sns.kdeplot([x[1] for x in freq_cnt], shade=True, cumulative=True, ax=axes[2])
plt.xlabel('Word Occurrences')
plt.title('Cumulative Density Function')
plt.tight_layout()
# Extracting the quantiles
quantiles = pd.Series([x[1] for x in freq_cnt]).quantile(np.arange(0, 1.01, 0.01), interpolation='higher')
# Plotting the quantils
quantiles.plot(kind='line', color='steelblue')
# Demark the quantiles in increments of 0.05 and 0.25
plt.scatter(x=quantiles.index[::5], y=quantiles.values[::5], c='orange', label='Quantiles with 0.05 intervals')
plt.scatter(x=quantiles.index[::25], y=quantiles.values[::25], c='red', label='Quantiles with 0.25 intervals')
# Titles, labels, legend
plt.title('Quantiles and their Values')
plt.xlabel('Quantile')
plt.ylabel('Number of Occurances')
plt.legend(loc='best')
# Annotate the 0th, 25th, 50th, 75th and 100th percentiles
for x, y in zip(quantiles.index[::25], quantiles.values[::25]):
plt.annotate(s=f'({x} , {y})', xy=(x,y), xytext=(x-0.1, y+max(quantiles.values)*0.05), fontweight='bold')
plt.show()
# Map words and indexes
word_to_index = {}
index_to_word = {}
for idx, word in enumerate(vocab):
word_to_index[word] = idx
index_to_word[idx] = word
# Add tokens for sequence start and end
word_to_index['<start>'] = len(vocab)
word_to_index['<end>'] = len(vocab) + 1
index_to_word[len(vocab)] = '<start>'
index_to_word[len(vocab) + 1] = '<end>'
# Define max caption lenght
max_len = 0
for key, value in img_captions.items():
for caption in value:
max_len = max(max_len, len(caption.split()))
print(f'Max caption lenght: {max_len}')
# Demark the <start> and <end> of caption sequences
captions_train = {}
for img in idx_train:
captions_train[img] = []
for caption in img_captions[img]:
formatted_caption = '<start> ' + caption + ' <end>'
captions_train[img].append(formatted_caption)
captions_valid = {}
for img in idx_valid:
captions_valid[img] = []
for caption in img_captions[img]:
formatted_caption = '<start> ' + caption + ' <end>'
captions_valid[img].append(formatted_caption)
captions_test = {}
for img in idx_test:
captions_test[img] = []
for caption in img_captions[img]:
formatted_caption = '<start> ' + caption + ' <end>'
captions_test[img].append(formatted_caption)
Using pretrained word embeddings from Glove.
# Number of embedding dimensions to use {50, 100, 200 or 300}
EMBEDDING_DIM = 50
# Open the downloaded glove model
with open(f'glove.6B/glove.6B.{EMBEDDING_DIM}d.txt', encoding='utf8') as GLOVE:
# Map word to their Glove embeddings
word_to_embedding = {}
# Loop through the Glove txt file to get the desired word embeddings
for line in GLOVE:
# The line has 51 elements, the word followed by the 50 element long embedding
values = line.split()
# Extract the word
word = values[0]
# Extract the embeddng
word_embedding = np.array(values[1:], dtype='float')
# Map word to embedding
word_to_embedding[word] = word_embedding
# Close the file
GLOVE.close()
# Testing the embedding
word_to_embedding['hippopotamus']
# Updated vocabulary size
VOCAB_SIZE = len(word_to_index) + 1
print(f'Vocabulary size: {VOCAB_SIZE}')
# Extract from Glove only the word vectors for the words in the used vocab
embedding_matrix = np.zeros((VOCAB_SIZE, EMBEDDING_DIM))
# Loop through the words in the word_to_index mapping
for word, idx in word_to_index.items():
embedding_vector = word_to_embedding.get(word)
# If the embedding_vector for the given word exists in glove, add it to the embedding_matrix
if embedding_vector is not None:
embedding_matrix[idx] = embedding_vector
print(f'embedding_matrix.shape: {embedding_matrix.shape}')
Image inputs
# Receive the feature engineered images, which have shape (2048, )
layer_img_1 = Input(shape=(2048,))
# Dropout layer for regularization
layer_img_2 = Dropout(0.3)(layer_img_1)
# Dense layer to be trained
layer_img_3 = Dense(256, activation='relu')(layer_img_2)
Text inputs
# Receive the captions, with shape (max_len, 0)
layer_txt_1 = Input(shape=(max_len,))
# Embedding layer
layer_txt_2 = Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM, mask_zero=True)(layer_txt_1)
# Dropout layer for regularization
layer_txt_3 = Dropout(0.3)(layer_txt_2)
# LSTM layer to be trained
layer_txt_4 = LSTM(256)(layer_txt_3)
Decode inputs
# Associate images with captions for the model to learn
decoder_1 = Add()([layer_img_3, layer_txt_4])
# Dense layer to be trained
decoder_2 = Dense(256, activation='relu')(decoder_1)
# Model output (softmax probabilities)
outputs = Dense(VOCAB_SIZE, activation='softmax')(decoder_2)
Model assembly
model = Model(inputs = [layer_img_1, layer_txt_1],
outputs = outputs )
model.summary()
Fix embedding layer weights (using Glove)
# Set the embedding layer weights as those obtained from glove and disable training
model.layers[2].set_weights([embedding_matrix])
model.layers[2].treinable = False
Compile model
# Compile model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
def data_loader(img_captions_dict, img_features, word_to_index, max_len, batch_size):
# X1 = image input, X2 = caption input, y = label
X1, X2, y = [], [], []
# Counter
n = 0
# Infinite loop generator: as long as the NN asks for data we'll give it
while True:
# Shuffle the image_captions_dict before each iteration
dict_as_list = list(img_captions_dict.items())
shuffle(dict_as_list)
img_captions_dict = dict(dict_as_list)
# Extract image index and caption
for img_idx, captions_list in img_captions_dict.items():
n += 1
# Extract the engineered features for the image
try:
xception_features = img_features[img_idx]
except:
continue
# Loop though possible captions
for caption in captions_list:
# Convert words to indexes
seq = [word_to_index[word] for word in caption.split() if word in word_to_index]
# For each word in the caption
for i in range(1,len(seq)): # start at 1 because of '<start>'
# Previously words in the sentence are used as prediction inputs
xi = seq[0:i]
# The current word has to be predicted
yi = seq[i]
# Pad xi with zeros
xi = pad_sequences([xi], maxlen=max_len, value=0, padding='post')[0]
# Convert the label word to categorical using one hot encoding
yi = to_categorical([yi], num_classes = VOCAB_SIZE)[0]
# Append the image and all it's captions to the lists
X1.append(xception_features)
X2.append(xi)
y.append(yi)
# Input to the NN based on batch size
if n == batch_size:
yield ([np.array(X1), np.array(X2)], np.array(y))
# And reset the lists
X1, X2, y = [], [], []
n = 0
# Hyperparameters
EPOCHS = 300 # Each epoch takes 75 seconds, estimated training time = 6 hours 15 minutes
BATCH_SIZE = 32
STEPS_TRAIN = len(captions_train) // BATCH_SIZE
STEPS_VALID = len(captions_valid) // BATCH_SIZE
# Data loaders
generator_train = data_loader(captions_train,
img_features_dict,
word_to_index,
max_len,
BATCH_SIZE)
generator_valid = data_loader(captions_valid,
img_features_dict,
word_to_index,
max_len,
BATCH_SIZE)
# Set to true to continue the model's training using the pre-trained neurons - THIS WILL GIVE AN ERROR IF THE WINDOW_SIZE OR NEURONS WAS CHANGED
# Set to false to train the neural network from scratch - THIS WILL LOSE ALL PROGRESS AND CAUSE WORSE PREDICTIONS
LOAD_TRAINED_WEIGHTS = False
if LOAD_TRAINED_WEIGHTS and os.path.exists('callbacks/model.h5'):
# Try loading weights. Will fail if the model structure changed
try:
# Load best model weights
model.load_weights('callbacks/model.h5')
# Compile model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# Check model
model.summary()
# Variable to guide the csv log callback
SUCCESSFUL_WEIGHT_LOAD = True
except:
SUCCESSFUL_WEIGHT_LOAD = False
print('Could not load weights. Most likely the network architecture changed.')
else:
SUCCESSFUL_WEIGHT_LOAD = False
if LOAD_TRAINED_WEIGHTS:
print('Could not locate file with weights.')
%%time
TRAIN_MODEL = True
if TRAIN_MODEL:
# Define directory for model checkpoints
CALLBACKS_DIR = 'callbacks/'
if not os.path.exists(CALLBACKS_DIR):
os.mkdir(CALLBACKS_DIR)
# Callbacks
checkpoint = ModelCheckpoint(os.path.join(CALLBACKS_DIR, 'model.h5'),
monitor = 'loss',
save_best_only = True,
save_weights_only = True,
verbose = 0)
plateauLRreduce = ReduceLROnPlateau(factor = 0.5,
patience = 10,
monitor = 'val_loss',
min_lr = 0.0000001,
verbose = 1)
stopearly = EarlyStopping(monitor = 'val_loss',
patience = 25,
verbose = 1)
logCSV = CSVLogger(filename = os.path.join(CALLBACKS_DIR, 'log_model.csv'),
separator = ',',
append = (LOAD_TRAINED_WEIGHTS & SUCCESSFUL_WEIGHT_LOAD))
model_callbacks = [checkpoint, plateauLRreduce, stopearly, logCSV, PlotLossesKeras()]
# Train model and save history
model_history = model.fit(generator_train,
validation_data = generator_valid,
epochs = EPOCHS,
steps_per_epoch = STEPS_TRAIN,
validation_steps = STEPS_VALID,
verbose = 1,
callbacks = model_callbacks,
use_multiprocessing=True,
workers = -1)
else:
if SUCCESSFUL_WEIGHT_LOAD:
print('Skipping model training. Using loaded weights without further training.')
else:
print('Skipping model training. Yet, could not load trained weights. THIS IS AN UNTRAINED MODEL UNSUITABLE FOR MAKING PREDICTIONS.')
# Read the log file
log_model_1 = pd.read_csv('callbacks/log_model.csv')
# Create figure
fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(20,5))
# Accuracy
fig.sca(axs[0])
plt.plot(log_model_1.index, log_model_1.accuracy, label='train', color='blue')
plt.plot(log_model_1.index, log_model_1.val_accuracy, label='valid', color='orange')
plt.legend()
axs[0].set_ylim([0, 1])
axs[0].title.set_text('Accuracy')
axs[0].set_xlabel('Epoch')
axs[0].set_xlabel('Epoch')
# Loss
fig.sca(axs[1])
plt.plot(log_model_1.index, log_model_1.loss, label='train', color='blue')
plt.plot(log_model_1.index, log_model_1.val_loss, label='valid', color='orange')
plt.legend()
axs[1].title.set_text('Loss')
axs[1].set_xlabel('Epoch')
axs[1].set_xlabel('Epoch')
# Learning Rate
fig.sca(axs[2])
axs[2].set_yscale('log')
g2 = plt.plot(log_model_1.index, log_model_1.lr, color='green')
axs[2].title.set_text('Learning Rate')
axs[2].set_xlabel('Epoch')
axs[2].set_xlabel('Epoch')
plt.show()
# Load best model weights
model.load_weights('./callbacks/model.h5')
# Objective Function
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# Check model
model.summary()
def predict_caption(img):
# Get the image features from Xception
xception_features = img_features_dict[img].reshape((1, 2048))
# Demark the text start
in_text = '<start>'
# Loop
for i in range(max_len):
# Split caption in words and get their indexes (tokens)
sequence = [word_to_index[w] for w in in_text.split() if w in word_to_index]
# Pad the yet-unpredicted latter parts of the caption
sequence = pad_sequences([sequence], maxlen=max_len, padding='post')
# Predict probabilities based on the image and currently predicted caption
ypred = model.predict([xception_features, sequence])
# Select highest probability word (as index)
ypred = ypred.argmax()
# Convert index to word and add to predicted caption
word = index_to_word[ypred]
in_text += (' ' + word)
# If the model predicted the end of the sequence, then stop
if word == '<end>':
break
# Split the prediction to remove <start> and <end> tokens
final_caption = in_text.split()[1:-1]
# Then merge caption back as a sentence
final_caption = ' '.join(final_caption)
return final_caption
# Predicting subtitles for 5 test images
# Loop through 5 images
for i in range(5):
# Randomly select an image index
img = np.random.choice(idx_test)
# Predict the caption
caption = predict_caption(img)
# Plot
plt.title(caption, fontdict={'fontsize': 20, 'weight': 'bold'})
image = plt.imread(PATH_IMAGES + img)
plt.imshow(image)
plt.axis('off')
plt.show()
print()
%%shell
jupyter nbconvert --to html Image_Captioning.ipynb