# Copyright 2020-present Tae Hwan Jung
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
_KB = 1024
"""The size of a Kilobyte in bytes"""
_MB = 1024 * _KB
"""The size of a Megabyte in bytes"""
import io
import ast
import h5py
import json
import numpy as np
import tensorflow as tf
from tensorflow.python.keras.optimizer_v2.optimizer_v2 import OptimizerV2
from matorage.optimizer.manager import Manager
[docs]class OptimizerManager(Manager):
"""
Optimizer Manager Tensorflow classes. This class overrides ``Manager``.
Note:
Unlike Dataset, optimizer weight is loaded entirely into cpu memory.
Therefore, the `HDF5_CORE` driver using the memory option is default setting.
Args:
config (:obj:`matorage.OptimizerConfig`, **require**):
A OptimizerConfig instance object
num_worker_threads (:obj:`int`, optional, defaults to `4`):
Number of backend storage worker to upload or download.
multipart_upload_size (:obj:`integer`, optional, defaults to `5 * 1024 * 1024`):
size of the incompletely uploaded object.
You can sync files faster with `multipart upload in MinIO. <https://github.com/minio/minio-py/blob/master/minio/api.py#L1795>`_
This is because MinIO clients use multi-threading, which improves IO speed more
efficiently regardless of Python's Global Interpreter Lock(GIL).
Examples::
from matorage.tensorflow import OptimizerManager
optimizer_config = OptimizerConfig(
endpoint='127.0.0.1:9000',
access_key='minio',
secret_key='miniosecretkey',
optimizer_name='testoptimizer',
additional={
"version" : "1.0.1"
}
)
optimizer_manager = OptimizerManager(config=optimizer_config)
optimizer_manager.save(model.optimizer, step=100)
"""
def __init__(self, config, num_worker_threads=4, multipart_upload_size=5 * _MB):
super(OptimizerManager, self).__init__(
config, num_worker_threads, multipart_upload_size
)
def _get_step(self, optimizer):
if len(optimizer.weights) > 0:
return optimizer.weights[0].numpy()
else:
return None
def _set_metadata(self, metadata, optimizer, step):
assert isinstance(optimizer, OptimizerV2)
metadata["optimizer"].update(
{
str(step): {
"framework": "tensorflow",
"config": str(optimizer.get_config()),
"param_groups": [],
}
}
)
def _set_scheduler(self, metadata, scheduler, step):
raise NotImplementedError()
def _save_optimizer(self, step, optimizer):
assert isinstance(optimizer, OptimizerV2)
assert isinstance(self.config.metadata, dict)
assert str(step) in self.config.metadata["optimizer"]
for i in range(1, len(optimizer.weights)):
param_name = optimizer.weights[i].name
param_value = optimizer.weights[i].numpy()
self._save_param(step, group="", name=param_name, weight=param_value)
self.config.metadata["optimizer"][str(step)]["param_groups"].append(
param_name
)
def _load_optimizer(self, step, layers, optimizer):
assert isinstance(optimizer, OptimizerV2)
assert isinstance(self.config.metadata, dict)
step = str(step)
if step not in self.config.metadata["optimizer"]:
raise KeyError(
"Available only in {}".format(
list(self.config.metadata["optimizer"].keys())
)
)
_ordered_weight = {}
for layer in layers:
name = layer.object_name
layer_image = self._client.get_object(
bucket_name=self.config.bucket_name, object_name=name
).read()
layer_image = h5py.File(io.BytesIO(layer_image), "r")
name = name[name.find("/") + 1 :]
_ordered_weight[name] = layer_image[self.type][:]
new_weight = [np.array(int(step))]
for param_name in self.config.metadata["optimizer"][str(step)]["param_groups"]:
new_weight.append(_ordered_weight[param_name])
_metadata_config = ast.literal_eval(
self.config.metadata["optimizer"][str(step)]["config"]
)
optimizer.from_config(_metadata_config)
optimizer.set_weights(new_weight)
[docs] def save(self, optimizer):
"""
save weight of optimizer
Args:
optimizer (:obj:`tf.keras.optimizers.Optimizer`, **require**):
Tensorflow optimizer.
Examples::
>>> model = Model()
# model training...
>>> optimizer_manager.save(model.optimizer)
"""
super(OptimizerManager, self).save(optimizer)
[docs] def load(self, optimizer, step):
"""
load weight of optimizer
Args:
optimizer (:obj:`tf.keras.optimizers.Optimizer`, **require**):
Tensorflow optimizer.
step (:obj:`integer`, **require**):
optimizer step.
Examples::
>>> model = Model()
>>> optimizer_manager.load(model.optimizer, step=938)
"""
super(OptimizerManager, self).load(optimizer, step)