From d73c52e15b519af764a83378d4eab19fb31985e0 Mon Sep 17 00:00:00 2001 From: Gustaf Rydholm Date: Fri, 30 Sep 2022 00:33:53 +0200 Subject: Update transformer --- text_recognizer/networks/transformer/attention.py | 80 ++++++++-------------- text_recognizer/networks/transformer/decoder.py | 19 +++-- .../networks/transformer/decoder_block.py | 10 ++- .../networks/transformer/embeddings/rotary.py | 55 +++++++++++---- 4 files changed, 86 insertions(+), 78 deletions(-) diff --git a/text_recognizer/networks/transformer/attention.py b/text_recognizer/networks/transformer/attention.py index fca260d..85f513e 100644 --- a/text_recognizer/networks/transformer/attention.py +++ b/text_recognizer/networks/transformer/attention.py @@ -1,5 +1,5 @@ """Implementes the attention module for the transformer.""" -from typing import Optional, Tuple +from typing import Optional import torch import torch.nn.functional as F @@ -8,7 +8,6 @@ from torch import Tensor, einsum, nn from text_recognizer.networks.transformer.embeddings.rotary import ( RotaryEmbedding, - rotate_half, ) @@ -22,100 +21,75 @@ class Attention(nn.Module): causal: bool = False, dim_head: int = 64, dropout_rate: float = 0.0, - rotary_embedding: Optional[RotaryEmbedding] = None, ) -> None: super().__init__() - self.dim = dim + self.scale = self.dim**-0.5 self.num_heads = num_heads - self.causal = causal self.dim_head = dim_head + + self.causal = causal self.dropout_rate = dropout_rate - self.rotary_embedding = rotary_embedding - self.scale = self.dim**-0.5 - inner_dim = self.num_heads * self.dim_head + # Single key/value head + k_dim = dim_head + v_dim = dim_head + + out_dim = self.num_heads * self.dim_head - self.to_q = nn.Linear(self.dim, inner_dim, bias=False) - self.to_k = nn.Linear(self.dim, inner_dim, bias=False) - self.to_v = nn.Linear(self.dim, inner_dim, bias=False) + self.to_q = nn.Linear(self.dim, out_dim, bias=False) + self.to_k = nn.Linear(self.dim, k_dim, bias=False) + self.to_v = nn.Linear(self.dim, v_dim, bias=False) self.dropout = nn.Dropout(p=self.dropout_rate) # Feedforward - self.fc = nn.Linear(inner_dim, self.dim) + self.fc = nn.Linear(out_dim, self.dim) def forward( self, x: Tensor, context: Optional[Tensor] = None, - input_mask: Optional[Tensor] = None, - context_mask: Optional[Tensor] = None, + mask: Optional[Tensor] = None, + rotary_embedding: Optional[RotaryEmbedding] = None, ) -> Tensor: """Computes the attention.""" - b, n, _, device = *x.shape, x.device + b, device = x.shape[0], x.device q = self.to_q(x) + q = rearrange(q, "b n (h d) -> b h n d", h=self.num_heads) k = self.to_k(context) if context is not None else self.to_k(x) v = self.to_v(context) if context is not None else self.to_v(x) - q, k, v = map( - lambda t: rearrange(t, "b n (h d) -> b h n d", h=self.num_heads), (q, k, v) - ) - if self.rotary_embedding is not None: - embedding = self.rotary_embedding(q) - q, k, v = _apply_rotary_emb(q, k, v, embedding[None, ...]) + if rotary_embedding is not None: + q, k, v = map(lambda t: rotary_embedding.rotate(t), (q, k, v)) - energy = einsum("b h i d, b h j d -> b h i j", q, k) * self.scale + energy = einsum("b h i d, b j d -> b h i j", q, k) * self.scale mask_value = -torch.finfo(energy.dtype).max - energy = apply_input_mask( - b, n, k, energy, input_mask, context, context_mask, mask_value, device - ) + energy = apply_input_mask(b, k, energy, mask, mask_value, device) if self.causal: - energy = apply_causal_mask(energy, input_mask, mask_value, device) + energy = apply_causal_mask(energy, mask, mask_value, device) attn = F.softmax(energy, dim=-1) attn = self.dropout(attn) - out = einsum("b h i j, b h j d -> b h i d", attn, v) + out = einsum("b h i j, b j d -> b h i d", attn, v) out = rearrange(out, "b h n d -> b n (h d)") out = self.fc(out) return out -def _apply_rotary_emb( - q: Tensor, k: Tensor, v: Tensor, freqs: Tensor -) -> Tuple[Tensor, Tensor, Tensor]: - q, k, v = map( - lambda t: (t * freqs.cos()) + (rotate_half(t) * freqs.sin()), (q, k, v) - ) - return q, k, v - - def apply_input_mask( b: int, - n: int, k: Tensor, energy: Tensor, - input_mask: Optional[Tensor], - context: Optional[Tensor], - context_mask: Optional[Tensor], + mask: Optional[Tensor], mask_value: Tensor, device: str, ) -> Tensor: """Applies an input mask.""" - if any(x is not None for x in (input_mask, context_mask)): - q_mask = ( - input_mask - if input_mask is not None - else torch.ones((b, n), device=device).bool() - ) - k_mask = q_mask if context is None else context_mask - k_mask = ( - torch.ones((b, k.shape[-2]), device=device).bool() - if k_mask is None - else k_mask - ) - q_mask = rearrange(q_mask, "b i -> b () i ()") + if mask is not None: + k_mask = torch.ones((b, k.shape[-2]), device=device).bool() + q_mask = rearrange(mask, "b i -> b () i ()") k_mask = rearrange(k_mask, "b j -> b () () j") input_mask = q_mask * k_mask diff --git a/text_recognizer/networks/transformer/decoder.py b/text_recognizer/networks/transformer/decoder.py index c7da226..741f5b3 100644 --- a/text_recognizer/networks/transformer/decoder.py +++ b/text_recognizer/networks/transformer/decoder.py @@ -6,16 +6,23 @@ from torch import Tensor, nn from text_recognizer.networks.transformer.attention import Attention from text_recognizer.networks.transformer.decoder_block import DecoderBlock +from text_recognizer.networks.transformer.embeddings.rotary import RotaryEmbedding from text_recognizer.networks.transformer.ff import FeedForward class Decoder(nn.Module): """Decoder Network.""" - def __init__(self, depth: int, dim: int, block: DecoderBlock) -> None: + def __init__( + self, + depth: int, + dim: int, + block: DecoderBlock, + rotary_embedding: Optional[RotaryEmbedding] = None, + ) -> None: super().__init__() self.depth = depth - self.has_pos_emb = block.has_pos_emb + self.rotary_embedding = rotary_embedding self.blocks = nn.ModuleList([deepcopy(block) for _ in range(self.depth)]) self.ln = nn.LayerNorm(dim) @@ -23,12 +30,14 @@ class Decoder(nn.Module): self, x: Tensor, context: Optional[Tensor] = None, - input_mask: Optional[Tensor] = None, - context_mask: Optional[Tensor] = None, + mask: Optional[Tensor] = None, ) -> Tensor: """Applies the network to the signals.""" for block in self.blocks: x = block( - x=x, context=context, input_mask=input_mask, context_mask=context_mask + x=x, + context=context, + mask=mask, + rotary_embedding=self.rotary_embedding, ) return self.ln(x) diff --git a/text_recognizer/networks/transformer/decoder_block.py b/text_recognizer/networks/transformer/decoder_block.py index b80a078..2dc4ddf 100644 --- a/text_recognizer/networks/transformer/decoder_block.py +++ b/text_recognizer/networks/transformer/decoder_block.py @@ -5,6 +5,7 @@ from typing import Optional, Type from torch import Tensor, nn from text_recognizer.networks.transformer.attention import Attention +from text_recognizer.networks.transformer.embeddings.rotary import RotaryEmbedding from text_recognizer.networks.transformer.ff import FeedForward @@ -25,22 +26,19 @@ class DecoderBlock(nn.Module): self.cross_attn = cross_attn self.ln_ff = deepcopy(norm) self.ff = ff - self.has_pos_emb = self.attn.rotary_embedding is not None def forward( self, x: Tensor, context: Optional[Tensor] = None, - input_mask: Optional[Tensor] = None, - context_mask: Optional[Tensor] = None, + mask: Optional[Tensor] = None, + rotary_embedding: Optional[RotaryEmbedding] = None, ) -> Tensor: """Applies decoder block on input signals.""" - x = x + self.attn(self.ln_attn(x), input_mask=input_mask) + x = x + self.attn(self.ln_attn(x), mask=mask, rotary_embedding=rotary_embedding) x = x + self.cross_attn( x=self.ln_cross_attn(x), context=context, - input_mask=input_mask, - context_mask=context_mask, ) x = x + self.ff(self.ln_ff(x)) return x diff --git a/text_recognizer/networks/transformer/embeddings/rotary.py b/text_recognizer/networks/transformer/embeddings/rotary.py index cc91206..ca0a260 100644 --- a/text_recognizer/networks/transformer/embeddings/rotary.py +++ b/text_recognizer/networks/transformer/embeddings/rotary.py @@ -6,6 +6,9 @@ Stolen from lucidrains: Explanation of roatary: https://blog.eleuther.ai/rotary-embeddings/ """ +from inspect import isfunction + +from einops import rearrange, repeat import torch from torch import Tensor, nn @@ -17,24 +20,48 @@ class RotaryEmbedding(nn.Module): super().__init__() inv_freqs = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim)) self.register_buffer("inv_freqs", inv_freqs) + self.cache = {} + + def rotate(self, t: Tensor, dim: int = -2) -> Tensor: + """Rotate vector.""" + device, n = t.device, t.shape[dim] + freqs = self.forward(lambda: torch.arange(n, device=device), cache_key=n) + return apply_rotary_emb(t, freqs) - def forward(self, x: Tensor) -> Tensor: + def forward(self, t: Tensor, cache_key: int) -> Tensor: """Encodes tensor x with rotary embeddings.""" - n = x.shape[-2] - t = torch.arange(n, device=x.device).type_as(self.inv_freqs) - freqs = torch.einsum("i , j -> i j", t, self.inv_freqs) - emb = torch.cat((freqs, freqs), dim=-1) - return emb[None, :, :] + if cache_key in self.cache: + return self.cache[cache_key] + + if isfunction(t): + t = t() + + freqs = self.inv_freqs + freqs = torch.einsum("..., f -> ... f", t.type(freqs.dtype), freqs) + freqs = repeat(freqs, "... n -> ... (n r)", r=2) + self.cache[cache_key] = freqs + return freqs def rotate_half(x: Tensor) -> Tensor: - if len(x.shape) == 3: - x = x.reshape((x.shape[0], -1, 2, x.shape[-1] // 2)) - else: - x = x.reshape((x.shape[0], x.shape[1], -1, 2, x.shape[-1] // 2)) - x1, x2 = x.unbind(dim=-2) - return torch.cat((-x2, x1), dim=-1) + x = rearrange(x, "... (d r) -> ... d r", r=2) + x1, x2 = x.unbind(dim=-1) + x = torch.stack((-x2, x1), dim=-1) + return rearrange(x, "... d r -> ... (d r)") -def apply_rotary_pos_emb(t: Tensor, freqs: Tensor) -> Tensor: - return (t * freqs.cos()) + (rotate_half(t) * freqs.sin()) +def apply_rotary_emb(t: Tensor, freqs: Tensor, start_index: int = 0) -> Tensor: + freqs = freqs.to(t) + rot_dim = freqs.shape[-1] + end_index = start_index + rot_dim + assert rot_dim <= t.shape[-1], ( + f"feature dimension {t.shape[-1]} is not of sufficient size to rotate" + f"in all the positions {rot_dim}" + ) + t_left, t, t_right = ( + t[..., :start_index], + t[..., start_index:end_index], + t[..., end_index:], + ) + t = (t * freqs.cos()) + (rotate_half(t) * freqs.sin()) + return torch.cat((t_left, t, t_right), dim=-1) -- cgit v1.2.3-70-g09d2