summaryrefslogtreecommitdiff
path: root/text_recognizer/networks/transformer
diff options
context:
space:
mode:
Diffstat (limited to 'text_recognizer/networks/transformer')
-rw-r--r--text_recognizer/networks/transformer/attention.py80
-rw-r--r--text_recognizer/networks/transformer/decoder.py19
-rw-r--r--text_recognizer/networks/transformer/decoder_block.py10
-rw-r--r--text_recognizer/networks/transformer/embeddings/rotary.py55
4 files changed, 86 insertions, 78 deletions
diff --git a/text_recognizer/networks/transformer/attention.py b/text_recognizer/networks/transformer/attention.py
index fca260d..85f513e 100644
--- a/text_recognizer/networks/transformer/attention.py
+++ b/text_recognizer/networks/transformer/attention.py
@@ -1,5 +1,5 @@
"""Implementes the attention module for the transformer."""
-from typing import Optional, Tuple
+from typing import Optional
import torch
import torch.nn.functional as F
@@ -8,7 +8,6 @@ from torch import Tensor, einsum, nn
from text_recognizer.networks.transformer.embeddings.rotary import (
RotaryEmbedding,
- rotate_half,
)
@@ -22,100 +21,75 @@ class Attention(nn.Module):
causal: bool = False,
dim_head: int = 64,
dropout_rate: float = 0.0,
- rotary_embedding: Optional[RotaryEmbedding] = None,
) -> None:
super().__init__()
-
self.dim = dim
+ self.scale = self.dim**-0.5
self.num_heads = num_heads
- self.causal = causal
self.dim_head = dim_head
+
+ self.causal = causal
self.dropout_rate = dropout_rate
- self.rotary_embedding = rotary_embedding
- self.scale = self.dim**-0.5
- inner_dim = self.num_heads * self.dim_head
+ # Single key/value head
+ k_dim = dim_head
+ v_dim = dim_head
+
+ out_dim = self.num_heads * self.dim_head
- self.to_q = nn.Linear(self.dim, inner_dim, bias=False)
- self.to_k = nn.Linear(self.dim, inner_dim, bias=False)
- self.to_v = nn.Linear(self.dim, inner_dim, bias=False)
+ self.to_q = nn.Linear(self.dim, out_dim, bias=False)
+ self.to_k = nn.Linear(self.dim, k_dim, bias=False)
+ self.to_v = nn.Linear(self.dim, v_dim, bias=False)
self.dropout = nn.Dropout(p=self.dropout_rate)
# Feedforward
- self.fc = nn.Linear(inner_dim, self.dim)
+ self.fc = nn.Linear(out_dim, self.dim)
def forward(
self,
x: Tensor,
context: Optional[Tensor] = None,
- input_mask: Optional[Tensor] = None,
- context_mask: Optional[Tensor] = None,
+ mask: Optional[Tensor] = None,
+ rotary_embedding: Optional[RotaryEmbedding] = None,
) -> Tensor:
"""Computes the attention."""
- b, n, _, device = *x.shape, x.device
+ b, device = x.shape[0], x.device
q = self.to_q(x)
+ q = rearrange(q, "b n (h d) -> b h n d", h=self.num_heads)
k = self.to_k(context) if context is not None else self.to_k(x)
v = self.to_v(context) if context is not None else self.to_v(x)
- q, k, v = map(
- lambda t: rearrange(t, "b n (h d) -> b h n d", h=self.num_heads), (q, k, v)
- )
- if self.rotary_embedding is not None:
- embedding = self.rotary_embedding(q)
- q, k, v = _apply_rotary_emb(q, k, v, embedding[None, ...])
+ if rotary_embedding is not None:
+ q, k, v = map(lambda t: rotary_embedding.rotate(t), (q, k, v))
- energy = einsum("b h i d, b h j d -> b h i j", q, k) * self.scale
+ energy = einsum("b h i d, b j d -> b h i j", q, k) * self.scale
mask_value = -torch.finfo(energy.dtype).max
- energy = apply_input_mask(
- b, n, k, energy, input_mask, context, context_mask, mask_value, device
- )
+ energy = apply_input_mask(b, k, energy, mask, mask_value, device)
if self.causal:
- energy = apply_causal_mask(energy, input_mask, mask_value, device)
+ energy = apply_causal_mask(energy, mask, mask_value, device)
attn = F.softmax(energy, dim=-1)
attn = self.dropout(attn)
- out = einsum("b h i j, b h j d -> b h i d", attn, v)
+ out = einsum("b h i j, b j d -> b h i d", attn, v)
out = rearrange(out, "b h n d -> b n (h d)")
out = self.fc(out)
return out
-def _apply_rotary_emb(
- q: Tensor, k: Tensor, v: Tensor, freqs: Tensor
-) -> Tuple[Tensor, Tensor, Tensor]:
- q, k, v = map(
- lambda t: (t * freqs.cos()) + (rotate_half(t) * freqs.sin()), (q, k, v)
- )
- return q, k, v
-
-
def apply_input_mask(
b: int,
- n: int,
k: Tensor,
energy: Tensor,
- input_mask: Optional[Tensor],
- context: Optional[Tensor],
- context_mask: Optional[Tensor],
+ mask: Optional[Tensor],
mask_value: Tensor,
device: str,
) -> Tensor:
"""Applies an input mask."""
- if any(x is not None for x in (input_mask, context_mask)):
- q_mask = (
- input_mask
- if input_mask is not None
- else torch.ones((b, n), device=device).bool()
- )
- k_mask = q_mask if context is None else context_mask
- k_mask = (
- torch.ones((b, k.shape[-2]), device=device).bool()
- if k_mask is None
- else k_mask
- )
- q_mask = rearrange(q_mask, "b i -> b () i ()")
+ if mask is not None:
+ k_mask = torch.ones((b, k.shape[-2]), device=device).bool()
+ q_mask = rearrange(mask, "b i -> b () i ()")
k_mask = rearrange(k_mask, "b j -> b () () j")
input_mask = q_mask * k_mask
diff --git a/text_recognizer/networks/transformer/decoder.py b/text_recognizer/networks/transformer/decoder.py
index c7da226..741f5b3 100644
--- a/text_recognizer/networks/transformer/decoder.py
+++ b/text_recognizer/networks/transformer/decoder.py
@@ -6,16 +6,23 @@ from torch import Tensor, nn
from text_recognizer.networks.transformer.attention import Attention
from text_recognizer.networks.transformer.decoder_block import DecoderBlock
+from text_recognizer.networks.transformer.embeddings.rotary import RotaryEmbedding
from text_recognizer.networks.transformer.ff import FeedForward
class Decoder(nn.Module):
"""Decoder Network."""
- def __init__(self, depth: int, dim: int, block: DecoderBlock) -> None:
+ def __init__(
+ self,
+ depth: int,
+ dim: int,
+ block: DecoderBlock,
+ rotary_embedding: Optional[RotaryEmbedding] = None,
+ ) -> None:
super().__init__()
self.depth = depth
- self.has_pos_emb = block.has_pos_emb
+ self.rotary_embedding = rotary_embedding
self.blocks = nn.ModuleList([deepcopy(block) for _ in range(self.depth)])
self.ln = nn.LayerNorm(dim)
@@ -23,12 +30,14 @@ class Decoder(nn.Module):
self,
x: Tensor,
context: Optional[Tensor] = None,
- input_mask: Optional[Tensor] = None,
- context_mask: Optional[Tensor] = None,
+ mask: Optional[Tensor] = None,
) -> Tensor:
"""Applies the network to the signals."""
for block in self.blocks:
x = block(
- x=x, context=context, input_mask=input_mask, context_mask=context_mask
+ x=x,
+ context=context,
+ mask=mask,
+ rotary_embedding=self.rotary_embedding,
)
return self.ln(x)
diff --git a/text_recognizer/networks/transformer/decoder_block.py b/text_recognizer/networks/transformer/decoder_block.py
index b80a078..2dc4ddf 100644
--- a/text_recognizer/networks/transformer/decoder_block.py
+++ b/text_recognizer/networks/transformer/decoder_block.py
@@ -5,6 +5,7 @@ from typing import Optional, Type
from torch import Tensor, nn
from text_recognizer.networks.transformer.attention import Attention
+from text_recognizer.networks.transformer.embeddings.rotary import RotaryEmbedding
from text_recognizer.networks.transformer.ff import FeedForward
@@ -25,22 +26,19 @@ class DecoderBlock(nn.Module):
self.cross_attn = cross_attn
self.ln_ff = deepcopy(norm)
self.ff = ff
- self.has_pos_emb = self.attn.rotary_embedding is not None
def forward(
self,
x: Tensor,
context: Optional[Tensor] = None,
- input_mask: Optional[Tensor] = None,
- context_mask: Optional[Tensor] = None,
+ mask: Optional[Tensor] = None,
+ rotary_embedding: Optional[RotaryEmbedding] = None,
) -> Tensor:
"""Applies decoder block on input signals."""
- x = x + self.attn(self.ln_attn(x), input_mask=input_mask)
+ x = x + self.attn(self.ln_attn(x), mask=mask, rotary_embedding=rotary_embedding)
x = x + self.cross_attn(
x=self.ln_cross_attn(x),
context=context,
- input_mask=input_mask,
- context_mask=context_mask,
)
x = x + self.ff(self.ln_ff(x))
return x
diff --git a/text_recognizer/networks/transformer/embeddings/rotary.py b/text_recognizer/networks/transformer/embeddings/rotary.py
index cc91206..ca0a260 100644
--- a/text_recognizer/networks/transformer/embeddings/rotary.py
+++ b/text_recognizer/networks/transformer/embeddings/rotary.py
@@ -6,6 +6,9 @@ Stolen from lucidrains:
Explanation of roatary:
https://blog.eleuther.ai/rotary-embeddings/
"""
+from inspect import isfunction
+
+from einops import rearrange, repeat
import torch
from torch import Tensor, nn
@@ -17,24 +20,48 @@ class RotaryEmbedding(nn.Module):
super().__init__()
inv_freqs = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
self.register_buffer("inv_freqs", inv_freqs)
+ self.cache = {}
+
+ def rotate(self, t: Tensor, dim: int = -2) -> Tensor:
+ """Rotate vector."""
+ device, n = t.device, t.shape[dim]
+ freqs = self.forward(lambda: torch.arange(n, device=device), cache_key=n)
+ return apply_rotary_emb(t, freqs)
- def forward(self, x: Tensor) -> Tensor:
+ def forward(self, t: Tensor, cache_key: int) -> Tensor:
"""Encodes tensor x with rotary embeddings."""
- n = x.shape[-2]
- t = torch.arange(n, device=x.device).type_as(self.inv_freqs)
- freqs = torch.einsum("i , j -> i j", t, self.inv_freqs)
- emb = torch.cat((freqs, freqs), dim=-1)
- return emb[None, :, :]
+ if cache_key in self.cache:
+ return self.cache[cache_key]
+
+ if isfunction(t):
+ t = t()
+
+ freqs = self.inv_freqs
+ freqs = torch.einsum("..., f -> ... f", t.type(freqs.dtype), freqs)
+ freqs = repeat(freqs, "... n -> ... (n r)", r=2)
+ self.cache[cache_key] = freqs
+ return freqs
def rotate_half(x: Tensor) -> Tensor:
- if len(x.shape) == 3:
- x = x.reshape((x.shape[0], -1, 2, x.shape[-1] // 2))
- else:
- x = x.reshape((x.shape[0], x.shape[1], -1, 2, x.shape[-1] // 2))
- x1, x2 = x.unbind(dim=-2)
- return torch.cat((-x2, x1), dim=-1)
+ x = rearrange(x, "... (d r) -> ... d r", r=2)
+ x1, x2 = x.unbind(dim=-1)
+ x = torch.stack((-x2, x1), dim=-1)
+ return rearrange(x, "... d r -> ... (d r)")
-def apply_rotary_pos_emb(t: Tensor, freqs: Tensor) -> Tensor:
- return (t * freqs.cos()) + (rotate_half(t) * freqs.sin())
+def apply_rotary_emb(t: Tensor, freqs: Tensor, start_index: int = 0) -> Tensor:
+ freqs = freqs.to(t)
+ rot_dim = freqs.shape[-1]
+ end_index = start_index + rot_dim
+ assert rot_dim <= t.shape[-1], (
+ f"feature dimension {t.shape[-1]} is not of sufficient size to rotate"
+ f"in all the positions {rot_dim}"
+ )
+ t_left, t, t_right = (
+ t[..., :start_index],
+ t[..., start_index:end_index],
+ t[..., end_index:],
+ )
+ t = (t * freqs.cos()) + (rotate_half(t) * freqs.sin())
+ return torch.cat((t_left, t, t_right), dim=-1)