借助 API 手写一个 Transformer 架构



对本文有任何问题,可加我的个人微信询问:kymjs666

背景

之所以写这个系列文章,其实是源自这个段子:

求求你们了,别再整新玩意了,学不动了。
求求你们了,别再整新玩意儿了
Manus没用过,OpenCode还没装,
Cowork还没捂热,又来个Clawdbot.
前脚Remotion干翻剪映,后脚Pencil打倒Figma,
去年前端已死,今年iOS爆发。
上周嘲笑苹果,这周下单Mac,
每天都革了昨天的命,日日都是AGI...

在当今AI时代,技术几乎是按周来涌现的。对于我这种古老程序员来说,虽然是体验了不少AI产品,甚至自己源码跑了 Dify,Llamaindex这些破玩意,但 AI 大模型仍然是一个神秘的黑盒子。出于本能,还是觉得应该研究一些不变的东西。

大模型的背景

要讲大模型,还是必须讲NLP,这是计算机执行各种语言任务的基础。如中文分词、子词切分、词性标注、文本分类、实体识别、关系抽取、文本摘要、机器翻译、自动问答等大模型常用的功能,都是靠NLP技术完成的。不过这些都是成熟的技术了,在搜索引擎领域都不知道玩了多少年了,不是近几年新创造的东西,但也正是因为NLP技术的进步,让我们从海量文本中提取有用信息、理解语言的深层含义成为可能,才有了大模型训练的可能。

1.1 文本转向量

NLP里面被使用最多的一项技术就是文本向量化。文本转向量的目的是将人类语言转化为计算机可以处理的形式,也就是将文本数据数字化。

将文本数字化以后,就可以将文本中每个”字”(实际上是token,这里叫字是方便理解)放在坐标系里面去,这样后续在查的时候,就能快速知道两个字之间的关系是比较近还是比较远。当有一个字以后,下一个字应该是什么,也就是下文的Transformer架构干的事。

比如字节火山引擎的一系列向量模型,就是做这件事的:如果有一段文本叫”我是张涛”,假设这段文本是4个token,每个字是一个token。那么每个token被向量模型转化后,就变成了一个数组,数组表示这个token在坐标系中的位置坐标,数组的长度就是这个向量模型的维度。比如在平面直角坐标系中,我们可以用两个数字确定一个点(x, y),因为平面是二维的。在空间直角坐标系中,需要三个数字确定一个点(x, y, z),因为空间是三维的。假设我们现在的向量模型是6维的,前面我们说的”我是张涛”这四个token,转化成了四个长度为6的数组,也就是一个4行6列的二维数组,也可以用矩阵表示。

我 = [0, 1, 4, 8, 20, 3]
是 = [10, 7, 4, 18, 20, 3]
张 = [30, 5, 3, 28, 20, 6]
涛 = [5, 6, 2, 38, 20, 7]

经过这样的向量化以后,我们就可以通过公式算出来每个token之间的距离是近还是远了。比如你一定知道空间中两个点之间的距离,(1, 1, 1) 和 (3, 3, 3)这两个点可以通过公式算出来是3√3 -1。算不出来的回去学高中数学去。

而大模型的本质,就是一个token一个token的算,后一个字与前一个字距离近,所以前一个字“我”出来以后,就知道要再生成“是”这个字。

但是上面的逻辑有两个问题需要解决:

  1. 语序对句子的影响,我喜欢你和你喜欢我,这完全是两码事。
  2. 词语在句子中的含义,比如“老六”,在抖音上可能就是老阴比的意思,但是在你外婆跟你介绍他的七个子女的时候,你肯定不会觉得你六舅舅是老阴比。

第一个问题,比较好解决,在所有向量模型训练和向量化时,也同步加入所在文本的位置坐标,这样就能备区分不同语序的能力。第二个问题,则由由注意力机制解决。

1.2 注意力机制

在 Transformer 出现之前,AI 处理文本、语音等时序数据有个致命问题:无法高效处理长文本,且难以捕捉全局关联。类似 Android 开发:就像早期 Android 系统中的 Handler 机制,必须按顺序处理消息队列,前面的任务会阻塞后面的任务。当处理长文本时,前面的信息会随着序列推进逐渐弱化,很难将”它”与”苹果”精准关联;处理几百字的长文案时,更是会出现”梯度消失”,导致模型理解偏差。

为了解决这些问题,才有大佬参考计算机视觉,引入了注意力机制(Attention)的概念,搭建了完全由注意力机制构成的神经网络——Transformer,也就是大语言模型(Large Language Model,LLM)的鼻祖及核心架构。

注意力机制的核心思想为当我们关注一张图片,我们往往无需看清楚全部内容而仅将注意力集中在重点部分即可。而在自然语言处理领域,我们往往也可以通过将重点注意力集中在一个或几个token,从而取得更高效高质的计算效果。

注意力机制有三个核心变量: Query (查询值)、 Key (索引)和 Value (完整内容)。从下面的公式你也能看出来,真正需要理解的主要是 Q 和 K。网上会有大把的文章教你这个公式。

这里我们只用代码实现一下这个公式:

def attention(query, key, value, dropout=None):
    # 获取键向量的维度,键向量的维度和值向量的维度相同
    d_k = query.size(-1) 
    # 计算Q与K的内积并除以根号dk
    # transpose——相当于转置
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    # Softmax
    p_attn = scores.softmax(dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
        # 采样
     # 根据计算结果对value进行加权求和
    return torch.matmul(p_attn, value), p_attn

1.2.1 自注意力

自注意力(self-attention)是注意力机制的一种特殊形式,它允许模型在处理一个序列时,关注序列中的其他位置。

当Q=K=V时,就称为自注意力机制(Self-Attention)。这在Transformer中非常重要,因为它让模型能够理解句子内部的关系。

类比:这就像让句子中的每个单词都成为一个搜索关键词,去查询整个句子中其他单词的相关性。

例如,对于句子”我是张涛”:

  • 当”我”作为Query时,会发现”是”和”张涛”与它相关性最高
  • 当”是”作为Query时,会发现”我”和”张涛”与它相关性最高
  • 当”张涛”作为Query时,会发现”我”和”是”与它相关性最高

在代码实现上,其实就是把Q K V的输入全部统一成一个值

# attention 为上文定义的注意力计算函数
attention(x, x, x)

1.2.2 多头注意力

多头注意力(Multi-Head Attention)是注意力机制的另一种变体,它允许模型同时关注序列中的多个不同位置。在多头注意力机制中,我们将输入序列分成多个子序列,然后对每个子序列应用自注意力机制,最后将这些子序列的注意力输出拼接起来。也可以将输入序列重复多次,然后每个序列应用不同的注意力机制,最后再将子序列拼接。

多头注意力机制的优势在于它可以捕捉序列中不同位置之间的不同类型的依赖关系,从而提高模型的表达能力。

单头注意力只能从一个角度理解句子,而多头注意力可以从多个角度同时理解句子。

类比:这就像让多个不同领域的专家同时分析同一份文档:

  • 一位专家关注语法结构
  • 一位专家关注语义关系
  • 一位专家关注长距离依赖

最终将所有专家的分析结果整合起来,得到更全面的理解。

import torch.nn as nn
import torch

class MultiHeadAttention(nn.Module):def __init__(self, args: ModelArgs, is_causal=False):# 构造函数# args: 配置对象super().__init__()# 隐藏层维度必须是头数的整数倍,因为后面我们会将输入拆成头数个矩阵assert args.dim % args.n_heads == 0# 每个头的维度,等于模型维度除以头的总数。
        self.head_dim = args.dim // args.n_heads
        self.n_heads = args.n_heads

        # Wq, Wk, Wv 参数矩阵,每个参数矩阵为 n_embd x dim# 这里通过三个组合矩阵来代替了n个参数矩阵的组合,其逻辑在于矩阵内积再拼接其实等同于拼接矩阵再内积,# 不理解的读者可以自行模拟一下,每一个线性层其实相当于n个参数矩阵的拼接
        self.wq = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)# 输出权重矩阵,维度为 dim x dim(head_dim = dim / n_heads)
        self.wo = nn.Linear(self.n_heads * self.head_dim, args.dim, bias=False)# 注意力的 dropout
        self.attn_dropout = nn.Dropout(args.dropout)# 残差连接的 dropout
        self.resid_dropout = nn.Dropout(args.dropout)
        self.is_causal = is_causal

        # 创建一个上三角矩阵,用于遮蔽未来信息# 注意,因为是多头注意力,Mask 矩阵比之前我们定义的多一个维度if is_causal:
            mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
            mask = torch.triu(mask, diagonal=1)# 注册为模型的缓冲区
            self.register_buffer("mask", mask)def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):# 获取批次大小和序列长度,[batch_size, seq_len, dim]
        bsz, seqlen, _ = q.shape

        # 计算查询(Q)、键(K)、值(V),输入通过参数矩阵层,维度为 (B, T, n_embed) x (n_embed, dim) -> (B, T, dim)
        xq, xk, xv = self.wq(q), self.wk(k), self.wv(v)# 将 Q、K、V 拆分成多头,维度为 (B, T, n_head, dim // n_head),然后交换维度,变成 (B, n_head, T, dim // n_head)# 因为在注意力计算中我们是取了后两个维度参与计算# 为什么要先按B*T*n_head*C//n_head展开再互换1、2维度而不是直接按注意力输入展开,是因为view的展开方式是直接把输入全部排开,# 然后按要求构造,可以发现只有上述操作能够实现我们将每个头对应部分取出来的目标
        xq = xq.view(bsz, seqlen, self.n_heads, self.head_dim)
        xk = xk.view(bsz, seqlen, self.n_heads, self.head_dim)
        xv = xv.view(bsz, seqlen, self.n_heads, self.head_dim)
        xq = xq.transpose(1, 2)
        xk = xk.transpose(1, 2)
        xv = xv.transpose(1, 2)# 注意力计算# 计算 QK^T / sqrt(d_k),维度为 (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)# 掩码自注意力必须有注意力掩码if self.is_causal:assert hasattr(self, 'mask')# 这里截取到序列长度,因为有些序列可能比 max_seq_len 短
            scores = scores + self.mask[:, :, :seqlen, :seqlen]# 计算 softmax,维度为 (B, nh, T, T)
        scores = F.softmax(scores.float(), dim=-1).type_as(xq)# 做 Dropout
        scores = self.attn_dropout(scores)# V * Score,维度为(B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        output = torch.matmul(scores, xv)# 恢复时间维度并合并头。# 将多头的结果拼接起来, 先交换维度为 (B, T, n_head, dim // n_head),再拼接成 (B, T, n_head * dim // n_head)# contiguous 函数用于重新开辟一块新内存存储,因为Pytorch设置先transpose再view会报错,# 因为view直接基于底层存储得到,然而transpose并不会改变底层存储,因此需要额外存储
        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)# 最终投影回残差流。
        output = self.wo(output)
        output = self.resid_dropout(output)return output

1.3 Encoder-Decoder架构

Transformer架构采用了Encoder-Decoder架构,其中Encoder负责对输入序列进行编码,Decoder负责根据Encoder的输出生成输出序列。

1.3.1 Encoder

Encoder由多个相同的 layer 堆叠而成,每个层包含两个子层:多头自注意力层和前馈神经网络层。

  1. 多头自注意力层:多头自注意力层用于捕捉输入序列中不同位置之间的依赖关系。
  2. 前馈神经网络层:前馈神经网络层用于对多头自注意力层的输出进行非线性变换。

多头注意力前面讲过了,前馈层实验更简单,有提供好的api直接使用:

class MLP(nn.Module):
    def __init__(self, dim: int, hidden_dim: int, dropout: float):
        super().__init__()
        # 定义第一层线性变换,从输入维度到隐藏维度
        self.w1 = nn.Linear(dim, hidden_dim, bias=False)
        # 定义第二层线性变换,从隐藏维度到输入维度
        self.w2 = nn.Linear(hidden_dim, dim, bias=False)
        # 定义dropout层,用于防止过拟合
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 前向传播函数
        # 首先,输入x通过第一层线性变换和RELU激活函数
        # 最后,通过第二层线性变换和dropout层
        return self.dropout(self.w2(F.relu(self.w1(x))))
    

这样在每个子层之后,都应用了残差连接和层归一化,以加速模型的训练和提高模型的稳定性。一个最简单的 Encode层的实现是这样的:

class EncoderLayer(nn.Module):
  '''Encoder层'''
    def __init__(self, args):
        super().__init__()
        # 一个 Layer 中有两个 LayerNorm,分别在 Attention 之前和 MLP 之前
        self.attention_norm = LayerNorm(args.n_embd)
        # Encoder 不需要掩码,传入 is_causal=False
        self.attention = MultiHeadAttention(args, is_causal=False)
        self.fnn_norm = LayerNorm(args.n_embd)
        self.feed_forward = MLP(args.dim, args.dim, args.dropout)

    def forward(self, x):
        # Layer Norm
        norm_x = self.attention_norm(x)
        # 自注意力
        h = x + self.attention.forward(norm_x, norm_x, norm_x)
        # 经过前馈神经网络
        out = h + self.feed_forward.forward(self.fnn_norm(h))
        return out
     
# 搭建一个 Encoder,由 N 个 Encoder Layer 组成,在最后会加入一个 Layer Norm 实现规范化:   
class Encoder(nn.Module):
    '''Encoder 块'''
    def __init__(self, args):
        super(Encoder, self).__init__() 
        # 一个 Encoder 由 N 个 Encoder Layer 组成
        self.layers = nn.ModuleList([EncoderLayer(args) for _ in range(args.n_layer)])
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x):
        "分别通过 N 层 Encoder Layer"
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)

1.3.2 Decoder

Decoder也由多个相同的 layer 堆叠而成,每个层包含三个子层:掩码多头自注意力层、多头注意力层和前馈神经网络层。

  1. 掩码多头自注意力层:掩码多头自注意力层用于捕捉输出序列中不同位置之间的依赖关系,同时确保模型在生成输出序列时,只能关注到当前位置之前的位置。
  2. 多头注意力层:多头注意力层用于捕捉输入序列和输出序列之间的依赖关系。
  3. 前馈神经网络层:前馈神经网络层用于对多头注意力层的输出进行非线性变换。

在每个子层之后,也应用了残差连接和层归一化。

class DecoderLayer(nn.Module):
  '''解码层'''
    def __init__(self, args):
        super().__init__()
        # 一个 Layer 中有三个 LayerNorm,分别在 Mask Attention 之前、Self Attention 之前和 MLP 之前
        self.attention_norm_1 = LayerNorm(args.n_embd)
        # Decoder 的第一个部分是 Mask Attention,传入 is_causal=True
        self.mask_attention = MultiHeadAttention(args, is_causal=True)
        self.attention_norm_2 = LayerNorm(args.n_embd)
        # Decoder 的第二个部分是 类似于 Encoder 的 Attention,传入 is_causal=False
        self.attention = MultiHeadAttention(args, is_causal=False)
        self.ffn_norm = LayerNorm(args.n_embd)
        # 第三个部分是 MLP
        self.feed_forward = MLP(args.dim, args.dim, args.dropout)

    def forward(self, x, enc_out):
        # Layer Norm
        norm_x = self.attention_norm_1(x)
        # 掩码自注意力
        x = x + self.mask_attention.forward(norm_x, norm_x, norm_x)
        # 多头注意力
        norm_x = self.attention_norm_2(x)
        h = x + self.attention.forward(norm_x, enc_out, enc_out)
        # 经过前馈神经网络
        out = h + self.feed_forward.forward(self.ffn_norm(h))
        return out
        
# 同样的,搭建一个 Decoder 块:
class Decoder(nn.Module):
    '''解码器'''
    def __init__(self, args):
        super(Decoder, self).__init__() 
        # 一个 Decoder 由 N 个 Decoder Layer 组成
        self.layers = nn.ModuleList([DecoderLayer(args) for _ in range(args.n_layer)])
        self.norm = LayerNorm(args.n_embd)

    def forward(self, x, enc_out):
        "Pass the input (and mask) through each layer in turn."
        for layer in self.layers:
            x = layer(x, enc_out)
        return self.norm(x)

1.4 代码实现Transformer架构

再从完整的结构看一下 Transformer,完成前面的 Encoder、Decoder 搭建以后,就完成了 Transformer 的核心部分,接下来将 Encoder、Decoder 拼接起来再加入 Embedding 层就可以搭建出完整的 Transformer 模型。

class Transformer(nn.Module):
   '''整体模型'''
    def __init__(self, args):
        super().__init__()
        # 必须输入词表大小和 block size
        assert args.vocab_size is not None
        assert args.block_size is not None
        self.args = args
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(args.vocab_size, args.n_embd),
            wpe = PositionalEncoding(args),
            drop = nn.Dropout(args.dropout),
            encoder = Encoder(args),
            decoder = Decoder(args),
        ))
        # 最后的线性层,输入是 n_embd,输出是词表大小
        self.lm_head = nn.Linear(args.n_embd, args.vocab_size, bias=False)

        # 初始化所有的权重
        self.apply(self._init_weights)

        # 查看所有参数的数量
        print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))

    '''统计所有参数的数量'''
    def get_num_params(self, non_embedding=False):
        # non_embedding: 是否统计 embedding 的参数
        n_params = sum(p.numel() for p in self.parameters())
        # 如果不统计 embedding 的参数,就减去
        if non_embedding:
            n_params -= self.transformer.wte.weight.numel()
        return n_params

    '''初始化权重'''
    def _init_weights(self, module):
        # 线性层和 Embedding 层初始化为正则分布
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    
    '''前向计算函数'''
    def forward(self, idx, targets=None):
        # 输入为 idx,维度为 (batch size, sequence length, 1);targets 为目标序列,用于计算 loss
        device = idx.device
        b, t = idx.size()
        assert t <= self.args.block_size, f"不能计算该序列,该序列长度为 {t}, 最大序列长度只有 {self.args.block_size}"

        # 通过 self.transformer
        # 首先将输入 idx 通过 Embedding 层,得到维度为 (batch size, sequence length, n_embd)
        print("idx",idx.size())
        # 通过 Embedding 层
        tok_emb = self.transformer.wte(idx)
        print("tok_emb",tok_emb.size())
        # 然后通过位置编码
        pos_emb = self.transformer.wpe(tok_emb) 
        # 再进行 Dropout
        x = self.transformer.drop(pos_emb)
        # 然后通过 Encoder
        print("x after wpe:",x.size())
        enc_out = self.transformer.encoder(x)
        print("enc_out:",enc_out.size())
        # 再通过 Decoder
        x = self.transformer.decoder(x, enc_out)
        print("x after decoder:",x.size())

        if targets is not None:
            # 训练阶段,如果我们给了 targets,就计算 loss
            # 先通过最后的 Linear 层,得到维度为 (batch size, sequence length, vocab size)
            logits = self.lm_head(x)
            # 再跟 targets 计算交叉熵
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            # 推理阶段,我们只需要 logits,loss 为 None
            # 取 -1 是只取序列中的最后一个作为输出
            logits = self.lm_head(x[:, [-1], :]) # note: using list [-1] to preserve the time dim
            loss = None

        return logits, loss

1.5 总结

以下内容由 DeepSeek 生成:

本文详细介绍了AI大模型的核心技术和实现方法,包括NLP基础概念、Transformer架构、预训练语言模型等内容。通过本文的学习,读者可以从零开始,深入理解AI大模型的工作原理和实现方法。

AI大模型是当前人工智能领域的研究热点,它具有强大的语言理解和生成能力,正在深刻改变着我们的生活和工作方式。随着技术的不断发展,AI大模型的性能和应用场景将会不断拓展,为人类社会带来更多的便利和创新。