From 2417288c9fe96264da708ce8d13ac7bc2faf83e3 Mon Sep 17 00:00:00 2001 From: Gustaf Rydholm Date: Wed, 17 Nov 2021 22:42:58 +0100 Subject: Add new quantizer --- text_recognizer/networks/quantizer/__init__.py | 0 text_recognizer/networks/quantizer/codebook.py | 96 ++++++++++++++++ text_recognizer/networks/quantizer/kmeans.py | 32 ++++++ text_recognizer/networks/quantizer/quantizer.py | 59 ++++++++++ text_recognizer/networks/quantizer/utils.py | 26 +++++ text_recognizer/networks/vqvae/quantizer.py | 141 ------------------------ 6 files changed, 213 insertions(+), 141 deletions(-) create mode 100644 text_recognizer/networks/quantizer/__init__.py create mode 100644 text_recognizer/networks/quantizer/codebook.py create mode 100644 text_recognizer/networks/quantizer/kmeans.py create mode 100644 text_recognizer/networks/quantizer/quantizer.py create mode 100644 text_recognizer/networks/quantizer/utils.py delete mode 100644 text_recognizer/networks/vqvae/quantizer.py (limited to 'text_recognizer/networks') diff --git a/text_recognizer/networks/quantizer/__init__.py b/text_recognizer/networks/quantizer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/text_recognizer/networks/quantizer/codebook.py b/text_recognizer/networks/quantizer/codebook.py new file mode 100644 index 0000000..cb9bc59 --- /dev/null +++ b/text_recognizer/networks/quantizer/codebook.py @@ -0,0 +1,96 @@ +"""Codebook module.""" +from typing import Tuple + +import attr +from einops import rearrange +import torch +from torch import nn, Tensor +import torch.nn.functional as F + +from text_recognizer.networks.quantizer.kmeans import kmeans +from text_recognizer.networks.quantizer.utils import ( + ema_inplace, + norm, + sample_vectors, +) + + +@attr.s(eq=False) +class CosineSimilarityCodebook(nn.Module): + """Cosine similarity codebook.""" + + dim: int = attr.ib() + codebook_size: int = attr.ib() + kmeans_init: bool = attr.ib(default=False) + kmeans_iters: int = attr.ib(default=10) + decay: float = attr.ib(default=0.8) + eps: float = attr.ib(default=1.0e-5) + threshold_dead: int = attr.ib(default=2) + + def __attrs_pre_init__(self) -> None: + super().__init__() + + def __attrs_post_init__(self) -> None: + if not self.kmeans_init: + embeddings = norm(torch.randn(self.codebook_size, self.dim)) + else: + embeddings = torch.zeros(self.codebook_size, self.dim) + self.register_buffer("initalized", Tensor([not self.kmeans_init])) + self.register_buffer("cluster_size", torch.zeros(self.codebook_size)) + self.register_buffer("embeddings", embeddings) + + def _initalize_embedding(self, data: Tensor) -> None: + embeddings, cluster_size = kmeans(data, self.codebook_size, self.kmeans_iters) + self.embeddings.data.copy_(embeddings) + self.cluster_size.data.copy_(cluster_size) + self.initalized.data.copy_(Tensor([True])) + + def _replace(self, samples: Tensor, mask: Tensor) -> None: + samples = norm(samples) + modified_codebook = torch.where( + mask[..., None], + sample_vectors(samples, self.codebook_size), + self.embeddings, + ) + self.embeddings.data.copy_(modified_codebook) + + def _replace_dead_codes(self, batch_samples: Tensor) -> None: + if self.threshold_dead == 0: + return + dead_codes = self.cluster_size < self.threshold_dead + if not torch.any(dead_codes): + return + batch_samples = rearrange(batch_samples, "... d -> (...) d") + self._replace(batch_samples, mask=dead_codes) + + def forward(self, x: Tensor) -> Tuple[Tensor, Tensor]: + """Quantizes tensor.""" + shape = x.shape + flatten = rearrange(x, "... d -> (...) d") + flatten = norm(flatten) + + if not self.initalized: + self._initalize_embedding(flatten) + + embeddings = norm(self.embeddings) + dist = flatten @ embeddings.t() + indices = dist.max(dim=-1).indices + one_hot = F.one_hot(indices, self.codebook_size).type_as(x) + indices = indices.view(*shape[:-1]) + + quantized = F.embedding(indices, self.embeddings) + + if self.training: + bins = one_hot.sum(0) + ema_inplace(self.cluster_size, bins, self.decay) + zero_mask = bins == 0 + bins = bins.masked_fill(zero_mask, 1.0) + + embed_sum = flatten.t() @ one_hot + embed_norm = (embed_sum / bins.unsqueeze(0)).t() + embed_norm = norm(embed_norm) + embed_norm = torch.where(zero_mask[..., None], embeddings, embed_norm) + ema_inplace(self.embeddings, embed_norm, self.decay) + self._replace_dead_codes(x) + + return quantized, indices diff --git a/text_recognizer/networks/quantizer/kmeans.py b/text_recognizer/networks/quantizer/kmeans.py new file mode 100644 index 0000000..a34c381 --- /dev/null +++ b/text_recognizer/networks/quantizer/kmeans.py @@ -0,0 +1,32 @@ +"""K-means clustering for embeddings.""" +from typing import Tuple + +from einops import repeat +import torch +from torch import Tensor + +from text_recognizer.networks.quantizer.utils import norm, sample_vectors + + +def kmeans( + samples: Tensor, num_clusters: int, num_iters: int = 10 +) -> Tuple[Tensor, Tensor]: + """Compute k-means clusters.""" + D = samples.shape[-1] + + means = sample_vectors(samples, num_clusters) + + for _ in range(num_iters): + dists = samples @ means.t() + buckets = dists.max(dim=-1).indices + bins = torch.bincount(buckets, minlength=num_clusters) + zero_mask = bins == 0 + bins_min_clamped = bins.masked_fill(zero_mask, 1) + + new_means = buckets.new_zeros(num_clusters, D).type_as(samples) + new_means.scatter_add_(0, repeat(buckets, "n -> n d", d=D), samples) + new_means /= bins_min_clamped[..., None] + new_means = norm(new_means) + means = torch.where(zero_mask[..., None], means, new_means) + + return means, bins diff --git a/text_recognizer/networks/quantizer/quantizer.py b/text_recognizer/networks/quantizer/quantizer.py new file mode 100644 index 0000000..3e8f0b2 --- /dev/null +++ b/text_recognizer/networks/quantizer/quantizer.py @@ -0,0 +1,59 @@ +"""Implementation of a Vector Quantized Variational AutoEncoder. + +Reference: +https://github.com/AntixK/PyTorch-VAE/blob/master/models/vq_vae.py +""" +from typing import Tuple, Type + +import attr +from einops import rearrange +import torch +from torch import nn +from torch import Tensor +import torch.nn.functional as F + + +@attr.s(eq=False) +class VectorQuantizer(nn.Module): + """Vector quantizer.""" + + input_dim: int = attr.ib() + codebook: Type[nn.Module] = attr.ib() + commitment: float = attr.ib(default=1.0) + project_in: nn.Linear = attr.ib(default=None, init=False) + project_out: nn.Linear = attr.ib(default=None, init=False) + + def __attrs_pre_init__(self) -> None: + super().__init__() + + def __attrs_post_init__(self) -> None: + require_projection = self.codebook.dim != self.input_dim + self.project_in = ( + nn.Linear(self.input_dim, self.codebook.dim) + if require_projection + else nn.Identity() + ) + self.project_out = ( + nn.Linear(self.codebook.dim, self.input_dim) + if require_projection + else nn.Identity() + ) + + def forward(self, x: Tensor) -> Tuple[Tensor, Tensor, Tensor]: + """Quantizes latent vectors.""" + H, W = x.shape[-2:] + x = rearrange(x, "b d h w -> b (h w) d") + x = self.project_in(x) + + quantized, indices = self.codebook(x) + + if self.training: + commitment_loss = F.mse_loss(quantized.detach(), x) * self.commitment + quantized = x + (quantized - x).detach() + else: + commitment_loss = torch.tensor([0.0]).type_as(x) + + quantized = self.project_out(quantized) + quantized = rearrange(quantized, "b (h w) d -> b d h w", h=H, w=W) + + return quantized, indices, commitment_loss diff --git a/text_recognizer/networks/quantizer/utils.py b/text_recognizer/networks/quantizer/utils.py new file mode 100644 index 0000000..0502d49 --- /dev/null +++ b/text_recognizer/networks/quantizer/utils.py @@ -0,0 +1,26 @@ +"""Helper functions for quantization.""" +from typing import Tuple + +import torch +from torch import Tensor +import torch.nn.functional as F + + +def sample_vectors(samples: Tensor, num: int) -> Tensor: + """Subsamples a set of vectors.""" + B, device = samples.shape[0], samples.device + if B >= num: + indices = torch.randperm(B, device=device)[:num] + else: + indices = torch.randint(0, B, (num,), device=device)[:num] + return samples[indices] + + +def norm(t: Tensor) -> Tensor: + """Applies L2-normalization.""" + return F.normalize(t, p=2, dim=-1) + + +def ema_inplace(moving_avg: Tensor, new: Tensor, decay: float) -> None: + """Applies exponential moving average.""" + moving_avg.data.mul_(decay).add_(new, alpha=(1 - decay)) diff --git a/text_recognizer/networks/vqvae/quantizer.py b/text_recognizer/networks/vqvae/quantizer.py deleted file mode 100644 index bba9b60..0000000 --- a/text_recognizer/networks/vqvae/quantizer.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Implementation of a Vector Quantized Variational AutoEncoder. - -Reference: -https://github.com/AntixK/PyTorch-VAE/blob/master/models/vq_vae.py -""" -from einops import rearrange -import torch -from torch import nn -from torch import Tensor -import torch.nn.functional as F - - -class EmbeddingEMA(nn.Module): - """Embedding for Exponential Moving Average (EMA).""" - - def __init__(self, num_embeddings: int, embedding_dim: int) -> None: - super().__init__() - weight = torch.zeros(num_embeddings, embedding_dim) - nn.init.kaiming_uniform_(weight, nonlinearity="linear") - self.register_buffer("weight", weight) - self.register_buffer("cluster_size", torch.zeros(num_embeddings)) - self.register_buffer("weight_avg", weight.clone()) - - -class VectorQuantizer(nn.Module): - """The codebook that contains quantized vectors.""" - - def __init__( - self, num_embeddings: int, embedding_dim: int, decay: float = 0.99 - ) -> None: - super().__init__() - self.num_embeddings = num_embeddings - self.embedding_dim = embedding_dim - self.decay = decay - self.embedding = EmbeddingEMA(self.num_embeddings, self.embedding_dim) - - def _discretization_bottleneck(self, latent: Tensor) -> Tensor: - """Computes the code nearest to the latent representation. - - First we compute the posterior categorical distribution, and then map - the latent representation to the nearest element of the embedding. - - Args: - latent (Tensor): The latent representation. - - Shape: - - latent :math:`(B x H x W, D)` - - Returns: - Tensor: The quantized embedding vector. - - """ - # Store latent shape. - b, h, w, d = latent.shape - - # Flatten the latent representation to 2D. - latent = rearrange(latent, "b h w d -> (b h w) d") - - # Compute the L2 distance between the latents and the embeddings. - l2_distance = ( - torch.sum(latent ** 2, dim=1, keepdim=True) - + torch.sum(self.embedding.weight ** 2, dim=1) - - 2 * latent @ self.embedding.weight.t() - ) # [BHW x K] - - # Find the embedding k nearest to each latent. - encoding_indices = torch.argmin(l2_distance, dim=1).unsqueeze(1) # [BHW, 1] - - # Convert to one-hot encodings, aka discrete bottleneck. - one_hot_encoding = torch.zeros( - encoding_indices.shape[0], self.num_embeddings, device=latent.device - ) - one_hot_encoding.scatter_(1, encoding_indices, 1) # [BHW x K] - - # Embedding quantization. - quantized_latent = one_hot_encoding @ self.embedding.weight # [BHW, D] - quantized_latent = rearrange( - quantized_latent, "(b h w) d -> b h w d", b=b, h=h, w=w - ) - if self.training: - self._compute_ema(one_hot_encoding=one_hot_encoding, latent=latent) - - return quantized_latent - - def _compute_ema(self, one_hot_encoding: Tensor, latent: Tensor) -> None: - """Computes the EMA update to the codebook.""" - batch_cluster_size = one_hot_encoding.sum(axis=0) - batch_embedding_avg = (latent.t() @ one_hot_encoding).t() - self.embedding.cluster_size.data.mul_(self.decay).add_( - batch_cluster_size, alpha=1 - self.decay - ) - self.embedding.weight_avg.data.mul_(self.decay).add_( - batch_embedding_avg, alpha=1 - self.decay - ) - new_embedding = self.embedding.weight_avg / ( - self.embedding.cluster_size + 1.0e-5 - ).unsqueeze(1) - self.embedding.weight.data.copy_(new_embedding) - - def _commitment_loss(self, latent: Tensor, quantized_latent: Tensor) -> Tensor: - """Vector Quantization loss. - - The vector quantization algorithm allows us to create a codebook. The VQ - algorithm works by moving the embedding vectors towards the encoder outputs. - - The embedding loss moves the embedding vector towards the encoder outputs. The - .detach() works as the stop gradient (sg) described in the paper. - - Because the volume of the embedding space is dimensionless, it can arbitarily - grow if the embeddings are not trained as fast as the encoder parameters. To - mitigate this, a commitment loss is added in the second term which makes sure - that the encoder commits to an embedding and that its output does not grow. - - Args: - latent (Tensor): The encoder output. - quantized_latent (Tensor): The quantized latent. - - Returns: - Tensor: The combinded VQ loss. - - """ - loss = F.mse_loss(quantized_latent.detach(), latent) - return loss - - def forward(self, latent: Tensor) -> Tensor: - """Forward pass that returns the quantized vector and the vq loss.""" - # Rearrange latent representation s.t. the hidden dim is at the end. - latent = rearrange(latent, "b d h w -> b h w d") - - # Maps latent to the nearest code in the codebook. - quantized_latent = self._discretization_bottleneck(latent) - - loss = self._commitment_loss(latent, quantized_latent) - - # Add residue to the quantized latent. - quantized_latent = latent + (quantized_latent - latent).detach() - - # Rearrange the quantized shape back to the original shape. - quantized_latent = rearrange(quantized_latent, "b h w d -> b d h w") - - return quantized_latent, loss -- cgit v1.2.3-70-g09d2