在自注意力里,Q(查询向量)和 K(键向量)要做点积:
结果是什么?
相对位置关系自然保留:Q 和 K 的旋转差值就是它们的相对位置
外推能力强:没见过的长序列也能用,因为旋转是周期性的
# -*- coding: utf-8 -*-importtorchimporttorch.nnasnnimporttorch.nn.functionalasFimporttorch.optimasoptimimportjiebaimportmatplotlib.pyplotaspltimportnumpyasnpplt.rcParams['font.sans-serif'] = ['SimHei'] # 中文字体plt.rcParams['axes.unicode_minus'] =False# ===== 准备《水浒传》样本文本 =====text_samples = ["""张天师祈禳瘟疫,洪太尉误走妖魔。话说大宋天子仁宗皇帝在位年间,京师瘟疫流行,百姓多有染病。天子召张天师入宫祈禳,命洪太尉押送香火,不料误开封印,放出妖魔。""","""王教头私走延安府,九纹龙大闹史家村。史进自幼好武,学成十八般武艺,因打死恶霸,被官府缉拿。王进教头见势不妙,离开东京前往延安府,途经史家村。""","""史大郎夜走华阴县,鲁提辖拳打镇关西。史进与鲁达结义,路遇镇关西郑屠,见其欺压妇女,鲁达愤然出手,三拳打死郑屠,遂落草为寇。"""]# ===== 中文分词 =====deftokenize_texts(text_list):tokenized = []fortintext_list:words =list(jieba.cut(t))words = [w.strip()forwinwordsifw.strip()]tokenized.append(words)returntokenizedsentences = tokenize_texts(text_samples)# ===== 构建词表 =====vocab = {}forsentinsentences:forwinsent:ifwnotinvocab:vocab[w] =len(vocab)vocab["<AD>"] =len(vocab)
vocab_size =len(vocab)embed_dim =32seq_len =max(len(s)forsinsentences)# 将句子转为索引,并paddefencode_sentences(sentences, vocab, seq_len):data = []forsinsentences:idxs = [vocab[w]forwins]iflen(idxs) < seq_len:idxs += [vocab["<AD>"]] * (seq_len -len(idxs))
data.append(idxs)returntorch.tensor(data)input_ids = encode_sentences(sentences, vocab, seq_len)# ===== RoPE实现 =====defapply_rope(x):"""支持输入维度:- (B, T, D) 或- (B, T, H, D)返回相同形状,且对最后一维做 RoPE(要求 D 为偶数)"""orig_shape = x.shapeiflen(orig_shape) ==3:# (B, T, D) -> 转为 (B, T, 1, D) 方便统一处理x = x.unsqueeze(2)squeezed =Trueelse:squeezed =False# 形状为 (B, T, H, D)# 现在 x.shape = (B, T, H, D)bsz, seqlen, nheads, head_dim = x.shapeasserthead_dim %2==0,"head_dim must be even for RoPE"device = x.devicedtype = x.dtypehalf = head_dim //2# theta: (half,)theta =10000** (-torch.arange(0, half, device=device, dtype=dtype) / half) # (half,)# seq positions: (seqlen,)seq_idx = torch.arange(seqlen, device=device, dtype=dtype) # (seqlen,)# freqs: (seqlen, half)freqs = torch.einsum('n,d->nd', seq_idx, theta)cos = freqs.cos().view(1, seqlen,1, half) # (1, T, 1, half)sin = freqs.sin().view(1, seqlen,1, half) # (1, T, 1, half)x1 = x[..., :half] # (B, T, H, half)x2 = x[..., half:] # (B, T, H, half)x_rotated = torch.cat([x1 * cos - x2 * sin,x1 * sin + x2 * cos], dim=-1) # (B, T, H, D)ifsqueezed:x_rotated = x_rotated.squeeze(2) # back to (B, T, D)returnx_rotated# ===== 多头注意力 with RoPE =====classMultiHeadSelfAttentionRoPE(nn.Module):def__init__(self, embed_dim, num_heads, dropout=0.1):super().__init__()self.embed_dim = embed_dimself.num_heads = num_headsself.head_dim = embed_dim // num_headsself.dropout = dropoutself.q_proj = nn.Linear(embed_dim, embed_dim)self.k_proj = nn.Linear(embed_dim, embed_dim)self.v_proj = nn.Linear(embed_dim, embed_dim)self.out_proj = nn.Linear(embed_dim, embed_dim)self.last_attn_weights =Nonedefforward(self, x):B, T, C = x.size()q = self.q_proj(x).view(B, T, self.num_heads, self.head_dim)k = self.k_proj(x).view(B, T, self.num_heads, self.head_dim)v = self.v_proj(x).view(B, T, self.num_heads, self.head_dim)# 应用 RoPEq = apply_rope(q)k = apply_rope(k)# 注意力计算attn_scores = torch.einsum('bthd,bshd->bhts', q, k) / (self.head_dim **0.5)attn_weights = F.softmax(attn_scores, dim=-1)attn_weights = F.dropout(attn_weights, p=self.dropout, training=self.training)self.last_attn_weights = attn_weights.detach()out = torch.einsum('bhts,bshd->bthd', attn_weights, v)out = out.reshape(B, T, C)returnself.out_proj(out)# ===== 模型训练 =====embedding = nn.Embedding(vocab_size, embed_dim)model = MultiHeadSelfAttentionRoPE(embed_dim, num_heads=4, dropout=0.1)criterion = nn.MSELoss()optimizer = optim.Adam(list(model.parameters()) +list(embedding.parameters()), lr=1e-3)epochs =200forepochinrange(epochs):model.train()x = embedding(input_ids)target = x.clone()out = model(x)loss = criterion(out, target)optimizer.zero_grad()loss.backward()optimizer.step()if(epoch +1) %50==0:print(f"Epoch{epoch+1}, Loss:{loss.item():.6f}")# ===== 注意力热图可视化 =====defplot_attention(attn, sentence_tokens, filename):heads = attn.shape[0]fig, axes = plt.subplots(1, heads, figsize=(4*heads,4))ifheads ==1:axes = [axes]forhinrange(heads):ax = axes[h]attn_head = attn[h].numpy()im = ax.imshow(attn_head, cmap='viridis')ax.set_xticks(np.arange(len(sentence_tokens)))ax.set_yticks(np.arange(len(sentence_tokens)))ax.set_xticklabels(sentence_tokens, rotation=90)ax.set_yticklabels(sentence_tokens)ax.set_title(f"Head{h+1}")fig.colorbar(im, ax=ax)plt.tight_layout()plt.savefig(filename)plt.close()model.eval()withtorch.no_grad():x = embedding(input_ids)_ = model(x)attn_weights = model.last_attn_weights # (batch, heads, seq, seq)fori, tokensinenumerate(sentences):attn = attn_weights[i]plot_attention(attn.cpu(), tokens,f"rope_attention_sentence{i+1}.png")print("RoPE多头注意力热图已生成,文件名为 rope_attention_sentenceX.png")
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |