本文共 19698 字,大约阅读时间需要 65 分钟。
这是官方NLP From Scratch的一个教程(3/3),原,在这个项目中,我们将搭建神经网络,将法语翻译成英语,本文是其详细的注解
编码器网络将输入序列压缩为一个向量,而解码器网络将该向量展开为一个新序列
改进此模型,使用注意机制,该机制可让解码器学习将注意力集中在输入序列的特定范围内需要加载名为’data/eng-fra.txt’的文件
编码方式:将一种语言中的每个单词表示为一个单向矢量:
这其实也是one-hot编码,但与之前所有字母中可能存在的几十个字符相比,单词有很多,因此编码向量要大得多。 这里作弊并整理数据以使每种语言仅使用几千个单词
首先创建函数:
def readLangs(lang1, lang2, reverse=False)
读文件进来,发现有一些:\u202f这种符号,所以再创建函数:
def normalizeString(s)
进行标准化,看一下re库的这两个函数用法:
s = 'I love you!?,'s = re.sub(r"([.!?])", r" \1", s)print(s)s = re.sub(r"[^a-zA-Z.!?]+", r"6", s)print(s)
I love you ! ?,I6love6you!?6
生成的pairs
就是:
[...['plastic surgery alone will not make you any less ugly .', 'la chirurgie plastique seule ne vous rendra pas moins laid .']...]
如果需要其他语言 → 英语,可以用reversed()
函数,返回的是一个反转的迭代器,需要使用list()
函数,原理如下:
x = ['plastic surgery alone will not make you any less ugly .', 'la chirurgie plastique seule ne vous rendra pas moins laid .']pairs = list(reversed(x))print(pairs)
['la chirurgie plastique seule ne vous rendra pas moins laid .', 'plastic surgery alone will not make you any less ugly .']
没有list
就是:
希望为每个单词创建一个唯一的索引,定义一个类class Lang
,该类具有单词→索引(word2index
)和索引→单词(index2word
)字典,以及每个要使用的单词word2count
的计数,以便以后替换稀有词
在这里,最大长度为 10 个字(包括结尾的标点符号),并且过滤翻译成“i’m”或“he’s”等形式的句子(考虑到前面已替换掉撇号的情况)
关于startswith(str, beg=0,end=len(string))
函数,用于检查字符串是否是以指定子字符串开头,如果是则返回 True,否则返回 False。如果参数 beg 和 end 指定值,则在指定范围内检查
这里直接把原文prepareData函数写道主函数里面,原文默认英语→其他语言,但是它两次的结果演示都是其他语言 → 英语,所以默认reverse=True
才对:
print("Counted words:")print(input_lang.name, input_lang.n_words)print(output_lang.name, output_lang.n_words)print(random.choice(pairs))
Counted words:fra 4345eng 2803['il fait une promenade .', 'he is taking a walk .']
序列到序列网络或 seq2seq 网络或编码器解码器网络是由两个称为编码器和解码器的 RNN 组成的模型,编码器读取输入序列并输出单个向量,而解码器读取该向量以产生输出序列
与使用单个 RNN 进行序列预测(每个输入对应一个输出)不同,seq2seq 模型使我们摆脱了序列长度和顺序的限制,这使其非常适合在两种语言之间进行翻译
例如:Je ne suis pas le chat noir(我不是黑猫)单个词的百度翻译:
输入句子中的大多数单词在输出句子中具有直接翻译,但是顺序略有不同,例如 “chat noir”和“black cat”,由于采用“ ne / pas”结构,因此在输入句子中还有一个单词。 直接从输入单词的序列中产生正确的翻译将是困难的
使用 seq2seq 模型,编码器创建单个矢量,在理想情况下,该矢量将输入序列的“含义”编码为单个矢量-句子的某些 N 维空间中的单个点
先来看一下门控循环单元(gated recurrent unit, GRU)
参考此
LSTM门控网络结构过于复杂与冗余
GRU将遗忘门和输入门合并成更新门,同时将记忆单元与隐藏层合并成了重置门,进而让整个结构运算变得更加简化且性能得以增强。 每个隐藏单元都有单独的重置和更新门当重置门接近于0时,隐藏状态被迫忽略先前的隐藏状态,仅用当前输入进行复位
这有效地使隐藏状态可以丢弃将来以后发现不相关的任何信息,从而允许更紧凑的表示另一方面,更新门控制从前一个隐藏状态将有多少信息转移到当前隐藏状态。这类似于LSTM网络中的记忆单元,并有助于RNN记住长期信息
运算过程如下:
对于每一对,我们将需要一个输入张量(输入句子中单词的索引)和目标张量(目标句子中单词的索引),创建这些向量时,将 EOS 令牌附加到两个序列上
对于pairs[0]
,即['j ai ans .', 'i m .']
,由tensorFromPair(pair)
函数返回的input_tensor
为:
tensor([[2], [3], [4], [5], [1]], device='cuda:0')
其大小为torch.Size([5, 1]),target_tensor
为:
tensor([[2], [3], [4], [1]], device='cuda:0')
其大小为torch.Size([4, 1])
seq2seq 网络的编码器是 RNN,它为输入句子中的每个单词输出一些值。 对于每个输入字,编码器输出一个向量和一个隐藏状态,并将隐藏状态用于下一个输入字
解码器是另一个 RNN,它采用编码器输出矢量并输出单词序列来创建翻译
在最简单的 seq2seq 解码器中,我们仅使用编码器的最后一个输出。 最后的输出有时称为上下文向量,因为它对整个序列的上下文进行编码。 该上下文向量用作解码器的初始隐藏状态,在解码的每个步骤中,为解码器提供输入令牌和隐藏状态。 初始输入令牌是字符串开始,第一个隐藏状态是上下文向量(编码器的最后一个隐藏状态)
如果仅上下文向量在编码器和解码器之间传递,则该单个向量承担对整个句子进行编码的负担,Attention使解码器网络可以针对解码器自身输出的每一步,“专注”于编码器输出的不同部分。 首先,我们计算一组Attention权重。 将这些与编码器输出向量相乘以创建加权组合。 结果(在代码中称为attn_applied)应包含有关输入序列特定部分的信息,从而帮助解码器选择正确的输出字
计算注意力权重的方法是使用另一个前馈层attn,并使用解码器的输入和隐藏状态作为输入。 由于训练数据中包含各种大小的句子,因此要实际创建和训练该层,我们必须选择可以应用的最大句子长度(输入长度,用于编码器输出)。最大长度的句子将使用所有注意权重,而较短的句子将仅使用前几个
关于: torch.bmm()是tensor中的一个相乘操作,类似于矩阵中的A*B,input(p,m,n) * mat2(p,n,a) ->output(p,m,a),前一个矩阵的列等于后面矩阵的行才可以相乘,例如:import torchx = torch.rand(2,3,6)y = torch.rand(2,6,7)print(torch.bmm(x,y).size())
torch.Size([2, 3, 7])
为了进行训练,我们通过编码器运行输入语句,并跟踪每个输出和最新的隐藏状态。 然后,为解码器提供SOS_TOKEN令牌作为其第一个输入,并将编码器的最后一个隐藏状态作为其第一个隐藏状态
“教师强制”的概念是使用实际目标输出作为每个下一个输入,而不是使用解码器的猜测作为下一个输入。 使用教师强制会导致其收敛更快,但是当使用受过训练的网络时,可能会显示不稳定
您可以观察到以教师为主导的网络的输出,这些输出阅读的是连贯的语法,但却偏离了正确的翻译-直观地,它学会了代表输出语法,并且一旦老师说了最初的几个单词就可以“理解”含义,但是,它还没有正确地学习如何从翻译中创建句子
由于 PyTorch 的 autograd 具有给我们的自由,我们可以通过简单的 if 语句随意选择是否使用教师强迫。 调高teacher_forcing_ratio
以使用更多功能
再看一下:
x = torch.rand(2,3,4)print(x)topv, topi = x.topk(1)print(topv, topi)
tensor([[[0.9181, 0.4981, 0.9914, 0.2432], [0.6078, 0.0700, 0.5814, 0.1134], [0.5261, 0.4351, 0.7423, 0.5492]], [[0.4094, 0.0585, 0.6755, 0.5136], [0.2214, 0.1035, 0.8093, 0.9017], [0.3362, 0.2959, 0.6774, 0.8712]]])tensor([[[0.9914], [0.6078], [0.7423]], [[0.6755], [0.9017], [0.8712]]]) tensor([[[2], [0], [2]], [[2], [3], [3]]])
关于
x = torch.rand(1, 2560)topv, topi = x.topk(1)print(topi)print(topi.squeeze()) # 将输入张量形状中的1 去除并返回,如果输入是形如(A×1×B×1×C×1×D),那么输出形状就为: (A×B×C×D)print(topi.squeeze().detach()) # 返回一个新的Tensor,从当前图中脱离出来,该tensor不会要求更新梯度,也就是梯度在这中断了print(topi.squeeze().detach().item())
tensor([[1217]])tensor(1217)tensor(1217)1217
关于绘图,原代码把纵轴的间隔设成了0.2:
import matplotlib.pyplot as pltplt.switch_backend('agg')import matplotlib.ticker as tickerimport numpy as npdef showPlot(points): plt.figure() fig, ax = plt.subplots() # this locator puts ticks at regular intervals loc = ticker.MultipleLocator(base=0.2) ax.yaxis.set_major_locator(loc) plt.plot(points)
训练所得曲线如图所示:
0.> je suis mauvais aux echecs .= i m terrible at chess .< i am a chess chess .1.> c est un tres bon joueur de tennis .= he is a good tennis player .< he is a good tennis player . 2.> elle porte de l ombre a paupiere .= she s wearing eye shadow .< she s wearing her birth . 3.> je suis ravi de votre travail .= i m very pleased with your work .< i m pleased your your work . 4.> je suis creatif .= i m creative .< i m thorough .
注意机制的一个有用特性是其高度可解释的输出。 因为它用于加权输入序列的特定编码器输出,所以我们可以想象一下在每个时间步长上网络最关注的位置。
可以简单地运行plt.matshow(attentions)
以将注意力输出显示为矩阵,其中列为输入步骤,行为输出步骤
input = elle a cinq ans de moins que moi .output = she s five years younger than me .input = elle est trop petit .output = she s too short . input = je ne crains pas de mourir .output = i m not scared to die . input = c est un jeune directeur plein de talent .output = he s a talented young person .
import unicodedataimport redef normalizeString(s): # 将 Unicode 字符转换为 ASCII Ascii = [] for c in unicodedata.normalize('NFD', s): if unicodedata.category(c) != 'Mn': Ascii.append(c) # 将ASCII列表转化为字符串,并将所有内容都转换为小写,并修剪大多数标点符号 s = ''.join(Ascii).lower().strip() s = re.sub(r"([.!?])", r" \1", s) # 在.!?前面加上空格 s = re.sub(r"[^a-zA-Z.!?]+", r" ", s) # 只保留a-zA-Z.!?并在其后加空格 return sSOS_token = 0 # Start of sentenceEOS_token = 1 # End of sentenceclass Lang: def __init__(self, name): self.name = name self.word2index = { } self.word2count = { } self.index2word = { 0: "SOS", 1: "EOS"} self.n_words = 2 def addSentence(self, sentence): for word in sentence.split(' '): self.addWord(word) def addWord(self, word): if word not in self.word2index: self.word2index[word] = self.n_words self.index2word[self.n_words] = word self.word2count[word] = 1 self.n_words += 1 else: self.word2count[word] += 1def readLangs(lang1='eng', lang2='fra', reverse=False): # 默认英语→其他语言,可以用reverse反转 print('Reading lines...') file = open(path + 'eng-fra.txt', encoding='utf-8') lines = file.read().strip().split('\n') pairs = [[normalizeString(s) for s in l.split('\t')]for l in lines] if reverse: pairs = [list(reversed(i)) for i in pairs] input_lang = Lang(lang2) # 初始化class里面的name output_lang = Lang(lang1) else: input_lang = Lang(lang1) output_lang = Lang(lang2) return input_lang, output_lang, pairsMAX_LENGTH = 10eng_prefixes = ( "i am ", "i m ", "he is", "he s ", "she is", "she s ", "you are", "you re ", "we are", "we re ", "they are", "they re ")def filterPair(pairs): pair_filter = [] for p in pairs: if len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH and p[1].startswith(eng_prefixes): pair_filter.append(p) return pair_filterdef tensorFromSentence(lang, sentence): indexes = [] for word in sentence.split(' '): indexes.append(lang.word2index[word]) indexes.append(EOS_token) return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)def tensorFromPair(pair): input_tensor = tensorFromSentence(input_lang, pair[0]) target_tensor = tensorFromSentence(output_lang, pair[1]) return (input_tensor, target_tensor)import torchdevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")import torch.nn as nnclass EncoderRNN(nn.Module): def __init__(self, input_size, hidden_size): # input_lang.n_words(4345), hidden_size=256 super(EncoderRNN, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) # embedding_dim=hidden_size, hidden_size, num_layers=1, bidirectional=False def forward(self, input, hidden): # input.size()= torch.Size([1]) embedded = self.embedding(input).view(1, 1, -1) # 1 * hidden_size → 1 * 1 * hidden_size:torch.Size([1, 1, 256]) output = embedded output, hidden = self.gru(output, hidden) return output, hidden def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device) # num_layers * num_directions, batch, hidden_size# 简单解码器import torch.nn.functional as F# class DecoderRNN(nn.Module):# def __init__(self, hidden_size, output_size):# super(DecoderRNN,self).__init__()# self.hidden_size = hidden_size## self.embedding = nn.Embedding(output_size, hidden_size)# self.gru = nn.GRU(hidden_size, hidden_size)# self.out = nn.Linear(hidden_size, output_size)# self.softmax = nn.LogSoftmax(dim=1)## def forward(self, input, hidden):# output = self.embedding(input).view(1, 1, -1)# output = F.relu(output)# output, hidden = self.gru(output, hidden)# output = self.softmax(self.out(output[0]))# return output, hidden## def initHidden(self):# return torch.zeros(1, 1, self.hidden_size, device=device)# Attention 解码器class AttnDecoderRNN(nn.Module): def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH): super(AttnDecoderRNN, self).__init__() self.hidden_size = hidden_size self.output_size = output_size self.dropout_p = dropout_p self.max_length = max_length self.embedding = nn.Embedding(self.output_size, self.hidden_size) self.dropout = nn.Dropout(self.dropout_p) self.attn = nn.Linear(self.hidden_size * 2, self.max_length) self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size) self.gru = nn.GRU(self.hidden_size, self.hidden_size) self.out = nn.Linear(self.hidden_size, self.output_size) def forward(self, input, hidden, encoder_outputs): embedded = self.embedding(input).view(1, 1, -1) # 1 * 1 * 256 embedded = self.dropout(embedded) attn_weights = F.softmax( self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1 # (1 * 256与1 * 256) → Linear 1 * 10 ) attn_applied = torch.bmm(attn_weights.unsqueeze(0),encoder_outputs.unsqueeze(0)) # 1 * 1 * 10, 1 * max_length * encoder.hidden_size → 1 * 1 * 256 output = torch.cat((embedded[0], attn_applied[0]), 1) # 1 * 512 output = self.attn_combine(output).unsqueeze(0) # 1 * 1* 256 output = F.relu(output) output, hidden = self.gru(output, hidden) output = F.log_softmax(self.out(output[0]), dim=1) return output, hidden, attn_weights def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)from torch import optimteacher_forcing_ratio = 0.5def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH): encoder_hidden = encoder.initHidden() encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() input_length = input_tensor.size()[0] target_length = target_tensor.size()[0] encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device) loss = 0 for ei in range(input_length): encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden) encoder_outputs[ei] = encoder_output[0][0] decoder_input = torch.tensor([SOS_token], device=device) decoder_hidden = encoder_hidden use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False if use_teacher_forcing: # Learn from reference for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs) loss += criterion(decoder_output, target_tensor[di]) # 1 * output_size decoder_input = target_tensor[di] # Teacher forcing else: # Learn from model for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs) loss += criterion(decoder_output, target_tensor[di]) topv, topi = decoder_output.topk(1) decoder_input = topi.detach() # detach from history as input if decoder_input.item() == EOS_token: break loss.backward() encoder_optimizer.step() decoder_optimizer.step() return loss.item() / target_lengthimport randomimport matplotlib.pyplot as pltdef trainIters(encoder, decoder): start = time.time() n_iters = 75000 print_every = 1000 plot_every = 100 print_loss_total = 0 # Reset every print_every plot_loss_total = 0 # Reset every plot_every encoder_optimizer = optim.SGD(encoder.parameters(), lr=0.01) decoder_optimizer = optim.SGD(decoder.parameters(), lr=0.01) training_pairs = [tensorFromPair(random.choice(pairs)) for i in range(75000)] # n_iters criterion = nn.NLLLoss() plot_losses = [] for iter in range(1, n_iters + 1): training_pair = training_pairs[iter - 1] input_tensor = training_pair[0] target_tensor = training_pair[1] loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion) print_loss_total += loss plot_loss_total += loss if iter % print_every == 0: print_loss_avg = print_loss_total / print_every print_loss_total = 0 print('{} 当前iter={} 完成进度{:.3f}% Loss:{:.4f}'.format(timeSince(start, iter / n_iters), iter, iter / n_iters * 100, print_loss_avg)) if iter % plot_every == 0: plot_loss_avg = plot_loss_total / plot_every plot_loss_total = 0 plot_losses.append(plot_loss_avg) torch.save(encoder.state_dict(), '... your path\\model_encoder.pth') torch.save(decoder.state_dict(), '... your path\\model_decoder.pth') plt.figure() plt.plot(plot_losses) plt.show()import timeimport mathdef asMinutes(s): m = math.floor(s / 60) s -= m * 60 return '%dm %ds' % (m, s)def timeSince(since, percent): #用于在给定当前时间和进度%的情况下打印经过的时间和估计的剩余时间 now = time.time() s = now - since es = s / (percent) rs = es - s return '运行时间 %s (估计的剩余时间 %s)' % (asMinutes(s), asMinutes(rs))def evaluate(encoder, decoder, sentence): max_length = MAX_LENGTH encoder.load_state_dict(torch.load('... your path\\model_encoder.pth')) decoder.load_state_dict(torch.load('... your path\\model_decoder.pth')) with torch.no_grad(): input_tensor = tensorFromSentence(input_lang, sentence) input_length = input_tensor.size()[0] encoder_hidden = encoder.initHidden() encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device) for ei in range(input_length): encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden) encoder_outputs[ei] += encoder_output[0][0] decoder_input = torch.tensor([[SOS_token]], device=device) decoder_hidden = encoder_hidden decoder_words = [] decoder_attentions = torch.zeros(max_length, max_length) for di in range(max_length): decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs) decoder_attentions[di] = decoder_attention.data topv, topi = decoder_output.data.topk(1) if topi.item() == EOS_token: decoder_words.append('') break else: decoder_words.append(output_lang.index2word[topi.item()]) decoder_input = topi.detach() return decoder_words, decoder_attentions[:di+1]def evaluateRandomly(encoder, decoder, n=5): for i in range(n): pair = random.choice(pairs) print(str(i) + '.') print('>', pair[0]) print('=', pair[1]) decoder_words, decoder_attentions = evaluate(encoder1, attn_decoder1, pair[0]) print('<', ' '.join(decoder_words))import matplotlib.ticker as tickerdef showAttention(input_sentence, output_words, attentions): # 用色条设置图形 fig = plt.figure() ax = fig.add_subplot(111) cax = ax.matshow(attentions.numpy()) # colormap, cmap='bone' fig.colorbar(cax) # 设置轴 ax.set_xticklabels([''] + input_sentence.split(' ') + [' '], rotation=90) ax.set_yticklabels([''] + output_words) # Show label at every tick ax.xaxis.set_major_locator(ticker.MultipleLocator(1)) ax.yaxis.set_major_locator(ticker.MultipleLocator(1)) plt.show()def evaluateAndShowAttention(input_sentence): output_words, attentions = evaluate(encoder1, attn_decoder1, input_sentence) print('input =', input_sentence) print('output =', ' '.join(output_words)) showAttention(input_sentence, output_words, attentions)if __name__ == '__main__': path = '... your path\\' # prepareData input_lang, output_lang, pairs = readLangs(reverse=True) print('Read %d sentence pairs' % len(pairs)) pairs = filterPair(pairs) print('Trimmed to %d sentence pairs' % len(pairs)) for pair in pairs: input_lang.addSentence(pair[0]) output_lang.addSentence(pair[1]) hidden_size = 256 encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device) attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device) # 以下为训练 # trainIters(encoder1, attn_decoder1) # 以下为测试1 # sentence = 'vous etes avides .' # decoder_words, decoder_attentions = evaluate(encoder1, attn_decoder1, sentence) # print(' '.join(decoder_words)) # 以下为测试2 # evaluateRandomly(encoder1, attn_decoder1, n=5) # 以下为注意力可视化1 # output_words, attentions = evaluate(encoder1, attn_decoder1, "je suis trop froid .") # plt.matshow(attentions.numpy()) # plt.show() # 以下为注意力可视化2 evaluateAndShowAttention("elle a cinq ans de moins que moi .") evaluateAndShowAttention("elle est trop petit .") evaluateAndShowAttention("je ne crains pas de mourir .") evaluateAndShowAttention("c est un jeune directeur plein de talent .")
转载地址:http://fwtrn.baihongyu.com/