Transformer大语言模型架构原理学习笔记

1. 模型架构与优化方法

在架构里面,我们做的就是把输入的x映射到输出的y,这个映射过程就是模型架构。但是如果模型里面的参数或者是这个模型是随机的,那么这个模型就没有意义,所以我们需要优化方法来优化模型。

就笔者的理解,模型内部实际上就是一堆矩阵(线性变换),与输入x进行矩阵乘法,得到输出y。通过训练,我们希望模型内部的矩阵能够尽量使得输入x映射到输出y。

但是完全的映射是不能的,因为输入和输出都是离散的,所以我们需要一个函数来衡量模型的预测结果与真实结果之间的差距,然后通过优化方法来最小化这个函数,从而优化模型内部的矩阵。

这个函数就是所谓的损失函数,常见的损失函数有均方误差、交叉熵等。(交叉熵是分类任务常用的损失函数,均方误差是回归任务常用的损失函数)

优化方法有梯度下降、牛顿法等。

这里Transformer模型架构的学习我采用了交叉熵(原因参见下文)作为损失函数,采用梯度下降作为优化方法(这个算法成本最低,其他的都还不懂)。

2. 大语言模型架构

Transformer大语言模型是一种基于自注意力机制的深度神经网络模型,用于处理自然语言处理任务。Transformer模型的主要架构包括编码器和解码器,以及它们之间的连接。

Transformer大语言模型的主要特点包括:

  • 自注意力机制:Transformer模型使用自注意力机制来计算序列中每个元素与其他元素之间的关系,从而捕捉序列中的长距离依赖关系。自注意力机制通过计算查询、键和值向量之间的点积来计算注意力权重,然后使用这些权重对值向量进行加权求和,得到每个元素的上下文表示。

  • 位置编码:Transformer模型使用位置编码来为序列中的每个元素添加位置信息,以便模型能够区分序列中的不同位置。位置编码通常使用正弦和余弦函数来生成。

  • 前馈神经网络:Transformer模型中的每个编码器和解码器都包含一个前馈神经网络,用于对输入进行非线性变换。前馈神经网络由两个线性层和一个非线性激活函数组成。

  • 多头注意力:Transformer模型使用多头注意力机制来捕捉序列中每个元素与其他元素之间的关系。多头注意力机制通过将输入序列分成多个子序列,并使用不同的查询、键和值向量来计算注意力权重,从而捕捉序列中的不同特征。

  • 残差连接:Transformer模型使用残差连接来缓解深层神经网络中的梯度消失问题。残差连接通过将输入直接添加到前馈神经网络的输出中,使得梯度能够直接传播到更深的层。

  • 层归一化:Transformer模型使用层归一化来稳定深层神经网络的训练过程。层归一化通过对每个输入序列的每个元素进行归一化,使得输入的分布更加稳定。

3. 模型架构原理

模型架构可以先看下面的流程图

在整体记录前,我们需要知道这个输入 x 是啥。

首先x不可能是用户输入的文本,因为文本无法映射到模型内部的矩阵,那么我们就需要分词,将一段连续的文本分割成一个个的词,然后通过词嵌入(Embedding)将词映射到模型内部的矩阵。

注意词很难独立映射到模型(Embedding)内部的矩阵,在分完词后,我们需要用一个 wordToIdIdToWord 来记录词和id之间的映射关系。这里的 id 是一个数字,所有的词会映射到独立的数字,我们用连续的数字映射不同的词,那样就可以用一个二维矩阵来表示所有的词(每一行向量都有自己的词相对应)。注意这里分完词后的词表大小,我们用 vocab_size 来表示,这个东西很重要。

然后我们就可以将词映射到模型内部的矩阵了,这个矩阵就是词嵌入矩阵,我们用 E 来表示。这个矩阵的维度是 [vocab_size, dim_model],其中 dim_model 是嵌入向量的维度(就是每个词所对应的向量的维度)。

我们拿到用户输入的文本后,先通过分词将文本转换成一个词的列表,然后通过 wordToId 将词列表转换成 id 列表(注意在原本创建词列表时需要多四个符号,分别是 unk 表示不知道,pad 表示填充,bos 表示文本开始,eos 表示文本结束),在id列表的前后加上 boseosid 列表的长度也为 seq_len

这里的id列表就是我们的输入x了。

到这里,你已经明白了怎么将输入的文本变成模型可以处理的输入了,那么,模型是如何预测的呢?

这里看到第一个架构

通过中间的Model,将输入x映射到输出y,这里的Model就是我们要设计和训练的东西了。

而输出y是啥呢,我学习到,这里应该会是一个logit数组(即通过逻辑回归得到的数组),通过 Softmax 函数将logit数组映射到概率分布,后通过概率分布取最大概率的id,通过 IdToWord 将id映射到词,这样就可以得到预测的文本了。

所以我们训练时的损失函数就应该是 CrossEntropy Loss,即交叉熵损失函数。公式如下

其中 N 是样本数量,M 是类别数量,y_{ij} 是样本 i 的第 j 类真实标签,y’_{ij} 是样本 i 的第 j 类预测概率。

我们训练数据实际上会是一大段一大段的文本,我们通过滑动窗口的方式,将文本分成很多个样本,每个样本的长度是 seq_len,然后通过上面的流程图,将样本映射到输出y,这里的y会是[window_size, vocab_size]形状的,然后通过交叉熵损失函数计算损失,然后通过梯度下降优化方法来优化模型内部的矩阵。

以上就是大语言模型的基础架构了。就可以拿回之前的流程图。

这样就清晰些了吧~

Transformer流程图

接下来是训练流程和输出流程

4. 训练流程

4.1 分词

当我们拿到一段文本时,我们需要将文本分词,将文本转换成词的列表。这里我们使用jieba分词。(jieba分词是Python中一个常用的中文分词库,它支持多种分词模式,包括精确模式、全模式和搜索引擎模式等。)

实际上,我们训练时会用一大段一大段的txt文件,我们要计划好在哪里分词,哪里插入对应的符号。

由于本人能力有限,只能在网上爬几十本小说来训练,我们就需要批量读取文本,这里先放代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from torch.utils.data import DataLoader
import os
import re
import json
import jieba
from typing import List, Dict


class DataProcessor:
"""文本处理与词表构建模块"""

def __init__(self, file_paths=None, max_vocab_size=None):
print(f"Initializing DataProcessor...")

# 默认参数不使用可变列表
if file_paths is None:
file_paths = []

# 特殊 token
self.pad = '<pad>'
self.unk = '<unk>'
self.bos = '<bos>'
self.eos = '<eos>'
self.special_tokens = [self.pad, self.unk, self.bos, self.eos]

# 数据存储
self.file_paths = file_paths
self.texts: List[str] = []
self.word_freq: Dict[str, int] = {}

# 初始化特殊 token 词频
for tok in self.special_tokens:
self.word_freq[tok] = 0

# 加载文本并统计词频
self._load_and_process_data()

# 根据词频自动构建 vocab
self.build_vocab(max_vocab_size)

print(f"Loaded {len(self.texts)} paragraphs, vocab size={len(self.vocab)}")

# -----------------------------
# 数据加载与预处理
# -----------------------------
def _load_and_process_data(self):
"""加载文件、清洗文本、分词并统计词频"""

for file_path in self.file_paths:
try:
print(f"Loading file: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()

paragraphs = [p.strip() for p in content.split("\n\n") if p.strip()]

for idx, paragraph in enumerate(paragraphs):

paragraph = self._clean_text(paragraph)
if not paragraph:
continue

# 保存段落文本
self.texts.append(paragraph)

# 分词
words = jieba.lcut(paragraph)

# 统计词频
for w in words:
self.word_freq[w] = self.word_freq.get(w, 0) + 1

# 每段增加一次 BOS/EOS
self.word_freq[self.bos] += 1
self.word_freq[self.eos] += 1

if idx % 1000 == 0:
print(f"Processed {idx} paragraphs")

except Exception as e:
print(f"Error processing file {file_path}: {e}")

# -----------------------------
# 文本清洗
# -----------------------------
def _clean_text(self, text: str) -> str:
"""清理全角空格、零宽字符、多余空白等"""
text = text.replace('\u3000', ' ')
text = text.replace('\u200b', '').replace('\u200d', '')
text = re.sub(r'\s+', ' ', text)
return text.strip()

# -----------------------------
# 词表构建
# -----------------------------
def build_vocab(self, max_vocab_size=None):
"""按词频排序构建词表,可限制最大词表大小"""

sorted_words = sorted(
self.word_freq.items(),
key=lambda x: x[1],
reverse=True
)

# 最终 vocab:特殊词 + 高频词
words = [w for w, _ in sorted_words if w not in self.special_tokens]

if max_vocab_size:
words = words[:max_vocab_size - len(self.special_tokens)]

self.vocab = self.special_tokens + words

# -----------------------------
# 工具函数
# -----------------------------
def get_vocab(self):
return self.vocab

def get_texts(self):
return self.texts

def get_word_frequency(self, word):
return self.word_freq.get(word, 0)

def get_total_words(self):
return sum(self.word_freq.values())

def get_sorted_vocab_by_freq(self):
return sorted(self.vocab, key=lambda x: self.word_freq.get(x, 0), reverse=True)

# -----------------------------
# 保存函数
# -----------------------------
def save_vocab(self, path="./vocab/vocab.txt"):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
for w in self.vocab:
f.write(w + "\n")
print(f"Saved vocab to {path}")

def save_texts(self, path="./vocab/processed_texts.txt"):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
for t in self.texts:
f.write(t + "\n")
print(f"Saved processed texts to {path}")

def save_word_freq(self, path="./vocab/vocab_freq.json"):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.word_freq, f, ensure_ascii=False, indent=4)
print(f"Saved word freq to {path}")

这个分词的工作流程

这里的分词很好看懂,接下来就是用这个分词创建数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import torch
import bisect
from torch.utils.data import Dataset


class TextDataset(Dataset):
"""
Transformer/GPT 语言模型数据集
支持:
- 分词 -> 数字化
- 滑动窗口序列
- input 与 target 序列对
"""

def __init__(self, data_processor, seq_length=512, path=None):
self.data_processor = data_processor
self.seq_length = seq_length

if path:
self.load(path)
return

# -----------------------
# 构建词表映射
# -----------------------
vocab = data_processor.get_vocab()

self.id_to_vocab = {i + 1: w for i, w in enumerate(vocab)}
self.vocab_to_id = {w: i + 1 for i, w in enumerate(vocab)}

# pad → id = 0
self.id_to_vocab[0] = data_processor.pad
self.vocab_to_id[data_processor.pad] = 0

self.vocab_size = len(self.vocab_to_id)

# -----------------------
# 从文本构建连续 token 流
# -----------------------
self.tokens = self.build_token_stream(data_processor.get_texts())

# 数据集大小(滑动窗口)
self.size = len(self.tokens) - seq_length - 1

# ----------------------------------------------------------------------
def build_token_stream(self, texts):
"""将所有文本串联成一个长 token 序列"""
all_tokens = []

for text in texts:
words = [self.data_processor.bos]
words.extend(jieba.lcut(text))
words.append(self.data_processor.eos)

ids = [self.vocab_to_id.get(w, self.vocab_to_id[self.data_processor.unk]) for w in words]
all_tokens.extend(ids)

return all_tokens

# ----------------------------------------------------------------------
def __len__(self):
return self.size

# ----------------------------------------------------------------------
def __getitem__(self, idx):
# input: tokens[idx: idx+seq_length]
# target: tokens[idx+1: idx+seq_length+1]
x = self.tokens[idx : idx + self.seq_length]
y = self.tokens[idx + 1 : idx + self.seq_length + 1]
return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# ----------------------------------------------------------------------
def save(self, path):
print("Saving dataset...")
torch.save(self, path)

# ----------------------------------------------------------------------
def load(self, path):
print("Loading dataset...")
data = torch.load(path)
self.__dict__.update(data.__dict__)

这个数据集的工作流程

这样,就可以通过下面代码

1
2
3
4
5
6
7
8
9
10
11
12
file_paths = []
dir_path = "./data" # 数据集文件夹路径

for name in os.listdir(dir_path):
file_paths.append(os.path.join(dir_path, name))

data_processor = DataProcessor(file_paths)

text_dataset = TextDataset(
data_processor=data_processor,
seq_length=128 # 滑动窗口长度,根据自己服务器内存大小调整
)

创建数据集结束,到这里我们获得 x 了

4.2 EMbedding

这个英语单词是嵌入的意思,就是将单词转换为向量,这里的向量实际上也算是模型的参数,是需要在训练中学习的,这个过程就是词向量的训练。

所以这个应该也需要设置一个模型,我们命名为 Embedding,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=512):
"""
位置编码类的初始化函数
参数:
d_model: 模型的维度
max_len: 序列的最大长度,默认为512
"""
super(PositionalEncoding, self).__init__()

# 创建一个位置编码矩阵,初始值为0
pe = torch.zeros(max_len, d_model)
# 创建位置张量,并增加一个维度
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 计算位置编码的除数项,用于交替计算sin和cos
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))

# 使用sin函数计算偶数位置
pe[:, 0::2] = torch.sin(position * div_term)
# 使用cos函数计算奇数位置
pe[:, 1::2] = torch.cos(position * div_term)
# 增加维度并转置位置编码矩阵
pe = pe.unsqueeze(0).transpose(0, 1)

# 将位置编码矩阵注册为buffer,这样它会被视为模型的一部分,但不会作为参数更新
self.register_buffer('pe', pe)

def forward(self, x):
"""
前向传播函数
参数:
x: 输入张量,形状为(seq_len, batch_size, d_model)
返回:
添加了位置编码的输入张量
"""
return x + self.pe[:x.size(1), :].transpose(0, 1)


class WordEmbeddingModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, max_seq_length=512, dropout=0.1):
"""
初始化词嵌入模型
参数:
vocab_size (int): 词汇表大小
embedding_dim (int): 词嵌入维度
max_seq_length (int): 最大序列长度,默认为512
dropout (float): dropout比率,默认为0.1
"""
super(WordEmbeddingModel, self).__init__()
self.embedding_dim = embedding_dim

# 初始化词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 初始化位置编码层
self.pos_encoding = PositionalEncoding(embedding_dim, max_seq_length)
# 初始化dropout层
self.dropout = nn.Dropout(dropout)
# 初始化层归一化层
self.layer_norm = nn.LayerNorm(embedding_dim)

def forward(self, x):
"""
前向传播过程
参数:
x (torch.Tensor): 输入张量,形状为(batch_size, sequence_length)
返回:
torch.Tensor: 经过嵌入、位置编码和归一化后的张量
"""
# x形状: (batch_size, sequence_length)
# seq_length = x.size(1)

# 词嵌入 + 缩放(Transformer标准做法)
embedded = self.embedding(x) * math.sqrt(self.embedding_dim) # (batch_size, sequence_length, embedding_dim)

# 位置编码
embedded = self.pos_encoding(embedded)

# 层归一化和dropout
embedded = self.layer_norm(embedded)
embedded = self.dropout(embedded)

return embedded # (batch_size, sequence_length, embedding_dim)

这里需要加上位置编码,需要记录每个词的位置因为每个词在不同的语境意思不同,就需要加一个位置编码来区分不同位置。这个有公式如下

最后出来的 X_ 就是词嵌入向量。又或者说 embedded, 形状是 (batch_size, sequence_length, embedding_dim)。

4.3 Attention

注意力层用于计算输入序列中每个元素对输出序列中每个元素的影响程度,从而生成更准确的输出。注意力机制的核心思想是,对于输出序列中的每个元素,模型都会计算输入序列中每个元素对该元素的影响程度,并根据这些影响程度对输入序列进行加权求和,得到该元素的输出。

简而言之就是找到上下文之间的关系,那么ATTENTION就被提出,专门用来找这个关系

单头注意力机制流程图,我们可以先从单头注意力机制来实现,然后再扩展到多头注意力机制。

有单头注意力机制代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class AttentionLayer(nn.Module):
"""单头因果自注意力层(GPT 风格)"""
def __init__(self, k_dim, v_dim, emb_dim):
super(AttentionLayer, self).__init__()
self.k_dim = k_dim
self.v_dim = v_dim
self.emb_dim = emb_dim

# Q, K, V 的线性变换
self.query = nn.Linear(emb_dim, k_dim)
self.key = nn.Linear(emb_dim, k_dim)
self.value = nn.Linear(emb_dim, v_dim)

def forward(self, x):
"""
x shape: (batch_size, seq_len, emb_dim)
"""

# 得到 Q, K, V
Q = self.query(x) # (batch, seq, k_dim)
K = self.key(x) # (batch, seq, k_dim)
V = self.value(x) # (batch, seq, v_dim)

# Scaled Dot-Product Attention
attention_scores = torch.matmul(Q, K.transpose(-2, -1)) # (batch, seq, seq)
attention_scores = attention_scores / math.sqrt(self.k_dim)

# 构造因果 Mask(下三角矩阵)
# 保证每个 token 只能看到自己和过去的 token
mask = torch.tril(torch.ones_like(attention_scores)).bool()
attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

# softmax 得到注意力权重
attention_weights = torch.softmax(attention_scores, dim=-1) # (batch, seq, seq)

# 加权求和 V
attention_output = torch.matmul(attention_weights, V) # (batch, seq, v_dim)

return attention_output

由流程图看,先通过 K, Q, V 的线性变换得到 k, q, v,然后计算得分矩阵 S,然后应用因果 Mask,然后缩放,然后 softmax 得到注意力权重,最后加权求和得到输出。

但是单头注意力往往不够,一句话里面的意思具有多重性,事物也有不同的特征,故需要不同的注意力头来捕获不同的特征才能得到更好的效果,所以需要多头注意力机制。

多头注意力机制流程图

多头注意力不必要把多个单独的 Attention 串联起来,而是可以并行计算,最后再拼接起来,所以速度会快很多。我们通过定义三个矩阵来表示总的 K Q V,将三个矩阵分割开就变成了多个 K Q V,然后分别计算,最后拼接起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class MultiHeadAttentionLayer(nn.Module):
"""多头自注意力(Multi-Head Self-Attention)层"""

def __init__(self, num_heads, emb_dim, dropout=0.1):
"""
多头注意力机制
Args:
num_heads: 注意力头数量 h
emb_dim: 输入/输出 embedding 维度 d_model
"""
super(MultiHeadAttentionLayer, self).__init__()

self.num_heads = num_heads
self.emb_dim = emb_dim
self.head_dim = emb_dim // num_heads # 每个头的维度 d_k 或 d_v

assert emb_dim % num_heads == 0, "emb_dim 必须能被 num_heads 整除"

# Q, K, V 的线性映射:从 d_model → d_model
# 然后再 reshape 成多个注意力头
self.W_q = nn.Linear(emb_dim, emb_dim)
self.W_k = nn.Linear(emb_dim, emb_dim)
self.W_v = nn.Linear(emb_dim, emb_dim)

# 将 h 个 head 拼接后,再映射回 d_model
self.W_o = nn.Linear(emb_dim, emb_dim)

self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(emb_dim)

def forward(self, x, mask=None):
"""
Args:
x: (batch_size, seq_len, d_model)
mask: 可选的掩码 (seq_len, seq_len) 或 (batch, 1, seq_len, seq_len)
Returns:
output: (batch_size, seq_len, d_model)
"""
batch_size, seq_len, _ = x.shape

# 残差连接的输入
residual = x

# ---------- Step 1: 线性投影并拆分成多个头 ----------
# 得到形状 (batch, seq_len, h, head_dim) → 转置成 (batch, h, seq_len, head_dim)
Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

# ---------- Step 2: QK^T 计算注意力分数 ----------
# 形状变成 (batch, h, seq_len, seq_len)
attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

# ---------- Step 3: 应用掩码(masking) ----------
if mask is not None:
# mask=0 的地方填入 -inf,使 softmax=0
attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))
else:
# 因果 mask (GPT 使用):下三角为1,上三角为0
causal_mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool()
attention_scores = attention_scores.masked_fill(~causal_mask, float('-inf'))

# ---------- Step 4: softmax 得注意力权重 ----------
attention_weights = torch.softmax(attention_scores, dim=-1)
attention_weights = self.dropout(attention_weights)

# ---------- Step 5: 加权求和得到注意力输出 ----------
# 每个头的输出: (batch, h, seq_len, head_dim)
attention_output = torch.matmul(attention_weights, V)

# ---------- Step 6: 合并多个头 ----------
# 转回 (batch, seq_len, h * head_dim = d_model)
attention_output = attention_output.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.emb_dim
)

# ---------- Step 7: 输出线性层 ----------
attention_output = self.W_o(attention_output)
attention_output = self.dropout(attention_output)

# ---------- Step 8: 残差连接 + LayerNorm ----------
output = self.layer_norm(attention_output + residual)

return output

前馈神经网络(Feed Forward Neural Network)由两个线性层和一个 ReLU 激活函数组成。第一个线性层将输入映射到更高维度,第二个线性层将映射后的结果映射回原始维度。中间的 ReLU 激活函数引入非线性,使模型能够学习更复杂的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class FeedForwardNetwork(nn.Module):
"""Transformer 前馈全连接网络(Position-wise Feed Forward Network, FFN)"""

def __init__(self, emb_dim, expansion_factor=4, dropout=0.1):
super(FeedForwardNetwork, self).__init__()

# 输入/输出维度都是 emb_dim,隐藏层维度扩大 expansion_factor 倍(通常是 4×)
self.emb_dim = emb_dim
self.hidden_dim = emb_dim * expansion_factor

# 前馈网络 FFN = Linear → GELU(ReLU) → Dropout → Linear → Dropout
self.network = nn.Sequential(
nn.Linear(emb_dim, self.hidden_dim), # 第 1 个全连接层(升维)
nn.GELU(), # 激活函数(可改为 ReLU)
nn.Dropout(dropout), # Dropout 防止过拟合
nn.Linear(self.hidden_dim, emb_dim), # 第 2 个全连接层(降回 emb_dim)
nn.Dropout(dropout) # 输出后再做一次 Dropout
)

# 残差连接后的 LayerNorm
self.layer_norm = nn.LayerNorm(emb_dim)

def forward(self, x):
# x: (batch_size, seq_len, emb_dim)

residual = x # 残差连接(将输入保留下来)
output = self.network(x) # 通过两层全连接+激活函数的 FFN

# 加上残差再 LayerNorm(Transformer 标准结构)
output = self.layer_norm(output + residual)

return output # 输出形状与输入一致: (batch_size, seq_len, emb_dim)

这个前馈神经网络(Feed Forward Neural Network)比较简单,主要是要看使用了 残差连接避免梯度消失

将前面组合成完整的 Transformer 层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TransformerBlock(nn.Module):
"""完整的Transformer块"""

def __init__(self, num_heads, emb_dim, dropout=0.1):
super(TransformerBlock, self).__init__()
self.attention = MultiHeadAttentionLayer(num_heads, emb_dim, dropout)
self.feed_forward = FeedForwardNetwork(emb_dim, dropout=dropout)

def forward(self, x, mask=None):
# 自注意力子层
x = self.attention(x, mask)
# 前馈网络子层
x = self.feed_forward(x) # (batch_size, seq_length, emb_dim)
return x

4.4 输出预测层

通过前面的铺垫,我们获得了完整语义,完整语境,现在就要来说接下来是啥话了。我们需要通过一个全连接层来输出预测 logit,通过 SoftMax 就可以拿到预测概率然后进行选词。

不过前面的参数量较大,我们可以选择用一个新的 FFN 来表示,也可以共享第一层里面的输出层的参数。

这里我使用第二种,不过给出第一种的代码。

第一种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class OutputLayer(nn.Module):
"""输出预测层"""

def __init__(self, emb_dim, vocab_size, dropout=0.1):
super(OutputLayer, self).__init__()

# 输出层:emb_dim -> vocab_size
self.linear = nn.Linear(emb_dim, vocab_size)

# Dropout
self.dropout = nn.Dropout(dropout)

def forward(self, x):
# x: (batch_size, seq_len, emb_dim)

# Dropout
x = self.dropout(x)

# 输出层 # (batch_size, seq_len, vocab_size)
output = self.linear(x)

return output

第二种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class LinearLayer(nn.Module):
"""权重共享输出层 - 大幅减少参数"""

def __init__(self, emb_dim, vocab_size, dropout=0.1):
super(LinearLayer, self).__init__()

# 注意:这个层需要与词嵌入层共享权重
self.output = nn.Linear(emb_dim, vocab_size, bias=False)
self.dropout = nn.Dropout(dropout)

def forward(self, x):
x = self.dropout(x)
return self.output(x)

def tie_weights(self, embedding_layer):
"""与词嵌入层共享权重"""
self.output.weight = embedding_layer.weight

这个是普通的线性层,没有那么多讲究,维度正确就行

4.5 组合成完整的 Transformer 模型(GTP2架构)

跟着流程图搭建model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Model(nn.Module):
"""
完整的 Transformer 语言模型(GPT 类架构,Decoder-only)。

主要组成:
- 词嵌入(含位置编码)
- 多个 Transformer Block(自注意力 + 前馈网络)
- 输出线性层 + 权重共享(tie weights)

输入:
x: (batch_size, seq_length) —— 输入 token id 序列
mask: (batch_size, 1, 1, seq_length) 或 None —— 可选的注意力掩码

输出:
logits: (batch_size, seq_length, vocab_size)
"""

def __init__(self, num_heads, num_transformer_blocks, emb_dim, seq_length, vocab_size, dropout=0.1):
super(Model, self).__init__()

self.emb_dim = emb_dim
self.seq_length = seq_length
self.vocab_size = vocab_size
self.num_heads = num_heads
self.num_transformer_blocks = num_transformer_blocks

# ===============================
# 1. 词 + 位置编码 Embedding 模块
# ===============================
# 输出形状:(batch_size, seq_length, emb_dim)
self.embedding = WordEmbeddingModel(
vocab_size,
emb_dim,
max_seq_length=seq_length,
dropout=dropout
)

# ===============================
# 2. N 个 Transformer Block 堆叠
# ===============================
# 每个 Block 包含:
# - 多头自注意力(含残差 + LayerNorm)
# - 前馈网络 FFN(含残差 + LayerNorm)
#
# 输入 / 输出形状均为 (batch_size, seq_length, emb_dim)
self.transformer_blocks = nn.ModuleList(
[TransformerBlock(num_heads, emb_dim, dropout) for _ in range(num_transformer_blocks)]
)

# ===============================
# 3. 输出层(预测词分布)
# ===============================
# 将最后的 embedding 映射到 vocab_size
# 输出 logits: (batch_size, seq_length, vocab_size)
self.linear = LinearLayer(emb_dim, vocab_size, dropout)

# 权重共享(tie weights)
# 输出层权重 = 输入词嵌入权重
self.linear.tie_weights(self.embedding.embedding)

# ---------------------------------------------------
# 权重初始化(未自动启用,需要调用 model.apply(model._init_weights))
# ---------------------------------------------------
def _init_weights(self, module):
"""遵循 GPT/Transformer 标准初始化方式"""
if isinstance(module, nn.Linear):
# 正态初始化
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)

elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

elif isinstance(module, nn.LayerNorm):
torch.nn.init.zeros_(module.bias)
torch.nn.init.ones_(module.weight)

# ---------------------------------------------------
# 前向传播
# ---------------------------------------------------
def forward(self, x, mask=None):
"""
x: 输入 token id 序列,形状 (batch_size, seq_length)
mask: 注意力掩码(可选),用于遮盖未来 token(因果掩码)

返回:
logits: (batch_size, seq_length, vocab_size)
"""

# 1. 词嵌入 (embedding + position embedding)
# 输出形状: (batch_size, seq_length, emb_dim)
x = self.embedding(x)

# 2. 依次通过 Transformer Blocks
for block in self.transformer_blocks:
x = block(x, mask)
# 每个 block 输出形状: (batch_size, seq_length, emb_dim)

# 3. 输出层 — 映射到词表大小
logits = self.linear(x) # (batch_size, seq_length, vocab_size)

return logits

# ---------------------------------------------------
# 保存模型参数
# ---------------------------------------------------
def save(self, path):
"""保存模型参数到文件"""
torch.save(self.state_dict(), path)

# ---------------------------------------------------
# 加载模型参数
# ---------------------------------------------------
def load(self, path):
"""从文件中加载模型参数"""
if os.path.exists(path):
self.load_state_dict(torch.load(path))

到这里,模型搭建完毕

4.6 损失函数

由于是预测模型,我们用 交叉熵 损失函数

1
2
3
4
5
crossLoss = nn.CrossEntropyLoss(ignore_index=0)

def Loss(y_pred, y_true):
y_pred = y_pred.transpose(1, 2)
return crossLoss(y_pred, y_true)

注意要调整维度,设置ignore_index(忽略掉pad)

4.7 训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# ============================
# 选择设备(GPU 优先)
# ============================
device = "cuda" if torch.cuda.is_available() else "cpu"

# ============================
# 加载数据集文件路径
# ============================
file_paths = []
dir_path = "./data"

for name in os.listdir(dir_path):
file_paths.append(os.path.join(dir_path, name))

# ============================
# 预处理训练数据(构建词表 + 分词 + 转 ID)
# ============================
data_processor = DataProcessor(file_paths)

# ============================
# 构建训练集
# ============================
text_dataset = TextDataset(
data_processor=data_processor,
seq_length=128
)

# ============================
# 创建测试数据处理器(复用词表)
# ============================
test_data_processor = DataProcessor()
test_data_processor.loadbutnottextWithdataprocessor(data_processor)

# 加载测试文本,将其处理成 tokens
test_data_processor.process_text("test_data\神明将世,看见血条的我杀疯了.txt")

# ============================
# 构建测试集
# ============================
test_dataset = TextDataset(
data_processor=test_data_processor,
seq_length=128
)

# ============================
# 初始化模型
# ============================
net = Model(
num_heads=8,
num_transformer_blocks=6,
emb_dim=256,
seq_length=128,
vocab_size=text_dataset.vocab_size,
dropout=0.1
)

# 初始化模型参数
net.apply(net._init_weights)

# 尝试加载已有模型(微调)
net.load("./model/model.pt")

# ============================
# 数据索引,用于随机采样
# ============================
all_indices = np.arange(len(text_dataset))
test_indices = np.arange(len(test_dataset))

# 每个 epoch 从训练集中随机抽 8000 个样本
num_samples_per_epoch = 8000


# =====================================
# 训练步骤(前向 + 反向 + 更新参数)
# =====================================
def train_step(X, y, loss_fn, optimizer, net):
"""
一个训练 step:
- 前向传播
- 反向传播
- 更新模型参数
"""
X = X.to(device)
y = y.to(device)

optimizer.zero_grad()
y_pred = net(X) # (batch, seq, vocab_size)

loss = loss_fn(y_pred, y)
loss.backward()

optimizer.step()
return loss.item()


# =====================================
# 测试步骤(计算 loss 和准确率)
# =====================================
def test_step(X, y, loss_fn, net):
"""
评估步骤:
- 仅前向传播
- 计算 loss 和准确率
"""
X = X.to(device)
y = y.to(device)

y_pred = net(X)
loss = loss_fn(y_pred, y)

# 取预测 token(最大概率)
preds = y_pred.argmax(dim=-1) # (batch, seq)

# 忽略 PAD=0 的位置
mask = (y != 0)

correct = (preds == y) & mask
acc = correct.sum().item()
total = mask.sum().item()

return loss.item(), acc, total


# =====================================
# 主训练循环
# =====================================
if __name__ == "__main__":
best_accuracy = 0

# Adam 优化器
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

# 当 loss 无改进时自动降低学习率
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.1, patience=2
)

net = net.to(device)

for epoch in range(10):

# ================
# 训练集采样
# ================
random_indices = np.random.choice(
all_indices, size=num_samples_per_epoch, replace=False
)

train_loader = DataLoader(
text_dataset,
batch_size=8,
sampler=SubsetRandomSampler(random_indices),
shuffle=False
)

# ====== 开始训练 ======
loss = 0
net.train()

idx = 0
for X, y in train_loader:
loss += train_step(X, y, Loss, optimizer, net)

if idx % 5 == 0:
print(f"Epoch {epoch}, Batch {idx}, Loss: {loss/(idx+1):.5f}")
idx += 1

print(f"Epoch {epoch}, Train Loss: {loss/len(train_loader):.5f}")

# ================
# 测试集采样
# ================
test_random_indices = np.random.choice(test_indices, size=1000, replace=False)
test_loader = DataLoader(
test_dataset,
batch_size=8,
sampler=SubsetRandomSampler(test_random_indices),
shuffle=False
)

# ====== 开始测试 ======
net.eval()
with torch.no_grad():
test_loss = 0
total_correct = 0
total_tokens = 0

idx = 0
for X, y in test_loader:
batch_loss, batch_correct, batch_tokens = test_step(X, y, Loss, net)

test_loss += batch_loss
total_correct += batch_correct
total_tokens += batch_tokens

if idx % 5 == 0:
print(f"Epoch {epoch}, Batch {idx}, Test Loss: {test_loss/(idx+1):.5f}, "
f"Test Accuracy: {total_correct/total_tokens:.5f}")
idx += 1

print(f"Epoch {epoch}, Test Loss: {test_loss/len(test_loader):.5f}, "
f"Test Accuracy: {total_correct/total_tokens:.5f}")

# ====== 模型保存(根据 accuracy 提升) ======
if total_correct/total_tokens > best_accuracy:
best_accuracy = total_correct/total_tokens
torch.save(net.state_dict(), "model/model.pt")
print("Model improved. Saved.")
else:
print("No improvement. Skipping save.")
scheduler.step(loss)

5. 生成流程

生成流程比较简单,就拿到用户输入,然后分解成token,然后传入模型跑出 logit,通过SoftMax拿到概率生成即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class Generate:
"""
文本生成类(基础版本:贪婪搜索)
功能:
- 加载模型
- 加载词表
- 对文本分词并转换为 token id
- 使用贪婪搜索逐 token 生成
"""

def __init__(self, device):
# 初始化 Transformer 模型
self.model = Model(
num_heads=CONFIG["num_heads"],
num_transformer_blocks=CONFIG["num_transformer_blocks"],
emb_dim=CONFIG["emb_dim"],
seq_length=CONFIG["seq_length"],
vocab_size=CONFIG["vocab_size"],
dropout=CONFIG["dropout"]
)

# 加载已经训练好的权重
self.model.load("./model/model.pt")
self.device = device

# 移动模型到 CPU/GPU
self.model = self.model.to(self.device)
self.model.eval() # 推理时必须 eval()

# 加载词表映射
self.load_id_to_vocab_and_vocab_to_id()

# 特殊 token ID
self.unk_id = self.vocab_to_id["<unk>"]
self.eos_id = self.vocab_to_id["<eos>"]
self.pad_id = self.vocab_to_id["<pad>"]

def load_id_to_vocab_and_vocab_to_id(self):
"""
加载词表映射:
- id_to_vocab: id -> 词
- vocab_to_id: 词 -> id
"""
import json
with open("./model/id_to_vocab.json", "r", encoding="utf-8") as f:
self.id_to_vocab = json.load(f)

# json 读出来键是字符串,所以要转 int
self.id_to_vocab = {int(k): v for k, v in self.id_to_vocab.items()}

# 构建反向词表
self.vocab_to_id = {v: k for k, v in self.id_to_vocab.items()}

def get_output(self, input):
"""
使用模型预测下一个 token(贪婪策略:取最大概率项)
input: (1, seq_length)
return: 最后一个位置的预测 token id
"""
with torch.no_grad():
self.model.eval()

output = self.model(input) # (1, seq_len, vocab_size)
output = output[:, -1, :] # 只取最后一个 token 的输出
output_id = output.argmax(dim=-1) # 取最大概率的 token
return output_id.item()

def tokenize_text(self, text):
"""
将输入文本分词 → 转 token id
"""
if isinstance(text, str):
import jieba
words = jieba.lcut(text)
else:
words = text

# 将词映射到 ID,没有的映射到 <unk>
token_ids = [self.vocab_to_id.get(word, self.unk_id) for word in words]
return token_ids

def generate_greedy(self, text, max_length=100):
"""
贪婪搜索文本生成:
每一步都选概率最大的 token
"""
print("question:", text)

# 分词 → id
input_tokens = self.tokenize_text(text)

# 序列长度补齐到固定长度(左侧 pad)
if len(input_tokens) > CONFIG["seq_length"]:
input_tokens = input_tokens[-CONFIG["seq_length"]:]
else:
padding = [self.pad_id] * (CONFIG["seq_length"] - len(input_tokens))
input_tokens = padding + input_tokens

# 转 tensor
input_tensor = torch.tensor([input_tokens], dtype=torch.long).to(self.device)

print("<answer>", "我理解你的问题:", end="")
generated_text = ""

# 逐 token 生成
for step in range(max_length):
next_token_id = self.get_output(input_tensor)
next_token = self.id_to_vocab.get(next_token_id, "<unk>")

if next_token_id == self.eos_id:
break

print(next_token, end="")
generated_text += next_token

# 滑动窗口:去掉最左 token,加上新 token
new_input = input_tensor[0, 1:].tolist() + [next_token_id]
input_tensor = torch.tensor([new_input], dtype=torch.long).to(self.device)

print("<answer>")
return generated_text



# ---------------- 高级版本:采样生成 -------------------

class AdvancedGenerate(Generate):
"""
基于采样的文本生成类:
- 温度 temperature 控制随机性
- top-k 限制采样范围,使文本更合理
"""

def __init__(self, device):
super().__init__(device)

def get_output_with_sampling(self, input_tensor, temperature=1.0, top_k=50):
"""
使用温度采样 + Top-k 选择下一 token
"""
with torch.no_grad():
self.model.eval()

input_tensor = input_tensor.to(self.device)
output = self.model(input_tensor) # (1, seq_len, vocab_size)
next_token_logits = output[:, -1, :] # 取最后一个 token 的预测分布

# 温度缩放
next_token_logits = next_token_logits / temperature

# top-k 筛选低概率 token
if top_k is not None:
threshold = torch.topk(next_token_logits, top_k)[0][..., -1, None]
indices_to_remove = next_token_logits < threshold
next_token_logits[indices_to_remove] = -float('Inf')

# softmax 得到概率
probs = torch.softmax(next_token_logits, dim=-1)

# 按概率采样
next_token_id = torch.multinomial(probs, num_samples=1)
return next_token_id.item()

def generate_with_sampling(self, text, max_length=100, temperature=0.8, top_k=50):
"""
使用 top-k + 温度采样生成文本
"""
input_tokens = self.tokenize_text(text)

# 左 pad 保持长度
if len(input_tokens) > CONFIG["seq_length"]:
input_tokens = input_tokens[-CONFIG["seq_length"]:]
else:
padding = [self.pad_id] * (CONFIG["seq_length"] - len(input_tokens))
input_tokens = padding + input_tokens

input_tensor = torch.tensor([input_tokens], dtype=torch.long).to(self.device)

print("<answer>", "我理解你的问题:", end="")

generated_text = ""

for step in range(max_length):
next_token_id = self.get_output_with_sampling(
input_tensor,
temperature=temperature,
top_k=top_k
)

next_token = self.id_to_vocab.get(next_token_id, "<unk>")

if next_token_id == self.eos_id:
break

print(next_token, end="")
generated_text += next_token

# 滑动窗口更新输入
new_input = input_tensor[0, 1:].tolist() + [next_token_id]
input_tensor = torch.tensor([new_input], dtype=torch.long).to(self.device)

print("<answer>")
return generated_text

结束

本文章为作者的学习笔记,仅供参考,知识来自论文,B站讲解,Deepseek,ChatGpt和豆包。感谢指正。