Source code for bigdl.models.lenet.utils
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bigdl.dataset import mnist
from bigdl.dataset.transformer import *
from bigdl.optim.optimizer import *
[docs]def get_mnist(sc, data_type="train", location="/tmp/mnist"):
"""
Get mnist dataset and parallelize into RDDs.
Data would be downloaded automatically if it doesn't present at the specific location.
:param sc: SparkContext.
:param data_type: "train" for training data and "test" for testing data.
:param location: Location to store mnist dataset.
:return: RDD of (features: ndarray, label: ndarray).
"""
(images, labels) = mnist.read_data_sets(location, data_type)
images = sc.parallelize(images)
labels = sc.parallelize(labels + 1) # Target start from 1 in BigDL
record = images.zip(labels)
return record
[docs]def preprocess_mnist(sc, options):
"""
Preprocess mnist dataset.
Normalize and transform into Sample of RDDs.
"""
train_data = get_mnist(sc, "train", options.dataPath)\
.map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TRAIN_MEAN, mnist.TRAIN_STD),
rec_tuple[1]))\
.map(lambda t: Sample.from_ndarray(t[0], t[1]))
test_data = get_mnist(sc, "test", options.dataPath)\
.map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TEST_MEAN, mnist.TEST_STD),
rec_tuple[1]))\
.map(lambda t: Sample.from_ndarray(t[0], t[1]))
return train_data, test_data
[docs]def get_end_trigger(options):
"""
When to end the optimization based on input option.
"""
if options.endTriggerType.lower() == "epoch":
return MaxEpoch(options.endTriggerNum)
else:
return MaxIteration(options.endTriggerNum)
[docs]def validate_optimizer(optimizer, test_data, options):
"""
Set validation and checkpoint for distributed optimizer.
"""
optimizer.set_validation(
batch_size=options.batchSize,
val_rdd=test_data,
trigger=EveryEpoch(),
val_method=[Top1Accuracy()]
)
optimizer.set_checkpoint(EveryEpoch(), options.checkpointPath)