Source code for matorage.data.config

# Copyright 2020-present Tae Hwan Jung
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

_KB = 1024
"""The size of a Kilobyte in bytes"""

_MB = 1024 * _KB
"""The size of a Megabyte in bytes"""

import copy
import json
import tables
import hashlib
import tempfile
from minio import Minio
from functools import reduce

from matorage.nas import NAS
from matorage.utils import check_nas, logger
from matorage.config import StorageConfig
from matorage.data.metadata import DataMetadata
from matorage.data.attribute import DataAttribute


[docs]class DataConfig(StorageConfig): """ Dataset configuration classes. This class overrides ``StorageConfig``. Args: endpoint (:obj:`string`, **require**): S3 object storage endpoint. or If use NAS setting, NAS folder path. access_key (:obj:`string`, optional, defaults to `None`): Access key for the object storage endpoint. (Optional if you need anonymous access). secret_key (:obj:`string`, optional, defaults to `None`): Secret key for the object storage endpoint. (Optional if you need anonymous access). secure (:obj:`boolean`, optional, defaults to `False`): Set this value to True to enable secure (HTTPS) access. (Optional defaults to False unlike the original MinIO). max_object_size (:obj:`integer`, optional, defaults to `10MB`): One object file is divided into `max_object_size` and stored. dataset_name (:obj:`string`, **require**): dataset name. attributes (:obj:`list`, **require**): DataAttribute type of list for data attributes additional (:obj:`dict`, optional, defaults to ``{}``): Parameters for additional description of datasets. The key and value of the dictionay can be specified very freely. compressor (:obj:`dict`, optional, defaults to :code:`{"complevel" : 0, "complib" : "zlib"}`): Data compressor option. It consists of a dict type that has complevel and complib as keys. For further reference, read `pytable's Filter <http://www.pytables.org/usersguide/libref/helper_classes.html#tables.Filters>`_. - complevel (:obj:`integer`, defaults to 0) : compressor level(0~9). The larger the number, the more compressed it is. - complib (:obj:`string`, defaults to 'zlib') : compressor library. choose in zlib, lzo, bzip2, blosc max_object_size (:obj:`integer`, optional, defaults to `10MB`): One object file is divided into `max_object_size` and stored. Examples:: from matorage import DataConfig, DataAttribute data_config = DataConfig( endpoint='127.0.0.1:9000', access_key='minio', secret_key='miniosecretkey', dataset_name='mnist', additional={ "framework" : "pytorch", "mode" : "training" }, compressor={ "complevel" : 0, "complib" : "zlib" }, attributes=[ ('image', 'float32', (28, 28)), ('target', 'int64', (1, )) ] ) data_config.to_json_file('data_config.json') data_config2 = DataConfig.from_json_file('data_config.json') If you have NAS(network access storage) settings, You can save/load faster by using the endpoint as a NAS folder path. Examples:: from matorage import DataConfig # NAS example data_config = DataConfig( endpoint='~/shared', dataset_name='mnist', additional={ "framework" : "pytorch", "mode" : "training" }, compressor={ "complevel" : 0, "complib" : "zlib" }, attributes=[ ('image', 'float32', (28, 28)), ('target', 'int64', (1, )) ] ) """ def __init__(self, **kwargs): super(DataConfig, self).__init__(**kwargs) self.type = "dataset" self.dataset_name = kwargs.pop("dataset_name", None) self.additional = kwargs.pop("additional", {}) self.attributes = kwargs.pop("attributes", None) self.compressor = kwargs.pop("compressor", {"complevel": 0, "complib": "zlib"}) self.max_object_size = kwargs.pop("max_object_size", 10 * _MB) self.bucket_name = self._hashmap_transfer() self._check_all() self.metadata = DataMetadata(**self.__dict__) def _check_all(self): """ Check all class variable is fine. """ self._check_bucket() if self.attributes is None: raise ValueError("attributes is empty") if isinstance(self.attributes, tuple): self.attributes = DataAttribute( name=self.attributes[0], type=self.attributes[1], shape=self.attributes[2], ) if isinstance(self.attributes, DataAttribute): self.attributes = [self.attributes] for i, attr in enumerate(self.attributes): if isinstance(attr, tuple): self.attributes[i] = DataAttribute(attr[0], attr[1], attr[2]) attribute_names = set() for attribute in self.attributes: assert isinstance(attribute.type, tables.atom.Atom) if attribute.name in attribute_names: raise KeyError( "{} is already exist in {}".format(attribute.name, attribute_names) ) else: attribute_names.add(attribute.name) # To convert `self.attributes`'s shape to be flatten self.flatten_attributes = copy.deepcopy(self.attributes) self._convert_type_flatten() if self.compressor["complevel"] < 0 or 9 < self.compressor["complevel"]: raise ValueError( "Compressor level is {} must be 0-9 interger".format( self.compressor["level"] ) ) if self.compressor["complib"] not in ("zlib", "lzo", "bzip2", "blosc"): raise ValueError( "compressor mode {} is not valid. select in " "zlib, lzo, bzip2, blosc".format(self.compressor["lib"]) ) def _check_bucket(self): """ Check bucket name is exist. If not exist, create new bucket If bucket and metadata sub folder exist, get metadata(attributes, compressor) from there. """ aws_region_list = [ 'us-east-1', 'us-east-2', 'us-west-1', 'us-west-2', 'eu-west-1', 'eu-west-2', 'ca-central-1', 'eu-central-1', 'sa-east-1', 'cn-north-1', 'ap-southeast-1', 'ap-southeast-2', 'ap-northeast-1', 'ap-northeast-2', ] if "amazonaws.com" in self.endpoint: if (self.region not in aws_region_list): raise AssertionError( 'AWS endpoint should has region argument from {}'.format( aws_region_list ) ) if f"s3.{self.region}.amazonaws.com" not in self.endpoint: raise AssertionError('Endpoint has to be {}'.format( f"s3.{self.region}.amazonaws.com" ) ) _client = ( Minio( self.endpoint, access_key=self.access_key, secret_key=self.secret_key, secure=self.secure, region=self.region, ) if not check_nas(self.endpoint) else NAS(self.endpoint) ) if _client.bucket_exists(self.bucket_name): objects = _client.list_objects(self.bucket_name, prefix="metadata/") _metadata = None for obj in objects: _metadata = _client.get_object(self.bucket_name, obj.object_name) break if not _metadata: return metadata_dict = json.loads(_metadata.read().decode("utf-8")) if self.endpoint != metadata_dict["endpoint"]: raise ValueError( "Already created endpoint({}) doesn't current endpoint str({})" " It may occurs permission denied error".format( metadata_dict["endpoint"], self.endpoint ) ) self.compressor = metadata_dict["compressor"] self.attributes = [ DataAttribute(**item) for item in metadata_dict["attributes"] ] else: logger.info( "{} {} is not exist!".format(self.dataset_name, str(self.additional)) ) def _convert_type_flatten(self): for attribute in self.flatten_attributes: attribute.shape = (reduce(lambda x, y: x * y, attribute.shape),) def _hashmap_transfer(self): """ Get unikey bucket name with `dataset_name` and `additional` Returns: :obj: `str`: """ if not isinstance(self.dataset_name, str): raise ValueError( "dataset_name {} is empty or not str type".format(self.dataset_name) ) if not isinstance(self.additional, dict): raise TypeError("additional is not dict type") key = self.dataset_name + json.dumps(self.additional, indent=4, sort_keys=True) return self.type + hashlib.md5(key.encode("utf-8")).hexdigest()
[docs] def to_dict(self): """ Serializes this instance to a Python dictionary. Returns: :obj:`Dict[str, any]`: Dictionary of all the attributes that make up this configuration instance, """ output = copy.deepcopy(self.__class__.__base__(**self.__dict__).__dict__) output["dataset_name"] = self.metadata.dataset_name output["additional"] = self.metadata.additional output["attributes"] = [_attribute.to_dict() for _attribute in self.attributes] output["compressor"] = self.metadata.compressor return output
[docs] @classmethod def from_json_file(cls, json_file): """ Constructs a `Config` from the path to a json file of parameters. Args: json_file (:obj:`string`): Path to the JSON file containing the parameters. Returns: :class:`DataConfig`: An instance of a configuration object """ config_dict = cls._dict_from_json_file(json_file) config_dict["attributes"] = [ DataAttribute(**item) for item in config_dict["attributes"] ] return cls(**config_dict)
def set_indexer(self, index): self.metadata.indexer.update(index) def set_files(self, files): self.metadata.filetype.append(files) @property def get_length(self): """ Get length of dataset in ``DataConfig`` Returns: :obj:`integer`: length of dataset """ keys = list(self.metadata.indexer.keys()) if len(keys) == 0: return 0 else: return keys[-1]