""" Contains basic Batch classes """
# pylint: disable=ungrouped-imports
import os
import traceback
import threading
import warnings
import functools
import dill
try:
import blosc
except ImportError:
pass
import numpy as np
try:
import pandas as pd
except ImportError:
from . import _fake as pd
try:
import feather
except ImportError:
pass
try:
import dask.dataframe as dd
except ImportError:
from . import _fake as dd
from .dsindex import DatasetIndex, FilesIndex
# renaming apply_parallel decorator is needed as Batch.apply_parallel method is also in the same namespace
# and can serve as a decorator too
from .decorators import action, inbatch_parallel, any_action_failed, apply_parallel as apply_parallel_
from .components import create_item_class, BaseComponents
from .named_expr import P, R
from .utils_random import make_rng
class MethodsTransformingMeta(type):
""" A metaclass to transform all class methods in the way described below:
1. Methods decorated with `@apply_parallel` are wrapped with `apply_parallel` method.
2. Add the original version of the method (i.e. unwrapped) to a class
namespace using name with underscores: `'_{}_'.format(name)`. This
is necessary in order to allow inner calls of untransformed versions
(e.g. `ImagesBatch.scale` calls `ImagesBatch.crop` under the hood).
"""
def __new__(cls, name, bases, namespace):
namespace_ = namespace.copy()
for object_name, object_ in namespace.items():
transform_kwargs = getattr(object_, 'apply_kwargs', None)
if transform_kwargs is not None:
namespace_[object_name] = cls.use_apply_parallel(object_, **transform_kwargs)
disclaimer = f"This is an untransformed version of `{object_.__qualname__}`.\n\n"
object_.__doc__ = disclaimer + (object_.__doc__ or '')
object_.__name__ = '_' + object_name + '_'
object_.__qualname__ = '.'.join(object_.__qualname__.split('.')[:-1] + [object_.__name__])
namespace_[object_.__name__] = object_
return super().__new__(cls, name, bases, namespace_)
@classmethod
def use_apply_parallel(cls, method, **apply_kwargs):
""" Wrap passed `method` in accordance with `all` arg value """
@functools.wraps(method)
def apply_parallel_wrapper(self, *args, **kwargs):
transform = self.apply_parallel
method_ = method.__get__(self, type(self)) # bound method to class
kwargs_full = {**self.apply_defaults, **apply_kwargs, **kwargs}
return transform(method_, *args, **kwargs_full)
return action(apply_parallel_wrapper)
[docs]class Batch(metaclass=MethodsTransformingMeta):
""" The core Batch class
Note, that if any method is wrapped with `@apply_parallel` decorator
than for inner calls (i.e. from other methods) should be used version
of desired method with underscores. (For example, if there is a decorated
`method` than you need to call `_method_` from inside of `other_method`).
Same is applicable for all child classes of :class:`batch.Batch`.
"""
components = None
# Class-specific defaults for :meth:`.Batch.apply_parallel`
apply_defaults = dict(target='threads',
post='_assemble',
src=None,
dst=None,
)
def __init__(self, index, dataset=None, pipeline=None, preloaded=None, copy=False, *args, **kwargs):
_ = args
if self.components is not None and not isinstance(self.components, tuple):
raise TypeError("components should be a tuple of strings with components names")
self.index = index
self._preloaded_lock = threading.Lock()
self._preloaded = preloaded
self._copy = copy
self._local = threading.local()
self._data_named = None
self._data = None
self._dataset = dataset
self.pipeline = pipeline
self.iteration = None
self._attrs = None
self.create_attrs(**kwargs)
[docs] def create_attrs(self, **kwargs):
""" Create attributes from kwargs """
self._attrs = list(kwargs.keys())
for attr, value in kwargs.items():
setattr(self, attr, value)
[docs] def get_attrs(self):
""" Return additional attrs as kwargs """
if self._attrs is None:
return {}
return {attr: getattr(self, attr, None) for attr in self._attrs}
@property
def data(self):
""": tuple or named components - batch data """
try:
if self._data is None and self._preloaded is not None:
# load data the first time it's requested
with self._preloaded_lock:
if self._data is None and self._preloaded is not None:
self.load(src=self._preloaded)
res = self._data if self.components is None else self._data_named
except Exception as exc:
print("Exception:", exc)
traceback.print_tb(exc.__traceback__)
raise
return res
@data.setter
def data_setter(self, value):
""": tuple or named components - batch data """
self._data = value
@property
def dataset(self):
""": Dataset - a dataset the batch has been taken from """
if self.pipeline is not None:
return self.pipeline.dataset
return self._dataset
@property
def pipeline(self):
""": Pipeline - a pipeline the batch is being used in """
return self._local.pipeline
@pipeline.setter
def pipeline(self, value):
""" Store the pipeline in a thread-local storage """
self._local.pipeline = value
@property
def random(self):
""" A random number generator :class:`numpy.random.Generator`.
Use it instead of `np.random` for reproducibility.
Examples
--------
::
x = self.random.normal(0, 1)
"""
# if RNG is set for the batch (e.g. in @inbatch_parallel), use it
if hasattr(self._local, 'random'):
return self._local.random
# otherwise use RNG from the pipeline
if self.pipeline is not None and self.pipeline.random is not None:
return self.pipeline.random
# if there is none (e.g. when the batch is created manually), make a random one
self._local.random = make_rng(self.random_seed)
return self._local.random
@property
def random_seed(self):
""" : SeedSequence for random number generation """
# if RNG is set for the batch (e.g. in @inbatch_parallel), use it
if hasattr(self._local, 'random_seed'):
return self._local.random_seed
if self.pipeline is not None and self.pipeline.random_seed is not None:
return self.pipeline.random_seed
# if there is none (e.g. when the batch is created manually), make a random seed
self._local.random_seed = np.random.SeedSequence()
return self._local.random_seed
@random_seed.setter
def random_seed(self, value):
""" : SeedSequence for random number generation """
self._local.random_seed = value
self._local.random = make_rng(value)
def __copy__(self):
dump_batch = dill.dumps(self)
restored_batch = dill.loads(dump_batch)
return restored_batch
[docs] def deepcopy(self):
""" Return a deep copy of the batch. """
return self.__copy__()
[docs] @classmethod
def from_data(cls, index=None, data=None):
""" Create a batch from data given """
# this is roughly equivalent to self.data = data
if index is None:
index = np.arange(len(data))
return cls(index, preloaded=data)
[docs] @classmethod
def merge(cls, batches, batch_size=None, components=None, batch_class=None):
""" Merge several batches to form a new batch of a given size
Parameters
----------
batches : tuple of batches
batch_size : int or None
if `None`, just merge all batches into one batch (the rest will be `None`),
if `int`, then make one batch of `batch_size` and a batch with the rest of data.
components : str, tuple or None
if `None`, all components from initial batches will be created,
if `str` or `tuple`, then create these components in new batches.
batch_class : Batch or None
if `None`, created batches will be of the same class as initial batch,
if `Batch`, created batches will be of that class.
Returns
-------
batch, rest : tuple of two batches
Raises
------
ValueError
If component is `None` in some batches and not `None` in others.
"""
batch_class = batch_class or cls
def _make_index(data):
return DatasetIndex(data.shape[0]) if data is not None and data.shape[0] > 0 else None
def _make_batch(data):
index = _make_index(data[0])
batch = batch_class.from_data(index, tuple(data)) if index is not None else None
if batch is not None:
batch.components = tuple(components)
_ = batch.data
return batch
if batch_size is not None:
break_point = len(batches) - 1
last_batch_len = 0
cur_size = 0
for i, b in enumerate(batches):
cur_batch_len = len(b)
if cur_size + cur_batch_len >= batch_size:
break_point = i
last_batch_len = batch_size - cur_size
break
cur_size += cur_batch_len
last_batch_len = cur_batch_len
if components is None:
components = batches[0].components or (None,)
elif isinstance(components, str):
components = (components, )
new_data = list(None for _ in components)
rest_data = list(None for _ in components)
for i, comp in enumerate(components):
none_components_in_batches = [b.get(component=comp) is None for b in batches]
if np.all(none_components_in_batches):
continue
if np.any(none_components_in_batches):
raise ValueError(f'Component {comp} is None in some batches')
if batch_size is None:
new_comp = [b.get(component=comp) for b in batches]
else:
last_batch = batches[break_point]
new_comp = [b.get(component=comp) for b in batches[:break_point]] + \
[last_batch.get(component=comp)[:last_batch_len]]
new_data[i] = cls.merge_component(comp, new_comp)
if batch_size is not None:
rest_comp = [last_batch.get(component=comp)[last_batch_len:]] + \
[b.get(component=comp) for b in batches[break_point + 1:]]
rest_data[i] = cls.merge_component(comp, rest_comp)
new_batch = _make_batch(new_data)
rest_batch = _make_batch(rest_data)
return new_batch, rest_batch
[docs] @classmethod
def merge_component(cls, component=None, data=None):
""" Merge the same component data from several batches """
_ = component
if isinstance(data[0], np.ndarray):
return np.concatenate(data)
raise TypeError("Unknown data type", type(data[0]))
[docs] def as_dataset(self, dataset=None, copy=False):
""" Makes a new dataset from batch data
Parameters
----------
dataset
an instance or a subclass of Dataset
copy : bool
whether to copy batch data to allow for further inplace transformations
Returns
-------
an instance of a class specified by `dataset` arg, preloaded with this batch data
"""
dataset = dataset or self._dataset
if dataset is None:
raise ValueError('dataset can be an instance of Dataset (sub)class or the class itself, but not None')
if isinstance(dataset, type):
dataset_class = dataset
attrs = {}
else:
dataset_class = dataset.__class__
attrs = dataset.get_attrs()
return dataset_class(self.index, batch_class=type(self), preloaded=self._data, copy=copy, **attrs)
@property
def indices(self):
""": numpy array - an array with the indices """
if isinstance(self.index, DatasetIndex):
return self.index.indices
return self.index
def __len__(self):
return len(self.index)
@property
def size(self):
""": int - number of items in the batch """
return len(self)
[docs] @action
def add_components(self, components, init=None):
""" Add new components
Parameters
----------
components : str or list
new component names
init : array-like
initial component data
Raises
------
ValueError
If a component or an attribute with the given name already exists
"""
if isinstance(components, str):
components = (components,)
init = (init,)
elif isinstance(components, (tuple, list)):
components = tuple(components)
if init is None:
init = (None,) * len(components)
else:
init = tuple(init)
for comp, value in zip(components, init):
if hasattr(self, comp):
raise ValueError(f"An attribute '{comp}' already exists")
if self.components is not None and comp in self.components:
raise ValueError(f"A components '{comp}' already exists")
if self.components is None:
self.components = tuple([comp])
if self._data is not None:
warnings.warn("All batch data is erased")
else:
self.components = self.components + tuple([comp])
setattr(self, comp, value)
return self
def __getattr__(self, name):
if self.components is not None and name in self.components: # pylint: disable=unsupported-membership-test
return getattr(self.data, name, None)
raise AttributeError(f"{name} not found in class {self.__class__.__name__}")
def __setattr__(self, name, value):
if self.components is not None:
if name == "_data":
super().__setattr__(name, value)
if self._data is not None:
if isinstance(self._data, BaseComponents):
self._data_named = self._data
else:
self._data_named = create_item_class(self.components, self._data)
return
if name in self.components: # pylint: disable=unsupported-membership-test
# preload data if needed
_ = self.data
if self._data_named is None or self._data_named.components != self.components:
self._data_named = create_item_class(self.components, self._data)
setattr(self._data_named, name, value)
# update _data with with new component values
super().__setattr__('_data', self._data_named.data)
return
super().__setattr__(name, value)
def __getstate__(self):
state = self.__dict__.copy()
state['_local'] = state['_local'] is not None
state['_preloaded_lock'] = True
return state
def __setstate__(self, state):
state['_preloaded_lock'] = threading.Lock() if state['_preloaded_lock'] else None
state['_local'] = threading.local() if state['_local'] else None
for k, v in state.items():
# this warrants that all hidden objects are reconstructed upon unpickling
setattr(self, k, v)
@property
def array_of_nones(self):
"""1-D ndarray: ``NumPy`` array with ``None`` values."""
return np.array([None] * len(self.index))
[docs] def get(self, item=None, component=None):
""" Return an item from the batch or the component """
if item is None:
if component is None:
res = self.data
else:
res = getattr(self, component)
else:
if component is None:
res = self[item]
else:
res = getattr(self[item], component)
return res
def __getitem__(self, item):
return self.data[item] if self.data is not None else None
def __iter__(self):
for item in self.indices:
yield self[item]
@property
def items(self):
""": list - batch items """
return [[self[ix]] for ix in self.indices]
[docs] def run_once(self, *args, **kwargs):
""" Init function for no parallelism
Useful for async action-methods (will wait till the method finishes)
"""
_ = self.data, args, kwargs
return [[]]
[docs] def get_errors(self, all_res):
""" Return a list of errors from a parallel action """
all_errors = [error for error in all_res if isinstance(error, Exception)]
return all_errors if len(all_errors) > 0 else None
[docs] @action
def do_nothing(self, *args, **kwargs):
""" An empty action (might be convenient in complicated pipelines) """
_ = args, kwargs
return self
[docs] @action
def apply_parallel(self, func, init=None, post=None, src=None, dst=None, *args,
p=None, target='for', requires_rng=False, rng_seeds=None, **kwargs):
""" Apply a function to each item in the container, returned by `init`, and assemble results by `post`.
Depending on the `target` parameter, different parallelization engines may be used: for, threads, MPC, async.
Roughly, under the hood we perform the following:
- compute parameters, individual for each worker. Currently, these are:
- `p` to indicate whether the function should be applied
- worker id and a seed for random generator, if required
- call `init` function, which outputs a container of items, passed directly to the `func`.
The simplest example is the `init` funciton that returns batch indices, and the function works off of each.
- wrap the `func` call into parallelization engine of choice.
- compute results of `func` calls for each item, returned by `init`
- assemble results by `post` function, e.g. stack the obtained numpy arrays.
In the simplest possible case of `init=None`, `src=images`, `dst=images_transformed`, `post=None`,
this function is almost equivalent to:
container = [func(item, *args, **kwargs) for item in self.images]
self.images_transformed = container
If `src` is a list and `dst` is a list, then this function is applied recursively to each pair of src, dst.
If `src` is a tuple, then this tuple is used as a whole.
This allows to make functions that work on multiple components.
Parameters
----------
func : callable
A function to apply to each item from the source.
Should accept `src` and `dst` parameters, or be written in a way that accepts variable args.
target : str
Parallelization engine:
- 'f', 'for' for executing each worker sequentially, like in a for-loop.
- 't', 'threads' for using threads.
- 'm', 'mpc' for using processes. Note the bigger overhead for process initialization.
- 'a', 'async' for asynchronous execution.
init : str, callable or container
Function to init data for individual workers: must return a container of items.
If 'data', then use `src` components as the init.
If other str, then must be a name of the attribute of the batch to use as the init.
If callable or any previous returned a callable, then result of this callable is used as the init.
Note that in the last case callable should accept `src` and `dst` parameters, and `kwargs` are also passed.
If not any of the above, then the object is used directly, for example, np.ndarray.
post : str or callable
Function to apply to the results of function evaluation on each item.
Must accept `src` and `dst` parameters, as well as `kwargs`.
src : str, sequence, list of str
The source to get data from:
- None
- str - a component name, e.g. 'images' or 'masks'
- tuple or list of str - several component names
- sequence - data as a numpy-array, data frame, etc
dst : str or array
the destination to put the result in, can be:
- None - in this case dst is set to be same as src
- str - a component name, e.g. 'images' or 'masks'
- tuple or list of str, e.g. ['images', 'masks']
If not provided, uses `src`.
p : float or None
Probability of applying func to an element in the batch.
requires_rng : bool
Whether the `func` requires RNG. Should be used for correctly initialized seeds for reproducibility.
If True, then a pre-initialized RNG will be passed to the function call as `rng` keyword parameter.
args, kwargs
Other parameters passed to ``func``.
Notes
-----
apply_parallel does the following (but in parallel)::
for item in range(len(batch)):
self.dst[item] = func(self.src[item], *args, **kwargs)
`apply_parallel(func, src=['images', 'masks'])` is equal to
`apply_parallel(func, src=['images', 'masks'], dst=['images', 'masks'])`,
which in turn equals to two subsequent calls::
images = func(images)
masks = func(masks)
However, named expressions will be evaluated only once before the first call.
Whereas `apply_parallel(func, src=('images', 'masks'))` (i.e. when `src` takes a tuple of component names,
not the list as in the previous example) passes both components data into `func` simultaneously::
images, masks = func((images, masks))
Examples
--------
::
apply_parallel(make_masks_fn, src='images', dst='masks')
apply_parallel(apply_mask, src=('images', 'masks'), dst='images_with_masks')
apply_parallel(rotate, src=['images', 'masks'], dst=['images', 'masks'], p=.2)
apply_parallel(MyBatch.some_static_method, p=.5)
apply_parallel(B.some_method, src='features', p=.5)
TODO: move logic of applying `post` function from `inbatch_parallel` here, as well as remove `use_self` arg.
"""
#pylint: disable=keyword-arg-before-vararg
# Parse parameters: fill with class-wide defaults
init = init or self.apply_defaults.get('init', None)
post = post or self.apply_defaults.get('post', None)
target = target or self.apply_defaults.get('target', None)
# Prepare parameters, individual for each worker: probability of applying, RNG seed, id
if isinstance(p, float):
p = P(R('binomial', 1, p, seed=self.random)).get(batch=self)
if requires_rng and rng_seeds is None:
rng_seeds = P(R('integers', 0, 9223372036854775807, seed=self.random)).get(batch=self)
worker_ids = P(np.arange(len(self), dtype=np.int32))
# Case of list `src`: recursively call for each pair of src/dst
if isinstance(src, list) and not (dst is None or isinstance(dst, list) and len(src) == len(dst)):
raise ValueError("src and dst must have equal length")
if isinstance(src, list) and (dst is None or isinstance(dst, list) and len(src) == len(dst)):
if dst is None:
dst = src
for src_, dst_ in zip(src, dst):
self.apply_parallel(func=func, init=init, post=post, src=src_, dst=dst_,
*args, p=p, target=target, rng_seeds=rng_seeds, **kwargs)
return self
# Actual computation
if init is None or init is False or init == 'data':
if isinstance(src, str):
init = self.get(component=src)
elif isinstance(src, (tuple, list)):
init = list((x,) for x in zip(*[self.get(component=s) for s in src]))
else:
init = src
elif isinstance(init, str):
# No hasattr check: if it is False, then an error would (and should) be raised
init = getattr(self, init)
if callable(init):
init = init(src=src, dst=dst, p=p, target=target, **kwargs)
# Compute result. Unbind the method to pass self explicitly
parallel = inbatch_parallel(init=init, post=post, target=target, src=src, dst=dst)
transform = parallel(type(self)._apply_once)
result = transform(self, *args, func=func, p=p, src=src, dst=dst,
apply_parallel_id=worker_ids, apply_parallel_seeds=rng_seeds, **kwargs)
return result
def _apply_once(self, item, *args, func=None, p=None, apply_parallel_id=None, apply_parallel_seeds=None, **kwargs):
""" Apply a function to each item in the batch.
Parameters
----------
item
An item from `init` function.
func : callable
A function to apply to each item.
p : None or {0, 1}
Whether to apply func to an element in the batch. If not specified, counts as 1.
Created and distributed to individual items by :meth:``.apply_parallel`.
apply_parallel_id : None, int
Index of the current item in the overall `init`.
Created and distributed to individual items by :meth:``.apply_parallel`.
apply_parallel_seeds : None, int
If provided, then the seed to create RNG for this given worker.
If provided, then supplied to a function call as `rng` keyword parameter.
Created and distributed to individual items by :meth:``.apply_parallel`.
args, kwargs
Other parameters passed to ``func``.
"""
_ = apply_parallel_id
if p is None or p == 1:
if apply_parallel_seeds is not None:
rng = np.random.default_rng(np.random.SFC64(apply_parallel_seeds))
kwargs['rng'] = rng
return func(item, *args, **kwargs)
return item
def _get_file_name(self, ix, src):
""" Get full path file name corresponding to the current index.
Parameters
----------
src : str, FilesIndex or None
if None, full path to the indexed item will be returned.
if FilesIndex it must contain the same indices values as in the self.index.
if str, behavior depends on wheter self.index.dirs is True. If self.index.dirs is True
then src will be appended to the end of the full paths from self.index. Else if
self.index.dirs is False then src is considered as a directory name and the basenames
from self.index will be appended to the end of src.
Examples
--------
Let folder "/some/path/*.dcm" contain files "001.png", "002.png", etc. Then if self.index
was built as
>>> index = FilesIndex(path="/some/path/*.png", no_ext=True)
Then _get_file_name(ix, src="augmented_images/") will return filenames:
"augmented_images/001.png", "augmented_images/002.png", etc.
Let folder "/some/path/*" contain folders "001", "002", etc. Then if self.index
was built as
>>> index = FilesIndex(path="/some/path/*", dirs=True)
Then _get_file_name(ix, src="masks.png") will return filenames:
"/some/path/001/masks.png", "/some/path/002/masks.png", etc.
If you have two directories "images/*.png", "labels/*png" with identical filenames,
you can build two instances of FilesIndex and use the first one to biuld your Dataset
>>> index_images = FilesIndex(path="/images/*.png", no_ext=True)
>>> index_labels = FilesIndex(path="/labels/*.png", no_ext=True)
>>> dset = Dataset(index=index_images, batch_class=Batch)
Then build dataset using the first one
_get_file_name(ix, src=index_labels) to reach corresponding files in the second path.
"""
if not isinstance(self.index, FilesIndex):
raise ValueError("File locations must be specified to dump/load data")
if isinstance(src, str):
if self.index.dirs:
fullpath = self.index.get_fullpath(ix)
file_name = os.path.join(fullpath, src)
else:
file_name = os.path.basename(self.index.get_fullpath(ix))
file_name = os.path.join(os.path.abspath(src), file_name)
elif isinstance(src, FilesIndex):
try:
file_name = src.get_fullpath(ix)
except KeyError as e:
raise KeyError(f"File {ix} is not indexed in the received index") from e
elif src is None:
file_name = self.index.get_fullpath(ix)
else:
raise ValueError("Src must be either str, FilesIndex or None")
return file_name
def _assemble_component(self, result, *args, component, **kwargs):
""" Assemble one component after parallel execution.
Parameters
----------
result : sequence, np.ndarray
Values to put into ``component``
component : str
Component to assemble.
"""
_ = args, kwargs
try:
new_items = np.stack(result)
except ValueError as e:
message = str(e)
if "must have the same shape" in message:
new_items = np.empty(len(result), dtype=object)
new_items[:] = result
else:
raise e
if hasattr(self, component):
setattr(self, component, new_items)
else:
self.add_components(component, new_items)
def _assemble(self, all_results, *args, dst=None, **kwargs):
""" Assembles the batch after a parallel action.
Parameters
----------
all_results : sequence
Results after inbatch_parallel.
dst : str, sequence, np.ndarray
Components to assemble
Returns
-------
self
"""
_ = args
if any_action_failed(all_results):
all_errors = self.get_errors(all_results)
print(all_errors[0])
traceback.print_tb(all_errors[0].__traceback__)
raise RuntimeError("Could not assemble the batch") from all_errors[0]
if dst is None:
dst_default = kwargs.get('dst_default', 'src')
if dst_default == 'src':
dst = kwargs.get('src')
elif dst_default == 'components':
dst = self.components
if not isinstance(dst, (list, tuple, np.ndarray)):
dst = [dst]
if len(dst) == 1:
all_results = [all_results]
else:
all_results = list(zip(*all_results))
for component, result in zip(dst, all_results):
self._assemble_component(result, component=component, **kwargs)
return self
@apply_parallel_(init='indices', post='_assemble', target='f', dst_default='components')
def _load_blosc(self, ix, src=None, dst=None):
""" Load data from a blosc packed file """
file_name = self._get_file_name(ix, src)
with open(file_name, 'rb') as f:
data = dill.loads(blosc.decompress(f.read()))
components = tuple(dst or self.components)
try:
item = tuple(data[i] for i in components)
except Exception as e:
raise KeyError('Cannot find components in corresponfig file', file_name) from e
return item
@apply_parallel_(init='indices', target='f')
def _dump_blosc(self, ix, dst=None, components=None):
""" Save blosc packed data to file """
file_name = self._get_file_name(ix, dst)
with open(file_name, 'w+b') as f:
if self.components is None:
components = (None,)
item = (self[ix],)
else:
components = tuple(components or self.components)
item = self[ix].as_tuple(components)
data = dict(zip(components, item))
f.write(blosc.compress(dill.dumps(data)))
def _load_table(self, src, fmt, dst=None, post=None, *args, **kwargs):
""" Load a data frame from table formats: csv, hdf5, feather """
if fmt == 'csv':
_data = pd.read_csv(src, *args, **kwargs)
elif fmt == 'feather':
_data = feather.read_dataframe(src, *args, **kwargs)
elif fmt == 'hdf5':
_data = pd.read_hdf(src, *args, **kwargs)
# Put into this batch only part of it (defined by index)
if isinstance(_data, pd.DataFrame):
_data = _data.loc[self.indices]
elif isinstance(_data, dd.DataFrame):
# dask.DataFrame.loc supports advanced indexing only with lists
_data = _data.loc[list(self.indices)].compute()
if callable(post):
_data = post(_data, src=src, fmt=fmt, dst=dst, **kwargs)
self.load(src=_data, dst=dst)
@action(use_lock='__dump_table_lock')
def _dump_table(self, dst, fmt='feather', components=None, *args, **kwargs):
""" Save batch data to table formats
Args:
dst: str - a path to dump into
fmt: str - format: feather, hdf5, csv
components: str or tuple - one or several component names
"""
filename = dst
components = tuple(components or self.components)
data_dict = {}
for comp in components:
comp_data = self.get(component=comp)
if isinstance(comp_data, pd.DataFrame):
data_dict.update(comp_data.to_dict('series'))
elif isinstance(comp_data, np.ndarray):
if comp_data.ndim > 1:
columns = [comp + str(i) for i in range(comp_data.shape[1])]
comp_dict = zip(columns, (comp_data[:, i] for i in range(comp_data.shape[1])))
data_dict.update({comp: comp_dict})
else:
data_dict.update({comp: comp_data})
else:
data_dict.update({comp: comp_data})
_data = pd.DataFrame(data_dict)
if fmt == 'feather':
feather.write_dataframe(_data, filename, *args, **kwargs)
elif fmt == 'hdf5':
_data.to_hdf(filename, *args, **kwargs) # pylint:disable=no-member
elif fmt == 'csv':
_data.to_csv(filename, *args, **kwargs) # pylint:disable=no-member
else:
raise ValueError(f'Unknown format {fmt}')
return self
def _load_from_source(self, dst, src):
""" Load data from a memory object (tuple, ndarray, pd.DataFrame, etc) """
if dst is None:
self._data = create_item_class(self.components, source=src, indices=self.indices,
crop=True, copy=self._copy)
else:
if isinstance(dst, str):
dst = (dst,)
src = (src,)
source = create_item_class(dst, source=src, indices=self.indices, crop=True, copy=self._copy)
for comp in dst:
setattr(self, comp, getattr(source, comp))
[docs] @action
def load(self, *args, src=None, fmt=None, dst=None, **kwargs):
""" Load data from another array or a file.
Parameters
----------
src :
a source (e.g. an array or a file name)
fmt : str
a source format, one of None, 'blosc', 'csv', 'hdf5', 'feather'
dst : None or str or tuple of str
components to load `src` to
**kwargs :
other parameters to pass to format-specific loaders
Notes
-----
Loading creates new components if necessary.
Examples
--------
Load data from a pandas dataframe's columns into all batch components::
batch.load(src=dataframe)
Load data from dataframe's columns `features` and `labels` into components `features` and `labels`::
batch.load(src=dataframe, dst=('features', 'labels'))
Load a dataframe into a component `features`::
batch.load(src=dataframe, dst='features')
Load data from a dict into components `images` and `masks`::
batch.load(src=dict(images=images_array, masks=masks_array), dst=('images', 'masks'))
Load data from a tuple into components `images` and `masks`::
batch.load(src=(images_array, masks_array), dst=('images', 'masks'))
Load data from an array into a component `images`::
batch.load(src=images_array, dst='images')
Load data from a CSV file columns into components `features` and `labels`::
batch.load(fmt='csv', src='/path/to/file.csv', dst=('features', 'labels`), index_col=0)
"""
_ = args
if dst is not None:
self.add_components(np.setdiff1d(dst, self.components).tolist())
if fmt is None:
self._load_from_source(src=src, dst=dst)
elif fmt == 'blosc':
self._load_blosc(src=src, dst=dst, **kwargs)
elif fmt in ['csv', 'hdf5', 'feather']:
self._load_table(src=src, fmt=fmt, dst=dst, **kwargs)
else:
raise ValueError("Unknown format " + fmt)
return self
[docs] @action
def dump(self, *args, dst=None, fmt=None, components=None, **kwargs):
""" Save data to another array or a file.
Parameters
----------
dst :
a destination (e.g. an array or a file name)
fmt : str
a destination format, one of None, 'blosc', 'csv', 'hdf5', 'feather'
components : None or str or tuple of str
components to load
*args :
other parameters are passed to format-specific writers
*kwargs :
other parameters are passed to format-specific writers
"""
components = [components] if isinstance(components, str) else components
if fmt is None:
if components is not None and len(components) > 1:
raise ValueError("Only one component can be dumped into a memory array: components =", components)
components = components[0] if components is not None else None
dst[self.indices] = self.get(component=components)
elif fmt == 'blosc':
self._dump_blosc(dst, components=components)
elif fmt in ['csv', 'hdf5', 'feather']:
self._dump_table(dst, fmt, components, *args, **kwargs)
else:
raise ValueError("Unknown format " + fmt)
return self
[docs] @action
def save(self, *args, **kwargs):
""" Save batch data to a file (an alias for dump method)"""
return self.dump(*args, **kwargs)
[docs] @apply_parallel_
def to_array(self, comp, dtype=np.float32, channels='last', **kwargs):
""" Converts batch components to np.ndarray format
Parameters
----------
src : str
Component to get images from. Default is 'images'.
dst : str
Component to write images to. Default is 'images'.
dtype : str or np.dtype
Data type
channels : None, 'first' or 'last'
the dimension for channels axis
"""
_ = kwargs
comp = np.array(comp)
if len(comp.shape) == 2:
# a special treatment for 2d arrays with images - add a new dimension for channels
if channels == 'first':
comp = comp[np.newaxis, :, :]
elif channels == 'last':
comp = comp[:, :, np.newaxis]
else:
# we assume that channels is 'last' by default
# so move channels from the last to the first axis if needed
if channels == 'first':
comp = np.moveaxis(comp, -1, 0)
if dtype is not None:
comp = comp.astype(dtype)
return comp