基于bert预训练高中知识点的单轮对话机器人
基于bert预训练的单轮对话机器人
·
代码和数据集下载链接:
基于bert预训练的高中知识点单轮对话机器人-自然语言处理文档类资源-CSDN下载
代码主要流程

对话机器人工作流程

1 清洗数据
import os
import numpy as np
import pandas as pd
dicts = {}
error_num = 0
uppath = "原始数据\\"
outpath = "数据生成\\"
subjects = ["高中地理", "高中历史", "高中生物", "高中政治"]
error = []
for subject in subjects:
dicts[subject] = {}
for i in os.listdir(os.path.join(uppath, subject, "origin\\")): # 查看子目录
dicts[subject][i.split(".")[0]] = []
data = pd.read_csv(os.path.join(uppath, subject, "origin\\", i)).item # 读取item列
for j in data:
j = j.split("\n")
if len(j) == 4: # 题目,题目内容,知识点,知识点内容
questions = j[1].replace("知识点:", "") # 问题内容清洗
kg = j[3].split(",") # 多个知识点 以逗号分开
dicts[subject][i.split(".")[0]].append([questions, kg])
elif len(j) > 4:
questions = []
questions_key = False
kg = []
kg_key = False
for tmp in j:
if "[题目]" == tmp:
questions_key = True # 说明后面跟着就是题目内容
continue
if "[知识点:]" == tmp:
questions_key = False
kg_key = True
continue
if questions_key:
questions.append(tmp)
if kg_key:
kg.append(tmp)
if len(kg) > 0:
questions = ''.join(questions).replace("知识点:", "")
kg = "".join(kg).split(",") # 多个知识点
dicts[subject][i.split(".")[0]].append([questions, kg])
else:
error_num += 1
error.append([subject, i, j])
else:
error_num += 1
error.append([subject, i, j])
os.mkdir(outpath) if not os.path.exists(outpath) else 1
# 也可在数据清理中完成
data_new = []
for km1 in dicts: # 主课程
for km2 in dicts[km1]: # 子课程
for data in dicts[km1][km2]: # 知识点
for kg in data[1]: # 一个知识点给一个题目
data_new.append([km1, km2, kg, data[0]]) #[主课程, 子课程, 知识点, 题目]
data_new = pd.DataFrame(data_new)
data_new.columns = ["km1", "km2", "kg", "question"]
data_new.to_csv(outpath+"km.csv", index=False)
2 构建实体表和关系表
2.1 构建知识点实体表和关系表
import pandas as pd
import numpy as np
import os
import random
# 设置随机数种子
random.seed(2)
def long_num_str(data): # 把数字转为str
data = str(data) + "\t"
return data
outpath = "数据生成\\"
data = pd.read_csv(outpath+"km.csv")
# 实体表
entity = None
for i in data: # 列名
names = list(set(eval("data."+i))) # 每一列去重复
ids = [hash(j) for j in names] # hash值
labels = [i for j in names] # 标签都是列名
if entity is None:
entity = pd.DataFrame(np.array([ids, names, labels]).transpose())
else:
entity = entity.append(pd.DataFrame(np.array([ids, names, labels]).transpose())) # 添加 不是在原有数据集上添加 有返回值
entity.columns = [":ID", "name", ":LABEL"]
entity[":ID"] = entity[":ID"].map(long_num_str) # 数字转为str
# 保存
os.mkdir(outpath) if not os.path.exists(outpath) else 1
entity.to_csv(outpath+'entity_km.csv', index = False)
# 关系表
relation = None
for i in [[data.keys()[tmp], data.keys()[tmp+1]] for tmp in range(len(data.keys())-1)]:
# i ['km1', 'km2'] ['km2', 'kg'] ['kg', 'question']
tmp = data[i]
tmp = tmp.drop_duplicates(subset=i[1], keep="first", inplace=False) # 在i[1]上去重
start = [hash(j) for j in eval("tmp."+i[0])] # i[0] 一样时 hash值也一样
end = [hash(j) for j in eval("tmp."+i[1])]
# 列名1 -> 列名2
name = [i[0] + '->' + i[1] for j in range(len(tmp))]
if relation is None:
relation = pd.DataFrame(np.array([start, name, end, name]).transpose())
else:
relation = relation.append(pd.DataFrame(np.array([start, name, end, name]).transpose()))
relation.columns = [":START_ID", "name", ":END_ID", ":TYPE"]
relation[':START_ID'] = relation[':START_ID'].map(long_num_str)
relation[':END_ID'] = relation[':END_ID'].map(long_num_str)
#保存
relation.to_csv(outpath + "relation_km.csv", index=False)
2.2 构建古诗词实体表和关系表
import pandas as pd
import numpy as np
import os
import random
# 设置随机数种子
random.seed(2)
def long_num_str(data): # 把数字转为str
data = str(data) + "\t"
return data
# 生成实体表
poems = pd.read_csv('原始数据/古诗/诗句.csv')
poets = pd.read_csv('爬虫清洗后数据/poets.csv')
poets = dict(zip(poets.poet, poets.introduce))
introduce = []
outpath = "数据生成\\"
for i in poems.author:
if i in poets: # poets清洗了一部分数据
introduce.append(poets[i])
else:
introduce.append(np.nan)
poems["introduce"] = introduce # 增加列
# eval 转为list
max_len = max([len(eval(i)) for i in poems['tag'] if type(i) == type('')])
tag = []
for i in poems["tag"]:
if type(i) == type(''): # 不为空
i = eval(i)
i += [np.nan]*(max_len - len(i))
else:
i = [np.nan]*max_len
tag.append(i)
# 添加列
poems[["tag-" + str(i) for i in range(max_len)]] = pd.DataFrame(tag)
# 去掉原来的tag列
poems = poems[['author', 'introduce', 'title', 'content', 'type', 'translate', 'class'] + ['tag-' + str(i) for i in range(max_len)]]
entity = None
for i in poems:
names = list(set(eval(f"poems['{i}']")))
ids = [hash(j) for j in names]
if 'tag-' in i:
i = 'tag'
labels = [i for j in names] # label
if entity is None:
entity = pd.DataFrame(np.array([ids, names, labels]).transpose())
else:
entity = entity.append(pd.DataFrame(np.array([ids, names, labels]).transpose()))
entity.columns = [':ID', 'name', ':LABEL']
entity[':ID'] = entity[':ID'].map(long_num_str)
#保存
os.mkdir(outpath) if not os.path.exists(outpath) else 1
poems.to_csv(outpath+'poems.csv', index=False) # 保存这里的poem表
entity.to_csv(outpath+'entity_poems.csv', index=False)
# 生成关系表
relation = None
poems = pd.read_csv(outpath+'poems.csv')
for i in [['author','title'],['author','introduce'],['title','content'],['title','translate'],
['type','title'],['class','title']]+[['tag-'+str(i),'title'] for i in range(7)]: # 关系表
tmp = poems[i]
tmp = tmp.dropna() # 只要有一个为空就去除
# 去重
tmp = tmp.drop_duplicates(subset=[i[0], i[1]] , keep='first',inplace=False)
start = [hash(j) for j in eval(f'tmp["{i[0]}"]')]
end = [hash(j) for j in eval(f'tmp["{i[1]}"]')]
if 'tag-' in i[0]:
i[0] = 'tag'
name = [i[0] + "->" + i[1] for j in range(len(tmp))]
if relation is None:
relation = pd.DataFrame(np.array([start, name, end, name]).transpose())
else:
relation = relation.append(pd.DataFrame(np.array([start, name, end, name]).transpose()))
relation.columns = [':START_ID', 'name', ':END_ID', ':TYPE']
relation[':START_ID'] = relation[':START_ID'].map(long_num_str) # id从数字转为str
relation[':END_ID'] = relation[':END_ID'].map(long_num_str)
#保存
relation.to_csv(outpath+'relation_poems.csv', index = False)
2.3 分别整合实体表和关系表
import pandas as pd
import numpy as np
import os
path = "数据生成\\"
file_list = ["km", "poems"]
entity_list = None
relation_list = None
for i in file_list:
if entity_list is None:
entity_list = pd.read_csv(path + "entity_" + i +".csv", dtype=str)
relation_list = pd.read_csv(path + "relation_" + i +".csv", dtype=str)
else:
entity_list = entity_list.append(pd.read_csv(path + "entity_" + i +".csv", dtype=str)) # 对pandas append 是要返回接收的
relation_list = relation_list.append(pd.read_csv(path + "relation_" + i +".csv", dtype=str))
# 统一去重
entity_com = entity_list.drop_duplicates(subset=":ID", keep="first", inplace=False)
relation_com = relation_list.drop_duplicates(subset=[':START_ID', 'name', ':END_ID', ':TYPE'], keep="first", inplace=False) # 得用单引号
# 保存
os.mkdir(path + "实体表和关系表\\") if not os.path.exists(path + "实体表和关系表\\") else 1
# entity[":ID"] = entity[":ID"].map(long_num_str) # 数字转为str
entity_com.to_csv(path + "实体表和关系表\\entity.csv", index=False)
relation_com.to_csv(path + "实体表和关系表\\relation.csv", index=False)
3 准备训练数据
3.1 ner训练数据和第二次意图识别训练数据
# 构建ner实体数据集
import pandas as pd
import os
path = "数据生成/实体表和关系表/"
outpath = "数据生成/命名实体识别和意图识别数据/"
data = pd.read_csv(path + "entity.csv")
index = data[":LABEL"].isin(["km1", "km2", "kg", "author", "title", "tag", "type", "class"])
data = data[index]
ner_data = data[["name", ":LABEL"]]
ner_data.columns = ['name', 'label']
#保存
os.mkdir(outpath) if not os.path.exists(outpath) else 1
# ner_data.to_csv(outpath + "ner_entity.csv", index= False)
import pandas as pd
path = "数据生成/命名实体识别和意图识别数据/"
data = pd.read_csv(path + "ner_entity.csv")
data_new = {}
for i in range(len(data)):
i = data.loc[i]
# 对name进行简单的数据清洗
name = i['name'].replace('(','').replace(')','').replace('【','').replace('】','').replace('“','').replace('”','').replace('《','').replace('》','').replace('(','').replace(')','').replace('_','')
# 按标签再分割实体
name_list = [m for j in name.split('·') for k in j.split('、') for q in k.split('、') for p in q.split(',') for m in p.split('・') for n in m.split('/')]
label = i.label
for name in name_list:
if len(name) > 10: # 过滤太长的实体
continue
if label not in data_new:
data_new[label] = [name]
else:
data_new[label].append(name) # 一个标签key 对应多个实体实名
from random import choice
import pickle as pk
# 输入实体前句和实体后句 返回 (x_intent, y_intent, x_ner, y_ner)
def build_seq(data, label, seq1, seq2, intent, model=1, num=0, output=None):
'''data 实体列表, label 实体标签, seq1 实体前句子, seq2 实体后句子, intent 生成语料的意图'''
seqs_intent, labels_intent, seqs_ner, labels_ner = [], [], [], []
seqs_intent.append(list(seq1) + [label] + list(seq2)) # 部分意图识别数据 ['老', '师', 'km1', '有', '哪', '些', '重', '要', '的', '课', '程']
labels_intent.append(intent)
# 命名实体识别数据
if model == 1: # 全遍历
for i in data[label]: # 'km1': ['高中地理', '高中生物', '高中政治', '高中历史'],'km2': ['地球与地图','经济学常识',
seqs_ner.append(list((seq1 + '%s' + seq2)%(i))) # 原句子 ['老', '师', '高', '中', '历', '史', '有', '哪', '些', '重', '要', '的', '课', '程']
if len(i) >= 2: # 转成 ['O', 'O', 'B-km1', 'I-km1', 'I-km1', 'E-km1', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
labels_ner.append(['O']*len(seq1) + ['B-' + label] + ['I-' + label]*(len(i)-2) + ['E-' + label] + ['O']*len(seq2))
if len(i) < 2:
labels_ner.append(['O']*len(seq1) + ['S-' + label] + ['O']*len(seq2))
if model == 2: # 随机生成num个
for i in range(num):
i = choice(data[label])
seqs_ner.append(list((seq1 + '%s' +seq2)%(i)))
if len(i) >= 2:
labels_ner.append(['O']*len(seq1) + ['B-' + label] + ['I-' + label]*(len(i)-2) + ['E-' + label] + ['O']*len(seq2))
if len(i) < 2:
labels_ner.append(['O']*len(seq1) + ['S-' + label] + ['O']*len(seq2))
if output is None:
return seqs_intent, labels_intent, seqs_ner, labels_ner
else: # output(x_intent, y_intent, x_ner, y_ner)
output[0] += seqs_intent
output[1] += labels_intent
output[2] += seqs_ner
output[3] += labels_ner
# 通过 seq1_list, seq2_list 遍历调用 build_seq
def create_dataset(seqs, output, label, intent, keys=1, num=100):
seq1_list, seq2_list = seqs
if keys == 1:
for seq1 in seq1_list:
for seq2 in seq2_list:
build_seq(data=data_new, label=label,seq1=seq1, seq2=seq2, intent=intent, model=1, num=10, output=output)
else:
iffirst = True
model = 1
for seq1 in seq1_list:
for seq2 in seq2_list:
# 第一次是全遍历 第二次就是model 2 了
if not iffirst:
model = 2
iffirst = False
build_seq(data=data_new, label=label, seq1=seq1, seq2=seq2, intent=intent, model=model, num=num, output=output)
# 手动生成 x_intent, y_intent, x_ner, y_ner
x_intent, y_intent, x_ner, y_ner = [], [], [], []
output = [x_intent, y_intent, x_ner, y_ner]
# 包括了ner数据集和17个intent1意图数据集
# km1
label = 'km1'
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的课程','有哪些重要的课','什么课比较重要','需要学什么课',
'有哪些课']
create_dataset([seq1_list,seq2_list],output,label,0)
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的知识点','有哪些知识点需要注意','什么知识点比较重要',
'需要学什么那些知识点', '有哪些知识点','涉及到哪些知识点','包含哪些知识点']
create_dataset([seq1_list,seq2_list],output,label,1)
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的例题需要掌握','有哪些例题需要注意','什么例题需要掌握',
'有哪些例题']
create_dataset([seq1_list,seq2_list],output,label,2)
# km2
label = 'km2'
seq1_list = ['老师','','请问','你好']
seq2_list = ['是哪个学科的课程','是什么学科的']
create_dataset([seq1_list,seq2_list],output,label,3)
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的知识点','有哪些知识点需要注意','什么知识点比较重要',
'需要学什么那些知识点', '有哪些知识点','涉及到哪些知识点','包含哪些知识点']
create_dataset([seq1_list,seq2_list],output,label,4)
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的例题需要掌握','有哪些例题需要注意','什么例题需要掌握',
'有哪些例题']
create_dataset([seq1_list,seq2_list],output,label,5)
label = 'kg'
seq1_list = ['老师','','请问','你好']
seq2_list = ['是什么学科的','是哪个学科的知识点']
create_dataset([seq1_list,seq2_list],output,label,6,keys = 2)
seq1_list = ['老师','','请问','你好']
seq2_list = ['是什么课程的','是哪个课程需要学习的知识点','是哪门课的']
create_dataset([seq1_list,seq2_list],output,label,7,keys = 2)
seq1_list = ['老师','','请问','你好']
seq2_list = ['有哪些重要的例题需要掌握','有哪些例题需要注意','什么例题需要掌握',
'有哪些例题']
create_dataset([seq1_list,seq2_list],output,label,8,keys = 2)
# 针对古诗的处理
# author
label = 'author'
seq1_list = ['老师','','请问','你好']
seq2_list = ['写过哪些诗句啊','有哪些诗','有哪些有名的诗句',
]
create_dataset([seq1_list,seq2_list],output,label,9,keys = 2,num = 500)
seq1_list = ['老师','','请问','你好']
seq2_list = ['是谁啊','这个的生平怎么样','有哪些经历',
]
create_dataset([seq1_list,seq2_list],output,label,10,keys = 2,num = 500)
# title
label = 'title'
seq1_list = ['老师','','请问','你好']
seq2_list = ['是谁写的啊'
]
create_dataset([seq1_list,seq2_list],output,label,11,keys = 2,num = 500)
seq1_list = ['老师','','请问','你好']
seq2_list = ['的诗句有哪些'
]
create_dataset([seq1_list,seq2_list],output,label,12,keys = 2,num = 500)
seq1_list = ['老师','','请问','你好']
seq2_list = ['的翻译是'
]
create_dataset([seq1_list,seq2_list],output,label,13,keys = 2,num = 500)
seq1_list = ['老师','','请问','你好']
seq2_list = ['是什么类型的古诗'
]
create_dataset([seq1_list,seq2_list],output,label,14,keys = 2,num = 500)
seq1_list = ['老师','','请问','你好']
seq2_list = ['出现在几年级的课程'
]
create_dataset([seq1_list,seq2_list],output,label,15,keys = 2,num = 500)
# tag
label = 'tag'
seq1_list = ['老师','','请问','你好']
seq2_list = ['的古诗有哪些'
]
create_dataset([seq1_list,seq2_list],output,label,16,keys = 2,num = 100)
# class
label = 'class'
seq1_list = ['我们','','请问我们']
seq2_list = ['学习的古诗有哪些'
]
create_dataset([seq1_list,seq2_list],output,label,17,keys = 1,num = 100)
len(x_intent),len(y_intent),len(x_ner),len(y_ner)
# 保存
# pk.dump([x_intent, y_intent, x_ner, y_ner], open("数据生成/命名实体识别和意图识别数据/data_intent1_ner.pkl", 'wb'))
3.2 构建第一次意图识别数据
# 语料库
corpus = [list(i.strip()) for i in open("corpus/conversation_test.txt", "r", encoding="utf-8").readlines()][::2] # 步长为2 只取问题
corpus_new = {}
for i in corpus: # i为列表
if str(i) not in corpus_new: # str(i) 是因为列表不能hash
corpus_new[str(i)] = 1
corpus = [eval(i) for i in corpus_new.keys()]
# 处理机器人个人属性数据
import xml.etree.ElementTree as et
robot_data = et.parse("robot_template/robot_template.xml").findall("temp")
# findall'q' 后做简单的数据清理
robot_data = [j.text.replace('(.*)', '').replace('.*', '') for i in robot_data for j in i.find('question').findall('q')]
# 不用处理的数据
data_new = [i for i in robot_data if '[' not in i]
# 需要进一步处理的数据
data_need_deal = [i.replace('+', '').replace('*', '') for i in robot_data if '[' in i]
data_need_deal_new = []
for i in data_need_deal:
# i = i.strip(']')
tmp = []
for j in i.split(']'):
# print(j)
if '[' not in j:
if len(j) != 0:
tmp.append(j)
else:
for k in j.split('['):
if len(k) != 0:
tmp.append(k)
data_need_deal_new.append(tmp)
def build(seq, m=0, tmp='', res=[]):
if m < len(seq): # 判断index
if "|" in seq[m]:
for i in seq[m].split('|'): # 有'|' 就分割
build(seq, m+1, tmp+i, res) # 递归先形成一句话
else:
build(seq, m+1, tmp+seq[m], res) # 没有'|' 就直接拼接
else:
res.append(tmp)
for i in data_need_deal_new:
res = []
build(seq=i, res=res)
data_new += res
data_new = [list(i) for i in data_new]
# 把图数据库 机器人个人属性 语料库的问题合并 作为第一次意图识别数据
data_x = x_intent + data_new + corpus
data_y = [0] * len(x_intent) + [1] * len(data_new) + [2] * len(corpus)
# 保存
pk.dump([data_x, data_y], open("数据生成\命名实体识别和意图识别数据\data_intent0.pkl", "wb"))
4 ner模型
4.1 ner模型训练
import torch
import pickle as pk
import numpy as np
import random
from torch import nn
from torchcrf import CRF
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, get_linear_schedule_with_warmup
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained('../prev_trained_model') # bert的config
tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
[x_intent, y_intent, x_ner, y_ner] = pk.load(open("../../../data/数据生成/命名实体识别和意图识别数据/data_intent1_ner.pkl", 'rb'))
# x_ner:['老', '师', '高', '中', '历', '史', '有', '哪', '些', '重', '要', '的', '课', '程']
# y_ner:['O', 'O', 'B-km1', 'I-km1', 'I-km1', 'E-km1', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
X_train, X_test, y_train, y_test = train_test_split(x_ner, y_ner)
train_data = [(X_train[i], y_train[i]) for i in range(len(X_train))]
test_data = [(X_test[i], y_test[i]) for i in range(len(X_test))]
tag2label = ['O'] + list(set([j for i in y_ner for j in i]) - set('O')) # 需要把'O'放在0位置
tag2label = dict(zip(tag2label, range(len(tag2label)))) # 'B-km1', 'I-km1', 'I-km1', 'E-km1'
parameter = { 'batch_size': 32,
'epoch': 10,
'dropout': 0.5,
'lr': 0.001,
'tag2label': tag2label,
'num_tag': len(tag2label),
'd_model': 768,
'shuffle': True,
'device': device,
'train_data': train_data,
'test_data': test_data,
}
# pk.dump(parameter, open('ner_parameter.pkl','wb'))
print(train_data[:5])
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def batch_yield(parameter, shuffle=True, isTrain=True):
if isTrain:
epochs = parameter['epoch']
data = parameter['train_data']
else:
epochs = 1
data = parameter['test_data']
for train_epoch in range(epochs):
if shuffle:
random.shuffle(data)
max_len = 0
seqs, labels = [], []
for seq, label in tqdm(data):
# seq = seq2id(seq, parameter['vocab'])
# 使用bert自带字典
seq = tokenizer.convert_tokens_to_ids(seq)
label = [parameter['tag2label'][label_] for label_ in label]
seqs.append(seq)
labels.append(label)
if len(seqs) == parameter["batch_size"]:
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
# 这里都用0 填充不不太好 在seqs中0表示pad 在labels中0表示'O'
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
labels = [i + [-1]*(max_len - len(i)) for i in labels] # label也要对齐 seqs对labels是一对一输出的
yield list2torch(seqs), list2torch(labels), False, None
seqs, labels = [], []
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
labels = [i + [-1]*(max_len - len(i)) for i in labels]
yield list2torch(seqs), list2torch(labels), False, train_epoch
seqs, label = [], []
yield None, None, True, None
class bert_crf(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_crf, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['num_tag']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
self.crf = CRF(output_size, batch_first=True)
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[0] # (batch_size, sequence_length, hidden_size)
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
# 微调训练
from torch import optim
model = bert_crf.from_pretrained("../prev_trained_model",config=config,parameter = parameter).to(parameter['device'])
full_finetuning = True
if full_finetuning:
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] # 不更新
optimizer_grouped_parameters = [
{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01}, # 更新
{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}] # 不更新
else:
param_optimizer = list(model.fc.named_parameters())
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer]}]
# 优化器
optimizer = AdamW(optimizer_grouped_parameters, lr=3e-5, correct_bias=False)
# 学习率策略
train_steps_per_epoch = len(train_data) // parameter['batch_size']
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=train_steps_per_epoch, num_training_steps=parameter['epoch']*train_steps_per_epoch)
# 训练
model.train()
loss_cal = []
min_loss = float('inf') # 无穷大
train_yield = batch_yield(parameter)
while 1:
seqs, labels, keys, epoch = next(train_yield)
if keys:
break
out = model(seqs.long().to(parameter['device']))
loss = -model.crf(out, labels.long().to(parameter['device']))
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=5)
optimizer.step()
scheduler.step()
loss_cal.append(loss.item())
if epoch is not None:
loss_cal = sum(loss_cal) / len(loss_cal)
if loss_cal < min_loss:
min_loss = loss_cal
# torch.save(model.state_dict(), "ner_model.h5")
print(f'epoch[{epoch+1}/{parameter["epoch"]}, loss:{loss_cal:.4f}')
loss_cal = [loss.item()]
4.2 ner模型评估
import torch
from torch import nn
from torchcrf import CRF
import pickle as pk
import numpy as np
from operator import itemgetter
import random
from tqdm import tqdm
import pandas as pd
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, get_linear_schedule_with_warmup
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained('../prev_trained_model') # bert的config
tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
class bert_crf(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_crf, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['num_tag']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
self.crf = CRF(output_size, batch_first=True)
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[0] # (batch_size, sequence_length, hidden_size)
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
def load_model(config, isEval=False):
parameter = pk.load(open('ner_parameter.pkl', 'rb'))
if isEval:
parameter['device'] = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
parameter['device'] = torch.device('cpu') # 预测在cpu上完成就行
model = bert_crf(config, parameter).to(parameter['device'])
# 加载模型参数
model.load_state_dict(torch.load('ner_model.h5', map_location=parameter['device']))
model.eval()
return model, parameter
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def batch_yield(parameter, shuffle=True, isTrain=True):
if isTrain:
epochs = parameter['epoch']
data = parameter['train_data']
else:
epochs = 1
data = parameter['test_data']
for train_epoch in range(epochs):
if shuffle:
random.shuffle(data)
print(data[:5])
max_len = 0
seqs, labels = [], []
for seq, label in tqdm(data):
# seq = seq2id(seq, parameter['vocab'])
# 使用bert自带字典
seq = tokenizer.convert_tokens_to_ids(seq)
label = [parameter['tag2label'][label_] for label_ in label]
seqs.append(seq)
labels.append(label)
if len(seqs) == parameter["batch_size"]:
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
# 这里都用0 填充不不太好 在seqs中0表示pad 在labels中0表示'O'
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
labels = [i + [-1]*(max_len - len(i)) for i in labels] # label也要对齐 seqs对labels是一对一输出的
yield list2torch(seqs), list2torch(labels), False, None
seqs, labels = [], []
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
labels = [i + [-1]*(max_len - len(i)) for i in labels]
yield list2torch(seqs), list2torch(labels), False, train_epoch
seqs, label = [], []
yield None, None, True, None
def eval_model():
count_table = {}
test_yield = batch_yield(parameter=parameter, isTrain=False)
while 1:
seqs, labels, keys, _ = next(test_yield)
if keys:
break
predict = model(seqs.long().to(parameter['device']))
predict = np.array(model.crf.decode(predict))
labels = np.array(labels)
right = (labels == predict)
for i in range(1, parameter['num_tag']):
if i not in count_table:
count_table[i] = {
'pred': len(predict[(predict == i) & (labels != -1)]), # tp+fp
'real': len(labels[labels == i]), # tp+fn
'common': len(labels[right & (labels == i)]) # tp
}
else:
count_table[i]['pred'] += len(predict[(predict == i) & (labels != -1)])
count_table[i]['real'] += len(labels[labels == i])
count_table[i]['common'] += len(labels[right & (labels == i)])
count_pandas = {}
name = list(parameter['tag2label'].keys())[1:] # 统计时 不算pad
for ind, i in enumerate(name):
i = i.split('-')[1]
ind += 1
if i in count_pandas:
count_pandas[i][0] += count_table[ind]['pred']
count_pandas[i][1] += count_table[ind]['real']
count_pandas[i][2] += count_table[ind]['common']
else:
count_pandas[i] = [0, 0, 0]
count_pandas[i][0] = count_table[ind]['pred']
count_pandas[i][1] = count_table[ind]['real']
count_pandas[i][2] = count_table[ind]['common']
count_pandas['all'] = [ sum([count_pandas[i][0] for i in count_pandas]),
sum([count_pandas[i][1] for i in count_pandas]),
sum([count_pandas[i][2] for i in count_pandas]) ]
# 转pandas
name = count_pandas.keys()
count_pandas = pd.DataFrame(count_pandas.values())
count_pandas.columns = ['pred', 'real', 'common']
# 增加列
count_pandas['p'] = count_pandas['common'] / count_pandas['pred']
count_pandas['r'] = count_pandas['common'] / count_pandas['real']
count_pandas['f1'] = 2*count_pandas['p']*count_pandas['r'] / (count_pandas['p'] + count_pandas['r'])
# 设置index
count_pandas.index = list(name)
# 保存
# count_pandas.to_csv('eval_ner_model.csv')
return count_pandas
4.3 ner调用接口
import torch
from torch import nn
from torchcrf import CRF
import pickle as pk
import numpy as np
from operator import itemgetter
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, BertTokenizer, get_linear_schedule_with_warmup
import os
import sys
path = os.path.abspath(os.path.dirname(__file__))
config_class, tokenizer_class = BertConfig, BertTokenizer
# config = config_class.from_pretrained('../prev_trained_model') # bert的config
config = config_class.from_pretrained(os.path.abspath(os.path.join(path, '..', 'prev_trained_model'))) # bert的config
# tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
tokenizer = tokenizer_class.from_pretrained(os.path.abspath(os.path.join(path, '..', 'prev_trained_model')))
class bert_crf(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_crf, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['num_tag']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
self.crf = CRF(output_size, batch_first=True)
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[0] # (batch_size, sequence_length, hidden_size)
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
def load_model(config, isEval=False):
# parameter = pk.load(open('ner_parameter.pkl', 'rb'))
parameter = pk.load(open(os.path.join(path, 'ner_parameter.pkl'), 'rb'))
if isEval:
parameter['device'] = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
parameter['device'] = torch.device('cpu') # 预测在cpu上完成就行
model = bert_crf(config, parameter).to(parameter['device'])
# 加载模型参数
model.load_state_dict(torch.load(os.path.join(path, 'ner_model.h5'), map_location=parameter['device']))
model.eval()
return model, parameter
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def keyword_predict(input):
input = list(input)
ind2key = dict(zip(parameter['tag2label'].values(), parameter['tag2label'].keys()))
# input_id = seq2id(input, parameter['vocab'])
input_id = tokenizer.convert_tokens_to_ids(input)
print("input_id", input_id)
# predict = model.crf.decode(model(list2torch([input_id]).long().to(parameter['device'])))[0]
predict = model(list2torch([input_id]).long().to(parameter['device']))
predict = model.crf.decode(predict)[0]
predict = itemgetter(*predict)(ind2key) # 把index转成label标签
# 后处理
keys_list = []
for ind, i in enumerate(predict):
if i == 'O':
continue
if i[0] == 'S':
if not (len(keys_list) == 0 or keys_list[-1][-1]):
del keys_list[-1]
keys_list.append([input[ind], [i], [ind], True])
continue
if i[0] == 'B':
if not (len(keys_list) == 0 or keys_list[-1][-1]):
del keys_list[-1]
keys_list.append([input[ind], [i], [ind], False])
continue
if i[0] == 'I':
if len(keys_list) > 0 and not keys_list[-1][-1] and keys_list[-1][1][0].split('-')[1] == i.split('-')[1]:
keys_list[-1][0] += input[ind]
keys_list[-1][1] += [i]
keys_list[-1][2] += [ind]
else:
if len(keys_list) > 0:
del keys_list[-1]
continue
if i[0] == 'E':
if len(keys_list) > 0 and not keys_list[-1][-1] and keys_list[-1][1][0].split('-')[1] == i.split('-')[1]:
keys_list[-1][0] += input[ind]
keys_list[-1][1] += [i]
keys_list[-1][2] += [ind]
keys_list[-1][3] = True
else:
if len(keys_list) > 0:
del keys_list[-1]
keys_list = [[i[0], i[1][0].split('-')[1], i[2]] for i in keys_list]
return keys_list
def keyword_predict_long_text(input):
max_len = 512
if len(input) < max_len:
return keyword_predict(input)
else:
keys_list = []
pad = 25
max_len -= pad
for i in range((len(input) // max_len) + 1):
if i == 0:
input_slice = input[i*max_len: (i+1)*max_len]
else:
input_slice = input[i*max_len - pad: (i+1)*max_len] # pad滑动效果 避免实体被分开
keys_list += keyword_predict(input_slice)
return keys_list
model, parameter = load_model(config, isEval=False)
# res = keyword_predict_long_text('你好,请问高中历史有哪些重要的知识点') # '老', '师', '高', '中', '历', '史', '有', '哪', '些', '重', '要', '的', '课', '程']
# print(res)
5 intent模型
5.1 intent_0模型训练
import torch
import os
import random
import numpy as np
import pandas as pd
import pickle as pk
from collections import defaultdict
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, get_linear_schedule_with_warmup
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained('../prev_trained_model') # bert的config
tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
[x_intent, y_intent] = pk.load(open("../../../data/数据生成/命名实体识别和意图识别数据/data_intent0.pkl", 'rb'))
X_train, X_test, y_train, y_test = train_test_split(x_intent, y_intent)
output_size = len(set(i for i in y_intent))
train_data = [(X_train[i], y_train[i]) for i in range(len(X_train))]
test_data = [(X_test[i], y_test[i]) for i in range(len(X_test))]
parameter = {
'epoch': 300,
'batch_size': 32,
'd_model': 768,
'dropout': 0.5,
'device': device,
'lr': 0.001,
'output_size': output_size,
'train_data': train_data,
'test_data': test_data,
}
# pk.dump(parameter, open('bert_intent0_parameter.pkl','wb'))
print(train_data[:5])
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def batch_yield(parameter, shuffle=True, isTrain=True):
if isTrain:
epochs = parameter['epoch']
data = parameter['train_data']
else:
epochs = 1
data = parameter['test_data']
for train_epoch in range(epochs):
if shuffle:
random.shuffle(data)
max_len = 0
seqs, labels = [], []
for seq, label in tqdm(data):
# bert自带字典
seq = tokenizer.convert_tokens_to_ids(seq)
seqs.append(seq)
labels.append(label)
if len(seqs) == parameter['batch_size']:
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, None
seqs, labels = [], []
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, train_epoch
seqs, labels = [], []
yield None, None, True, None
from torch import nn
class bert_intent(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_intent, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['output_size']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[1]
# 没弄明白 为什么CLS能代表整句话
# bert_out[0] (batch_size, sequence_length, hidden_size)
# bert_out[1] (batch_size, hidden_size) CLS BERT希望最后的输出代表的是整个序列的信息
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
# 微调训练
from torch import optim
model = bert_intent.from_pretrained("../prev_trained_model",config=config,parameter = parameter).to(parameter['device'])
full_finetuning = True
if full_finetuning:
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] # 不更新
optimizer_grouped_parameters = [
{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01}, # 更新
{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}] # 不更新
else:
param_optimizer = list(model.fc.named_parameters())
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer]}]
# 交叉熵损失
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = AdamW(optimizer_grouped_parameters, lr=3e-5, correct_bias=False)
# 学习率策略
train_steps_per_epoch = len(train_data) // parameter['batch_size']
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=train_steps_per_epoch, num_training_steps=parameter['epoch']*train_steps_per_epoch)
# 训练
model.train()
loss_cal = []
min_loss = float('inf') # 无穷大
train_yield = batch_yield(parameter)
while 1:
seqs, labels, keys, epoch = next(train_yield)
if keys:
break
out = model(seqs.long().to(parameter['device']))
labels = labels.long().to(parameter['device'])
# print(out.shape)
# print(labels.shape)
loss = criterion(out, labels)
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=5)
optimizer.step()
scheduler.step()
loss_cal.append(loss.item())
if epoch is not None:
loss_cal = sum(loss_cal) / len(loss_cal)
if loss_cal < min_loss:
min_loss = loss_cal
# torch.save(model.state_dict(), "bert_intent0_model.h5")
print(f'epoch[{epoch+1}/{parameter["epoch"]}, loss:{loss_cal:.4f}')
loss_cal = [loss.item()]
5.2 intent_1模型训练
import torch
import os
import random
import numpy as np
import pandas as pd
import pickle as pk
from collections import defaultdict
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, get_linear_schedule_with_warmup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
[x_intent, y_intent, x_ner, y_ner] = pk.load(open("../../../data/数据生成/命名实体识别和意图识别数据/data_intent1_ner.pkl", 'rb'))
X_train, X_test, y_train, y_test = train_test_split(x_intent, y_intent)
output_size = len(set(i for i in y_intent))
train_data = [(X_train[i], y_train[i]) for i in range(len(X_train))]
test_data = [(X_test[i], y_test[i]) for i in range(len(X_test))]
parameter = {
'epoch': 300,
'batch_size': 32,
'd_model': 768,
'dropout': 0.5,
'device': device,
'lr': 0.001,
'output_size': output_size,
'train_data': train_data,
'test_data': test_data,
}
# pk.dump(parameter, open('bert_intent1_parameter.pkl','wb'))
print(train_data[:5])
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def batch_yield(parameter, shuffle=True, isTrain=True):
if isTrain:
epochs = parameter['epoch']
data = parameter['train_data']
else:
epochs = 1
data = parameter['test_data']
for train_epoch in range(epochs):
if shuffle:
random.shuffle(data)
max_len = 0
seqs, labels = [], []
for seq, label in tqdm(data):
# bert自带字典
seq = tokenizer.convert_tokens_to_ids(seq)
seqs.append(seq)
labels.append(label)
if len(seqs) == parameter['batch_size']:
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, None
seqs, labels = [], []
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, train_epoch
seqs, labels = [], []
yield None, None, True, None
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained('../prev_trained_model') # bert的config
tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
from torch import nn
class bert_intent(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_intent, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['output_size']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[1]
# bert_out[0] (batch_size, sequence_length, hidden_size)
# bert_out[1] (batch_size, hidden_size) CLS BERT希望最后的输出代表的是整个序列的信息
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
# 微调训练
from torch import optim
model = bert_intent.from_pretrained("../prev_trained_model",config=config,parameter = parameter).to(parameter['device'])
full_finetuning = True
if full_finetuning:
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] # 不更新
optimizer_grouped_parameters = [
{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01}, # 更新
{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}] # 不更新
else:
param_optimizer = list(model.fc.named_parameters())
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer]}]
# 交叉熵损失
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = AdamW(optimizer_grouped_parameters, lr=3e-5, correct_bias=False)
# 学习率策略
train_steps_per_epoch = len(train_data) // parameter['batch_size']
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=train_steps_per_epoch, num_training_steps=parameter['epoch']*train_steps_per_epoch)
# 训练
model.train()
loss_cal = []
min_loss = float('inf') # 无穷大
train_yield = batch_yield(parameter)
while 1:
seqs, labels, keys, epoch = next(train_yield)
if keys:
break
out = model(seqs.long().to(parameter['device']))
labels = labels.long().to(parameter['device'])
# print(out.shape)
# print(labels.shape)
loss = criterion(out, labels)
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=5)
optimizer.step()
scheduler.step()
loss_cal.append(loss.item())
if epoch is not None:
loss_cal = sum(loss_cal) / len(loss_cal)
if loss_cal < min_loss:
min_loss = loss_cal
# torch.save(model.state_dict(), "bert_intent1_model.h5")
print(f'epoch[{epoch+1}/{parameter["epoch"]}, loss:{loss_cal:.4f}')
loss_cal = [loss.item()]
5.3 intent 模型评估
import torch
import os
import random
import numpy as np
import pandas as pd
import pickle as pk
from collections import defaultdict
from tqdm import tqdm
from torch import nn
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, get_linear_schedule_with_warmup
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained('../prev_trained_model') # bert的config
tokenizer = tokenizer_class.from_pretrained('../prev_trained_model')
class bert_intent(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_intent, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['output_size']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[1]
# 没弄明白 为什么CLS能代表整句话
# bert_out[0] (batch_size, sequence_length, hidden_size)
# bert_out[1] (batch_size, hidden_size) CLS BERT希望最后的输出代表的是整个序列的信息
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
def load_model(config, intent, isEval=False):
parameter = pk.load(open(f'bert_intent{intent}_parameter.pkl', 'rb'))
if isEval:
parameter['device'] = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
parameter['device'] = torch.device('cpu') # 预测在cpu上完成就行
model = bert_intent(config, parameter).to(parameter['device'])
# 加载模型参数
model.load_state_dict(torch.load(f'bert_intent{intent}_model.h5', map_location=parameter['device']))
model.eval()
return model, parameter
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def batch_yield(parameter, shuffle=True, isTrain=True):
if isTrain:
epochs = parameter['epoch']
data = parameter['train_data']
else:
epochs = 1
data = parameter['test_data']
for train_epoch in range(epochs):
if shuffle:
random.shuffle(data)
max_len = 0
seqs, labels = [], []
for seq, label in tqdm(data):
# bert自带字典
seq = tokenizer.convert_tokens_to_ids(seq)
seqs.append(seq)
labels.append(label)
if len(seqs) == parameter['batch_size']:
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, None
seqs, labels = [], []
seq_len_list = [len(i) for i in seqs]
max_len = max(seq_len_list)
seqs = [i + [0]*(max_len - len(i)) for i in seqs]
yield list2torch(seqs), list2torch(labels), False, train_epoch
seqs, labels = [], []
yield None, None, True, None
def eval_model(intent): # 0 第一次意图识别 1 第二次意图识别
try:
model, parameter = load_model(config, intent=intent, isEval=True)
except:
print('intent输入错误')
count_table = {}
test_yield = batch_yield(parameter=parameter, isTrain=False)
while 1:
seqs, labels, keys, _ = next(test_yield)
if keys:
break
predict = model(seqs.long().to(parameter['device']))
# print(predict.shape) [32,18]
# max
predicted_prob, predict = torch.max(predict, dim=1) # [32,]
labels = labels.long().to(parameter['device']) # [32,]
right = (labels == predict)
for i in range(parameter['output_size']):
if i not in count_table:
count_table[i] = {
'pred': len(predict[predict == i]), # tp+fp
'real': len(labels[labels == i]), # tp+fn
'common': len(labels[right & (labels == i)]) # tp
}
else:
count_table[i]['pred'] += len(predict[predict == i])
count_table[i]['real'] += len(labels[labels == i])
count_table[i]['common'] += len(labels[right & (labels == i)])
count_pandas = {}
# name = list(parameter['tag2label'].keys())[1:] # 统计时 不算pad
# 标签只是0-17数字
for i in range(parameter['output_size']):
if i in count_pandas:
count_pandas[i][0] += count_table[i]['pred']
count_pandas[i][1] += count_table[i]['real']
count_pandas[i][2] += count_table[i]['common']
else:
count_pandas[i] = [0, 0, 0]
count_pandas[i][0] = count_table[i]['pred']
count_pandas[i][1] = count_table[i]['real']
count_pandas[i][2] = count_table[i]['common']
count_pandas['all'] = [ sum([count_pandas[i][0] for i in count_pandas]),
sum([count_pandas[i][1] for i in count_pandas]),
sum([count_pandas[i][2] for i in count_pandas]) ]
# 转pandas
name = count_pandas.keys()
count_pandas = pd.DataFrame(count_pandas.values())
count_pandas.columns = ['pred', 'real', 'common']
# 增加列
count_pandas['p'] = count_pandas['common'] / count_pandas['pred']
count_pandas['r'] = count_pandas['common'] / count_pandas['real']
count_pandas['f1'] = 2*count_pandas['p']*count_pandas['r'] / (count_pandas['p'] + count_pandas['r'])
# 设置index
count_pandas.index = list(name)
count_pandas.fillna(0, inplace=True)
# 保存
# count_pandas.to_csv(f'eval_intent{intent}_model.csv')
return count_pandas
5.4 intent接口调用
import torch
import os
import random
import numpy as np
import pandas as pd
import pickle as pk
from collections import defaultdict
from tqdm import tqdm
from torch import nn
from transformers import BertModel, BertPreTrainedModel, WEIGHTS_NAME, BertConfig, AdamW, BertTokenizer, \
get_linear_schedule_with_warmup
path = os.path.abspath(os.path.dirname(__file__))
config_class, tokenizer_class = BertConfig, BertTokenizer
config = config_class.from_pretrained(os.path.abspath(os.path.join(path, '..', 'prev_trained_model'))) # bert的config
tokenizer = tokenizer_class.from_pretrained(os.path.abspath(os.path.join(path, '..', 'prev_trained_model')))
class bert_intent(BertPreTrainedModel):
def __init__(self, config, parameter):
super(bert_intent, self).__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
embedding_dim = parameter['d_model']
output_size = parameter['output_size']
self.fc = nn.Linear(embedding_dim, output_size)
self.init_weights()
def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
bert_out = self.bert(input_ids, attention_mask, token_type_ids)
out = bert_out[1]
# CLS代表整句话
# bert_out[0] (batch_size, sequence_length, hidden_size)
# bert_out[1] (batch_size, hidden_size) CLS BERT希望最后的输出代表的是整个序列的信息
out = self.dropout(out)
res = self.fc(out) # embedding_dim和hidden_size 都是用768
return res
def load_model(config, intent, isEval=False):
parameter_path = f'bert_intent{intent}_parameter.pkl'
state_dict_path = f'bert_intent{intent}_model.h5'
parameter = pk.load(open(os.path.join(path, parameter_path), 'rb'))
if isEval:
parameter['device'] = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
parameter['device'] = torch.device('cpu') # 预测在cpu上完成就行
model = bert_intent(config, parameter).to(parameter['device'])
# 加载模型参数
model.load_state_dict(torch.load(os.path.join(path, state_dict_path), map_location=parameter['device']))
model.eval()
return model, parameter
def list2torch(ins):
return torch.from_numpy(np.array(ins))
def intent_predict(input, model, parameter):
max_len = 512
# 处理过长文本
if len(input) > 512:
input = input[:512]
input = list(input)
input_id = tokenizer.convert_tokens_to_ids(input)
# print(list2torch([input_id])) 二维 [[6435, 7309, 7770,]]
# print(list2torch(input_id)) 一维 [6435, 7309, 7770,]
predict = model(list2torch([input_id]).long().to(parameter['device']))
# 取最大值
predicted_prob, predict = torch.max(predict, dim=1)
return predict
# 0 第一次意图识别
intent0_model, parameter0 = load_model(config, intent=0, isEval=False)
# 1 第二次意图识别
intent1_model, parameter1 = load_model(config, intent=1, isEval=False)
6 基于机器人个人属性、闲聊语料库、网络搜索回答api
import logging
import os
path = os.path.abspath(os.path.dirname(__file__))
# 日志类
class BaseLayer:
def __init__(self, log=True):
self.logger = logging.getLogger() # 初始化日志 参数可以填个name
if not log:
self.close_log()
def close_log(self):
self.logger.setLevel(logging.ERROR) # 设置日志的等级
def print_log(self, msg):
self.logger.warning(msg)
def search_answer(self, question): # 抽象方法
pass
# 基于机器人个人属性的问答
import xml.etree.ElementTree as et
from random import choice
import re
class template(BaseLayer):
def __init__(self):
super(template, self).__init__()
self.template = self.load_template_file()
self.robot_info = self.load_robot_info()
self.temps = self.template.findall("temp") # <temp id="name">
# 暂时用不到
# self.default_answer = self.template.find("default")
# self.exceed_answer = self.template.find("exceed")
def load_template_file(self):
data = et.parse(os.path.abspath(os.path.join(path, '../data/robot_template/robot_template.xml')))
return data
def load_robot_info(self):
robot_info = self.template.find("robot_info") # <name>Ash,中文名:小智</name>
robot_info_dict = {}
for info in robot_info:
robot_info_dict[info.tag] = info.text
return robot_info_dict
def search_answer(self, question):
match_temp = None
flag = None
for temp in self.temps:
qs = temp.find("question").findall("q") # 问题列表
for q in qs:
res = re.search(q.text, question) # 匹配问题
if res:
match_temp = temp
flag = True
break
if flag:
break
if flag:
a_s = choice([i.text for i in match_temp.find("answer").findall("a")]) # 随机选个答案
answer = a_s.format(**self.robot_info) # 填充{}
return answer
else:
return None
# 预处理语料库
import jieba
import re
def load_seq_qa(): # 加载语料库
q_list, a_list = [], []
with open(os.path.abspath(os.path.join(path, "../data/corpus/conversation_test.txt")), "r", encoding="utf-8") as f:
for i, line in enumerate(f):
line = re.sub('[0-9]*', '', line.strip())
line = jieba.lcut(line)
if i % 2 == 0: # 偶数为问题
q_list.append(line)
else: # 奇数为答案
a_list.append(line)
return q_list, a_list
def build_vocab(): # 问题列表 词->index 字典
q_list, _ = load_seq_qa()
word_dict = set([j for i in q_list for j in i])
word_dict = dict(zip(word_dict, range(3, len(word_dict) + 3)))
word_dict['<pad>'] = 0
word_dict['<start>'] = 1
word_dict['<end>'] = 2
return word_dict
def build_words_in_question(): # 统计词在问题中的分布
word_dict = build_vocab()
q_list, _ = load_seq_qa()
inverse_index_dict = {}
for i in word_dict.keys():
inverse_index_dict[i] = []
for i, qw in enumerate(q_list):
for w in qw:
inverse_index_dict[w].append(i) # 统计 词 在问题index中的分布
return inverse_index_dict # {词:[问题1, 问题2 , 问题3...]}
from collections import Counter
import numpy as np
import jieba
class CorpusSearch(BaseLayer):
def __init__(self):
super(CorpusSearch, self).__init__()
self.question_list, self.answer_list = load_seq_qa()
self.words_in_question = build_words_in_question()
self.THRESHOLD = 0.7 # 阈值
def cosin_sim(self, a, b): # 计算余弦相似度(通过看词和问题列表的词统计)
# a 和 b 都是分词后的结果
a_words = Counter(a) # Counter({'你': 1, '是': 1, '什么': 1})
b_words = Counter(b) # Counter({'你': 1, '的': 1, '爱好': 1, '是': 1, '什么': 1})
all_words = b_words.copy()
all_words.update(a_words - b_words)
# print(all_words) # Counter({'你': 1, '的': 1, '爱好': 1, '是': 1, '什么': 1})
all_words = set(all_words) # 两个句子的全部分词
a_vec, b_vec = list(), list()
for w in all_words:
a_vec.append(a_words.get(w, 0))
b_vec.append(b_words.get(w, 0))
a_vec = np.array(a_vec) # [0 1 0 1 1]
b_vec = np.array(b_vec) # [1 1 1 1 1]
# 取模 为了正规化
a_ = np.sqrt(np.sum(np.square(a_vec)))
b_ = np.sqrt(np.sum(np.square(b_vec)))
# 内积 看多大长度指向同一方向
cos_sim = np.dot(a_vec, b_vec) / (a_ * b_)
return round(cos_sim, 4) # 保留4为小数
def search_answer(self, question):
search_list = list()
q_words = jieba.lcut(question)
for q_word in q_words:
index = self.words_in_question.get(q_word, list()) # 查找哪些文档出现了这些词
search_list += index
if not search_list: # 都没出现过的词
return None
count_list = Counter(search_list) # 计数
count_list = count_list.most_common(3) # 找出现最多的前3个文档
result_sim = list()
for i, _ in count_list:
q = self.question_list[i] # 取文档中问题
sim = self.cosin_sim(q_words, q)
result_sim.append((i, sim)) # 文档index, 相似度
result = max(result_sim, key=lambda x: x[1]) # 相似度在1位置
if result[1] > self.THRESHOLD:
answewr = "".join(self.answer_list[result[0]]) # 与问题相似度最高的模板问题对应的答案
return answewr
return None
# 基于网络回答
import requests
class InterNet(BaseLayer):
def __init__(self):
super(InterNet, self).__init__()
self.print_log("Interner layer is ready")
def search_answer(self, question):
url = 'https://api.ownthink.com/bot?appid=xiaosi&userid=user&spoken='
try:
text = requests.post(url + question).json() # 发送请求 接收返回并处理json
if 'message' in text and text['message'] == 'success':
return text['data']['info']['text']
else:
return None
except:
return None
7 neo4j搜索回答
from py2neo import Graph
# 设置路径
import os
import sys
path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(path)
from neo4j_config import neo4j_ip, username, password
from pyhanlp import *
class GraphSearch():
def __init__(self):
self.graph = Graph(neo4j_ip, auth=(username, password))
# 基于km1(科目)寻求课程(km2)
def forintent0(self, entity):
sql = 'match p = (n:km1)-[]->(m:km2) where n.name = "%s" return m.name' % (entity[0])
print(sql)
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于km1(科目)寻求知识点
def forintent1(self, entity):
if entity[1] == 'km1': # 基于km1
sql = "match p = (n:%s)-[]->()-[]->(m:kg) where n.name = '%s' return m.name limit 10" % (entity[1], entity[0])
else: # 避免识别错误 使用km2
sql = "match p = (n:%s)-[]->(m:kg) where n.name = '%s' return m.name limit 10" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于km1(科目)寻求问题(question)
def forintent2(self, entity):
if entity[1] == 'km1':
sql = "match p = (n:%s)-[]->()-[]->()-[]->(m:question) where n.name = '%s' return m.name limit 10" % (
entity[1], entity[0])
elif entity[1] == 'km2':
sql = "match p = (n:%s)-[]->()-[]->(m:question) where n.name = '%s' return m.name limit 10" % (
entity[1], entity[0])
else:
sql = "match p = (n:%s)-[]->(m:question) where n.name = '%s' return m.name limit 10" % (
entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于km2寻求km1(科目)
def forintent3(self, entity):
sql = "match p = (n:km1)-[]->(m:%s) where m.name = '%s' return n.name" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于km2寻求知识点
def forintent4(self, entity):
res = self.forintent1(entity)
return res
# 基于km2寻求问题
def forintent5(self, entity):
res = self.forintent2(entity)
return res
# 基于kg(知识点)寻求km1
def forintent6(self, entity):
sql = "match p = (n:km1)-[]->()-[]->(m:%s) where m.name = '%s' return n.name " % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于kg(知识点)寻求km2
def forintent7(self, entity):
sql = "match p = (n:km2)-[]->(m:%s) where m.name = '%s' return n.name " % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于kg(知识点)寻求question
def forintent8(self, entity):
sql = "match p = (n:%s)-[]->(m:question) where n.name = '%s' return m.name limit 10" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于作者,寻求诗句名
def forintent9(self, entity):
sql = "match p = (n:%s)-[]->(m:title) where n.name = '%s' return m.name limit 10" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于作者名,寻求简介
def forintent10(self, entity):
sql = "match p = (n:%s)-[]->(m:introduce) where n.name = '%s' return m.name" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return res[0]['m.name']
# 基于诗句名,寻求作者
def forintent11(self, entity):
sql = "match p = (n:author)-[]->(m:%s) where m.name = '%s' return n.name" % (entity[1], entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于诗名,寻求诗文
def forintent12(self, entity):
sql = "match p = (n:title)-[]->(m:content) where n.name = '%s' return m.name" % (entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于诗名,寻求翻译
def forintent13(self, entity):
sql = "match p = (n:title)-[]->(m:translate) where n.name = '%s' return m.name" % (entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于诗名,寻求类型
def forintent14(self, entity):
sql = "match p = (n:title)<-[]-(m:tag) where n.name = '%s' return m.name" % (entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
# 基于诗名,寻求class
def forintent15(self, entity):
sql = "match p = (n:class)-[]->(m:title) where m.name= '%s' return n.name" % (entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于类型,寻求诗名
def forintent16(self, entity):
sql = "match p = (n:title)<-[]-(m:tag) where m.name = '%s' return n.name limit 10" % (entity[0])
res = self.graph.run(sql).data()
return [i['n.name'] for i in res]
# 基于class,寻求诗名
def forintent17(self, entity):
sql = "match p = (n:class)-[]->(m:title) where n.name= '%s' return m.name limit 10" % (entity[0])
res = self.graph.run(sql).data()
return [i['m.name'] for i in res]
8 flask部署
import json
import os
import sys
from operator import itemgetter
# ner识别
from utils.model.ner_model.ner_predict import keyword_predict_long_text
# 意图识别
from utils.model.intent_model.intent_predict import intent0_model, intent1_model, parameter0, parameter1, intent_predict
# neo4j
from utils.neo4j_graph import GraphSearch
from flask_cors import cross_origin
from flask import Flask, request, redirect, url_for
# 个人属性,闲聊预料库,基于网络
from utils.template_corpus_internet import template, CorpusSearch, InterNet
intent0_tag = {'0': '基于知识图谱问答',
'1': '基于机器人个人属性回答',
'2': '基于语料库回答'}
intent1_tag = {
'0': '询问科目有哪些课程',
'1': '询问科目有哪些知识点',
'2': '询问科目有哪些例题',
'3': '询问课程是什么学科的',
'4': '询问课程有哪些知识点',
'5': '询问课程有哪些例题',
'6': '询问是哪个学科的知识点',
'7': '询问是哪个课程的知识点',
'8': '询问这个知识点有哪些例题需要掌握',
'9': '询问作者有哪些诗句',
'10': '询问作者个人简介',
'11': '询问诗是谁写的',
'12': '询问诗的诗句',
'13': '询问诗的翻译',
'14': '询问诗的类型',
'15': '询问诗来自那篇课文',
'16': '询问这种类型的古诗有哪些',
'17': '询问这个年级学过哪些诗',
}
app = Flask(__name__)
template_model = template()
corpus_search_model = CorpusSearch()
InterNet_model = InterNet()
graphSearch_model = GraphSearch()
def takelong(ins):
return len(ins[0])
def rebuildins(ins, entity_list): # 用实体类别替换实体名
new_ins = {}
left_ind = set(range(len(ins)))
for i in entity_list:
left_ind -= set(range(i[-1][0], i[-1][-1] + 1)) # 实体起始位置到实体结束位置
new_ins[i[-1][0]] = i[1] # 替换为实体类别
for i in left_ind:
new_ins[i] = ins[i] # 添加剩余非实体
new_id = list(new_ins.keys())
new_id.sort() # 排序
return itemgetter(*new_id)(new_ins) # id转文字
@app.route('/test', methods=['GET', 'POST'])
@cross_origin() # 跨域
def my_service():
if request.method == 'POST':
data = request.get_data().decode()
data = json.loads(data)
question = data['question']
print('question:', question)
entity_list = keyword_predict_long_text(question)
entity_list.sort(key=takelong, reverse=True) # 实体列名名称从大到小排序
print('entity_list', entity_list)
# 更新问题
new_question = rebuildins(question, entity_list)
print('new_question:', new_question)
intent0 = intent_predict(new_question, intent0_model, parameter0)[0]
print('第一次意图识别:', intent0)
answer = None
if intent0 == 0:
intent1 = intent_predict(new_question, intent1_model, parameter1)[0]
print('第二次意图识别:', intent1)
try:
print(f'graphSearch_model.forintent{intent1}({entity_list[0]})')
answer = eval(f'graphSearch_model.forintent{intent1}(entity_list[0])')
# print(answer)
except:
answer = None
if intent0 == 1:
answer = template_model.search_answer(question)
if intent0 == 2:
answer = corpus_search_model.search_answer(question)
if answer is None or len(answer) == 0:
answer = InterNet_model.search_answer(question)
return json.dumps(answer, ensure_ascii=False)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8081, threaded=True)
9 用户输入问题模块
import requests
import json
while 1:
question = input('请输入:')
if len(question) > 0:
data = {
'question': question,
}
url = 'http://127.0.0.1:8081/test'
answer = requests.post(url, data=json.dumps(data)).json()
print(answer)
else:
print('退出')
break
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)