Using data from "The Ryerson Audio-Visual Database of Emotional Speech and Song (RAVDESS)" by Livingstone & Russo is licensed under CC BY-NA-SC 4.0.
https://zenodo.org/record/1188976
The file is Audio_Speech_Actors_01-24
Ryerson Audio-Visual Database of Emotional Speech and Song (RAVDESS) Song audio-only files (16bit, 48kHz .wav) from the RAVDESS. Full dataset of speech and song, audio and video (24.8 GB) available from Zenodo. Construction and perceptual validation of the RAVDESS is described in our Open Access paper in PLoS ONE.
Check out our Kaggle Speech emotion dataset.
Files
This portion of the RAVDESS contains 1012 files: 44 trials per actor x 23 actors = 1012. The RAVDESS contains 24 professional actors (12 female, 12 male), vocalizing two lexically-matched statements in a neutral North American accent. Song emotions includes calm, happy, sad, angry, and fearful expressions. Each expression is produced at two levels of emotional intensity (normal, strong), with an additional neutral expression.
File naming convention
Each of the 1012 files has a unique filename. The filename consists of a 7-part numerical identifier (e.g., 03-02-06-01-02-01-12.wav). These identifiers define the stimulus characteristics:
Filename identifiers
Modality (01 = full-AV, 02 = video-only, 03 = audio-only).
Vocal channel (01 = speech, 02 = song).
Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised).
Emotional intensity (01 = normal, 02 = strong). NOTE: There is no strong intensity for the 'neutral' emotion.
Statement (01 = "Kids are talking by the door", 02 = "Dogs are sitting by the door").
Repetition (01 = 1st repetition, 02 = 2nd repetition).
Actor (01 to 24. Odd numbered actors are male, even numbered actors are female).
Filename example: 03-02-06-01-02-01-12.wav
How to cite the RAVDESS
Academic citation
If you use the RAVDESS in an academic publication, please use the following citation: Livingstone SR, Russo FA (2018) The Ryerson Audio-Visual Database of Emotional Speech and Song (RAVDESS): A dynamic, multimodal set of facial and vocal expressions in North American English. PLoS ONE 13(5): e0196391. https://doi.org/10.1371/journal.pone.0196391.
All other attributions
If you use the RAVDESS in a form other than an academic publication, such as in a blog post, school project, or non-commercial product, please use the following attribution: "The Ryerson Audio-Visual Database of Emotional Speech and Song (RAVDESS)" by Livingstone & Russo is licensed under CC BY-NA-SC 4.0.
# OS Manipulation
import os
from glob import glob
import argparse
from scipy.io import wavfile
from tqdm import tqdm
import pickle
import watermark
# Basic Python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
import seaborn as sns
import statistics
from scipy import stats
import pylab
# Audio Manipulation
import librosa
import librosa.display
import kapre
import python_speech_features as psf
from python_speech_features import mfcc, logfbank
from librosa.core import resample, to_mono
from kapre.time_frequency import Melspectrogram
from kapre.utils import Normalization2D
from kapre.augmentation import AdditiveNoise
from playsound import playsound
# TensorFlow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv1D, Conv2D, MaxPooling2D, MaxPooling1D, Flatten, LSTM, Bidirectional
from tensorflow.keras.layers import Dropout, Dense, TimeDistributed, Input, Permute, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import CSVLogger, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
# Sklearn
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import roc_auc_score, roc_curve, auc, confusion_matrix, plot_confusion_matrix, precision_recall_curve
# Package versions
%load_ext watermark
%watermark -v -iv
'''
# cuDNN was having problems with the Kapre custom layers (Melspectogram)
# This made the bug go away... that's all I know
physical_devices = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
'''
# Check source directory
len(os.listdir('Audio_Speech_Actors_01-24/'))
%%time
# Looping through 24 folders, each with 60 samples, for a total of 1440 audio files.
# The folders are named 'Actor_01' through 'Actor_24'
# Definig the root directory name
root_dir = 'Audio_Speech_Actors_01-24'
# Dictionaries to receive the outputs
files = {}
sampling_rate = {}
# Loop through all directories
for itens in range(1, len(os.listdir(root_dir))+1):
# Define the folder path for that specific actor
if len(str(itens)) == 1:
audio_dir = root_dir + '/Actor_' + str('0') + str(itens)
else:
audio_dir = root_dir + '/Actor_' + str(itens)
# Extract the path to all the directory's files
audio_files = glob(audio_dir + '/*.wav')
# Store the path and sampling rate for each '.wav' file in the folder
for i in range(len(audio_files)):
x = audio_files[i]
audio, sfreq = librosa.load(audio_files[i], sr = None)
files[x] = len(audio) / sfreq
sampling_rate[x] = sfreq
# Check if all sampling rates are equal
all(value==48000 for value in sampling_rate.values())
# Put everying into a dataframe
audio_df = pd.DataFrame()
for keys, values in files.items():
audio_df.at[keys,'file_length'] = values
audio_df
# Dictionary with emotions
emotion_dict = {'01': 'neutral',
'02': 'calm',
'03': 'happy',
'04': 'sad',
'05': 'angry',
'06': 'fearful',
'07': 'disgust',
'08': 'surprised'}
# Lenght of the string indicating the audiofile path
len(audio_df.index[0])
# Encoding example
audio_df.index[0][35:]
String positions x encoding:
# Create column with emotion IDs
for i, row in audio_df.iterrows():
audio_df.at[i, 'emotion_id'] = i[41:43]
# Create column with emotion labels
audio_df['emotion'] = audio_df['emotion_id'].map(emotion_dict)
audio_df
# Longest audio file
a = audio_df['file_length'].argmax()
b = audio_df.iloc[a].name
print(f'{audio_df.iloc[a]}')
playsound(b)
# Shortest audio file
a = audio_df['file_length'].argmin()
b = audio_df.iloc[a].name
print(f'{audio_df.iloc[a]}')
playsound(b)
# Play a random audio
b = np.random.choice(audio_df.index)
a = audio_df[audio_df.index == b]
print(a.index)
print(a.columns)
print(a.values)
playsound(b)
# Dataset balance by emotion
sns.countplot(x = 'emotion', data = audio_df)
Calm has half as many samples because it does not have a intense version, only the normal one
# Emotion balance by audio length
# Reindexing to keep the original label order
emotion_length = audio_df.groupby('emotion').sum().reindex(emotion_dict.values())
emotion_length
# Emotion balance by audio length
sns.barplot(x = emotion_length.index, y = emotion_length.file_length)
# Plot class distribution
fig, ax = plt.subplots()
ax.set_title('Class Distribution')
ax.pie(emotion_length.file_length, labels = emotion_length.index,
autopct='%1.1f%%')
ax.axis('equal')
plt.show()
np.random.choice(audio_df.index)
Reference video on digital audio processing I: https://www.youtube.com/watch?v=Z7YM-HAz-IY
Reference video on digital audio processing II: https://www.youtube.com/watch?v=RMfeYitdO-c&feature=youtu.be
Speech Processing for Machine Learning: https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
Reference on getting from audio to melspetrogram: https://towardsdatascience.com/getting-to-know-the-mel-spectrogram-31bca3e2d9d0
The Mel Spectrogram is the result of the following pipeline:
1) Separate to windows: Sample the input with windows of size n_fft=2048
, making hops of size hop_length=512
each time to sample the next window.
2) Compute FFT (Fast Fourier Transform) for each window to transform from time domain to frequency domain.
3) Generate a Mel scale: Take the entire frequency spectrum, and separate it into n_mels=128
evenly spaced frequencies. And what do we mean by evenly spaced? not by distance on the frequency dimension, but distance as it is heard by the human ear.
4) Generate Spectrogram: For each window, decompose the magnitude of the signal into its components, corresponding to the frequencies in the mel scale.
# Plot the Melspectrogram of a random audio file
ORIGINAL_SR = 48000
N_MELS = 128
# Read a random file
signal, rate = librosa.load(np.random.choice(audio_df.index), sr = ORIGINAL_SR)
# Calculate the melspectrogram
melSpec = librosa.feature.melspectrogram(y=signal, sr=rate, n_mels=N_MELS)
# Convert to log scale (decibels)
melSpec_dB = librosa.power_to_db(melSpec, ref=np.max)
# Plot
plt.figure(figsize=(10, 5))
librosa.display.specshow(melSpec_dB, x_axis='time', y_axis='mel', sr=rate, fmax=ORIGINAL_SR)
plt.colorbar(format='%+1.0f dB')
plt.title("MelSpectrogram")
plt.tight_layout()
plt.show()
The MelSpectrogram seems to be cutting at the start and end of the audiofile.
Plotting a 3x3 with 9 MelSpectrograms to check if this is really the case
# Plotting multiple Melspectrograms
# Number of plots
nrows, ncols = 3, 3
# Create figure with suplots
fig, ax = plt.subplots(nrows, ncols, figsize=(16,8))
# Define index of subplot
i = 1
# Loop through each row x column pair
for x in range(nrows):
for y in range(ncols):
# Define the row x column location (aka the subplot)
plt.subplot(nrows, ncols, i)
# Pick a random file
random_file = np.random.choice(audio_df.index)
# Read the random file
signal, rate = librosa.load(random_file, sr = ORIGINAL_SR)
# Calculate the melspectrogram
melSpec = librosa.feature.melspectrogram(y=signal, sr=rate, n_mels=N_MELS)
# Convert to log scale (decibels)
melSpec_dB = librosa.power_to_db(melSpec, ref=np.max)
# Plot melspectrogram
librosa.display.specshow(melSpec_dB, x_axis='time', y_axis='mel', sr=rate, fmax=ORIGINAL_SR)
# Plot emotion label
plt.gca().set_title(audio_df[audio_df.index == random_file].emotion.values[0])
i += 1
plt.tight_layout()
plt.show()
It is indeed the case that before the first second there's a lot of empty signal.
I'll employ a signal envelope to attempt to remove some of the useless parts of the audio
# First plotting the signal as is
# Dictionary to store the original signal samples
signals_o = {}
# Extracting sample data
# For each class (emotion) sample one audio file to plot
for e in list(emotion_dict.values()):
# Pick a random index (file path) from the given emotion
wav_name = np.random.choice(audio_df[audio_df.emotion == e].index)
# Signal and rate
signal, rate = librosa.load(wav_name, sr = ORIGINAL_SR)
# Append to the original signals dictionary
signals_o[e] = signal
# Functio to plot signals
def plot_signals(signals):
# Define number of rows and columns
nrows = 2
ncols = 4
# Create figure and axes with subplots
fig, axes = plt.subplots(nrows=nrows, ncols=ncols, sharex=False, sharey=True, figsize=(20,5))
# Set plot name
fig.suptitle('Signal Sample per Emotion', size=16)
# Set index for the first dictionary key
i = 0
# Loop through each row x column pair
for x in range(nrows):
for y in range(ncols):
axes[x,y].set_title(list(signals.keys())[i])
axes[x,y].plot(list(signals.values())[i])
axes[x,y].get_xaxis().set_visible(False)
axes[x,y].get_yaxis().set_visible(False)
i += 1
# Plotting the original signals
plot_signals(signals_o)
# Creating function to calculate the signal envelope
# Then remove dead spaace from the audio with it
# From empirical testing I found a threshold of about 0.001 to be the ideal. Half that (0.0005) works too.
def envelope(signal, rate, threshold=0.001):
mask = []
# pd.Series allows for easy rolling window
# Need to take abs values otherwise all negative parts of the audio wave would be removed
y = pd.Series(signal).apply(np.abs)
# Window size will be a 10th of a second
y_mean = y.rolling(window = int(rate/10), min_periods = 1, center = True).mean()
for mean in y_mean:
if mean > threshold:
mask.append(True) # Appending true means keep the signal
else:
mask.append(False)
return mask
# Plotting the filtered signals
signals_f = {}
# For each class (emotion) sample one audio file to plot
for e in list(emotion_dict.values()):
# Pick a random index (file path) from the given emotion
wav_name = np.random.choice(audio_df[audio_df.emotion == e].index)
# Signal and rate
signal, rate = librosa.load(wav_name, sr = ORIGINAL_SR)
# Crease a mask to envelope the signal
mask = envelope(signal, rate, threshold=0.001)
# Apply the envelope
signal = signal[mask]
# Append to the original signals dictionary
signals_f[e] = signal
plot_signals(signals_f)
# List duration of the filtered samples
for i in range(len(signals_f)):
print(f'Length of the {i+1} sample is {list(signals_f.values())[i].size/ORIGINAL_SR:.2f} seconds')
# Leaving this utility function here as a reminder that I could downsample my audio data
# Because 48kHz is quite high of a sampling rate, and most likely 16kHz would work fine
# But initially I will not be downsampling because my dataset isn't too big to start with
# And so I don't want to lose any more data (will already lose some with the signal envelope)
def downsample_mono(path, sr):
# Read the file on the original rate, as it results in better mono conversion than using a downsampled audio rate
rate, wav = wavfile.read(path)
# Convert to float32 as its required by Librosa to downsample to mono
wav = wav.astype(np.float32, order='F')
# Check if audio is stereo (for mono shape[1] will be empty and the try will fail)
try:
wav.shape[1]
# If the audio was stereo, then convert it to mono with Librosa's to_mono
# The input needs to be transposed because wafile returns stereo as (n,2) but Librosa expects (2,n)
wav = to_mono(wav.T)
except:
pass
# Use Librosa's resample to downsample to the desired rate
wav = resample(wav, rate, sr)
# Convert audio back to int16 because audio in float sounds awful (due to issues with the floating point)
wav = wav.astype(np.int16)
return sr, wav
wav_name[35:55]
def save_sample(sample, rate, wav_name):
fn = wav_name[35:55]
dst_path = os.path.join(TARGET_ROOT, fn + '.wav')
if os.path.exists(dst_path):
return
wavfile.write(dst_path, rate, sample)
SOURCE_ROOT = 'Audio_Speech_Actors_01-24'
TARGET_ROOT = 'dataset_cleaned'
# Ensure target root exists
if os.path.exists(TARGET_ROOT) is False:
os.mkdir(TARGET_ROOT)
# List all emotions
classes = list(emotion_dict.values())
classes
# Recursive path on the source folder to extract all wav file paths
all_paths = glob(f'{SOURCE_ROOT}/**', recursive=True)
len(all_paths)
# Keep only .wav file paths (discarding folder paths)
wav_paths = [x for x in all_paths if '.wav' in x]
len(wav_paths)
# Dictionary for the filtered signals
signals = {}
# Applying the envelope to all audio files
# Loop through all wav_paths
for wav_name in tqdm(wav_paths):
# Signal and rate
signal, rate = librosa.load(wav_name, sr = ORIGINAL_SR)
# Crease a mask to envelope the signal
mask = envelope(signal, rate, threshold=0.001)
# Apply the envelope
signal = signal[mask]
# Append the signal to the dictionary with all filtered singals
signals[wav_name] = signal
# Save sample
save_sample(signal, rate, wav_name)
# Check a random sample
b = np.random.choice(os.listdir(TARGET_ROOT))
c = os.path.join(TARGET_ROOT, b)
# Signal and rate
signal, rate = librosa.load(c, sr = ORIGINAL_SR)
print(f'File: {b}')
print(f'Signal length: {signal.shape[0]}')
print(f'Rate: {rate}')
playsound(c)
# Variable to gather the total length of all audio files
filtered_audio_len = []
# Loop through the dictionary of filtered signals and add the size of the signal for each audio sample
for key, value in signals.items():
filtered_audio_len.append(value.size)
# Adding the filtered audio lengths to my dataframe
cnt = 0
for i, row in enumerate(audio_df.iterrows()):
audio_df.at[audio_df.index[i], 'filtered_length'] = np.array(filtered_audio_len[cnt])/ORIGINAL_SR
cnt += 1
audio_df
print('STATISTICS FOR THE FILTERED AUDIO FILES')
print(f'Median sample size: {statistics.median(audio_df.filtered_length):.2f} seconds')
print(f'Mean sample size: {statistics.mean(audio_df.filtered_length):.2f} seconds')
print(f'Stdev of sample size: {statistics.stdev(audio_df.filtered_length):.2f} seconds')
print(f'Maximum of sample size: {max(filtered_audio_len)/ORIGINAL_SR:.2f} seconds')
print(f'Minimum of sample size: {min(filtered_audio_len)/ORIGINAL_SR:.2f} seconds')
print('STATISTICS FOR THE ORIGINAL AUDIO FILES')
print(f'Median sample size: {statistics.median(audio_df.file_length):.2f} seconds')
print(f'Mean sample size: {statistics.mean(audio_df.file_length):.2f} seconds')
print(f'Stdev of sample size: {statistics.stdev(audio_df.file_length):.2f} seconds')
print(f'Maximum of sample size: {max(audio_df.file_length):.2f} seconds')
print(f'Minimum of sample size: {min(audio_df.file_length):.2f} seconds')
# Histogram comparing the original and filtered audio datasets
fig, ax = plt.subplots(2, 1, figsize=(16,8), sharey=True, sharex=True)
# Histogram of original audio length
sns.distplot(audio_df.file_length, ax=ax[0], label='Original Audio')
# Histogram of filtered_audio_len
sns.distplot(audio_df.filtered_length, ax=ax[1], label='Filtered Audio')
I need to make all samples on the filtered dataset the same size, so they can be fed to a neural network.
# Checking different percentiles to assess the best length to choose
print(f' 1st percentile: {np.percentile(audio_df.filtered_length, 1):.2f} seconds')
print(f' 5th percentile: {np.percentile(audio_df.filtered_length, 5):.2f} seconds')
print(f'10th percentile: {np.percentile(audio_df.filtered_length, 10):.2f} seconds')
print(f'50th percentile: {np.percentile(audio_df.filtered_length, 50):.2f} seconds')
print(f'90th percentile: {np.percentile(audio_df.filtered_length, 90):.2f} seconds')
print(f'95th percentile: {np.percentile(audio_df.filtered_length, 95):.2f} seconds')
print(f'99th percentile: {np.percentile(audio_df.filtered_length, 99):.2f} seconds')
# Check percentiles of specific audio lengths
print(f' Percentile of files above 1.0 seconds: {stats.percentileofscore(audio_df.filtered_length, 1.0)}')
print(f' Percentile of files above 1.5 seconds: {stats.percentileofscore(audio_df.filtered_length, 1.5)}')
print(f' Percentile of files above 2.0 seconds: {stats.percentileofscore(audio_df.filtered_length, 2.0)}')
print(f' Percentile of files above 2.5 seconds: {stats.percentileofscore(audio_df.filtered_length, 2.5)}')
print(f' Percentile of files above 3.0 seconds: {stats.percentileofscore(audio_df.filtered_length, 3.0)}')
There is a good case to be made for the chosen audio lengths to be the mean or median, as it would imply the least manipulation (padding or cutting) on the audio data.
Yet as long as all classes (emotions) get more or less the same amount of padding with zeros, then I think it makes more sense for a longer audio length to be chosen, as it reduces the loss of data.
# Emotion balance by audio length
# Reindexing to keep the original label order
emotion_length = audio_df.groupby('emotion').sum().reindex(emotion_dict.values())
emotion_length
# Emotion balance by audio length
sns.barplot(x = emotion_length.index, y = emotion_length.filtered_length)
# Plot class distribution
fig, ax = plt.subplots()
ax.set_title('Class Distribution')
ax.pie(emotion_length.filtered_length, labels = emotion_length.index,
autopct='%1.1f%%')
ax.axis('equal')
plt.show()
There is a good class balance in the cleaned dataset, with the notable exception of the undersampled neutral class.
All classes seem to have lost around 50% of audio length, although that's not necessarily a 50% loss of data as most of the audio removed with the envelope was empty space.
I will opt to move ahead with a 2 second audio length, which means that around 80% of the samples will need some zero-padding and 20% will need some cutting.
# Hyperparameters
DT = 2.0 # Delta time
AUDIO_LEN = ORIGINAL_SR * DT
# Create a new dataframe indexed at the cleaned audio files
df = audio_df.copy()
# Create new index mapping to the dataset_cleaned path
new_index = [TARGET_ROOT + i[34:] for i in audio_df.index]
# Map the index to the filenames on the dataset_cleaned folder
df.index = new_index
df
# Longest audio file
a = df['filtered_length'].argmax()
b = df.iloc[a].name
print(f'{df.iloc[a]}')
playsound(b)
# Shorted audio file
a = df['filtered_length'].argmin()
b = df.iloc[a].name
print(f'{df.iloc[a]}')
playsound(b)
# Plotting multiple Melspectrograms - now showing the audio after applying the signal envelope
# Number of plots
nrows, ncols = 3, 3
# Create figure with suplots
fig, ax = plt.subplots(nrows, ncols, figsize=(16,8))
# Define index of subplot
i = 1
# Loop through each row x column pair
for x in range(nrows):
for y in range(ncols):
# Define the row x column location (aka the subplot)
plt.subplot(nrows, ncols, i)
# Pick a random file
random_file = np.random.choice(df.index)
# Read the random file
signal, rate = librosa.load(random_file, sr = ORIGINAL_SR)
# Calculate the melspectrogram
melSpec = librosa.feature.melspectrogram(y=signal, sr=rate, n_mels=N_MELS)
# Convert to log scale (decibels)
melSpec_dB = librosa.power_to_db(melSpec, ref=np.max)
# Plot melspectrogram
librosa.display.specshow(melSpec_dB, x_axis='time', y_axis='mel', sr=rate, fmax=ORIGINAL_SR)
# Plot emotion label
plt.gca().set_title(df[df.index == random_file].emotion.values[0])
i += 1
plt.tight_layout()
plt.show()
The timewindows are of varying lenght, exactly what I'm about to fix!
# Recursive path on the source folder (dataset_cleaned) to extract all wav file paths
all_paths = glob(f'{TARGET_ROOT}/**', recursive=True)
len(all_paths)
# Keep only .wav file paths (discarding folder paths)
wav_paths = [x for x in all_paths if '.wav' in x]
len(wav_paths)
FINAL_ROOT = 'dataset_final'
# Ensure target root exists
if os.path.exists(FINAL_ROOT) is False:
os.mkdir(FINAL_ROOT)
wav_paths[0][16:36]
def save_sample_final(sample, rate, wav_name):
fn = wav_name[16:36]
dst_path = os.path.join(FINAL_ROOT, fn + '.wav')
if os.path.exists(dst_path):
return
wavfile.write(dst_path, rate, sample)
# Dictionary with final signals
signals_final = {}
# Loop through all audio files
for wav_name in tqdm(wav_paths):
# Signal and rate
signal, rate = librosa.load(wav_name, sr = ORIGINAL_SR)
# Resample to 16000 with librosa
#signal = resample(signal, orig_sr=rate, target_sr=16000)
#rate = 16000
#AUDIO_LEN = 32000
# Downsampling did not improve performance
# Check if audio is shorter (or equal) than AUDIO_LEN (2 seconds)
if signal.shape[0] <= AUDIO_LEN:
# Create an array the size of AUDIO_LEN filled with zeros
sample = np.zeros(shape=(int(AUDIO_LEN)), dtype=np.float32)
# Overwrite the zeroes array with data from the wav_name
sample[:signal.shape[0]] = signal
# Dictionary with final signals
signals_final[wav_name] = sample
# Save
save_sample_final(sample, rate, wav_name)
# Else check is the audio is longer than AUDIO_LEN (2 seconds), which is has to be
elif signal.shape[0] > AUDIO_LEN:
# Calculate how much longer it is
extra_len = signal.shape[0] - AUDIO_LEN
# Split the difference in half (and take only the integer part)
half_dif = extra_len//2
# Define start and stop indexes to cut the audio
start = int(half_dif)
stop = int(AUDIO_LEN + half_dif)
# Use the indexes to sample the middle portion of the audio, cutting the edges (begin and end)
sample = signal[start:stop]
# Dictionary with final signals
signals_final[wav_name] = sample
# Save
save_sample_final(sample, rate, wav_name)
# Check if all files got converted
len(os.listdir(FINAL_ROOT))
# Check a random sample
b = np.random.choice(os.listdir(FINAL_ROOT))
c = os.path.join(FINAL_ROOT, b)
# Signal and rate
signal, rate = librosa.load(c, sr = ORIGINAL_SR)
print(f'File: {b}')
print(f'Signal length: {signal.shape[0]}')
print(f'Rate: {rate}')
with open(c, 'r'):
playsound(c)
audio_df.index[0][34:]
# Create new index mapping to the dataset_final path
new_index = [FINAL_ROOT + i[34:] for i in audio_df.index]
new_index[0:3]
# Map the index to the filenames on the dataset_final folder
df.index = new_index
df
This is the whole point of this project: Using Kapre's special NN layer called Melspectrogram which allows for the Melspectogram to be calculated online from the signal files.
This way there's less need for feature engineering before feeding audio data to the Neural Networks.
The usual approach is preprocessing to extracti MFCC features and then making the NN learn based on the MFCC features. The dataset ends up similar to a 'classic' ML dataset
This approach creates the Melspectogram and lets the NN extract features by itself (sort of creating it's own filterbank).
Parameters
22050
128
0.0
fmin
[scalar]None
, it is inferred as sr / 2
.None
2.0
if power-spectrogram,1.0
if amplitude spectrogram.1.0
True
.True
, although it's not by default.False
True
, the frequency-to-mel matrix is initialised with mel frequencies but trainable.False
, it is initialised and then frozen.False
Spectrogram
such as n_dft
, n_hop
,padding
, trainable_kernel
, image_data_format
.Notes
(audio_channel, audio_length)
.
E.g., (1, 44100)
for mono signal, (2, 44100)
for stereo signal.audio_channel
can be any positive integer.Returns
A Keras layer
(None, n_channel, n_mels, n_time)
if 'channels_first'
,(None, n_mels, n_time, n_channel)
if 'channels_last'
,TimeDistributed expects input sequence as batch
time
features
channels
The default configuration on Keras is channels_last
, so Kapre will output batch
n_mels
n_time
n_channel
Therefore Kapre's output has to be reshaped to match the input TimeDistributed expects, which means a Permute(2, 1, 3). It takes only three arguments because batch
refers to each individual sample that passes through and is always on position '0' as None
I had to use layers.
in front of many Keras layers because otherwise Python was returning a bunch of erros
Using n_hop
as SR/10 means splitting the audio data in chunks of 1/10th of a second, which is the model common choice, so I just went along
def Conv1D(N_CLASSES=8, SR=48000, DT=2.0):
i = layers.Input(shape=(1, int(SR*DT)), name='input')
x = Melspectrogram(n_dft=2048, n_hop=SR/100,
padding='same', sr=SR, n_mels=128,
fmin=0.0, fmax=SR/2, power_melgram=2.0,
return_decibel_melgram=True, trainable_fb=False,
trainable_kernel=False,
name='melbands')(i)
x = AdditiveNoise(power=0.05)(x)
x = Normalization2D(str_axis='batch', name='batch_norm')(x)
x = layers.Permute((2,1,3), name='permute')(x)
x = TimeDistributed(layers.Conv1D(8, kernel_size=(4), activation='tanh'), name='td_conv_1d_tanh')(x)
x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_1')(x)
x = TimeDistributed(layers.Conv1D(16, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_1')(x)
x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_2')(x)
x = TimeDistributed(layers.Conv1D(32, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_2')(x)
x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_3')(x)
x = TimeDistributed(layers.Conv1D(64, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_3')(x)
x = layers.MaxPooling2D(pool_size=(2,2), name='max_pool_2d_4')(x)
x = TimeDistributed(layers.Conv1D(128, kernel_size=(4), activation='relu'), name='td_conv_1d_relu_4')(x)
x = layers.GlobalMaxPooling2D(name='global_max_pooling_2d')(x)
x = layers.Dropout(rate=0.1, name='dropout')(x)
x = layers.Dense(64, activation='relu',
activity_regularizer=l2(0.005),
kernel_regularizer=l2(0.005), name='dense')(x)
o = layers.Dense(N_CLASSES, activation='softmax', name='softmax')(x)
model = Model(inputs=i, outputs=o, name='1d_convolution')
opt = tf.keras.optimizers.Adam(learning_rate=0.0003)
model.compile(optimizer=opt,
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def Conv2D(N_CLASSES=8, SR=48000, DT=2.0):
i = layers.Input(shape=(1, int(SR*DT)), name='input')
x = Melspectrogram(n_dft=2048, n_hop=SR/100,
padding='same', sr=SR, n_mels=128,
fmin=0.0, fmax=SR/2, power_melgram=2.0,
return_decibel_melgram=True, trainable_fb=False,
trainable_kernel=False,
name='melbands')(i)
x = AdditiveNoise(power=0.35)(x)
x = Normalization2D(str_axis='batch', name='batch_norm')(x)
x = layers.Conv2D(8, kernel_size=(7,7), activation='tanh', padding='same', name='conv2d_tanh')(x)
x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_1')(x)
x = layers.Conv2D(16, kernel_size=(5,5), activation='relu', padding='same', name='conv2d_relu_1')(x)
x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_2')(x)
x = layers.Conv2D(16, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_2')(x)
x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_3')(x)
x = layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_3')(x)
x = layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_4')(x)
x = layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_4')(x)
x = layers.Flatten(name='flatten')(x)
x = layers.Dropout(rate=0.5, name='dropout')(x)
x = layers.Dense(64, activation='relu',
activity_regularizer=l2(0.02),
kernel_regularizer=l2(0.02), name='dense')(x)
o = layers.Dense(N_CLASSES, activation='softmax', name='softmax')(x)
model = Model(inputs=i, outputs=o, name='2d_convolution')
opt = tf.keras.optimizers.Adam(learning_rate=0.0003)
model.compile(optimizer=opt,
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def LSTM(N_CLASSES=8, SR=48000, DT=2.0):
i = layers.Input(shape=(1, int(SR*DT)), name='input')
x = Melspectrogram(n_dft=2048, n_hop=SR/100,
padding='same', sr=SR, n_mels=128,
fmin=0.0, fmax=SR/2, power_melgram=2.0,
return_decibel_melgram=True, trainable_fb=False,
trainable_kernel=False,
name='melbands')(i)
x = AdditiveNoise(power=0.25)(x)
x = Normalization2D(str_axis='batch', name='batch_norm')(x)
x = layers.Permute((2,1,3), name='permute')(x)
x = TimeDistributed(layers.Reshape((-1,)), name='reshape')(x)
s = TimeDistributed(layers.Dense(64, activation='tanh'),
name='td_dense_tanh')(x)
x = layers.Bidirectional(layers.LSTM(32, return_sequences=True),
name='bidirectional_lstm')(s)
x = layers.concatenate([s, x], axis=2, name='skip_connection')
x = layers.Dense(64, activation='relu', name='dense_1_relu')(x)
x = layers.MaxPooling1D(name='max_pool_1d')(x)
x = layers.Dense(32, activation='relu', name='dense_2_relu')(x)
x = layers.Flatten(name='flatten')(x)
x = layers.Dropout(rate=0.5, name='dropout')(x)
x = layers.Dense(32, activation='relu',
activity_regularizer=l2(0.01),
kernel_regularizer=l2(0.01), name='dense_3_relu')(x)
o = layers.Dense(N_CLASSES, activation='softmax', name='softmax')(x)
model = Model(inputs=i, outputs=o, name='long_short_term_memory')
opt = tf.keras.optimizers.Adam(learning_rate=0.0003)
model.compile(optimizer=opt,
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
MODEL_TYPE = 'Conv1D' # Conv1D, Conv2D or LSTM
AUDIO_DIR= 'dataset_final/' # Directory with audio files
N_CLASSES = 8
BATCH_SIZE = 16
DELTA_TIME = 2.0 # Time in seconds to sample audio
SAMPLING_RATE = 48000 # Sample rate
NFILT = 128 # number of mels, usually a power of 2, and 128 is the most common top value
NFEAT = 64 # Standard is half of nfeat
NFFT = 2048 # Roundup to the nearest power of 2 from (rate * frame_length), being that the default frame_length is 0.025, hence here 48000*0.025 = 1200 -> 2048
N_EPOCH = 30
MODEL_PATH = os.path.join('models', MODEL_TYPE + '.model')
PICKLE_PATH = os.path.join('pickles', MODEL_TYPE + '.pickle')
# Recursive path on the source folder to extract all wav file paths
wav_paths = glob(f'{AUDIO_DIR}/**', recursive=True)
# Keep only .wav file paths (discarding folder paths)
wav_paths = [x for x in wav_paths if '.wav' in x]
len(wav_paths)
# Check the alphabetical order in which labels will get encoded (aka angry will get encoding 0)
set(df.emotion.unique())
# Encode class labels with LabelEncoder()
le = LabelEncoder()
le.fit(classes)
labels = df.emotion
labels = le.transform(labels)
df.emotion[50]
labels[50]
# Train-test splits (paths only)
wav_train, wav_test, label_train, label_test = train_test_split(df.index.values, labels, test_size=0.2)
# Train-valid splits (paths only)
wav_valid, wav_test, label_valid, label_test = train_test_split(wav_test, label_test, test_size=0.5)
# Check if there are enough samples to meet the batch_size
assert len(wav_train) >= BATCH_SIZE, 'number of train samples must be >= batch_size'
Notes
(audio_channel, audio_length)
.
E.g., (1, 44100)
for mono signal, (2, 44100)
for stereo signal.audio_channel
can be any positive integer.Returns
A Keras layer
(None, n_channel, n_mels, n_time)
if 'channels_first'
,(None, n_mels, n_time, n_channel)
if 'channels_last'
,# Instead of feeding the NN a full dataset in matrix-style data this will feed a keras Sequence which batches data online and can even randomly sample
class BatchData(tf.keras.utils.Sequence):
def __init__(self, file_paths, labels, sr, dt, n_classes, batch_size=16, shuffle=True):
# Hyperparameters
self.file_paths = file_paths
self.labels = labels
self.sr = sr
self.dt = dt
self.n_classes = n_classes
self.batch_size = batch_size
self.shuffle = shuffle
self.indexes = np.arange(len(self.file_paths))
# Shuffle
if self.shuffle:
np.random.shuffle(self.indexes)
def __len__(self):
return int(np.floor(len(self.file_paths) / self.batch_size))
def __getitem__(self, index):
# Set a sequence of indexes the size of batch_size
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Get the file_paths and labels for the indexes
file_paths = [self.file_paths[k] for k in indexes]
labels = [self.labels[k] for k in indexes]
# Create arrays to receive the data and labels
X = np.empty((self.batch_size, 1, int(self.sr*self.dt)), dtype=np.float32) # n_samples, n_channels, signal
Y = np.empty((self.batch_size, self.n_classes), dtype=np.float32) # n_samples, n_classes (which will be the probability prediction for each class)
# Extract the signal, rate and label for each sample
for i, (path, label) in enumerate(zip(file_paths, labels)):
signal, rate = librosa.load(path, sr=self.sr)
X[i,] = signal.reshape(1, -1) # reshape to n_channels, signal - audio is mono, so n_channels=1
Y[i,] = to_categorical(label, num_classes=self.n_classes) # convert label to categorical
return X, Y
def on_epoch_end(self):
self.indexes = np.arange(len(self.file_paths))
# Shuffle
if self.shuffle:
np.random.shuffle(self.indexes)
# Instance an object of the data batcher class for each of train, test, valid
train_batch = BatchData(wav_train, label_train, SAMPLING_RATE, DELTA_TIME, len(set(label_train)), batch_size=BATCH_SIZE)
test_batch = BatchData(wav_test, label_test, SAMPLING_RATE, DELTA_TIME, len(set(label_test)), batch_size=BATCH_SIZE, shuffle=False)
valid_batch = BatchData(wav_valid, label_valid, SAMPLING_RATE, DELTA_TIME, len(set(label_valid)), batch_size=BATCH_SIZE)
Note: It was necessary to set suffle=False on the test_batch as it was messing with evaluation metrics, whose predicted probabilities where getting shuffled and were not matching the indexes
# Dictionary with hyperparameters for training
params = {'N_CLASSES': len(set(df.emotion_id)),
'SR': SAMPLING_RATE,
'DT': DELTA_TIME}
# List available models and check if chosen model was built
models = {'Conv1D': Conv1D(**params),
'Conv2D': Conv2D(**params),
'LSTM': LSTM(**params)}
assert MODEL_TYPE in models.keys(), f'{MODEL_TYPE} not an available model'
# Instance model object
model = models[MODEL_TYPE]
model.summary()
# Instance checkpoint object
cp = ModelCheckpoint(f'models/{MODEL_TYPE}.h5', monitor='val_loss',
save_best_only=True, save_weights_only=False,
mode='auto', save_freq='epoch', verbose=1)
# Log path
log_path = os.path.join('logs', f'{MODEL_TYPE}_history.csv')
# Instance logger object
csv_logger = CSVLogger(log_path, append=False)
# Train model
model.fit(train_batch, validation_data=valid_batch,
epochs=N_EPOCH, verbose=1,
callbacks=[csv_logger, cp])
log_csvs = sorted(os.listdir('logs/'))
print(log_csvs)
plt_labels = list(models.keys())
colors = ['r', 'g', 'b']
fig, ax = plt.subplots(1, 3, sharey=True, figsize=(16,5))
for i, (fn, plt_label, c) in enumerate(zip(log_csvs, plt_labels, colors)):
csv_path = os.path.join('logs', fn)
dataf = pd.read_csv(csv_path)
ax[i].set_title(plt_label, size=16)
ax[i].plot(dataf.accuracy, color=c, label='train')
ax[i].plot(dataf.val_accuracy, ls='--', color=c, label='test')
ax[i].legend(loc='upper left')
ax[i].tick_params(axis='both', which='major', labelsize=12)
ax[i].set_ylim([0,1.0])
fig.text(0.5, 0.02, 'Epochs', ha='center', size=14)
fig.text(0.08, 0.5, 'Accuracy', va='center', rotation='vertical', size=14)
plt.show()
MODEL_TYPE = 'LSTM' # Conv1D, Conv2D or LSTM
MODEL_FN = f'models/{MODEL_TYPE}.h5'
# Load model (with custom Kapre objects)
model = load_model(MODEL_FN, custom_objects={'Melspectrogram':Melspectrogram,
'AdditiveNoise':AdditiveNoise,
'Normalization2D':Normalization2D})
# Shuffle must be false for the test batch, otherwise when it's called by model.predict() it will shuffle the order of indexes
# Thus making it so that the order in the array of indexes (test_batch.labels) does not match the order of the predicted probabilities
test_batch.shuffle = False
model_eval = model.evaluate(test_batch)
print(f'Test dataset loss: {model_eval[0]}')
print(f'Test dataset accuracy: {model_eval[1]}')
Loss and accuracy as better on the prediction dataset because the noise layer (kapre AdditiveNoise) as well as the regularization elements (L2, Dropout) are not used during prediction. So essentialy the model was trained on a harder challenge than the one it's given during prediction.
# label encoder sequence -> {'angry', 'calm', 'disgust', 'fearful', 'happy', 'neutral', 'sad', 'surprised'}
emotion_dict
# Check predicted probabilities
pred_probs = model.predict(test_batch)
pred_probs[0:2]
# True encoding
true_encoding = test_batch.labels
true_encoding[0:10]
# Sparse Categorical Cross Entropy
scce = tf.keras.losses.SparseCategoricalCrossentropy()
scce(true_encoding, pred_probs).numpy()
model.evaluate() and the loss manually calculated from model.predict() do not match. Need to investigate this. I'm probably messing something on the manual calculations. Especially becasue model.evaluate() is returning results in line with those obtained on valid_batch during training.
# Predicted encoding
pred_encoding = np.argmax(pred_probs, axis=1)
pred_encoding[0:10]
# Manually calculate accuracy
sum((pred_encoding == true_encoding) / len(true_encoding))
With 8 classes a random guess would give me 0.125 accuracy. Looks like the data is getting shuffled somewhere along the way
# True labels
true_labels = le.inverse_transform(test_batch.labels)
true_labels[0:10]
# Predicted labels
pred_labels = le.inverse_transform(pred_encoding)
pred_labels[0:10]
# Instance an one hot encoder object
onehot_encoder = OneHotEncoder(sparse=False)
# {'angry', 'calm', 'disgust', 'fearful', 'happy', 'neutral', 'sad', 'surprised'}
true_onehot = onehot_encoder.fit_transform(true_encoding.reshape(-1, 1))
true_onehot[0:3]
# Predicted one_hot
pred_onehot = onehot_encoder.fit_transform(pred_encoding.reshape(-1, 1))
pred_onehot[0:3]
# Categorical Cross Entropy
cce = tf.keras.losses.CategoricalCrossentropy()
cce(true_onehot, pred_probs).numpy()
# Data for the plots
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(N_CLASSES):
fpr[i], tpr[i], _ = roc_curve(true_onehot[:, i], pred_probs[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(true_onehot.ravel(), pred_probs.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
# ROC-AUC Plot: Micro and Macro Scores
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(N_CLASSES)]))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(N_CLASSES):
mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
# Finally average it and compute AUC
mean_tpr /= N_CLASSES
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
# Plot all ROC curves
plt.figure(figsize=(10,10))
plt.plot(fpr["micro"], tpr["micro"],
label='micro-average ROC curve (area = {0:0.2f})'
''.format(roc_auc["micro"]),
color='deeppink', linestyle=':', linewidth=4)
plt.plot(fpr["macro"], tpr["macro"],
label='macro-average ROC curve (area = {0:0.2f})'
''.format(roc_auc["macro"]),
color='navy', linestyle=':', linewidth=4)
plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi Class ROC')
legend = plt.legend()
legend._legend_box.align = "right"
plt.legend(loc="lower right")
plt.show()
# ROC-AUC Plot: Class Scores
plt.figure(figsize=(10,10))
colors = []
cm = pylab.get_cmap('nipy_spectral')
for i in range(N_CLASSES):
color = cm(1.*i/N_CLASSES)
colors.append(color)
unique_labels = np.unique(classes)
for i, color in zip(range(N_CLASSES), colors):
plt.plot(fpr[i], tpr[i], color=color, lw=2,
label='{0} (area = {1:0.2f})'.format(unique_labels[i], roc_auc[i]))
plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi Class ROC')
legend = plt.legend()
legend._legend_box.align = "right"
plt.legend(loc="lower right")
plt.show()
conf_mat = confusion_matrix(true_labels, pred_labels, labels=unique_labels)
df_cm = pd.DataFrame(conf_mat, index = unique_labels,
columns = unique_labels)
plt.figure(figsize = (10,8))
plt.title('Confusion Matrix', fontsize = 24, pad = 20, fontweight='bold')
sns.heatmap(df_cm, annot=True, cmap='viridis')
plt.ylabel('True Emotion', fontsize = 18, labelpad = 20)
plt.xlabel('Predicted Emotion', fontsize = 18, labelpad = 20)
plt.show()
# Metrics
precision = precision_score(true_encoding, pred_encoding, average = 'macro')
recall = recall_score(true_encoding, pred_encoding, average = 'macro')
f1_sc = f1_score(true_encoding, pred_encoding, average = 'macro')
accuracy_sc = accuracy_score(true_encoding, pred_encoding)
roc_auc_macro = roc_auc['macro']
print('Model Performance Metrics:')
print(f'Precision = {precision:.5f}')
print(f'Recal = {recall:.5f}')
print(f'F1 Score = {f1_sc:.5f}')
print(f'Accuracy = {accuracy_sc:.5f}')
print(f'ROC-AUC = {roc_auc_macro:.5f}')