# File manipulation imports for Google Colab
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir("/content/drive/My Drive/Colab Notebooks/Seq2Seq_Translator")
# Imports
import math
import time
import spacy
import torch
import random
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torchtext
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator
# Setting the device to cuda
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Check GPU
!nvidia-smi
# Download english dictionary
!python -m spacy download en
# Download german dictionary
!python -m spacy download de
# Loading dictionaries
spacy_english = spacy.load('en')
spacy_german = spacy.load('de')
# Tokenization function: english
# The '[::-1]' is needed because the text order is flipped
def tokenize_english(text):
return [token.text for token in spacy_english.tokenizer(text)][::-1]
# Tokenization function: german
def tokenize_german(text):
return [token.text for token in spacy_german.tokenizer(text)]
# Source language
SOURCE = Field(tokenize = tokenize_english, init_token = '<sos>', eos_token = '<eos>', lower = True)
# Target language
TARGET = Field(tokenize = tokenize_german, init_token = '<sos>', eos_token = '<eos>', lower = True)
# Train-test split
train_data, valid_data, test_data = Multi30k.splits(exts = ('.en', '.de'), fields = (SOURCE, TARGET))
# Vizualising the train data
print(train_data.examples[0].src)
print(train_data.examples[0].trg)
print('Train dataset length: ' + str(len(train_data.examples)))
print('Validation dataset length: ' + str(len(valid_data.examples)))
print('Test dataset length: ' + str(len(test_data.examples)))
# Creating the SOURCE and TARGET vocabularies
SOURCE.build_vocab(train_data, min_freq = 2)
TARGET.build_vocab(train_data, min_freq = 2)
class Encoder(nn.Module):
# Constructor method
def __init__(self, input_dims, emb_dims, hid_dims, n_layers, dropout):
super().__init__()
# Model layers
self.hid_dims = hid_dims
self.n_layers = n_layers
self.embedding = nn.Embedding(input_dims, emb_dims)
self.rnn = nn.LSTM(emb_dims, hid_dims, n_layers, dropout = dropout)
self.dropout = nn.Dropout(dropout)
# Forward method for training
def forward(self, src):
# Model execution
embedded = self.dropout(self.embedding(src))
outputs, (h, cell) = self.rnn(embedded)
return h, cell
class Decoder(nn.Module):
# Constructor method
def __init__(self, output_dims, emb_dims, hid_dims, n_layers, dropout):
super().__init__()
# Model layers
self.output_dims = output_dims
self.hid_dims = hid_dims
self.n_layers = n_layers
self.embedding = nn.Embedding(output_dims, emb_dims)
self.rnn = nn.LSTM(emb_dims, hid_dims, n_layers, dropout = dropout)
self.fc_out = nn.Linear(hid_dims, output_dims)
self.dropout = nn.Dropout(dropout)
# Forward method for training
def forward(self, input, h, cell):
# Model execution
input = input.unsqueeze(0)
embedded = self.dropout(self.embedding(input))
output, (h, cell) = self.rnn(embedded, (h, cell))
pred = self.fc_out(output.squeeze(0))
return pred, h, cell
class Seq2Seq(nn.Module):
# Constructor method
def __init__(self, encoder, decoder, device):
super().__init__()
# Model components
self.encoder = encoder
self.decoder = decoder
self.device = device
# Forward method for training
def forward(self, src, trg, teacher_forcing_rate = 0.5):
# Model execution
batch_size = trg.shape[1]
target_length = trg.shape[0]
target_vocab_size = self.decoder.output_dims
outputs = torch.zeros(target_length, batch_size, target_vocab_size).to(self.device)
h, cell = self.encoder(src)
input = trg[0,:]
for t in range(1, target_length):
output, h, cell = self.decoder(input, h, cell)
outputs[t] = output
top = output.argmax(1)
input = trg[t] if (random.random() < teacher_forcing_rate) else top
return outputs
# Hyperparameters
batch_size = 32
input_dimensions = len(SOURCE.vocab)
output_dimensions = len(TARGET.vocab)
encoder_embedding_dimensions = 256
decoder_embedding_dimensions = 256
hidden_layer_dimensions = 512
num_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
epochs = 30
grad_clip = 1
lowest_validation_loss = float('inf')
# Data generators
train_iterator, valid_iterator, test_iterator = BucketIterator.splits((train_data, valid_data, test_data),
batch_size = batch_size,
device = device)
# Enconder instance
encod = Encoder(input_dimensions,
encoder_embedding_dimensions,
hidden_layer_dimensions,
num_layers,
encoder_dropout)
# Decor instance
decod = Decoder(output_dimensions,
decoder_embedding_dimensions,
hidden_layer_dimensions,
num_layers,
decoder_dropout)
# Model instance
model = Seq2Seq(encod, decod, device).to(device)
# Create model
model
def initialize_weights(m):
for name, param in m.named_parameters():
nn.init.uniform_(param.data, -0.1, 0.1)
# Including the weight initialization function on the model
model.apply(initialize_weights)
# Defining the loss function to calculate model error
criterion = nn.CrossEntropyLoss(ignore_index = TARGET.vocab.stoi[TARGET.pad_token])
# Creating an optimizer to update the model weights after each epoch
optimizer = optim.Adam(model.parameters())
def train_model(model, iterator, optimizer, criterion, clip):
# Initialize the training method
model.train()
# Initialize epoch error
epoch_loss = 0
# Loop through iterator (data generator)
for i, batch in enumerate(iterator):
# Collect source and target data
src = batch.src
trg = batch.trg
# Zero gradients
optimizer.zero_grad()
# Predict
output = model(src, trg)
# Adjust prediction shape
output_dims = output.shape[-1]
output = output[1:].view(-1, output_dims)
trg = trg[1:].view(-1)
# Calculate loss
loss = criterion(output, trg)
# Initialize backpropagation
loss.backward()
# Calculate the derivative gradients to update weights
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
# Apply weight optimization
optimizer.step()
# Store epoch error
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def evaluate_model(model, iterator, criterion):
# Initialize evaluation method
model.eval()
# Initialize epoch error
epoch_loss = 0
# Predicting
with torch.no_grad():
# Loop through iterator (data generator)
for i, batch in enumerate(iterator):
# Extract source and target data
src = batch.src
trg = batch.trg
# Predict
output = model(src, trg, 0)
# Adjust prediction shape
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
trg = trg[1:].view(-1)
# Model loss
loss = criterion(output, trg)
# Store epoch error
epoch_loss += loss.item()
return epoch_loss / len(iterator)
# Loop through epochs to train odel
for epoch in range(epochs):
# Store start time
start_time = time.time()
# Training
train_loss = train_model(model, train_iterator, optimizer, criterion, grad_clip)
# Validation
valid_loss = evaluate_model(model, valid_iterator, criterion)
# Store end time
end_time = time.time()
# Check lowest error and save the model by doing a checkpoint of the best performing model
if valid_loss < lowest_validation_loss:
lowest_validation_loss = valid_loss
torch.save(model.state_dict(), 'models/seq2seq.pt')
# Print
print(f'Epoch: {epoch+1:02} | Time: {np.round(end_time-start_time,0)}s')
print(f'\t Training error: {train_loss:.4f}')
print(f'\t Validation Error: {valid_loss:.4f}')
# Loading trained model
model.load_state_dict(torch.load('models/seq2seq.pt'))
# Evaluate model
test_loss = evaluate_model(model, test_iterator, criterion)
# Test error
print(f'Test Error: {test_loss:.4f}')
def translate_language(model, iterator, num_translations = 5):
with torch.no_grad():
# Loop through iterador
for i, batch in enumerate(iterator):
# While inside the num_translations, translate
if i < num_translations :
# Extract SOURCE and TARGET
# Doing so to compare the predicted translation with the true translation
src = batch.src
trg = batch.trg
# Model prediction
output = model(src, trg, 0)
# All predictions
preds = torch.tensor([[torch.argmax(x).item()] for x in output])
# Prints
print('Original English Text: ' + str([SOURCE.vocab.itos[x] for x in src][1:-1][::-1]))
print('Translated German Text (Expected Output): ' + str([TARGET.vocab.itos[x] for x in trg][1:-1]))
print('Translated German Text (Model Prediction): ' + str([TARGET.vocab.itos[x] for x in preds][1:-1]))
print('\n')
# Generating random text to be translated
_, _, iterator_translate = BucketIterator.splits((train_data, valid_data, test_data),
batch_size = 1,
device = device)
# Translation
translation = translate_language(model, iterator_translate)