|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 糖逗 于 2021-12-28 19:52 编辑
一、数据预处理
- char_vocab_path = "E:/.../1.NLP/zh-nlp-demo-master/data/char_vocabs.txt" # 字典文件
- train_data_path = "E:/.../1.NLP/地址识别项目/data/train.conll" # 训练数据
- test_data_path = "E:/.../1.NLP/地址识别项目/data/dev.conll" # 测试数据
- special_words = ['<PAD>', '<UNK>'] # 特殊词表示
- '''
- <UNK>: 低频词或未在词表中的词
- <PAD>: 补全字符
- <GO>/<SOS>: 句子起始标识符
- <EOS>: 句子结束标识符
- [SEP]:两个句子之间的分隔符
- [MASK]:填充被掩盖掉的字符
- '''
- # "BIO"标记的标签
- import pandas as pd
- store = pd.read_table(r"E:\工作\7.理论学习\1.NLP\地址识别项目\data\mytag.dic", header = None)
- store.loc[0, 0] = "O"
- store.loc[24, 0] = "B-prov"
- store1 = store.to_dict()
- idx2label = store1[0]
- # 索引和BIO标签对应
- label2idx = {idx: label for label, idx in idx2label.items()}
- print(label2idx)
- # 读取字符词典文件
- with open(char_vocab_path, "r", encoding="utf8") as fo:
- char_vocabs = [line.strip() for line in fo]
- char_vocabs = special_words + char_vocabs
- # 字符和索引编号对应
- idx2vocab = {idx: char for idx, char in enumerate(char_vocabs)}
- vocab2idx = {char: idx for idx, char in idx2vocab.items()}
复制代码
二、数据统计描述
- import pandas as pd
- temp = pd.read_table(r"E:\...\1.NLP\地址识别项目\data\train.txt", header = None)
- temp["长度"] = temp.loc[:, 0].apply(lambda x: len(str(x)))
- print(temp.head())
- print(max(temp["长度"]))
- import matplotlib.pyplot as plt
- import numpy as np
- %matplotlib inline
- plt.hist(np.array(temp["长度"]), bins=10, rwidth=0.9, density=True)
复制代码
三、模型定义
- import tensorflow as tf
- import tensorflow_addons as tfa
- print(tf.__version__)
- print(tfa.__version__)
- from tensorflow import keras
- from tensorflow.keras import layers, models
- from tensorflow.keras import backend as K
- class CRF(layers.Layer):
- def __init__(self, label_size):
- super(CRF, self).__init__()
- self.trans_params = tf.Variable(
- tf.random.uniform(shape=(label_size, label_size)), name="transition")
-
- @tf.function
- def call(self, inputs, labels, seq_lens):
- log_likelihood, self.trans_params = tfa.text.crf_log_likelihood(
- inputs, labels, seq_lens,
- transition_params=self.trans_params)
- loss = tf.reduce_sum(-log_likelihood)
- return loss
- from transformers import TFBertForTokenClassification
- EPOCHS = 20
- BATCH_SIZE = 64
- EMBED_DIM = 128
- HIDDEN_SIZE = 64
- MAX_LEN = 55
- VOCAB_SIZE = len(vocab2idx)
- CLASS_NUMS = len(label2idx)
- inputs = layers.Input(shape=(MAX_LEN,), dtype='int32')
- targets = layers.Input(shape=(MAX_LEN,),dtype='int32')
- seq_lens = layers.Input(shape=(), dtype='int32')
- PRETRAINED_MODEL_NAME = r"D:\bert_model\bert-base-chinese" # 指定为中文
- #x = TFBertForTokenClassification.from_pretrained(PRETRAINED_MODEL_NAME, num_labels = 100)(inputs)
- x = layers.Embedding(input_dim=VOCAB_SIZE, output_dim=EMBED_DIM, mask_zero=True)(inputs)
- x = layers.Bidirectional(layers.LSTM(HIDDEN_SIZE, return_sequences=True))(x)
- print(x.shape)
- logits = layers.Dense(CLASS_NUMS)(x)
- loss = CRF(label_size=CLASS_NUMS)(logits, targets, seq_lens)
- model = models.Model(inputs=[inputs, targets, seq_lens], outputs=loss)
- print(model.summary())
- model.compile(loss=lambda y_true, y_pred: y_pred, optimizer='adam')#, metrics=[metric])
复制代码
四、数据处理
- from tensorflow.keras.preprocessing import sequence
- import numpy as np
- # 读取训练语料
- def read_corpus(corpus_path, vocab2idx, label2idx):
- datas, labels = [], []
- with open(corpus_path, encoding='utf-8') as fr:
- lines = fr.readlines()
- sent_, tag_ = [], []
- for line in lines:
- if line != '\n':
- char, label = line.strip().split()
- sent_.append(char)
- tag_.append(label)
- else:
- sent_ids = [vocab2idx[char] if char in vocab2idx else vocab2idx['<UNK>'] for char in sent_]
- tag_ids = [label2idx[label] if label in label2idx else 0 for label in tag_]
- datas.append(sent_ids)
- labels.append(tag_ids)
- sent_, tag_ = [], []
- return datas, labels
- # 加载训练集
- train_datas, train_labels = read_corpus(train_data_path, vocab2idx, label2idx)
- # 加载测试集
- test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)
- train_datas = sequence.pad_sequences(train_datas, maxlen=MAX_LEN, padding='post')
- train_labels = sequence.pad_sequences(train_labels, maxlen=MAX_LEN, padding='post')
- train_seq_lens = np.array([MAX_LEN] * len(train_labels))
- labels = np.ones(len(train_labels))
- # train_labels = keras.utils.to_categorical(train_labels, CLASS_NUMS)
- test_datas = sequence.pad_sequences(test_datas, maxlen=MAX_LEN, padding = "post")
- test_labels = sequence.pad_sequences(test_labels, maxlen=MAX_LEN, padding = "post")
- test_seq_lens = np.array([MAX_LEN] * len(test_labels))
- print(np.shape(train_datas), np.shape(train_labels))
复制代码
五、模型训练
- # 训练
- import os
- #os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- history = model.fit(x=[train_datas, train_labels, train_seq_lens], y=labels,validation_split=0.1, batch_size=BATCH_SIZE, epochs=20)#.history
- #acc = model.history['sparse_categorical_accuracy']
- #val_acc = model.history['val_sparse_categorical_accuracy']
- loss = history['loss']
- val_loss = history['val_loss']
- print('loss:',loss)
- print('val_loss:',val_loss)
- import matplotlib.pyplot as plt
- from matplotlib.ticker import MaxNLocator
- plt.plot(list(range(1, 21)) , history["loss"])
- plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
复制代码
六、模型效果查看
- trans_params = model.get_layer('crf').get_weights()[0]
- # 获得BiLSTM的输出logits
- sub_model = models.Model(inputs=model.get_layer('input_1').input,
- outputs=model.get_layer('dense').output)
- def predict(model, inputs, input_lens):
- logits = sub_model.predict(inputs)
- # 获取CRF层的转移矩阵
- # crf_decode:viterbi解码获得结果
- pred_seq, viterbi_score = tfa.text.crf_decode(logits, trans_params, input_lens)
- return pred_seq
- test_datas, test_labels = read_corpus(test_data_path, vocab2idx, label2idx)
- maxlen = 55
- sentence = "北京市西城区阜成门外大街0号万通金融中心0-0层"
- sent_chars = list(sentence)
- sent2id = [vocab2idx[word] if word in vocab2idx else vocab2idx['<UNK>'] for word in sent_chars]
- sent2id_new = np.array([sent2id[:maxlen] + [0] * (maxlen-len(sent2id))])
- test_lens = np.array([55])
- pred_seq = predict(model, sent2id_new, test_lens)
- print(pred_seq)
- y_label = pred_seq.numpy().reshape(1, -1)[0]
- #print(y_label)
- y_ner = [idx2label[i] for i in y_label][0:len(sent_chars)]
- #print(sent2id)
- print(y_ner)
- # 对预测结果进行命名实体解析和提取
- def get_valid_nertag(input_data, result_tags):
- result_words = []
- start, end =0, 1 # 实体开始结束位置标识
- tag_label = "O" # 实体类型标识
- for i, tag in enumerate(result_tags):
- if tag.startswith("B"):
- if tag_label != "O": # 当前实体tag之前有其他实体
- result_words.append((input_data[start: end], tag_label)) # 获取实体
- tag_label = tag.split("-")[1] # 获取当前实体类型
- start, end = i, i+1 # 开始和结束位置变更
- elif tag.startswith("I"):
- temp_label = tag.split("-")[1]
- if temp_label == tag_label: # 当前实体tag是之前实体的一部分
- end += 1 # 结束位置end扩展
- elif tag == "O":
- if tag_label != "O": # 当前位置非实体 但是之前有实体
- result_words.append((input_data[start: end], tag_label)) # 获取实体
- tag_label = "O" # 实体类型置"O"
- start, end = i, i+1 # 开始和结束位置变更
- if tag_label != "O": # 最后结尾还有实体
- result_words.append((input_data[start: end], tag_label)) # 获取结尾的实体
- return result_words
- result_words = get_valid_nertag(sent_chars, y_ner)
- for (word, tag) in result_words:
- print("".join(word), tag)
复制代码
数据:
代码参考:https://www.cnblogs.com/huanghaocs/p/14673020.html
预测背景说明:https://tianchi.aliyun.com/compe ... 03.9.493e2448u7nhbg
|
|