Config.
py
from pathlib import Path
# create an configuration
def get_config():
return {
"batch_size": 8,
"num_epochs": 20,
"lr": 10**-4,
"seq_len": 350,
"d_model": 512,
"lang_src": "en",
"lang_tgt": "it",
"model_folder": "weights",
"model_basename": "tmodel_",
"preload": None,
"tokenizer_file": "tokenizer_{0}.json",
"experiment_name": "runs/tmodel_",
# get weights
def get_weights_file_path(config, epoch: str):
model_folder = config["model_folder"]
model_basename = config["model_basename"]
model_filename = f"{model_basename}{epoch}.pt"
return str(Path('.') / model_folder / model_filename)
Corpus.py
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from typing import Any
class BillingualDataset(Dataset):
def __init__(self, dataset, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len):
super().__init__()
self.dataset = dataset
self.tokenizer_src = tokenizer_src
self.tokenizer_tgt = tokenizer_tgt
self.src_lang = src_lang
self.tgt_lang = tgt_lang
self.seq_len = seq_len
self.sos_token = torch.tensor([
tokenizer_src.token_to_id('[SOS]')
], dtype=torch.int64)
self.eos_token = torch.tensor([
tokenizer_src.token_to_id('[EOS]')
], dtype=torch.int64)
self.pad_token = torch.tensor([
tokenizer_src.token_to_id('[PAD]')
], dtype=torch.int64)
def __len__(self):
return len(self.dataset)
def __getitem__(self, index) -> Any:
src_target_pair = self.dataset[index]
src_text = src_target_pair["translation"][self.src_lang]
tgt_text = src_target_pair["translation"][self.tgt_lang]
enc_input_tokens = self.tokenizer_src.encode(src_text).ids
dec_input_tokens = self.tokenizer_tgt.encode(tgt_text).ids
enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2
dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1
if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:
raise ValueError("Someone is too long")
# add SOS and EOS to the source text
encoder_input = torch.cat(
self.sos_token,
torch.tensor(enc_input_tokens, dtype=torch.int64),
self.eos_token,
torch.tensor([self.pad_token] * enc_num_padding_tokens, dtype=torch.int64)
# add SOS to the decoder input
decoder_input = torch.cat(
self.sos_token,
torch.tensor(dec_input_tokens, dtype=torch.int64),
torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)
]
)
# add EOS to the label
label = torch.cat(
torch.tensor(dec_input_tokens, dtype=torch.int64),
self.eos_token,
torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)
assert encoder_input.size(0) == self.seq_len
assert decoder_input.size(0) == self.seq_len
assert label.size(0) == self.seq_len
return {
"encoder_input": encoder_input, # seq_len
"decoder_input": decoder_input, # seq_len
"encoder_mask": (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(), # (1, 1,
seq_len)
"decoder_mask": (decoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int() &
casual_mask(decoder_input.size(0)), # (seq_len, 1) & (1, seq_len, seq_len)
"label": label, # (seq_len)
"src_text": src_text,
"tgt_text": tgt_text
def casual_mask(size):
mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int)
return mask == 0
Model.py
# import libraries
import torch
import math
import torch.nn as nn
# d_model -> size of embbeded vector
# h -> number of heads
class InputEmbeddings(nn.Module):
def __init__(self, d_model: int, vocab_size: int):
super().__init__()
self.d_model = d_model
self.vocab_size = vocab_size
self.embedding = nn.Embedding(vocab_size, d_model)
def forward(self, x):
return self.embedding(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, sen_len: int, dropout: float) -> None:
super().__init__()
self.d_model = d_model
self.sen_len = sen_len
self.dropout = nn.Dropout(dropout)
# create a matrix of shape (sen_len, d_model)
pe = torch.zeros(sen_len, d_model)
# create a vector of shape (sen_len)
position = torch.arange(0, sen_len, dtype=torch.float).unsqueeze(1) # shape: (sen_len, 1)
# formula
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
# apply sin to even positions
pe[:, 0::2] = torch.sin(position * div_term)
# apply cos to odd positions
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0) # shape: (1, sen_len, d_model)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
return self.dropout(x)
class LayerNormalization(nn.Module):
def __init__(self, eps: float = 10**-6) -> None:
super().__init__()
self.eps = eps
# alpha -> Multiplicative
self.alpha = nn.Parameter(torch.ones(1))
# beta -> Addictive
self.beta = nn.Parameter(torch.zeros(1))
def forward(self, x):
mean = x.mean(dim = -1, keepdim=True)
std = x.std(dim = -1, keepdim=True)
return self.alpha * (x - mean) / (std + self.eps) + self.beta
class FeedForwardBlock(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
super().__init__()
self.linear_01 = nn.Linear(d_model, d_ff) # w1 and b1
self.dropout = nn.Dropout(dropout)
self.linear_02 = nn.Linear(d_ff, d_model) # w2 and b2
def forward(self, x):
# (Batch, sen_len, d_model) --> (Batch, sen_len, d_ff) --> (Batch, sen_len, d_model)
return self.linear_02(self.dropout(torch.relu(self.linear_01(x))))
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, heads: int, dropout: float) -> None:
super().__init__()
self.d_model = d_model
self.heads = heads
assert d_model % heads == 0, "d_model is not divisible by heads"
self.d_k = d_model // heads
# set the query, key and value vector
self.w_q = nn.Linear(d_model, d_model) # w_q
self.w_k = nn.Linear(d_model, d_model) # w_k
self.w_v = nn.Linear(d_model, d_model) # w_v
# output
self.w_o = nn.Linear(d_model, d_model) # w_o
self.dropout = nn.Dropout(dropout)
@staticmethod
def Attention(query, key, value, mask, dropout: nn.Dropout):
d_k = query.shape[-1]
# (Batch, h, sen_len, d_k) --> (Batch, h, sen_len, sen_len)
attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
attention_scores.masked_fill_(mask == 0, -1e9)
attention_scores = attention_scores.softmax(dim = -1) # (Batch, h, sen_len, sen_len)
if dropout is not None:
attention_scores = dropout(attention_scores)
return (attention_scores @ value), attention_scores
def forward(self, q, k, v, mask):
# (Batch, sen_len, d_model) --> (Batch, sen_len, d_model)
query = self.w_q(q)
key = self.w_k(k)
value = self.w_v(v)
# (Batch, sen_len, d_model) --> (Batch, sen_len, heads, d_k) --> (Batch, heads, sen_len, d_k)
query = query.view(query.shape[0], query.shape[1], self.heads, self.d_k).transpose(1, 2)
key = key.view(key.shape[0], key.shape[1], self.heads, self.d_k).transpose(1, 2)
value = value.view(value.shape[0], value.shape[1], self.heads, self.d_k).transpose(1, 2)
# call the attention mechanism
x, self.attention_scores = MultiHeadAttention.Attention(query, key, value, mask, self.dropout)
# (Batch, heads, sen_len, d_k) --> (Batch, sen_len, heads, d_k) --> (Batch, sen_len, d_model)
x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.heads * self.d_k)
# (Batch, sen_len, d_model) --> (Batch, sen_len, d_model)
return self.w_o(x)
class ResidualConnection(nn.Module):
def __init__(self, dropout: float):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.norm = LayerNormalization()
def forward(self, x, sublayer):
return x + self.dropout(sublayer(self.norm(x)))
class EncoderBlock(nn.Module):
def __init__(self, self_attention_block: MultiHeadAttention, feed_forward_network:
FeedForwardBlock, dropout: float):
super().__init__()
self.self_attention_block = self_attention_block
self.feed_forward_block = feed_forward_network
self.residual_connection = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])
def forward(self, x, src_mask):
x = self.residual_connection[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
x = self.residual_connection[1](x, self.feed_forward_block)
return x
class Encoder(nn.Module):
def __init__(self, layers: nn.ModuleList):
super().__init__()
self.layers = layers
self.norm = LayerNormalization()
def forward(self, x, mask):
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
class DecoderBlock(nn.Module):
def __init__(self, self_attention_block: MultiHeadAttention, cross_attention_block:
MultiHeadAttention, feed_forward_block: FeedForwardBlock, dropout: float):
super().__init__()
self.self_attention_block = self_attention_block
self.cross_attention_block = cross_attention_block
self.feed_forward_block = feed_forward_block
self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])
def forward(self, x, encoder_output, src_mask, tgt_mask):
x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output,
encoder_output, src_mask))
x = self.residual_connections[2](x, self.feed_forward_block)
return x
class Decoder(nn.Module):
def __init__(self, layers: nn.ModuleList):
super().__init__()
self.layers = layers
self.norm = LayerNormalization()
def forward(self, x,encoder_output, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, encoder_output, src_mask, tgt_mask)
return self.norm(x)
class ProjectionLayer(nn.Module):
def __init__(self, d_model: int, vocab_size: int):
super().__init__()
self.linear = nn.Linear(d_model, vocab_size)
def forward(self, x):
# (Batch, sen_len, d_model) --> (Batch, sen_len, vocab_size)
return torch.log_softmax(self.linear(x), dim=-1)
class Transformer(nn.Module):
def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed:
InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, proj: ProjectionLayer):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.src_pos = src_pos
self.tgt_pos = tgt_pos
self.projection = proj
def encode(self, src, src_mask):
src = self.src_embed(src)
src = self.src_pos(src)
return self.encoder(src, src_mask)
def decode(self, encoder_output, src_mask, tgt, tgt_mask):
tgt = self.tgt_embed(tgt)
tgt = self.tgt_pos(tgt)
return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
def project(self, x):
return self.projection(x)
# build the transformer
# N -> number of encoder and decoder blocks
def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int,
d_model: int = 512, N: int = 6, heads: int = 8, dropout: float = 0.1, d_ff: int = 2048) -> Transformer:
# create the embedding layers
src_embed = InputEmbeddings(d_model, src_vocab_size)
tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)
# create the positional encodings
src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)
# create the encoder blocks
encoder_blocks = []
for _ in range(N):
encoder_self_attention_block = MultiHeadAttention(d_model, heads, dropout)
feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
encoder_block = EncoderBlock(encoder_self_attention_block, feed_forward_block, dropout)
encoder_blocks.append(encoder_block)
# create the decoder blocks
decoder_blocks = []
for _ in range(N):
decoder_self_attention_block = MultiHeadAttention(d_model, heads, dropout)
decoder_cross_attention_block = MultiHeadAttention(d_model, heads, dropout)
feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block,
feed_forward_block, dropout)
decoder_blocks.append(decoder_block)
# create encoder and decoder blocks
encoder = Encoder(nn.ModuleList(encoder_blocks))
decoder = Decoder(nn.ModuleList(decoder_blocks))
# create the projection layer
projection_layer = ProjectionLayer(d_model, tgt_vocab_size)
# create the transformer
transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos,
projection_layer)
# Intialize the parameters
for p in transformer.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return transformer
Train.py
import torch
import warnings
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace
from pathlib import Path
from corpus import BillingualDataset, casual_mask
from model import build_transformer
from config import get_weights_file_path, get_config
def get_all_sentences(dataset, lang):
for item in dataset:
yield item['translation'][lang]
# build the tokenizer
def get_or_build_tokenizer(config, dataset, lang):
# tokenizer path
Tokenizer_path = Path(config["tokenizer_file"].format(lang))
if not Path.exists(Tokenizer_path):
tokenizer = Tokenizer(WordLevel(unk_token='[UNK]'))
tokenizer.pre_tokenizer = Whitespace()
trainer = WordLevelTrainer(special_tokens=["[UNK]","[PAD]","[SOS]","[EOS]"], min_frequency=2)
tokenizer.train_from_iterator(get_all_sentences(dataset, lang), trainer=trainer)
tokenizer.save(str(Tokenizer_path))
else:
tokenizer = Tokenizer.from_file(str(Tokenizer_path))
return tokenizer
# get the dataset
def get_dataset(config):
dataset_raw = load_dataset("opus_books", f"{config['lang_src']}-{config['lang_tgt']}", split="train")
# build the tokenizer
tokenizer_src = get_or_build_tokenizer(config, dataset_raw, config["lang_src"])
tokenizer_tgt = get_or_build_tokenizer(config, dataset_raw, config["lang_tgt"])
# keep the 90% data for training and 10% for testing
train_ds_size = int(0.9 * len(dataset_raw))
val_ds_size = len(dataset_raw) - train_ds_size
train_ds_raw, val_ds_raw = random_split(dataset_raw, [train_ds_size, val_ds_size])
train_ds = BillingualDataset(train_ds_raw, tokenizer_src, tokenizer_tgt, config["lang_src"],
config["lang_tgt"], config["seq_len"])
val_ds = BillingualDataset(val_ds_raw, tokenizer_src, tokenizer_tgt, config["lang_src"],
config["lang_tgt"], config["seq_len"])
max_len_src = 0
max_len_tgt = 0
for item in dataset_raw:
src_ids = tokenizer_src.encode(item["translation"][config["lang_src"]]).ids
tgt_ids = tokenizer_tgt.encode(item["translation"][config["lang_tgt"]]).ids
max_len_src = max(max_len_src, len(src_ids))
max_len_tgt = max(max_len_tgt, len(tgt_ids))
print(f"Max length of source sentence: {max_len_src}")
print(f"Max length of target sentence: {max_len_tgt}")
# create a data loader
train_dataloader = DataLoader(
train_ds,
batch_size=config["batch_size"],
shuffle=True
val_dataloader = DataLoader(
val_ds,
batch_size=1,
shuffle=True
return train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt
def get_model(config, vocab_src_len, vocab_tgt_len):
# build the model
model = build_transformer(vocab_src_len, vocab_tgt_len, config["seq_len"], config["seq_len"],
config["d_model"])
return model
# create an model training loop
def train_model(config):
# define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# create an model folder
Path(config["model_folder"]).mkdir(parents=True, exist_ok=True)
# get the data loaders
train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_dataset(config)
# get the tokenizer model
model = get_model(config, tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size())
# change the model into device
model = model.to(device)
# Tensorboard
writer = SummaryWriter(config["experiment_name"])
# set the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"], eps=1e-9)
intial_epoch = 0
global_step = 0
if config["preload"]:
model_filename = get_weights_file_path(config, config["preload"])
print(f"Preloading model: {model_filename}")
state = torch.load(model_filename)
intial_epoch = state["epoch"] + 1
optimizer.load_state_dict(state["optimizer_state_dict"])
global_step = state["global_step"]
# set the loss function
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer_src.token_to_id("[PAD]"),
label_smoothing=0.1).to(device)
for epoch in range(intial_epoch, config["num_epochs"]):
model.train()
batch_iterator = tqdm(train_dataloader, desc=f"Processing epoch {epoch:02d}")
for batch in batch_iterator:
encoder_input = batch["encoder_input"].to(device) # (Batch_size, seq_len)
decoder_input = batch["decoder_input"].to(device) # (Batch_size, seq_len)
encoder_mask = batch["encoder_mask"].to(device) # (Batch_size, 1, 1, seq_len)
decoder_mask = batch["decoder_mask"].to(device) # (Batch_size, 1, seq_len, seq_len)
# run the tensors through transformers
encoder_output = model.encode(encoder_input, encoder_mask) # (Batch_size, seq_len,
d_model)
decoder_output = model.decode(encoder_output, encoder_mask, decoder_input,
decoder_mask) # (Batch_size, seq_len, d_model)
projec_output = model.project(decoder_output) # (Batch_size, seq_len, tgt_vocab_size)
label = batch['label'].to(device) # (Batch, seq_len)
# (Batch_size, seq_len, tgt_vocab_size) --> (Batch_size, seq_len, tgt_vocab_size)
loss = loss_fn(projec_output.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))
batch_iterator.set_postfix({f"loss": f"{loss.item():6.3f}"})
# log the loss
writer.add_scalar("train loss", loss.item(), global_step)
writer.flush()
# backpropagation the loss
loss.backward()
# update the weights
optimizer.step()
optimizer.zero_grad()
global_step += 1
# save the model
model_filename = get_weights_file_path(config, f"{epoch:02d}")
torch.save(
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"global_step": global_step
},
model_filename
)
if __name__ == "__main__":
warnings.filterwarnings("ignore")
config = get_config()
train_model(config)
Project Structure