|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 糖逗 于 2021-12-28 19:52 编辑
一、数据预处理char_vocab_path = "E:/.../1.NLP/zh-nlp-demo-master/data/char_vocabs.txt" # 字典文件
train_data_path = "E:/.../1.NLP/地址识别项目/data/train.conll" # 训练数据
test_data_path = "E:/.../1.NLP/地址识别项目/data/dev.conll" # 测试数据
special_words = ['<PAD>', '<UNK>'] # 特殊词表示
'''
<UNK>: 低频词或未在词表中的词
<PAD>: 补全字符
<GO>/<SOS>: 句子起始标识符
<EOS>: 句子结束标识符
[SEP]:两个句子之间的分隔符
[MASK]:填充被掩盖掉的字符
'''
# "BIO"标记的标签
import pandas as pd
store = pd.read_table(r"E:\工作\7.理论学习\1.NLP\地址识别项目\data\mytag.dic", header = None)
store.loc[0, 0] = "O"
store.loc[24, 0] = "B-prov"
store1 = store.to_dict()
idx2label = store1[0]
# 索引和BIO标签对应
label2idx = {idx: label for label, idx in idx2label.items()}
print(label2idx)
# 读取字符词典文件
with open(char_vocab_path, "r", encoding="utf8") as fo:
char_vocabs = [line.strip() for line in fo]
char_vocabs = special_words + char_vocabs
# 字符和索引编号对应
idx2vocab = {idx: char for idx, char in enumerate(char_vocabs)}
vocab2idx = {char: idx for idx, char in idx2vocab.items()}
二、数据统计描述import pandas as pd
temp = pd.read_table(r"E:\...\1.NLP\地址识别项目\data\train.txt", header = None)
temp["长度"] = temp.loc[:, 0].apply(lambda x: len(str(x)))
print(temp.head())
print(max(temp["长度"]))
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
plt.hist(np.array(temp["长度"]), bins=10, rwidth=0.9, density=True)
三、模型定义import tensorflow as tf
import tensorflow_addons as tfa
print(tf.__version__)
print(tfa.__version__)
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras import backend as K
class CRF(layers.Layer):
def __init__(self, label_size):
super(CRF, self).__init__()
self.trans_params = tf.Variable(
tf.random.uniform(shape=(label_size, label_size)), name="transition")
@tf.function
def call(self, inputs, labels, seq_lens):
log_likelihood, self.trans_params = tfa.text.crf_log_likelihood(
inputs, labels, seq_lens,
transition_params=self.trans_params)
loss = tf.reduce_sum(-log_likelihood)
return loss
from transformers import TFBertForTokenClassification
EPOCHS = 20
BATCH_SIZE = 64
EMBED_DIM = 128
HIDDEN_SIZE = 64
MAX_LEN = 55
VOCAB_SIZE = len(vocab2idx)
CLASS_NUMS = len(label2idx)
inputs = layers.Input(shape=(MAX_LEN,), dtype='int32')
targets = layers.Input(shape=(MAX_LEN,),dtype='int32')
seq_lens = layers.Input(shape=(), dtype='int32')
PRETRAINED_MODEL_NAME = r"D:\bert_model\bert-base-chinese" # 指定为中文
#x = TFBertForTokenClassification.from_pretrained(PRETRAINED_MODEL_NAME, num_labels = 100)(inputs)
x = layers.Embedding(input_dim=VOCAB_SIZE, output_dim=EMBED_DIM, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(HIDDEN_SIZE, return_sequences=True))(x)
print(x.shape)
logits = layers.Dense(CLASS_NUMS)(x)
loss = CRF(label_size=CLASS_NUMS)(logits, targets, seq_lens)
model = models.Model(inputs=[inputs, targets, seq_lens], outputs=loss)
print(model.summary())
model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')#, metrics=[metric])
四、数据处理from tensorflow.keras.preprocessing import sequence
import numpy as np
# 读取训练语料
def read_corpus(corpus_path, vocab2idx, label2idx):
datas, labels = [], []
with open(corpus_path, encoding='utf-8') as fr:
lines = fr.readlines()
sent_, tag_ = [], []
for line in lines:
if line != '\n':
char, label = line.strip().split()
sent_.append(char)
tag_.append(label)
else:
sent_ids = [vocab2idx[char] if char in vocab2idx else vocab2idx['<UNK>'] for char in sent_]
tag_ids = [label2idx[label] if label in label2idx else 0 for label in tag_]
datas.append(sent_ids)
labels.append(tag_ids)
sent_, tag_ = [], []
return datas, labels
# 加载训练集
train_datas, train_labels = read_corpus(train_data_path, vocab2idx, label2idx)
# 加载测试集
test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)
train_datas = sequence.pad_sequences(train_datas, maxlen=MAX_LEN, padding='post')
train_labels = sequence.pad_sequences(train_labels, maxlen=MAX_LEN, padding='post')
train_seq_lens = np.array([MAX_LEN] * len(train_labels))
labels = np.ones(len(train_labels))
# train_labels = keras.utils.to_categorical(train_labels, CLASS_NUMS)
test_datas = sequence.pad_sequences(test_datas, maxlen=MAX_LEN, padding = "post")
test_labels = sequence.pad_sequences(test_labels, maxlen=MAX_LEN, padding = "post")
test_seq_lens = np.array([MAX_LEN] * len(test_labels))
print(np.shape(train_datas), np.shape(train_labels))
五、模型训练# 训练
import os
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
history = model.fit(x=[train_datas, train_labels, train_seq_lens], y=labels,validation_split=0.1, batch_size=BATCH_SIZE, epochs=20)#.history
#acc = model.history['sparse_categorical_accuracy']
#val_acc = model.history['val_sparse_categorical_accuracy']
loss = history['loss']
val_loss = history['val_loss']
print('loss:',loss)
print('val_loss:',val_loss)
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
plt.plot(list(range(1, 21)) , history["loss"])
plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
六、模型效果查看trans_params = model.get_layer('crf').get_weights()[0]
# 获得BiLSTM的输出logits
sub_model = models.Model(inputs=model.get_layer('input_1').input,
outputs=model.get_layer('dense').output)
def predict(model, inputs, input_lens):
logits = sub_model.predict(inputs)
# 获取CRF层的转移矩阵
# crf_decode:viterbi解码获得结果
pred_seq, viterbi_score = tfa.text.crf_decode(logits, trans_params, input_lens)
return pred_seq
test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)
maxlen = 55
sentence = "北京市西城区阜成门外大街0号万通金融中心0-0层"
sent_chars = list(sentence)
sent2id = [vocab2idx[word] if word in vocab2idx else vocab2idx['<UNK>'] for word in sent_chars]
sent2id_new = np.array([sent2id[:maxlen] + [0] * (maxlen-len(sent2id))])
test_lens = np.array([55])
pred_seq = predict(model, sent2id_new, test_lens)
print(pred_seq)
y_label = pred_seq.numpy().reshape(1, -1)[0]
#print(y_label)
y_ner = [idx2label[i] for i in y_label][0:len(sent_chars)]
#print(sent2id)
print(y_ner)
# 对预测结果进行命名实体解析和提取
def get_valid_nertag(input_data, result_tags):
result_words = []
start, end =0, 1 # 实体开始结束位置标识
tag_label = "O" # 实体类型标识
for i, tag in enumerate(result_tags):
if tag.startswith("B"):
if tag_label != "O": # 当前实体tag之前有其他实体
result_words.append((input_data[start: end], tag_label)) # 获取实体
tag_label = tag.split("-")[1] # 获取当前实体类型
start, end = i, i+1 # 开始和结束位置变更
elif tag.startswith("I"):
temp_label = tag.split("-")[1]
if temp_label == tag_label: # 当前实体tag是之前实体的一部分
end += 1 # 结束位置end扩展
elif tag == "O":
if tag_label != "O": # 当前位置非实体 但是之前有实体
result_words.append((input_data[start: end], tag_label)) # 获取实体
tag_label = "O" # 实体类型置"O"
start, end = i, i+1 # 开始和结束位置变更
if tag_label != "O": # 最后结尾还有实体
result_words.append((input_data[start: end], tag_label)) # 获取结尾的实体
return result_words
result_words = get_valid_nertag(sent_chars, y_ner)
for (word, tag) in result_words:
print("".join(word), tag)
数据:
代码参考:https://www.cnblogs.com/huanghaocs/p/14673020.html
预测背景说明:https://tianchi.aliyun.com/compe ... 03.9.493e2448u7nhbg
|
|