""" Contains basic Batch classes """
import os
import traceback
import threading
import warnings
import functools
import dill
import numpy as np
from .dsindex import DatasetIndex, FilesIndex
# renaming apply_parallel decorator is needed as Batch.apply_parallel method is also in the same namespace
# and can serve as a decorator too
from .decorators import action, inbatch_parallel, any_action_failed, apply_parallel as apply_parallel_
from .components import create_item_class, BaseComponents
from .named_expr import P, R
from .utils_random import make_rng
from .utils_import import make_delayed_import
blosc = make_delayed_import('blosc')
pd = make_delayed_import('pandas')
feather = make_delayed_import('feather')
dd = make_delayed_import('dask.dataframe') # import dask.dataframe as dd
class MethodsTransformingMeta(type):
    """ A metaclass to transform all class methods in the way described below:
        1. Methods decorated with `@apply_parallel` are wrapped with `apply_parallel` method.
        2. Add the original version of the method (i.e. unwrapped) to a class
           namespace using name with underscores: `'_{}_'.format(name)`. This
           is necessary in order to allow inner calls of untransformed versions
           (e.g. `ImagesBatch.scale` calls `ImagesBatch.crop` under the hood).
    """
    def __new__(cls, name, bases, namespace):
        namespace_ = namespace.copy()
        for object_name, object_ in namespace.items():
            transform_kwargs = getattr(object_, 'apply_kwargs', None)
            if transform_kwargs is not None:
                namespace_[object_name] = cls.use_apply_parallel(object_, **transform_kwargs)
                disclaimer = f"This is an untransformed version of `{object_.__qualname__}`.\n\n"
                object_.__doc__ = disclaimer + (object_.__doc__ or '')
                object_.__name__ = '_' + object_name + '_'
                object_.__qualname__ = '.'.join(object_.__qualname__.split('.')[:-1] + [object_.__name__])
                namespace_[object_.__name__] = object_
        return super().__new__(cls, name, bases, namespace_)
    @classmethod
    def use_apply_parallel(cls, method, **apply_kwargs):
        """ Wrap passed `method` in accordance with `all` arg value """
        @functools.wraps(method)
        def apply_parallel_wrapper(self, *args, **kwargs):
            transform = self.apply_parallel
            # Bound method to class:
            method_ = method.__get__(self, type(self))
            kwargs_full = {**self.apply_defaults, **apply_kwargs, **kwargs}
            return transform(method_, *args, **kwargs_full)
        return action(apply_parallel_wrapper)
[docs]
class Batch(metaclass=MethodsTransformingMeta):
    """ The core Batch class
    Note, that if any method is wrapped with `@apply_parallel` decorator
    than for inner calls (i.e. from other methods) should be used version
    of desired method with underscores. (For example, if there is a decorated
    `method` than you need to call `_method_` from inside of `other_method`).
    Same is applicable for all child classes of :class:`batch.Batch`.
    """
    components = None
    # Class-specific defaults for :meth:`.Batch.apply_parallel`
    apply_defaults = dict(target='threads',
                          post='_assemble',
                          src=None,
                          dst=None,
                          )
    def __init__(self, index, dataset=None, pipeline=None, preloaded=None, copy=False, *args, **kwargs):
        _ = args
        if  self.components is not None and not isinstance(self.components, tuple):
            raise TypeError("components should be a tuple of strings with components names")
        self.index = index
        self._preloaded_lock = threading.Lock()
        self._preloaded = preloaded
        self._copy = copy
        self._local = threading.local()
        self._data_named = None
        self._data = None
        self._dataset = dataset
        self.pipeline = pipeline
        self.iteration = None
        self._attrs = None
        self.create_attrs(**kwargs)
[docs]
    def create_attrs(self, **kwargs):
        """ Create attributes from kwargs """
        self._attrs = list(kwargs.keys())
        for attr, value in kwargs.items():
            setattr(self, attr, value) 
[docs]
    def get_attrs(self):
        """ Return additional attrs as kwargs """
        if self._attrs is None:
            return {}
        return {attr: getattr(self, attr, None) for attr in self._attrs} 
    @property
    def data(self):
        """: tuple or named components - batch data """
        try:
            if self._data is None and self._preloaded is not None:
                # load data the first time it's requested
                with self._preloaded_lock:
                    if self._data is None and self._preloaded is not None:
                        self.load(src=self._preloaded)
            res = self._data if self.components is None else self._data_named
        except Exception as exc: # noqa: BLE001, blind-except
            print("Exception:", exc)
            traceback.print_tb(exc.__traceback__)
            raise
        return res
    @data.setter
    def data_setter(self, value):
        """: tuple or named components - batch data """
        self._data = value
    @property
    def dataset(self):
        """: Dataset - a dataset the batch has been taken from """
        if self.pipeline is not None:
            return self.pipeline.dataset
        return self._dataset
    @property
    def pipeline(self):
        """: Pipeline - a pipeline the batch is being used in """
        return self._local.pipeline
    @pipeline.setter
    def pipeline(self, value):
        """ Store the pipeline in a thread-local storage """
        self._local.pipeline = value
    @property
    def random(self):
        """ A random number generator :class:`numpy.random.Generator`.
        Use it instead of `np.random` for reproducibility.
        Examples
        --------
        ::
            x = self.random.normal(0, 1)
        """
        # if RNG is set for the batch (e.g. in @inbatch_parallel), use it
        if hasattr(self._local, 'random'):
            return self._local.random
        # otherwise use RNG from the pipeline
        if self.pipeline is not None and self.pipeline.random is not None:
            return self.pipeline.random
        # if there is none (e.g. when the batch is created manually), make a random one
        self._local.random = make_rng(self.random_seed)
        return self._local.random
    @property
    def random_seed(self):
        """ : SeedSequence for random number generation """
        # if RNG is set for the batch (e.g. in @inbatch_parallel), use it
        if hasattr(self._local, 'random_seed'):
            return self._local.random_seed
        if self.pipeline is not None and self.pipeline.random_seed is not None:
            return self.pipeline.random_seed
        # if there is none (e.g. when the batch is created manually), make a random seed
        self._local.random_seed = np.random.SeedSequence()
        return self._local.random_seed
    @random_seed.setter
    def random_seed(self, value):
        """ : SeedSequence for random number generation """
        self._local.random_seed = value
        self._local.random = make_rng(value)
    def __copy__(self):
        dump_batch = dill.dumps(self)
        restored_batch = dill.loads(dump_batch)
        return restored_batch
[docs]
    def deepcopy(self):
        """ Return a deep copy of the batch. """
        return self.__copy__() 
[docs]
    @classmethod
    def from_data(cls, index=None, data=None):
        """ Create a batch from data given """
        # this is roughly equivalent to self.data = data
        if index is None:
            index = np.arange(len(data))
        return cls(index, preloaded=data) 
[docs]
    @classmethod
    def merge(cls, batches, batch_size=None, components=None, batch_class=None):
        """ Merge several batches to form a new batch of a given size
        Parameters
        ----------
        batches : tuple of batches
        batch_size : int or None
            if `None`, just merge all batches into one batch (the rest will be `None`),
            if `int`, then make one batch of `batch_size` and a batch with the rest of data.
        components : str, tuple or None
            if `None`, all components from initial batches will be created,
            if `str` or `tuple`, then create these components in new batches.
        batch_class : Batch or None
            if `None`, created batches will be of the same class as initial batch,
            if `Batch`, created batches will be of that class.
        Returns
        -------
        batch, rest : tuple of two batches
        Raises
        ------
        ValueError
            If component is `None` in some batches and not `None` in others.
        """
        batch_class = batch_class or cls
        def _make_index(data):
            return DatasetIndex(data.shape[0]) if data is not None and data.shape[0] > 0 else None
        def _make_batch(data):
            index = _make_index(data[0])
            batch = batch_class.from_data(index, tuple(data)) if index is not None else None
            if batch is not None:
                batch.components = tuple(components)
                _ = batch.data
            return batch
        if batch_size is not None:
            break_point = len(batches) - 1
            last_batch_len = 0
            cur_size = 0
            for i, b in enumerate(batches):
                cur_batch_len = len(b)
                if cur_size + cur_batch_len >= batch_size:
                    break_point = i
                    last_batch_len = batch_size - cur_size
                    break
                cur_size += cur_batch_len
                last_batch_len = cur_batch_len
        if components is None:
            components = batches[0].components or (None,)
        elif isinstance(components, str):
            components = (components, )
        new_data = list(None for _ in components)
        rest_data = list(None for _ in components)
        for i, comp in enumerate(components):
            none_components_in_batches = [b.get(component=comp) is None for b in batches]
            if np.all(none_components_in_batches):
                continue
            if np.any(none_components_in_batches):
                raise ValueError(f'Component {comp} is None in some batches')
            if batch_size is None:
                new_comp = [b.get(component=comp) for b in batches]
            else:
                last_batch = batches[break_point]
                new_comp = [b.get(component=comp) for b in batches[:break_point]] + \
                           
[last_batch.get(component=comp)[:last_batch_len]]
            new_data[i] = cls.merge_component(comp, new_comp)
            if batch_size is not None:
                rest_comp = [last_batch.get(component=comp)[last_batch_len:]] + \
                            
[b.get(component=comp) for b in batches[break_point + 1:]]
                rest_data[i] = cls.merge_component(comp, rest_comp)
        new_batch = _make_batch(new_data)
        rest_batch = _make_batch(rest_data)
        return new_batch, rest_batch 
[docs]
    @classmethod
    def merge_component(cls, component=None, data=None):
        """ Merge the same component data from several batches """
        _ = component
        if isinstance(data[0], np.ndarray):
            return np.concatenate(data)
        raise TypeError("Unknown data type", type(data[0])) 
[docs]
    def as_dataset(self, dataset=None, copy=False):
        """ Makes a new dataset from batch data
        Parameters
        ----------
        dataset
            an instance or a subclass of Dataset
        copy : bool
            whether to copy batch data to allow for further inplace transformations
        Returns
        -------
        an instance of a class specified by `dataset` arg, preloaded with this batch data
        """
        dataset = dataset or self._dataset
        if dataset is None:
            raise ValueError('dataset can be an instance of Dataset (sub)class or the class itself, but not None')
        if isinstance(dataset, type):
            dataset_class = dataset
            attrs = {}
        else:
            dataset_class = dataset.__class__
            attrs = dataset.get_attrs()
        return dataset_class(self.index, batch_class=type(self), preloaded=self._data, copy=copy, **attrs) 
    @property
    def indices(self):
        """: numpy array - an array with the indices """
        if isinstance(self.index, DatasetIndex):
            return self.index.indices
        return self.index
    def __len__(self):
        return len(self.index)
    @property
    def size(self):
        """: int - number of items in the batch """
        return len(self)
[docs]
    @action
    def add_components(self, components, init=None):
        """ Add new components
        Parameters
        ----------
        components : str or list
            new component names
        init : array-like
            initial component data
        Raises
        ------
        ValueError
            If a component or an attribute with the given name already exists
        """
        if isinstance(components, str):
            components = (components,)
            init = (init,)
        elif isinstance(components, (tuple, list)):
            components = tuple(components)
            if init is None:
                init = (None,) * len(components)
            else:
                init = tuple(init)
        for comp, value in zip(components, init):
            if hasattr(self, comp):
                raise ValueError(f"An attribute '{comp}' already exists")
            if self.components is not None and comp in self.components:
                raise ValueError(f"A components '{comp}' already exists")
            if self.components is None:
                self.components = tuple([comp])
                if self._data is not None:
                    warnings.warn("All batch data is erased")
            else:
                self.components = self.components + tuple([comp])
            setattr(self, comp, value)
        return self 
    def __getattr__(self, name):
        if self.components is not None and name in self.components:
            return getattr(self.data, name, None)
        raise AttributeError(f"{name} not found in class {self.__class__.__name__}")
    def __setattr__(self, name, value):
        if self.components is not None:
            if name == "_data":
                super().__setattr__(name, value)
                if self._data is not None:
                    if isinstance(self._data, BaseComponents):
                        self._data_named = self._data
                    else:
                        self._data_named = create_item_class(self.components, self._data)
                return
            if name in self.components:
                # preload data if needed
                _ = self.data
                if self._data_named is None or self._data_named.components != self.components:
                    self._data_named = create_item_class(self.components, self._data)
                setattr(self._data_named, name, value)
                # update _data with with new component values
                super().__setattr__('_data', self._data_named.data)
                return
        super().__setattr__(name, value)
    def __getstate__(self):
        state = self.__dict__.copy()
        state['_local'] = state['_local'] is not None
        state['_preloaded_lock'] = True
        return state
    def __setstate__(self, state):
        state['_preloaded_lock'] = threading.Lock() if state['_preloaded_lock'] else None
        state['_local'] = threading.local() if state['_local'] else None
        for k, v in state.items():
            # this warrants that all hidden objects are reconstructed upon unpickling
            setattr(self, k, v)
    @property
    def array_of_nones(self):
        """1-D ndarray: ``NumPy`` array with ``None`` values."""
        return np.array([None] * len(self.index))
[docs]
    def get(self, item=None, component=None):
        """ Return an item from the batch or the component """
        if item is None:
            if component is None:
                res = self.data
            else:
                res = getattr(self, component)
        else:
            if component is None:
                res = self[item]
            else:
                res = getattr(self[item], component)
        return res 
    def __getitem__(self, item):
        return self.data[item] if self.data is not None else None
    def __iter__(self):
        for item in self.indices:
            yield self[item]
    @property
    def items(self):
        """: list - batch items """
        return [[self[ix]] for ix in self.indices]
[docs]
    def run_once(self, *args, **kwargs):
        """ Init function for no parallelism
        Useful for async action-methods (will wait till the method finishes)
        """
        _ = self.data, args, kwargs
        return [[]] 
[docs]
    def get_errors(self, all_res):
        """ Return a list of errors from a parallel action """
        all_errors = [error for error in all_res if isinstance(error, Exception)]
        return all_errors if len(all_errors) > 0 else None 
[docs]
    @action
    def do_nothing(self, *args, **kwargs):
        """ An empty action (might be convenient in complicated pipelines) """
        _ = args, kwargs
        return self 
[docs]
    @action
    def apply_parallel(self, func, init=None, post=None, src=None, dst=None, *args,
                       p=None, target='for', requires_rng=False, rng_seeds=None, **kwargs):
        """ Apply a function to each item in the container, returned by `init`, and assemble results by `post`.
        Depending on the `target` parameter, different parallelization engines may be used: for, threads, MPC, async.
        Roughly, under the hood we perform the following:
            - compute parameters, individual for each worker. Currently, these are:
                - `p` to indicate whether the function should be applied
                - worker id and a seed for random generator, if required
            - call `init` function, which outputs a container of items, passed directly to the `func`.
            The simplest example is the `init` funciton that returns batch indices, and the function works off of each.
            - wrap the `func` call into parallelization engine of choice.
            - compute results of `func` calls for each item, returned by `init`
            - assemble results by `post` function, e.g. stack the obtained numpy arrays.
        In the simplest possible case of `init=None`, `src=images`, `dst=images_transformed`, `post=None`,
        this function is almost equivalent to:
            container = [func(item, *args, **kwargs) for item in self.images]
            self.images_transformed = container
        If `src` is a list and `dst` is a list, then this function is applied recursively to each pair of src, dst.
        If `src` is a tuple, then this tuple is used as a whole.
        This allows to make functions that work on multiple components.
        Parameters
        ----------
        func : callable
            A function to apply to each item from the source.
            Should accept `src` and `dst` parameters, or be written in a way that accepts variable args.
        target : str
            Parallelization engine:
                - 'f', 'for' for executing each worker sequentially, like in a for-loop.
                - 't', 'threads' for using threads.
                - 'm', 'mpc' for using processes. Note the bigger overhead for process initialization.
                - 'a', 'async' for asynchronous execution.
        init : str, callable or container
            Function to init data for individual workers: must return a container of items.
            If 'data', then use `src` components as the init.
            If other str, then must be a name of the attribute of the batch to use as the init.
            If callable or any previous returned a callable, then result of this callable is used as the init.
            Note that in the last case callable should accept `src` and `dst` parameters, and `kwargs` are also passed.
            If not any of the above, then the object is used directly, for example, np.ndarray.
        post : str or callable
            Function to apply to the results of function evaluation on each item.
            Must accept `src` and `dst` parameters, as well as `kwargs`.
        src : str, sequence, list of str
            The source to get data from:
            - None
            - str - a component name, e.g. 'images' or 'masks'
            - tuple or list of str - several component names
            - sequence - data as a numpy-array, data frame, etc
        dst : str or array
            the destination to put the result in, can be:
            - None - in this case dst is set to be same as src
            - str - a component name, e.g. 'images' or 'masks'
            - tuple or list of str, e.g. ['images', 'masks']
            If not provided, uses `src`.
        p : float or None
            Probability of applying func to an element in the batch.
        requires_rng : bool
            Whether the `func` requires RNG. Should be used for correctly initialized seeds for reproducibility.
            If True, then a pre-initialized RNG will be passed to the function call as `rng` keyword parameter.
        args, kwargs
            Other parameters passed to ``func``.
        Notes
        -----
        apply_parallel does the following (but in parallel)::
            for item in range(len(batch)):
                self.dst[item] = func(self.src[item], *args, **kwargs)
        `apply_parallel(func, src=['images', 'masks'])` is equal to
        `apply_parallel(func, src=['images', 'masks'], dst=['images', 'masks'])`,
        which in turn equals to two subsequent calls::
            images = func(images)
            masks = func(masks)
        However, named expressions will be evaluated only once before the first call.
        Whereas `apply_parallel(func, src=('images', 'masks'))` (i.e. when `src` takes a tuple of component names,
        not the list as in the previous example) passes both components data into `func` simultaneously::
            images, masks = func((images, masks))
        Examples
        --------
        ::
            apply_parallel(make_masks_fn, src='images', dst='masks')
            apply_parallel(apply_mask, src=('images', 'masks'), dst='images_with_masks')
            apply_parallel(rotate, src=['images', 'masks'], dst=['images', 'masks'], p=.2)
            apply_parallel(MyBatch.some_static_method, p=.5)
            apply_parallel(B.some_method, src='features', p=.5)
        TODO: move logic of applying `post` function from `inbatch_parallel` here, as well as remove `use_self` arg.
        """
        # Parse parameters: fill with class-wide defaults
        init = init or self.apply_defaults.get('init', None)
        post = post or self.apply_defaults.get('post', None)
        target = target or self.apply_defaults.get('target', None)
        # Prepare parameters, individual for each worker: probability of applying, RNG seed, id
        if isinstance(p, float):
            p = P(R('binomial', 1, p, seed=self.random)).get(batch=self)
        if requires_rng and rng_seeds is None:
            rng_seeds = P(R('integers', 0, 9223372036854775807, seed=self.random)).get(batch=self)
        worker_ids = P(np.arange(len(self), dtype=np.int32))
        # Case of list `src`: recursively call for each pair of src/dst
        if isinstance(src, list) and not (dst is None or isinstance(dst, list) and len(src) == len(dst)):
            raise ValueError("src and dst must have equal length")
        if isinstance(src, list) and (dst is None or isinstance(dst, list) and len(src) == len(dst)):
            if dst is None:
                dst = src
            for src_, dst_ in zip(src, dst):
                self.apply_parallel(func=func, init=init, post=post, src=src_, dst=dst_,
                                    *args, p=p, target=target, rng_seeds=rng_seeds, **kwargs)
            return self
        # Actual computation
        if init is None or init is False or init == 'data':
            if isinstance(src, str):
                init = self.get(component=src)
            elif isinstance(src, (tuple, list)):
                init = list((x,) for x in zip(*[self.get(component=s) for s in src]))
            else:
                init = src
        elif isinstance(init, str):
            # No hasattr check: if it is False, then an error would (and should) be raised
            init = getattr(self, init)
            if callable(init):
                init = init(src=src, dst=dst, p=p, target=target, **kwargs)
        # Compute result. Unbind the method to pass self explicitly
        parallel = inbatch_parallel(init=init, post=post, target=target, src=src, dst=dst)
        transform = parallel(type(self)._apply_once)  # noqa: SLF001; private-member-access
        result = transform(self, *args, func=func, p=p, src=src, dst=dst,
                           apply_parallel_id=worker_ids, apply_parallel_seeds=rng_seeds, **kwargs)
        return result 
    def _apply_once(self, item, *args, func=None, p=None, apply_parallel_id=None, apply_parallel_seeds=None, **kwargs):
        """ Apply a function to each item in the batch.
        Parameters
        ----------
        item
            An item from `init` function.
        func : callable
            A function to apply to each item.
        p : None or {0, 1}
            Whether to apply func to an element in the batch. If not specified, counts as 1.
            Created and distributed to individual items by :meth:``.apply_parallel`.
        apply_parallel_id : None, int
            Index of the current item in the overall `init`.
            Created and distributed to individual items by :meth:``.apply_parallel`.
        apply_parallel_seeds : None, int
            If provided, then the seed to create RNG for this given worker.
            If provided, then supplied to a function call as `rng` keyword parameter.
            Created and distributed to individual items by :meth:``.apply_parallel`.
        args, kwargs
            Other parameters passed to ``func``.
        """
        _ = apply_parallel_id
        if p is None or p == 1:
            if apply_parallel_seeds is not None:
                rng = np.random.default_rng(np.random.SFC64(apply_parallel_seeds))
                kwargs['rng'] = rng
            return func(item, *args, **kwargs)
        return item
    def _get_file_name(self, ix, src):
        """ Get full path file name corresponding to the current index.
        Parameters
        ----------
        src : str, FilesIndex or None
            if None, full path to the indexed item will be returned.
            if FilesIndex it must contain the same indices values as in the self.index.
            if str, behavior depends on wheter self.index.dirs is True. If self.index.dirs is True
            then src will be appended to the end of the full paths from self.index. Else if
            self.index.dirs is False then src is considered as a directory name and the basenames
            from self.index will be appended to the end of src.
        Examples
        --------
        Let folder "/some/path/*.dcm" contain files "001.png", "002.png", etc. Then if self.index
        was built as
        >>> index = FilesIndex(path="/some/path/*.png", no_ext=True)
        Then _get_file_name(ix, src="augmented_images/") will return filenames:
        "augmented_images/001.png", "augmented_images/002.png", etc.
        Let folder "/some/path/*" contain folders "001", "002", etc. Then if self.index
        was built as
        >>> index = FilesIndex(path="/some/path/*", dirs=True)
        Then _get_file_name(ix, src="masks.png") will return filenames:
        "/some/path/001/masks.png", "/some/path/002/masks.png", etc.
        If you have two directories "images/*.png", "labels/*png" with identical filenames,
        you can build two instances of FilesIndex and use the first one to biuld your Dataset
        >>> index_images = FilesIndex(path="/images/*.png", no_ext=True)
        >>> index_labels = FilesIndex(path="/labels/*.png", no_ext=True)
        >>> dset = Dataset(index=index_images, batch_class=Batch)
        Then build dataset using the first one
        _get_file_name(ix, src=index_labels) to reach corresponding files in the second path.
        """
        if not isinstance(self.index, FilesIndex):
            raise ValueError("File locations must be specified to dump/load data")
        if isinstance(src, str):
            if self.index.dirs:
                fullpath = self.index.get_fullpath(ix)
                file_name = os.path.join(fullpath, src)
            else:
                file_name = os.path.basename(self.index.get_fullpath(ix))
                file_name = os.path.join(os.path.abspath(src), file_name)
        elif isinstance(src, FilesIndex):
            try:
                file_name = src.get_fullpath(ix)
            except KeyError as e:
                raise KeyError(f"File {ix} is not indexed in the received index") from e
        elif src is None:
            file_name = self.index.get_fullpath(ix)
        else:
            raise ValueError("Src must be either str, FilesIndex or None")
        return file_name
    def _assemble_component(self, result, *args, component, **kwargs):
        """ Assemble one component after parallel execution.
        Parameters
        ----------
        result : sequence, np.ndarray
            Values to put into ``component``
        component : str
            Component to assemble.
        """
        _ = args, kwargs
        try:
            new_items = np.stack(result)
        except ValueError as e:
            message = str(e)
            if "must have the same shape" in message:
                new_items = np.empty(len(result), dtype=object)
                new_items[:] = result
            else:
                raise e
        if hasattr(self, component):
            setattr(self, component, new_items)
        else:
            self.add_components(component, new_items)
    def _assemble(self, all_results, *args, dst=None, **kwargs):
        """ Assembles the batch after a parallel action.
        Parameters
        ----------
        all_results : sequence
            Results after inbatch_parallel.
        dst : str, sequence, np.ndarray
            Components to assemble
        Returns
        -------
        self
        """
        _ = args
        if any_action_failed(all_results):
            all_errors = self.get_errors(all_results)
            print(all_errors[0])
            traceback.print_tb(all_errors[0].__traceback__)
            raise RuntimeError("Could not assemble the batch") from all_errors[0]
        if dst is None:
            dst_default = kwargs.get('dst_default', 'src')
            if dst_default == 'src':
                dst = kwargs.get('src')
            elif dst_default == 'components':
                dst = self.components
        if not isinstance(dst, (list, tuple, np.ndarray)):
            dst = [dst]
        if len(dst) == 1:
            all_results = [all_results]
        else:
            all_results = list(zip(*all_results))
        for component, result in zip(dst, all_results):
            self._assemble_component(result, component=component, **kwargs)
        return self
    @apply_parallel_(init='indices', post='_assemble', target='f', dst_default='components')
    def _load_blosc(self, ix, src=None, dst=None):
        """ Load data from a blosc packed file """
        file_name = self._get_file_name(ix, src)
        with open(file_name, 'rb') as f:
            data = dill.loads(blosc.decompress(f.read()))
            components = tuple(dst or self.components)
            try:
                item = tuple(data[i] for i in components)
            except Exception as e: # noqa: BLE001, blind-except
                raise KeyError('Cannot find components in corresponfig file', file_name) from e
        return item
    @apply_parallel_(init='indices', target='f')
    def _dump_blosc(self, ix, dst=None, components=None):
        """ Save blosc packed data to file """
        file_name = self._get_file_name(ix, dst)
        with open(file_name, 'w+b') as f:
            if self.components is None:
                components = (None,)
                item = (self[ix],)
            else:
                components = tuple(components or self.components)
                item = self[ix].as_tuple(components)
            data = dict(zip(components, item))
            f.write(blosc.compress(dill.dumps(data)))
    def _load_table(self, src, fmt, dst=None, post=None, *args, **kwargs):
        """ Load a data frame from table formats: csv, hdf5, feather """
        if fmt == 'csv':
            _data = pd.read_csv(src, *args, **kwargs)
        elif fmt == 'feather':
            _data = feather.read_dataframe(src, *args, **kwargs)
        elif fmt == 'hdf5':
            _data = pd.read_hdf(src, *args, **kwargs)
        # Put into this batch only part of it (defined by index)
        if isinstance(_data, pd.DataFrame):
            _data = _data.loc[self.indices]
        elif isinstance(_data, dd.DataFrame):
            # dask.DataFrame.loc supports advanced indexing only with lists
            _data = _data.loc[list(self.indices)].compute()
        if callable(post):
            _data = post(_data, src=src, fmt=fmt, dst=dst, **kwargs)
        self.load(src=_data, dst=dst)
    @action(use_lock='__dump_table_lock')
    def _dump_table(self, dst, fmt='feather', components=None, *args, **kwargs):
        """ Save batch data to table formats
        Args:
          dst: str - a path to dump into
          fmt: str - format: feather, hdf5, csv
          components: str or tuple - one or several component names
        """
        filename = dst
        components = tuple(components or self.components)
        data_dict = {}
        for comp in components:
            comp_data = self.get(component=comp)
            if isinstance(comp_data, pd.DataFrame):
                data_dict.update(comp_data.to_dict('series'))
            elif isinstance(comp_data, np.ndarray):
                if comp_data.ndim > 1:
                    columns = [comp + str(i) for i in range(comp_data.shape[1])]
                    comp_dict = zip(columns, (comp_data[:, i] for i in range(comp_data.shape[1])))
                    data_dict.update({comp: comp_dict})
                else:
                    data_dict.update({comp: comp_data})
            else:
                data_dict.update({comp: comp_data})
        _data = pd.DataFrame(data_dict)
        if fmt == 'feather':
            feather.write_dataframe(_data, filename, *args, **kwargs)
        elif fmt == 'hdf5':
            _data.to_hdf(filename, *args, **kwargs)
        elif fmt == 'csv':
            _data.to_csv(filename, *args, **kwargs)
        else:
            raise ValueError(f'Unknown format {fmt}')
        return self
    def _load_from_source(self, dst, src):
        """ Load data from a memory object (tuple, ndarray, pd.DataFrame, etc) """
        if dst is None:
            self._data = create_item_class(self.components, source=src, indices=self.indices,
                                           crop=True, copy=self._copy)
        else:
            if isinstance(dst, str):
                dst = (dst,)
                src = (src,)
            source = create_item_class(dst, source=src, indices=self.indices, crop=True, copy=self._copy)
            for comp in dst:
                setattr(self, comp, getattr(source, comp))
[docs]
    @action
    def load(self, *args, src=None, fmt=None, dst=None, **kwargs):
        """ Load data from another array or a file.
        Parameters
        ----------
        src :
            a source (e.g. an array or a file name)
        fmt : str
            a source format, one of None, 'blosc', 'csv', 'hdf5', 'feather'
        dst : None or str or tuple of str
            components to load `src` to
        **kwargs :
            other parameters to pass to format-specific loaders
        Notes
        -----
        Loading creates new components if necessary.
        Examples
        --------
        Load data from a pandas dataframe's columns into all batch components::
            batch.load(src=dataframe)
        Load data from dataframe's columns `features` and `labels` into components `features` and `labels`::
            batch.load(src=dataframe, dst=('features', 'labels'))
        Load a dataframe into a component `features`::
            batch.load(src=dataframe, dst='features')
        Load data from a dict into components `images` and `masks`::
            batch.load(src=dict(images=images_array, masks=masks_array), dst=('images', 'masks'))
        Load data from a tuple into components `images` and `masks`::
            batch.load(src=(images_array, masks_array), dst=('images', 'masks'))
        Load data from an array into a component `images`::
            batch.load(src=images_array, dst='images')
        Load data from a CSV file columns into components `features` and `labels`::
            batch.load(fmt='csv', src='/path/to/file.csv', dst=('features', 'labels`), index_col=0)
        """
        _ = args
        if dst is not None:
            self.add_components(np.setdiff1d(dst, self.components).tolist())
        if fmt is None:
            self._load_from_source(src=src, dst=dst)
        elif fmt == 'blosc':
            self._load_blosc(src=src, dst=dst, **kwargs)
        elif fmt in ['csv', 'hdf5', 'feather']:
            self._load_table(src=src, fmt=fmt, dst=dst, **kwargs)
        else:
            raise ValueError("Unknown format " + fmt)
        return self 
[docs]
    @action
    def dump(self, *args, dst=None, fmt=None, components=None, **kwargs):
        """ Save data to another array or a file.
        Parameters
        ----------
        dst :
            a destination (e.g. an array or a file name)
        fmt : str
            a destination format, one of None, 'blosc', 'csv', 'hdf5', 'feather'
        components : None or str or tuple of str
            components to load
        *args :
            other parameters are passed to format-specific writers
        *kwargs :
            other parameters are passed to format-specific writers
        """
        components = [components] if isinstance(components, str) else components
        if fmt is None:
            if components is not None and len(components) > 1:
                raise ValueError("Only one component can be dumped into a memory array: components =", components)
            components = components[0] if components is not None else None
            dst[self.indices] = self.get(component=components)
        elif fmt == 'blosc':
            self._dump_blosc(dst, components=components)
        elif fmt in ['csv', 'hdf5', 'feather']:
            self._dump_table(dst, fmt, components, *args, **kwargs)
        else:
            raise ValueError("Unknown format " + fmt)
        return self 
[docs]
    @action
    def save(self, *args, **kwargs):
        """ Save batch data to a file (an alias for dump method)"""
        return self.dump(*args, **kwargs) 
[docs]
    @apply_parallel_
    def to_array(self, comp, dtype=np.float32, channels='last', **kwargs):
        """ Converts batch components to np.ndarray format
        Parameters
        ----------
        src : str
            Component to get images from. Default is 'images'.
        dst : str
            Component to write images to. Default is 'images'.
        dtype : str or np.dtype
            Data type
        channels : None, 'first' or 'last'
            the dimension for channels axis
        """
        _ = kwargs
        comp = np.array(comp)
        if len(comp.shape) == 2:
            # a special treatment for 2d arrays with images - add a new dimension for channels
            if channels == 'first':
                comp = comp[np.newaxis, :, :]
            elif channels == 'last':
                comp = comp[:, :, np.newaxis]
        else:
            # we assume that channels is 'last' by default
            # so move channels from the last to the first axis if needed
            if channels == 'first':
                comp = np.moveaxis(comp, -1, 0)
        if dtype is not None:
            comp = comp.astype(dtype)
        return comp