#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import glob
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import ListConverter, JavaArray, JavaList, JavaMap
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql import DataFrame, SQLContext
from pyspark.mllib.common import callJavaFunc
from pyspark import SparkConf
import numpy as np
import threading
from bigdl.util.engine import prepare_env
INTMAX = 2147483647
INTMIN = -2147483648
DOUBLEMAX = 1.7976931348623157E308
if sys.version >= '3':
long = int
unicode = str
[docs]class SingletonMixin(object):
_lock = threading.RLock()
_instance = None
[docs] @classmethod
def instance(cls,
bigdl_type="float"):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = cls(bigdl_type)
return cls._instance
[docs]class JavaCreator(SingletonMixin):
__creator_class="com.intel.analytics.bigdl.python.api.PythonBigDL"
[docs] @classmethod
def get_creator_class(cls):
with JavaCreator._lock:
return JavaCreator.__creator_class
[docs] @classmethod
def set_creator_class(cls, cclass):
with JavaCreator._lock:
JavaCreator.__creator_class = cclass
JavaCreator._instance = None
def __init__(self, bigdl_type):
sc = get_spark_context()
jclass = getattr(sc._jvm, JavaCreator.get_creator_class())
if bigdl_type == "float":
self.value = getattr(jclass, "ofFloat")()
elif bigdl_type == "double":
self.value = getattr(jclass, "ofDouble")()
else:
raise Exception("Not supported bigdl_type: %s" % bigdl_type)
[docs]class JavaValue(object):
[docs] def jvm_class_constructor(self):
name = "create" + self.__class__.__name__
print("creating: " + name)
return name
def __init__(self, jvalue, bigdl_type, *args):
self.value = jvalue if jvalue else callBigDlFunc(
bigdl_type, self.jvm_class_constructor(), *args)
self.bigdl_type = bigdl_type
def __str__(self):
return self.value.toString()
[docs]class TestResult():
"""
A testing result used to benchmark the model quality.
"""
def __init__(self, result, total_num, method):
"""
:param result: the validation result. i.e: top1 accuracy percentage.
:param total_num: the total processed records.
:param method: the validation method. i.e: Top1Accuracy
"""
self.result = result
self.total_num = total_num
self.method = method
def __reduce__(self):
return (TestResult, (self.result, self.total_num, self.method))
def __str__(self):
return "Test result: %s, total_num: %s, method: %s" % (
self.result, self.total_num, self.method)
[docs]def get_dtype(bigdl_type):
# Always return float32 for now
return "float32"
[docs]class JTensor(object):
"""
A wrapper to easy our work when need to pass or return Tensor to/from Scala.
>>> import numpy as np
>>> from bigdl.util.common import JTensor
>>> np.random.seed(123)
>>>
"""
def __init__(self, storage, shape, bigdl_type="float"):
if isinstance(storage, bytes) and isinstance(shape, bytes):
self.storage = np.frombuffer(storage, dtype=get_dtype(bigdl_type))
self.shape = np.frombuffer(shape, dtype=np.int32)
else:
self.storage = np.array(storage, dtype=get_dtype(bigdl_type))
self.shape = np.array(shape, dtype=np.int32)
self.bigdl_type = bigdl_type
[docs] @classmethod
def from_ndarray(cls, a_ndarray, bigdl_type="float"):
"""
Convert a ndarray to Tensor which would be used in Java side.
>>> import numpy as np
>>> from bigdl.util.common import JTensor
>>> from bigdl.util.common import callBigDlFunc
>>> np.random.seed(123)
>>> data = np.random.uniform(0, 1, (2, 3)).astype("float32")
>>> result = JTensor.from_ndarray(data)
>>> data_back = result.to_ndarray()
>>> (data == data_back).all()
True
>>> tensor1 = callBigDlFunc("float", "testTensor", JTensor.from_ndarray(data)) # noqa
>>> array_from_tensor = tensor1.to_ndarray()
>>> (array_from_tensor == data).all()
True
"""
if a_ndarray is None:
return None
assert isinstance(a_ndarray, np.ndarray), \
"input should be a np.ndarray, not %s" % type(a_ndarray)
return cls(a_ndarray,
a_ndarray.shape if a_ndarray.shape else (a_ndarray.size),
bigdl_type= bigdl_type)
[docs] def to_ndarray(self):
return np.array(self.storage, dtype=get_dtype(self.bigdl_type)).reshape(self.shape) # noqa
def __reduce__(self):
return JTensor, (self.storage.tostring(), self.shape.tostring(), self.bigdl_type)
def __str__(self):
return "JTensor: storage: %s, shape: %s" % (self.storage, self.shape)
def __repr__(self):
return "JTensor: storage: %s, shape: %s" % (self.storage, self.shape)
[docs]class Sample(object):
def __init__(self, features, label, bigdl_type="float"):
"""
User should always use Sample.from_ndarray to construct Sample.
:param features: a JTensor
:param label: a JTensor
:param bigdl_type: "double" or "float"
"""
self.features = features
self.label = label
self.bigdl_type = bigdl_type
[docs] @classmethod
def from_ndarray(cls, features, label, bigdl_type="float"):
"""
Convert a ndarray of features and label to Sample, which would be used in Java side.
>>> import numpy as np
>>> from bigdl.util.common import callBigDlFunc
>>> from bigdl.util.common import Sample
>>> from numpy.testing import assert_allclose
>>> sample = Sample.from_ndarray(np.random.random((2,3)), np.random.random((2,3)))
>>> sample_back = callBigDlFunc("float", "testSample", sample)
>>> assert_allclose(sample.features.to_ndarray(), sample_back.features.to_ndarray())
>>> assert_allclose(sample.label.to_ndarray(), sample_back.label.to_ndarray())
"""
assert isinstance(features, np.ndarray), \
"features should be a np.ndarray, not %s" % type(features)
if not isinstance(label, np.ndarray): # in case label is a scalar.
label = np.array(label)
return cls(
features=JTensor.from_ndarray(features),
label=JTensor.from_ndarray(label),
bigdl_type=bigdl_type)
def __reduce__(self):
return Sample, (self.features, self.label, self.bigdl_type)
def __str__(self):
return "Sample: features: %s, label: %s," % (self.features, self.label)
def __repr__(self):
return "Sample: features: %s, label: %s" % (self.storage, self.shape)
[docs]class RNG():
"""
generate tensor data with seed
"""
def __init__(self, bigdl_type="float"):
self.bigdl_type = bigdl_type
[docs] def set_seed(self, seed):
callBigDlFunc(self.bigdl_type, "setModelSeed", seed)
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
'Sample',
'TestResult',
'JTensor'
]
[docs]def init_engine(bigdl_type="float"):
callBigDlFunc(bigdl_type, "initEngine")
[docs]def get_bigdl_conf():
bigdl_conf_file = "spark-bigdl.conf"
bigdl_python_wrapper = "python-api.zip"
def load_conf(conf_str):
return dict(line.split() for line in conf_str.split("\n") if
"#" not in line and line.strip())
for p in sys.path:
if bigdl_conf_file in p and os.path.isfile(p):
with open(p) if sys.version_info < (3,) else open(p, encoding='latin-1') as conf_file: # noqa
return load_conf(conf_file.read())
if bigdl_python_wrapper in p and os.path.isfile(p):
import zipfile
with zipfile.ZipFile(p, 'r') as zip_conf:
content = zip_conf.read(bigdl_conf_file)
if sys.version_info >= (3,):
content = str(content, 'latin-1')
return load_conf(content)
raise Exception("Cannot find spark-bigdl.conf.Pls add it to PYTHONPATH.")
[docs]def to_list(a):
if type(a) is list:
return a
return [a]
[docs]def create_spark_conf():
bigdl_conf = get_bigdl_conf()
sparkConf = SparkConf()
sparkConf.setAll(bigdl_conf.items())
return sparkConf
[docs]def get_spark_context(conf = None):
"""
Get the current active spark context and create one if no active instance
:param conf: combining bigdl configs into spark conf
:return: SparkContext
"""
with SparkContext._lock: # Compatible with Spark1.5.1
if SparkContext._active_spark_context is None:
SparkContext(conf=conf or create_spark_conf())
return SparkContext._active_spark_context
[docs]def get_spark_sql_context(sc):
if "getOrCreate" in SQLContext.__dict__:
return SQLContext.getOrCreate()
else:
return SQLContext(sc) # Compatible with Spark1.5.1
[docs]def callBigDlFunc(bigdl_type, name, *args):
""" Call API in PythonBigDL """
jinstance = JavaCreator.instance(bigdl_type=bigdl_type).value
sc = get_spark_context()
api = getattr(jinstance, name)
return callJavaFunc(sc, api, *args)
def _java2py(sc, r, encoding="bytes"):
if isinstance(r, JavaObject):
clsName = r.getClass().getSimpleName()
# convert RDD into JavaRDD
if clsName != 'JavaRDD' and clsName.endswith("RDD"):
r = r.toJavaRDD()
clsName = 'JavaRDD'
if clsName == 'JavaRDD':
jrdd = sc._jvm.SerDe.javaToPython(r)
return RDD(jrdd, sc)
if clsName == 'DataFrame':
return DataFrame(r, get_spark_sql_context(sc))
if clsName in _picklable_classes:
r = sc._jvm.org.apache.spark.bigdl.api.python.BigDLSerDe.dumps(r)
elif isinstance(r, (JavaArray, JavaList, JavaMap)):
try:
r = sc._jvm.org.apache.spark.bigdl.api.python.BigDLSerDe.dumps(
r)
except Py4JJavaError:
pass # not pickable
if isinstance(r, (bytearray, bytes)):
r = PickleSerializer().loads(bytes(r), encoding=encoding)
return r
[docs]def callJavaFunc(sc, func, *args):
""" Call Java Function """
args = [_py2java(sc, a) for a in args]
result = func(*args)
return _java2py(sc, result)
def _to_java_object_rdd(rdd):
""" Return a JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever
the RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return \
rdd.ctx._jvm.org.apache.spark.bigdl.api.python.BigDLSerDe.pythonToJava(
rdd._jrdd, True)
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, DataFrame):
obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, (list, tuple)):
obj = ListConverter().convert([_py2java(sc, x) for x in obj],
sc._gateway._gateway_client)
elif isinstance(obj, dict):
result = {}
for (key, value) in obj.items():
result[key] = _py2java(sc, value) if isinstance(value, JavaValue) else value # noqa
obj = result
elif isinstance(obj, JavaValue):
obj = obj.value
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
pass
else:
data = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.org.apache.spark.bigdl.api.python.BigDLSerDe.loads(data)
return obj
def _test():
import doctest
from pyspark import SparkContext
from bigdl.nn import layer
globs = layer.__dict__.copy()
sc = SparkContext(master="local[2]", appName="test common utility")
globs['sc'] = sc
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()