使用Tensorflow实现文本分类模型

文本分类模型

Posted by Wwt on August 24, 2019

在很早的一篇博文中在Keras模型中使用预训练的词向量,我们介绍了如何使用Keras实现一个卷积神经网络的文本分类模型。基于CNN的文本分类模型简直是新手学习一个深度学习框架的绝佳练手项目,所以,这篇博文我们将介绍一个基于tensorflow的卷积神经网络的文本分类模型,记录一下自己在学习tensorflow时候的爬坑过程。

模型简介

模型架构如图所示:

1

从左向右看,第一层是词嵌入层,主要将输入文本词映射到一个低维空间中;第二层是卷积层,通过不同宽度的卷积核在词矩阵上进行卷积操作,,例如卷积核的大小为3,4,5,以此捕获n-gram语义信息;第三层是池化层,我们对卷积后的向量进行最大池化操作并将向量拼接在一起,捕获句子中最明显的特征;最后,我们将向量送入全连接层中,进行最终的分类。

我们的模型大致就是按照上述架构进行搭建的,使用预训练的词向量(glove.6B.100),你也可以使用其它流行的预训练词向量,如elmo或者基于BERT的预训练词向量。

卷积神经网络的文本分类模型是基于tensorflow实现的,主要分下三个部分:数据预处理,模型模块,训练模块。

数据预处理

$\color{red}{炼丹的道路千万条,数据预处理第一条}$。我们首先来看如何对数据进行处理

第一步是加载数据,加载完数据,我们对数据进行构建一个字典,为后续处理提供基础:

def char_mapping(sentences,lower):
    #获取词的列表
	chars = [[x.lower() if lower() else x for x in s.split()] for s in sentence]
    dico = cretae_dico(chars)
    char_to_id, id_to_char = create_mapping(dico)
    print("Found %i unique words (%i in total)" %(len(dico),sum(len(x) for x in chats)

在词典和预训练词向量之间建立起映射,获得预训练的词向量矩阵:

def load_word2vec(emb_path, id_to_word, word_dim):
    """
    Load word embedding from pre-trained file
    embedding size must match
    """
    new_weights = np.zeors(len(id_to_word),word_dim)
    print('Loading pretrained embeddings from {}...'.format(emb_path))
    pre_trained = {}
    emb_invalid = 0
    for i, line in enumerate(codecs.open(emb_path, 'r', 'utf-8')):
        line = line.rstrip().split()
        if len(line) == word_dim + 1:
            pre_trained[line[0]] = np.array(
                [float(x) for x in line[1:]]
            ).astype(np.float32)
        else:
            emb_invalid += 1
    if emb_invalid > 0:
        print('WARNING: %i invalid lines' % emb_invalid)
    c_found = 0
    c_lower = 0
    c_zeros = 0
    n_words = len(id_to_word)
    # Lookup table initialization
    for i in range(n_words):
        word = id_to_word[i]
        if(i==0):
            new_wwights[i+1] = np.random.uniform(-0.25,0.25,word_dim)
        if word in pre_trained:
            new_weights[i] = pre_trained[word]
            c_found += 1
        elif word.lower() in pre_trained:
            new_weights[i] = pre_trained[word.lower()]
            c_lower += 1
        elif re.sub('\d', '0', word.lower()) in pre_trained:    #replace numbers to zero
            new_weights[i] = pre_trained[
                re.sub('\d', '0', word.lower())
            ]
            c_zeros += 1
    print('Loaded %i pretrained embeddings.' % len(pre_trained))
    print('%i / %i (%.4f%%) words have been initialized with '
          'pretrained embeddings.' % (
        c_found + c_lower + c_zeros, n_words,
        100. * (c_found + c_lower + c_zeros) / n_words)
    )
    print('%i found directly, %i after lowercasing, '
          '%i after lowercasing + zero.' % (
        c_found, c_lower, c_zeros
    ))
    return new_weights

然后是文本数据映射成id、数据补齐以及迭代生成批次数据送入到模型中:

def pad_data(data,length):
	chars = []
    max_length = length
    for line in data:
        if(len(line)<max_length):
            padding = [0]*(max_length-len(line))
            chars.append(line+padding)
         else:
            chars.append(line[0:length])
   	return chars
  
def batch_iter(data, batch_size, num_epochs, shuffle=True):
    """
    生成一个批次的数据.
    """
    data = np.array(data)
    data_size = len(data)
    num_batches_per_epoch = int((len(data)-1)/batch_size) + 1
    for epoch in range(num_epochs):
        # 将每一轮的数据打乱
        if shuffle:
            shuffle_indices = np.random.permutation(np.arange(data_size))
            shuffled_data = data[shuffle_indices]
        else:
            shuffled_data = data
        for batch_num in range(num_batches_per_epoch):
            start_index = batch_num * batch_size
            end_index = min((batch_num + 1) * batch_size, data_size)
            yield shuffled_data[start_index:end_index]

模型搭建

class TextCNN(object):
    """
    文本分类模型
    词嵌入层,卷积层,池化层,分类层
    """
    def __init__(
      self, embeddings,sequence_length, num_classes, vocab_size,
      embedding_size, filter_sizes, num_filters, l2_reg_lambda=0.0):

        # 占位符,输入,输出,dropout
        self.input_x = tf.placeholder(tf.int32, [None, sequence_length], name="input_x")
        self.input_y = tf.placeholder(tf.float32, [None, num_classes], name="input_y")
        self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")

        # Keeping track of l2 regularization loss (optional)
        l2_loss = tf.constant(0.0)

        # 词向量层 随机初始化
        '''
         with tf.device('/cpu:0'), tf.name_scope("embedding"):
            self.W = tf.Variable(
                tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0),
                name="W")
            self.embedded_chars = tf.nn.embedding_lookup(self.W, self.input_x)
            self.embedded_chars_expanded = tf.expand_dims(self.embedded_chars, -1)
        '''
        #使用预训练的词向量
        with tf.device('/cpu:0'), tf.name_scope("embedding"):
            self.W = tf.Variable(
                shape = [vocab_size, embedding_size],
                initializer = tf.constant_initializer(embeddings),
                name="W"
              #查表
            self.embedded_chars = tf.nn.embedding_lookup(self.W, self.input_x)
      	pooled_outputs = []
       	for kernel_size in filter_sizes:
            with.tf.variable_scope("CNN-max-pooling-%s" % kernel_size):
                conv = tf.layer.conv1d(self.embedd_chars,num_filters,kernel_size,name='conv')
                gmp = tf.reduce_max(conv,reduction_indices=[1],name='gmp')
                pooled_outputs.append(gmp)           
      	self.h_pool = tf.concat(pooled_outputs,1)
       
        num_total_filters = len(filter_size)*num_filters
       	with tf.name_scope("score"):
            # 全连接层,后面接dropout以及relu激活
            fc = tf.layers.dense(self.h_pool, num_total_filters, name='fc1')
            fc = tf.contrib.layers.dropout(fc, self.keep_prob)
            fc = tf.nn.relu(fc)

            # 分类器
            self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')
            self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1)  # 预测类别

        with tf.name_scope("optimize"):
            # 损失函数,交叉熵
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)
            self.loss = tf.reduce_mean(cross_entropy)
            # 优化器
            self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)

        with tf.name_scope("accuracy"):
            # 准确率
            correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls)
            self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

在模型搭建过程中,文本经过词嵌入层获得每个词的embedding维度,即[bacth_size,seq_length,embedding_dim],如果要想像图像一样在两个维度上进行卷积操作,我们需要对维度进行转换,即再增加一维。这里,我们只使用一维卷积,然后使用最大池化,获取全局最大的特征。

模型训练

# Parameters
# ==================================================
def preprocess():
    # Data Preparation
    # ==================================================
		‘’‘
    # Load data
    print("Loading data...")
    x_text, y = data_helpers.load_data_and_labels(FLAGS.positive_data_file, FLAGS.negative_data_file)
   
  
def train(x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights):
    # 训练
    # ==================================================
    with tf.Graph().as_default():
        session_conf = tf.ConfigProto(
          allow_soft_placement=FLAGS.allow_soft_placement,
          log_device_placement=FLAGS.log_device_placement)
        sess = tf.Session(config=session_conf)
        with sess.as_default():
            cnn = TextCNN(
              embeddings = pre_weights,
                sequence_length=x_train.shape[1],
                num_classes=y_train.shape[1],
                vocab_size=len(vocab_processor.vocabulary_),
                embedding_size=FLAGS.embedding_dim,
                filter_sizes=list(map(int, FLAGS.filter_sizes.split(","))),
                num_filters=FLAGS.num_filters,
                l2_reg_lambda=FLAGS.l2_reg_lambda)

            # 训练步数
            global_step = tf.Variable(0, name="global_step", trainable=False)
            optimizer = tf.train.AdamOptimizer(1e-3)
            grads_and_vars = optimizer.compute_gradients(cnn.loss)
            train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)

            # 模型的保存目录
            timestamp = str(int(time.time()))
            out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp))
            print("Writing to {}\n".format(out_dir))

            # 损失和准确率的标量
            loss_summary = tf.summary.scalar("loss", cnn.loss)
            acc_summary = tf.summary.scalar("accuracy", cnn.accuracy)

            # Train Summaries
            train_summary_op = tf.summary.merge([loss_summary, acc_summary, grad_summaries_merged])
            train_summary_dir = os.path.join(out_dir, "summaries", "train")
            train_summary_writer = tf.summary.FileWriter(train_summary_dir, sess.graph)

            # Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it
            checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
            checkpoint_prefix = os.path.join(checkpoint_dir, "model")
            if not os.path.exists(checkpoint_dir):
                os.makedirs(checkpoint_dir)
            saver = tf.train.Saver(tf.global_variables(), max_to_keep=FLAGS.num_checkpoints)

            # 字典保存
            #vocab_processor.save(os.path.join(out_dir, "vocab"))

            # Initialize all variables
            sess.run(tf.global_variables_initializer())

            def train_step(x_batch, y_batch):
                """
                训练过程的一步
                """
                feed_dict = {
                  cnn.input_x: x_batch,
                  cnn.input_y: y_batch,
                  cnn.dropout_keep_prob: FLAGS.dropout_keep_prob
                }
                 #传入变量,得到变量的返回值
                _, step, summaries, loss, accuracy = sess.run(
                    [train_op, global_step, train_summary_op, cnn.loss, cnn.accuracy],
                    feed_dict)
                time_str = datetime.datetime.now().isoformat()
                print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
                train_summary_writer.add_summary(summaries, step)

            def dev_step(x_batch, y_batch, writer=None):
                """
                Evaluates model on a dev set
                """
                feed_dict = {
                  cnn.input_x: x_batch,
                  cnn.input_y: y_batch,
                  cnn.dropout_keep_prob: 1.0
                }
               
                step, summaries, loss, accuracy = sess.run(
                    [global_step, dev_summary_op, cnn.loss, cnn.accuracy],
                    feed_dict)
                time_str = datetime.datetime.now().isoformat()
                print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
                if writer:
                    writer.add_summary(summaries, step)

            # Generate batches
            batches = data_helpers.batch_iter(
                list(zip(x_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs)
            # Training loop. For each batch...
            for batch in batches:
                x_batch, y_batch = zip(*batch)
                train_step(x_batch, y_batch)
                current_step = tf.train.global_step(sess, global_step)
                if current_step % FLAGS.evaluate_every == 0:
                    print("\nEvaluation:")
                    dev_step(x_dev, y_dev, writer=dev_summary_writer)
                    print("")
                if current_step % FLAGS.checkpoint_every == 0:
                    path = saver.save(sess, checkpoint_prefix, global_step=current_step)
                    print("Saved model checkpoint to {}\n".format(path))

def main(argv=None):
    #x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights = preprocess()
    train(x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights)

if __name__ == '__main__':
    tf.app.run()

tensorflow的训练过程,没有同keras一样简单容易操作,写起来相当复杂,对初学者不友好。我们要将数据处理好,一批一批的送入到定义好的图中,让图运行起来。需要注意的是,tensorflow中步数(global step)是一个计数器,每训练一个batch_size的数据,步数就加一,这样训练一轮(epoch)会有多个步数,是通过计算得来的。另外,函数sess.run()中不仅传入数据,还传入运行过程中需要返回查看的一些值,如accuracy,loss等。

至此,基于tensorflow的cnn文本分类模型介绍结束。模型不难,初学者容易看懂,通过这个学会tensorflow的基本操作,基础的炼丹应该不难了,在学一些热腾腾的模型,以后可以快乐的炼丹了。

现在BERT横扫各大比赛榜单,各种模型都预先使用BERT来提取特征,不会使用BERT,都不好意思说自己是搞深度学习的。所以,这里我稍微介绍一下如何Tensorflow来加载BERT模型。

#初始化BERT
model = modeling.BertModel(
   config=bert_config,
   is_training=False,
   input_ids=input_ids,
   input_mask=input_mask,
   token_type_ids=segment_ids,
   use_one_hot_embeddings=False)
#加载BERT
tvars = tf.trainable_variables()
(assignment, initialized_variable_names) = modeling.get_assignment_map_from_checkpoint(tvars, init_check_point)
tf.train.init_from_checkpoint(init_checkpoint, assignment)
#获得BERT的输出[b,s,e]
encoder_last_layer = model.get_sequence_output()

获得BERT的输出后,就可以愉快的将BERT输出送入到后续网络层中,进行各种风骚操作了。

参考

cnn-text-classification-tf

BERT的简单使用