Source code for bigdl.models.textclassifier.textclassifier

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import itertools
import re
from optparse import OptionParser

from bigdl.dataset import news20
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.util.common import Sample
import datetime as dt


[docs]def text_to_words(review_text): letters_only = re.sub("[^a-zA-Z]", " ", review_text) words = letters_only.lower().split() return words
[docs]def analyze_texts(data_rdd): def index(w_c_i): ((w, c), i) = w_c_i return (w, (i + 1, c)) return data_rdd.flatMap(lambda text_label: text_to_words(text_label[0])) \ .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) \ .sortBy(lambda w_c: - w_c[1]).zipWithIndex() \ .map(lambda w_c_i: index(w_c_i)).collect()
# pad([1, 2, 3, 4, 5], 0, 6)
[docs]def pad(l, fill_value, width): if len(l) >= width: return l[0: width] else: l.extend([fill_value] * (width - len(l))) return l
[docs]def to_vec(token, b_w2v, embedding_dim): if token in b_w2v: return b_w2v[token] else: return pad([], 0, embedding_dim)
[docs]def to_sample(vectors, label, embedding_dim): # flatten nested list flatten_features = list(itertools.chain(*vectors)) features = np.array(flatten_features, dtype='float').reshape( [sequence_len, embedding_dim]) return Sample.from_ndarray(features, np.array(label))
[docs]def build_model(class_num): model = Sequential() if model_type.lower() == "cnn": model.add(TemporalConvolution(embedding_dim, 256, 5)) \ .add(ReLU()) \ .add(TemporalMaxPooling(sequence_len - 5 + 1)) \ .add(Squeeze(2)) elif model_type.lower() == "lstm": model.add(Recurrent() .add(LSTM(embedding_dim, 256, p))) model.add(Select(2, -1)) elif model_type.lower() == "gru": model.add(Recurrent() .add(GRU(embedding_dim, 256, p))) model.add(Select(2, -1)) model.add(Linear(256, 128)) \ .add(Dropout(0.2)) \ .add(ReLU()) \ .add(Linear(128, class_num)) \ .add(LogSoftMax()) return model
[docs]def train(sc, data_path, batch_size, sequence_len, max_words, embedding_dim, training_split): print('Processing text dataset') texts = news20.get_news20(source_dir=data_path) data_rdd = sc.parallelize(texts, 2) word_to_ic = analyze_texts(data_rdd) # Only take the top wc between [10, sequence_len] word_to_ic = dict(word_to_ic[10: max_words]) bword_to_ic = sc.broadcast(word_to_ic) w2v = news20.get_glove_w2v(dim=embedding_dim) filtered_w2v = dict((w, v) for w, v in w2v.items() if w in word_to_ic) bfiltered_w2v = sc.broadcast(filtered_w2v) tokens_rdd = data_rdd.map(lambda text_label: ([w for w in text_to_words(text_label[0]) if w in bword_to_ic.value], text_label[1])) padded_tokens_rdd = tokens_rdd.map( lambda tokens_label: (pad(tokens_label[0], "##", sequence_len), tokens_label[1])) vector_rdd = padded_tokens_rdd.map(lambda tokens_label: ([to_vec(w, bfiltered_w2v.value, embedding_dim) for w in tokens_label[0]], tokens_label[1])) sample_rdd = vector_rdd.map( lambda vectors_label: to_sample(vectors_label[0], vectors_label[1], embedding_dim)) train_rdd, val_rdd = sample_rdd.randomSplit( [training_split, 1-training_split]) optimizer = Optimizer( model=build_model(news20.CLASS_NUM), training_rdd=train_rdd, criterion=ClassNLLCriterion(), end_trigger=MaxEpoch(max_epoch), batch_size=batch_size, optim_method=Adagrad(learningrate=learning_rate, learningrate_decay=0.001)) optimizer.set_validation( batch_size=batch_size, val_rdd=val_rdd, trigger=EveryEpoch(), val_method=[Top1Accuracy()] ) logdir = '/tmp/.bigdl/' app_name = 'adam-' + dt.datetime.now().strftime("%Y%m%d-%H%M%S") train_summary = TrainSummary(log_dir=logdir, app_name=app_name) train_summary.set_summary_trigger("Parameters", SeveralIteration(50)) val_summary = ValidationSummary(log_dir=logdir, app_name=app_name) optimizer.set_train_summary(train_summary) optimizer.set_val_summary(val_summary) train_model = optimizer.optimize()
if __name__ == "__main__": parser = OptionParser() parser.add_option("-a", "--action", dest="action", default="train") parser.add_option("-l", "--learning_rate", dest="learning_rate", default="0.05") parser.add_option("-b", "--batchSize", dest="batchSize", default="128") parser.add_option("-e", "--embedding_dim", dest="embedding_dim", default="300") # noqa parser.add_option("-m", "--max_epoch", dest="max_epoch", default="30") parser.add_option("--model", dest="model_type", default="cnn") parser.add_option("-p", "--p", dest="p", default="0.0") parser.add_option("-d", "--data_path", dest="data_path", default="/tmp/news20/") (options, args) = parser.parse_args(sys.argv) if options.action == "train": batch_size = int(options.batchSize) embedding_dim = int(options.embedding_dim) learning_rate = float(options.learning_rate) max_epoch = int(options.max_epoch) p = float(options.p) model_type = options.model_type sequence_len = 500 max_words = 5000 training_split = 0.8 sc = SparkContext(appName="text_classifier", conf=create_spark_conf()) data_path = options.data_path redire_spark_logs() show_bigdl_info_logs() init_engine() train(sc, data_path, batch_size, sequence_len, max_words, embedding_dim, training_split) sc.stop() elif options.action == "test": pass