Source code for bigdl.models.utils.model_broadcast
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import gc
from tempfile import NamedTemporaryFile
from pyspark.broadcast import Broadcast
from pyspark.broadcast import _from_id
from bigdl.nn.layer import Model
def _from_id_and_type(bid, bigdl_type):
result = _from_id(bid)
return ModelBroadcast(path=result._path, bigdl_type=bigdl_type)
[docs]def broadcast_model(sc, layer):
return ModelBroadcast(sc, layer, sc._pickled_broadcast_vars)
[docs]class ModelBroadcast(Broadcast):
def __init__(self, sc=None, layer=None, pickle_registry=None, path=None, bigdl_type="float"):
"""
Should not be called directly by users -- use L{SparkContext.broadcast()}
instead.
"""
if layer is not None:
self.bigdl_type = layer.bigdl_type
else:
self.bigdl_type = bigdl_type
super(ModelBroadcast, self).__init__(sc, layer, pickle_registry, path)
[docs] def dump(self, value, f):
try:
value.saveModel(f.name, over_write=True)
except Exception as e:
msg = "Could not serialize broadcast: %s" % e.__class__.__name__
if not self.sc.version.startswith("2.1"):
from pyspark.cloudpickle import print_exec
else:
from pyspark.util import print_exec
print_exec(sys.stderr)
raise ValueError(msg)
f.close()
return f.name
def _load(self, path):
return Model.loadModel(path, bigdl_type=self.bigdl_type)
@property
def value(self):
""" Return the broadcasted value
"""
if not hasattr(self, "_value") and self._path is not None:
self._value = self._load(self._path)
return self._value
def __reduce__(self):
if self._jbroadcast is None:
raise Exception("Broadcast can only be serialized in driver")
self._pickle_registry.add(self)
return _from_id_and_type, (self._jbroadcast.id(), self.bigdl_type)