Matheus Schmitz
LinkedIn
Github Portfolio
My goal is pretty straightforward here, I want to see if I can create a model capable of removing watermarks from images.
I'm using a modified version of Pix2Pix. Reference Pix2Pix implementation from TensorFlow: https://www.tensorflow.org/tutorials/generative/pix2pix
# File manipulation imports for Google Colab
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir("/content/drive/My Drive/Colab Notebooks/Watermark_GAN/")
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
!pip install --upgrade --quiet albumentations
import torch
import torch.nn as nn
import torch.nn.functional as F
class ConvBlock(nn.Module):
def __init__(self, in_channels, out_channels, use_act=True, **kwargs):
super().__init__()
self.cnn = nn.Conv2d(in_channels, out_channels, **kwargs, bias=False, padding_mode='reflect')
self.bn = nn.BatchNorm2d(out_channels)
self.act = nn.PReLU() if use_act else nn.Identity()
def forward(self, x):
return self.act(self.bn(self.cnn(x)))
class ResidualBlock(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.survival_prob = 0.8
self.block1 = ConvBlock(
in_channels,
in_channels,
kernel_size=3,
stride=1,
padding=1,
use_act=False
)
self.block2 = ConvBlock(
in_channels,
in_channels,
kernel_size=3,
stride=1,
padding=1,
use_act=True
)
def stochastic_depth(self, x):
'''
Apply stochastic dropout to entire layers during training, and adjsuts weights accordingly at inference time
During test instead of dropping fully removing (or not) the layer, we shrink all activations too keep outgoing signal in line with values seen during trainig
https://github.com/aleju/papers/blob/master/neural-nets/Deep_Networks_with_Stochastic_Depth.md
'''
if self.training:
return torch.bernoulli(torch.tensor(self.survival_prob)) * x
return x * self.survival_prob
def forward(self, x):
out = self.block1(x)
out = self.block2(out)
return self.stochastic_depth(out) + x
class Block(nn.Module):
def __init__(self, in_channels, out_channels, stride=2):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False, padding_mode='reflect'),
nn.BatchNorm2d(out_channels),
nn.PReLU()
)
def forward(self, x):
return self.conv(x)
class Generator(nn.Module):
def __init__(self, in_channels, features=64, num_residuals=9):
super().__init__()
self.initial_down = nn.Sequential(
nn.Conv2d(in_channels, features, 7, 1, 3, bias=True, padding_mode='reflect'),
nn.PReLU()
) # /1
self.down1 = Block(features, features*2, stride=2) # /2
self.down2 = Block(features*2, features*4, stride=2) # /4
self.down3 = Block(features*4, features*8, stride=2) # /8
self.down4 = Block(features*8, features*16, stride=2) # /16
self.residuals = nn.Sequential(*[ResidualBlock(features*16) for _ in range(num_residuals)]) # /16
self.up1 = Block(features*16, features*8, stride=1) # /16
self.up2 = Block(features*8*2, features*4, stride=1) # /8
self.up3 = Block(features*4*2, features*2, stride=1) # /4
self.up4 = Block(features*2*2, features, stride=1) # /2
self.final_conv = nn.Sequential(
Block(features*2, features, stride=1), # /1
Block(features, features, stride=1), # /1
nn.Conv2d(features, in_channels, kernel_size=7, stride=1, padding=3, padding_mode='reflect'), # /1
nn.Tanh(),
)
def forward(self, x):
d1 = self.initial_down(x) # out: size/1
d2 = self.down1(d1) # out: size/2
d3 = self.down2(d2) # out: size/4
d4 = self.down3(d3) # out: size/8
d5 = self.down4(d4) # out: size/16
residuals = self.residuals(d5) + d5 # out: size/16
up1 = self.up1(F.interpolate(residuals, scale_factor=2, mode='nearest')) # out: size/8
up2 = self.up2(F.interpolate(torch.cat([up1, d4], dim=1), scale_factor=2, mode='nearest')) # out: size/4
up3 = self.up3(F.interpolate(torch.cat([up2, d3], dim=1), scale_factor=2, mode='nearest')) # out: size/2
up4 = self.up4(F.interpolate(torch.cat([up3, d2], dim=1), scale_factor=2, mode='nearest')) # out: size/1
return self.final_conv(torch.cat([up4, d1], dim=1)) # out: size/1
import torch
import torch.nn as nn
from torch.nn.utils import spectral_norm
class CNNBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super().__init__()
self.conv = nn.Sequential(
spectral_norm(nn.Conv2d(
in_channels, out_channels, 3, stride, 1, bias=False, padding_mode='reflect'
)),
nn.PReLU()
)
def forward(self, x):
return self.conv(x)
class Discriminator(nn.Module):
def __init__(self, in_channels=3, features=[64, 128, 256, 512, 512, 1024]):
super().__init__()
self.initial = nn.Sequential(
spectral_norm(
nn.Conv2d(
in_channels*2, # we concatenate the {true/fake} image and the label on the channels axis.
# This is because we don't merely want to learn whether an image is real or fake, but whether the image is real/fake from the originating input (the label)
features[0],
kernel_size=3,
stride=1,
padding=1,
padding_mode='reflect'
)
),
nn.PReLU(),
)
layers = []
in_channels = features[0]
for idx, feature in enumerate(features[1:]):
layers.append(CNNBlock(in_channels, feature, stride=1 if idx==len(features)-2 else 2))
in_channels = feature
layers.append(
nn.Sequential(
nn.Conv2d(
in_channels, 1, kernel_size=3, stride=1, padding=1, padding_mode='reflect'
)
)
)
self.model = nn.Sequential(*layers)
def forward(self, x, y):
x = torch.cat([x,y], dim=1)
x = self.initial(x)
x = self.model(x)
return x
import torch
import torch.nn as nn
import torchvision
class VGG19(nn.Module):
def __init__(self, requires_grad=False):
super().__init__()
vgg_pretrained_features = torchvision.models.vgg19(pretrained=True).features
self.slice1 = nn.Sequential()
self.slice2 = nn.Sequential()
self.slice3 = nn.Sequential()
self.slice4 = nn.Sequential()
self.slice5 = nn.Sequential()
# One feature map for each scale, collected right before the MaxPool, uses features before ReLU activations, for denser feature maps, as per the ESRGAN paper
for x in range(3):
self.slice1.add_module(str(x), vgg_pretrained_features[x])
for x in range(3, 8):
self.slice2.add_module(str(x), vgg_pretrained_features[x])
for x in range(8, 17):
self.slice3.add_module(str(x), vgg_pretrained_features[x])
for x in range(17, 26):
self.slice4.add_module(str(x), vgg_pretrained_features[x])
for x in range(26, 35):
self.slice5.add_module(str(x), vgg_pretrained_features[x])
if not requires_grad:
for param in self.parameters():
param.requires_grad = False
def forward(self, x):
h_relu1 = self.slice1(x)
h_relu2 = self.slice2(h_relu1)
h_relu3 = self.slice3(h_relu2)
h_relu4 = self.slice4(h_relu3)
h_relu5 = self.slice5(h_relu4)
out = [h_relu1, h_relu2, h_relu3, h_relu4, h_relu5]
return out
class VGGLoss(nn.Module):
def __init__(self):
super().__init__()
self.vgg = VGG19().cuda()
self.criterion = nn.L1Loss()
self.weights = [1.0/32, 1.0/16, 1.0/8, 1.0/4, 1.0]
def forward(self, x, y):
x_vgg, y_vgg = self.vgg(x), self.vgg(y)
loss = 0
for i in range(len(x_vgg)):
loss += self.weights[i] * self.criterion(x_vgg[i], y_vgg[i].detach()) # gradient descent only on generated images (x) not on ground truth (y)
return loss
from torch.cuda import is_available
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
TRAIN_DIR = 'Urban 100/X4 Urban100/X4/HIGH x4 URban100/'
VAL_DIR = 'Urban 100/X4 Urban100/X4/HIGH x4 URban100/'
CHECKPOINT_GEN = 'generator.pth'
CHECKPOINT_DISC = 'discriminator.pth'
SAVE_MODEL = True
LOAD_MODEL = True
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
LEARNING_RATE_GEN = 5e-4
LEARNING_RATE_DISC = 5e-5
NUM_EPOCHS = 1000
BATCH_SIZE = 8
NUM_WORKERS = 4
ADVERSARIAL_LOSS_WEIGHT = 1
L1_LOSS_WEIGHT = 100
PERCEPTUAL_LOSS_WEIGHT = 1
IMG_MEAN = [0.5, 0.5, 0.5]
IMG_STD = [0.5, 0.5, 0.5]
EPS_LABEL_SMOOTING = 0.05
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import cv2
import numpy as np
import os
import copy
import albumentations as A
from albumentations.pytorch import ToTensorV2
import random
from albumentations.augmentations.transforms import PixelDropout
transform_watermark = A.Compose([
A.RandomCrop(256,256),
A.HorizontalFlip(),
A.VerticalFlip(),
A.Sharpen(),
*[A.RandomSunFlare(num_flare_circles_lower=5, num_flare_circles_upper=10, src_radius=20) for _ in range(5)],
A.AdvancedBlur(),
A.ColorJitter(),
ToTensorV2(),
])
transform_training = A.Compose([
A.RandomCrop(256,256),
A.HorizontalFlip(),
A.ColorJitter(p=0.2),
A.PixelDropout(),
#ToTensorV2(),
])
transform_validation = A.Compose([
A.RandomCrop(256,256),
ToTensorV2()
])
transform_normalize = A.Compose([
A.Normalize(mean=IMG_MEAN, std=IMG_STD, max_pixel_value=255.0,),
ToTensorV2(),
])
class UrbanDataset(Dataset):
def __init__(self, root_dir, is_training=True, shuffle=True):
self.root_dir = root_dir
self.is_training = is_training
self.list_files = os.listdir(self.root_dir)
self._watermark = watermark = np.array(Image.open("watermark.jpg"))
if shuffle:
random.shuffle(self.list_files)
def __len__(self):
return len(self.list_files)
def __getitem__(self, index):
# Load images
img_file = self.list_files[index]
img_path = os.path.join(self.root_dir, img_file)
watermark = copy.deepcopy(self._watermark)
img = np.array(Image.open(img_path))
# Augment
watermark = transform_watermark(image=watermark)['image'].permute(1,2,0).numpy()
if self.is_training:
img = transform_training(image=img)['image']
img = transform_validation(image=img)['image'].permute(1,2,0).numpy()
# Store the target image before watermark is applied
target_image = copy.deepcopy(img)
# Crop watermark to fit the image if needed
if watermark.shape[0] > img.shape[0]:
watermark = watermark[:img.shape[0],:]
if watermark.shape[1] > img.shape[1]:
watermark = watermark[:, :img.shape[1]]
# Get dimensions
h_watermark, w_watermark, _ = watermark.shape
h_img, w_img, _ = img.shape
# Pick a random section of the image to paste the watermark on
y_start_max = h_img - h_watermark
top_y = int(np.random.uniform(low=0, high=y_start_max))
x_start_max = w_img - w_watermark
left_x = int(np.random.uniform(low=0, high=x_start_max))
bottom_y = top_y + h_watermark
right_x = left_x + w_watermark
# Cut a slice from the image with size matching the watermark, then overlap both
roi = img[top_y:bottom_y, left_x:right_x]
result = cv2.addWeighted(roi, 1, watermark, np.random.uniform(low=0.5, high=1), 0)
# Finally, paste the merged sliced/watermark back on the image
img[top_y:bottom_y, left_x:right_x] = result
input_image = img
# Normaize
target_image = transform_normalize(image=target_image)['image']
input_image = transform_normalize(image=input_image)['image']
# Convert to tensor
#target_image = torch.from_numpy(target_image)
#input_image = torch.from_numpy(input_image)
# PyTorch expects the last two channels to be height and width
assert target_image.shape[-1] > 3, 'PyTorch expects the last two channels to be height and width'
assert target_image.shape[-2] > 3, 'PyTorch expects the last two channels to be height and width'
assert input_image.shape[-1] > 3, 'PyTorch expects the last two channels to be height and width'
assert input_image.shape[-2] > 3, 'PyTorch expects the last two channels to be height and width'
return input_image, target_image
def denormalize(x: torch.Tensor, mean, std):
# 3, H, W, B
ten = x.clone().permute(1, 2, 3, 0)
for t, m, s in zip(ten, mean, std):
t.mul_(s).add_(m)
# B, 3, H, W
return torch.clamp(ten, 0, 1).permute(3, 0, 1, 2)
import torch
from torchvision.utils import save_image
def save_some_examples(gen, val_loader, epoch, folder):
if not os.path.exists(folder):
os.makedirs(folder)
x, y = next(iter(val_loader))
x, y = x.to(DEVICE), y.to(DEVICE)
gen.eval()
with torch.no_grad():
y_fake = gen(x)
x = denormalize(x, mean=IMG_MEAN, std=IMG_STD)
y = denormalize(y, mean=IMG_MEAN, std=IMG_STD)
y_fake = denormalize(y_fake, mean=IMG_MEAN, std=IMG_STD)
save_image(y_fake, folder + f"/epoch_{epoch}_y_gen.png")
save_image(x, folder + f"/epoch_{epoch}_input.png")
save_image(y, folder + f"/epoch_{epoch}_label.png")
gen.train()
def save_checkpoint(model, optimizer, filename="my_checkpoint.pth.tar"):
print("=> Saving checkpoint")
checkpoint = {
"state_dict": model.state_dict(),
"optimizer": optimizer.state_dict(),
}
torch.save(checkpoint, filename)
def load_checkpoint(checkpoint_file, model, optimizer, lr):
print("=> Loading checkpoint")
checkpoint = torch.load(checkpoint_file, map_location=DEVICE)
model.load_state_dict(checkpoint["state_dict"])
optimizer.load_state_dict(checkpoint["optimizer"])
# If we don't do this then it will just have learning rate of old checkpoint
# and it will lead to many hours of debugging \:
for param_group in optimizer.param_groups:
param_group["lr"] = lr
!pip install --quiet adabelief-pytorch
from adabelief_pytorch import AdaBelief
import torch
#from utils import save_checkpoint, load_checkpoint, save_some_examples
import torch.nn as nn
import torch.optim as optim
# import config
# from dataset import UrbanDataset
# from generator_model import Generator
# from discriminator_model import Discriminator
from torch.utils.data import DataLoader
from tqdm import tqdm
from torchvision.utils import save_image
import time
def train_model(epoch, disc, gen, loader, opt_disc, opt_gen, l1, bce, vgg_loss, g_scaler, d_scaler, verbose=True):
#loop = tqdm(loader, leave=True)
start_time = time.time()
for idx, (x, y) in enumerate(loader):
x, y = x.to(DEVICE), y.to(DEVICE)
# # Discriminator is too good, only train it every n-th step
# nth_step = 10
# if idx%nth_step == 0:
# Train Discriminator with automatic mixed precision
with torch.cuda.amp.autocast():
y_fake = gen(x)
D_real = disc(x, y)
D_fake = disc(x, y_fake.detach())
D_real_loss = bce(D_real, torch.ones_like(D_real) - EPS_LABEL_SMOOTING)
D_fake_loss = bce(D_fake, torch.zeros_like(D_fake) + EPS_LABEL_SMOOTING)
D_loss = (D_real_loss + D_fake_loss) / 2
opt_disc.zero_grad()
d_scaler.scale(D_loss).backward()
d_scaler.step(opt_disc)
d_scaler.update()
# Train Generator with automatic mixed precision
with torch.cuda.amp.autocast():
D_fake = disc(x, y_fake)
loss_adversarilal = ADVERSARIAL_LOSS_WEIGHT * bce(D_fake, torch.ones_like(D_fake))
loss_l1 = L1_LOSS_WEIGHT * l1(y_fake, y)
loss_vgg = PERCEPTUAL_LOSS_WEIGHT * vgg_loss(y_fake, y)
G_loss = loss_adversarilal + loss_l1 + loss_vgg
opt_gen.zero_grad()
g_scaler.scale(G_loss).backward()
g_scaler.step(opt_gen)
g_scaler.update()
duration = time.time() - start_time
if verbose:
print(f"Epoch: {epoch} | Disc Real Loss: {D_real_loss:.4f} | Disc Fake Loss: {D_fake_loss:.4f} | Adv. Loss: {loss_adversarilal:.4f} | VGG Loss: {loss_vgg:.4f} | L1 Loss: {loss_l1:.4f} | Time: {duration:.2f} seconds")
def main():
disc = Discriminator(in_channels=3).to(DEVICE)
gen = Generator(in_channels=3).to(DEVICE)
opt_disc = AdaBelief(disc.parameters(), lr=LEARNING_RATE_DISC, betas=(0.5, 0.999), print_change_log=False)
opt_gen = AdaBelief(gen.parameters(), lr=LEARNING_RATE_GEN, betas=(0.5, 0.999), print_change_log=False)
BCE_LOSS = nn.BCEWithLogitsLoss()
L1_LOSS = nn.L1Loss()
VGG_LOSS = VGGLoss()
if LOAD_MODEL:
load_checkpoint(CHECKPOINT_GEN, gen, opt_gen, LEARNING_RATE_GEN)
load_checkpoint(CHECKPOINT_DISC, disc, opt_disc, LEARNING_RATE_DISC)
train_dataset = UrbanDataset(TRAIN_DIR, is_training=True)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_dataset = UrbanDataset(VAL_DIR, is_training=False)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=True)
# Train with float16 instead of float32
g_scaler = torch.cuda.amp.GradScaler()
d_scaler = torch.cuda.amp.GradScaler()
for epoch in range(NUM_EPOCHS):
train_model(epoch, disc, gen, train_loader, opt_disc, opt_gen, L1_LOSS, BCE_LOSS, VGG_LOSS, g_scaler, d_scaler)
if SAVE_MODEL and epoch % 10 == 0:
save_checkpoint(gen, opt_gen, filename=CHECKPOINT_GEN)
save_checkpoint(disc, opt_disc, filename=CHECKPOINT_DISC)
save_some_examples(gen, val_loader, epoch, folder='evaluation')
# Run main and train the model
main()
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
# Plot original images, tetra-chrome images, and images colored by the model
examples = [118, 128, 138, 148, 158]
num_images = len(examples)
fig, axs = plt.subplots(ncols=3, nrows=num_images, figsize=(25,num_images*3))
images_path = 'evaluation/'
first_images = 0
# Loop through axes and plot random images
for axs_row, epoch in zip(range(axs.shape[0]), examples):
# set image index
img_index = first_images
first_images += 1
# Input
img = mpimg.imread(images_path + f'epoch_{epoch}_input.png')
axs[axs_row][0].imshow(img)
axs[axs_row][0].set_xticks([])
axs[axs_row][0].set_yticks([])
axs[axs_row][0].set_title('Input Image')
# GAN generation
img = mpimg.imread(images_path + f'epoch_{epoch}_y_gen.png')
axs[axs_row][1].imshow(img)
axs[axs_row][1].set_xticks([])
axs[axs_row][1].set_yticks([])
axs[axs_row][1].set_title('Generated Image')
# Label
img = mpimg.imread(images_path + f'epoch_{epoch}_label.png')
axs[axs_row][2].imshow(img)
axs[axs_row][2].set_xticks([])
axs[axs_row][2].set_yticks([])
axs[axs_row][2].set_title('Label')
plt.tight_layout(w_pad=-80)
# Plot original images, tetra-chrome images, and images colored by the model
examples = [183, 184, 185, 186, 187]
num_images = len(examples)
fig, axs = plt.subplots(ncols=3, nrows=num_images, figsize=(25,num_images*3))
images_path = 'evaluation/'
first_images = 0
# Loop through axes and plot random images
for axs_row, epoch in zip(range(axs.shape[0]), examples):
# set image index
img_index = first_images
first_images += 1
# Input
img = mpimg.imread(images_path + f'epoch_{epoch}_input.png')
axs[axs_row][0].imshow(img)
axs[axs_row][0].set_xticks([])
axs[axs_row][0].set_yticks([])
axs[axs_row][0].set_title('Input Image')
# GAN generation
img = mpimg.imread(images_path + f'epoch_{epoch}_y_gen.png')
axs[axs_row][1].imshow(img)
axs[axs_row][1].set_xticks([])
axs[axs_row][1].set_yticks([])
axs[axs_row][1].set_title('Generated Image')
# Label
img = mpimg.imread(images_path + f'epoch_{epoch}_label.png')
axs[axs_row][2].imshow(img)
axs[axs_row][2].set_xticks([])
axs[axs_row][2].set_yticks([])
axs[axs_row][2].set_title('Label')
plt.tight_layout(w_pad=-80)
It seems the model already learned decently well how to "vanish" the watermarks when the background has a lot of color gradation, but it still produce lackluster results when the background consists of a single color.
Matheus Schmitz
LinkedIn
Github Portfolio