Source code for bigdl.models.rnn.rnnexample

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import itertools
import re
from optparse import OptionParser

from bigdl.dataset import base
from bigdl.dataset import sentence
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.util.common import Sample

[docs]def download_data(dest_dir): TINYSHAKESPEARE_URL = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt' # noqa file_name = "input.txt" file_abs_path = base.maybe_download(file_name, dest_dir, TINYSHAKESPEARE_URL) return file_abs_path
[docs]def prepare_data(sc, folder, vocabsize, training_split): if not folder.startswith( 'hdfs://' ): file = download_data(folder) else: file = folder sentences_rdd = sc.textFile(file) \ .map(lambda line: sentence.sentences_split(line)) pad_sent = sentences_rdd.flatMap(lambda x: x). \ map(lambda sent: sentence.sentences_bipadding(sent)) tokens = pad_sent.map(lambda pad: sentence.sentence_tokenizer(pad)) train_tokens, val_tokens = tokens.randomSplit([training_split, 1 - training_split]) train_tokens.cache() val_tokens.cache() train_max_len = train_tokens.map(lambda x: len(x)).max() print("max length %s" % train_max_len) words = train_tokens.flatMap(lambda x: x) print("%s words and %s sentences processed in train data" % (words.count(), train_tokens.count())) val_max_len = val_tokens.map(lambda x: len(x)).max() print("val max length %s" % val_max_len) val_words = val_tokens.flatMap(lambda x: x) print("%s words and %s sentences processed in validation data" % (val_words.count(), val_tokens.count())) sort_words = words.map(lambda w: (w, 1)) \ .reduceByKey(lambda a, b: a + b) \ .sortBy(lambda w_c: w_c[1]) vocabulary = np.array(sort_words.map(lambda w: w[0]).collect()) fre_len = vocabulary.size if vocabsize > fre_len: length = fre_len else: length = vocabsize discard_vocab = vocabulary[: fre_len-length] used_vocab = vocabulary[fre_len-length: fre_len] used_vocab_size = used_vocab.size index = np.arange(used_vocab_size) index2word = dict(enumerate(used_vocab)) word2index = dict(zip(used_vocab, index)) total_vocab_len = used_vocab_size + 1 startIdx = word2index.get("SENTENCESTART") endIdx = word2index.get("SENTENCEEND") def text2labeled(sent): indexes = [word2index.get(x, used_vocab_size) for x in sent] data = indexes[0: -1] label = indexes[1: len(indexes)] return data, label def labeled2onehotformat(labeled_sent): label = [x+1 for x in labeled_sent[1]] size = len(labeled_sent[0]) feature_onehot = np.zeros(size * total_vocab_len, dtype='int').reshape( [size, total_vocab_len]) for i, el in enumerate(labeled_sent[0]): feature_onehot[i, el] = 1 return feature_onehot, label def padding(features, label, length): pad_len = length - len(label) padded_label = (label + [startIdx] * length)[:length] feature_padding = np.zeros((pad_len, total_vocab_len), dtype=np.int) feature_padding[:, endIdx + 1] = np.ones(pad_len) padded_feautres = np.concatenate((features, feature_padding), axis=0) return padded_feautres, padded_label sample_rdd = train_tokens.map(lambda sentence_te: text2labeled(sentence_te)) \ .map(lambda labeled_sent: labeled2onehotformat(labeled_sent)) \ .map(lambda x: padding(x[0], x[1], train_max_len)) \ .map(lambda vectors_label: Sample.from_ndarray(vectors_label[0], np.array(vectors_label[1]))).cache() val_sample_rdd = val_tokens.map(lambda sentence_t: text2labeled(sentence_t)) \ .map(lambda labeled_sent: labeled2onehotformat(labeled_sent)) \ .map(lambda x: padding(x[0], x[1], val_max_len)) \ .map(lambda vectors_label: Sample.from_ndarray(vectors_label[0], np.array(vectors_label[1]))).cache() return sample_rdd, val_sample_rdd, total_vocab_len
[docs]def build_model(input_size, hidden_size, output_size): model = Sequential() model.add(Recurrent() .add(RnnCell(input_size, hidden_size, Tanh())))\ .add(TimeDistributed(Linear(hidden_size, output_size))) model.reset() return model
if __name__ == "__main__": parser = OptionParser() parser.add_option("-f", "--folder", dest="folder", default="/tmp/rnn") parser.add_option("-b", "--batchSize", dest="batchSize", default="12") parser.add_option("--learningRate", dest="learningrate", default="0.1") parser.add_option("--momentum", dest="momentum", default="0.0") parser.add_option("--weightDecay", dest="weight_decay", default="0.0") parser.add_option("--dampening", dest="dampening", default="0.0") parser.add_option("--hiddenSize", dest="hidden_size", default="40") parser.add_option("--vocabSize", dest="vob_size", default="4000") parser.add_option("--maxEpoch", dest="max_epoch", default="30") (options, args) = parser.parse_args(sys.argv) batch_size = int(options.batchSize) learningrate = float(options.learningrate) momentum = float(options.momentum) weight_decay = float(options.weight_decay) dampening = float(options.dampening) hidden_size = int(options.hidden_size) vob_size = int(options.vob_size) max_epoch = int(options.max_epoch) folder = options.folder training_split = 0.8 sc = SparkContext(appName="simplernn_example", conf=create_spark_conf()) redire_spark_logs() show_bigdl_info_logs() init_engine() (train_rdd, val_rdd, vob_size) = prepare_data(sc, folder, vob_size, training_split) optimizer = Optimizer( model=build_model(vob_size, hidden_size, vob_size), training_rdd=train_rdd, criterion=TimeDistributedCriterion(CrossEntropyCriterion(), size_average=True), batch_size=batch_size, optim_method=SGD(learningrate=learningrate, weightdecay=weight_decay, momentum=momentum, dampening=dampening), end_trigger=MaxEpoch(max_epoch) ) optimizer.set_validation( batch_size=batch_size, val_rdd=val_rdd, trigger=EveryEpoch(), val_method=[Loss(TimeDistributedCriterion(CrossEntropyCriterion(), size_average=True))] ) train_model = optimizer.optimize() sc.stop()