Source code for batchflow.dsindex

""" DatasetIndex """
import os
import math
import glob
from collections.abc import Iterable
import warnings
import numpy as np

from .base import Baseset
from .notifier import Notifier
from .utils_random import make_rng


[docs]class DatasetIndex(Baseset): """ Stores an index for a dataset. The index should be 1-d array-like, e.g. numpy array, pandas Series, etc. Parameters ---------- index : int, 1-d array-like or callable defines structure of DatasetIndex Examples -------- >>> index = DatasetIndex(all_item_ids) >>> index.split([0.8, 0.2]) >>> item_pos = index.get_pos(item_id) """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._pos = self.build_pos() self._random_state = None
[docs] @classmethod def from_index(cls, *args, **kwargs): """Create index from another index. """ return cls(*args, **kwargs)
[docs] @classmethod def concat(cls, *index_list): """Create index by concatenating other indices. Parameters ---------- index_list : list Indices to be concatenated. Each item is expected to contain index property with 1-d sequence of indices. Returns ------- DatasetIndex Contains one common index. """ return type(index_list[0])(np.concatenate([i.index for i in index_list]))
def __add__(self, other): if not isinstance(other, DatasetIndex): other = DatasetIndex(other) return self.concat(self, other)
[docs] @staticmethod def build_index(index): """ Check index type and structure. Parameters ---------- index : int, 1-d array-like or callable Defines content of DatasetIndex - 1-d array-like Content is numpy.array - int Content is numpy.arange() of given length. - callable Content is return of given function (should be 1-d array-like). Raises ------ TypeError If 'index' is not 1-dimensional. ValueError If 'index' is empty. Returns ------- numpy.array Index to be stored in class instance. """ if callable(index): _index = index() else: _index = index if isinstance(_index, DatasetIndex): _index = _index.indices elif isinstance(_index, int): _index = np.arange(_index) else: # index should allow for advance indexing (i.e. subsetting) _index = np.asarray(_index) if np.shape(_index) == (): _index = _index.reshape(1) if len(_index) == 0: raise ValueError("Index cannot be empty") if len(_index.shape) > 1: raise TypeError("Index should be 1-dimensional") if len(np.unique(_index)) != len(_index): warnings.warn("Index contains non-unique elements") return _index
[docs] def build_pos(self): """ Create a dictionary with positions in the index. """ if self.indices is None: return {} return dict(zip(self.indices, np.arange(len(self))))
[docs] def get_pos(self, index): """ Return position of an item in the index. Parameters ---------- index : int, str, slice or Iterable Items to return positions of. - int, str Return position of that item in the DatasetIndex. - slice, Iterable Return positions of multiple items, specified by argument. Returns ------- numpy.array Positions of specified items in DatasetIndex. Examples -------- Create DatasetIndex that holds index of images and get position of one of them >>> DatasetIndex(['image_0', 'image_1']).get_pos('image_1') """ if isinstance(index, slice): start = self._pos[index.start] if index.start is not None else None stop = self._pos[index.stop] if index.stop is not None else None pos = slice(start, stop, index.step) elif isinstance(index, str): pos = self._pos[index] elif isinstance(index, Iterable): pos = np.asarray([self._pos[ix] for ix in index]) else: pos = self._pos[index] return pos
[docs] def subset_by_pos(self, pos): """ Return subset of index by given positions in the index. Parameters ---------- pos : int, slice, list or numpy.array Positions of items to include in subset. Returns ------- numpy.array Subset of DatasetIndex.index. """ return self.index[pos]
[docs] def create_subset(self, index): """ Return a new index object based on the subset of indices given. """ return type(self)(index)
[docs] def split(self, shares=0.8, shuffle=False): """ Split index into train, test and validation subsets. Shuffles index if necessary. Subsets are available as `.train`, `.test` and `.validation` respectively. Parameters ---------- shares : float or tuple of floats train, test and validation shares. shuffle specifies the order of items (see :meth:`~.DatasetIndex.shuffle`) Notes ----- If tuple of 3 floats is passed, then validation subset is always present. Examples --------- split into train / test in 80/20 ratio >>> index.split() split into train / test / validation in 60/30/10 ratio >>> index.split([0.6, 0.3]) split into train / test / validation in 50/30/20 ratio >>> index.split([0.5, 0.3, 0.2]) use 1 sample as validation and split the rest evenly to train / test >>> index.split([0.5, 0.5, 0]) """ train_share, test_share, valid_share = self.calc_split(shares) order = self.shuffle(shuffle) # pylint: disable=attribute-defined-outside-init if valid_share > 0: validation_pos = order[:valid_share] self.validation = self.create_subset(self.subset_by_pos(validation_pos)) if test_share > 0: test_pos = order[valid_share : valid_share + test_share] self.test = self.create_subset(self.subset_by_pos(test_pos)) if train_share > 0: train_pos = order[valid_share + test_share:] self.train = self.create_subset(self.subset_by_pos(train_pos))
[docs] def shuffle(self, shuffle, iter_params=None): """ Permute indices Parameters ---------- shuffle : bool or seed specifies the order of items - if `False`, items go sequentially, one after another as they appear in the index. - if `True`, items are shuffled randomly before each epoch. - see :func:`~.make_rng` for seed specifications. Returns ------- ndarray a permuted order for indices """ if iter_params is None: iter_params = self.get_default_iter_params() if iter_params['_order'] is None: order = np.arange(len(self)) else: order = iter_params['_order'] rng = make_rng(shuffle) if rng is not None: iter_params['_random_state'] = rng order = rng.permutation(order) return order
[docs] def next_batch(self, batch_size, shuffle=False, n_iters=None, n_epochs=None, drop_last=False, iter_params=None): """ Return the next batch Parameters ---------- batch_size : int Desired number of items in the batch (the actual batch could contain fewer items) shuffle Specifies the order of items (see :meth:`~.DatasetIndex.shuffle`) n_iters : int Number of iterations to make (only one of `n_iters` and `n_epochs` should be specified). n_epochs : int Number of epochs required (only one of `n_iters` and `n_epochs` should be specified). drop_last : bool If `True`, drops the last batch (in each epoch) if it contains fewer than `batch_size` items. If `False`, than the last batch in each epoch could contain repeating indices (which might be a problem) and the very last batch could contain fewer than `batch_size` items. For instance, `next_batch(3, shuffle=False, n_epochs=2, drop_last=False)` for a dataset with 4 items returns indices [0,1,2], [3,0,1], [2,3]. While `next_batch(3, shuffle=False, n_epochs=2, drop_last=True)` returns indices [0,1,2], [0,1,2]. Take into account that `next_batch(3, shuffle=True, n_epochs=2, drop_last=False)` could return batches [3,0,1], [2,0,2], [1,3]. Here the second batch contains two items with the same index "2". This might become a problem if some action uses `batch.get_pos()` or `batch.index.get_pos()` methods so that one of the identical items will be missed. However, there is nothing to worry about if you don't iterate over batch items explicitly (i.e. `for item in batch`) or implicitly (through `batch[ix]`). Raises ------ StopIteration When `n_epochs` has been reached and there is no batches left in the dataset. ValueError When `n_epochs` and `n_iters` have been passed at the same time. When batch size exceeds the dataset size. Examples -------- :: for i in range(MAX_ITER): index_batch = index.next_batch(BATCH_SIZE, shuffle=True, n_epochs=2, drop_last=True): # do whatever you want """ if batch_size > len(self): raise ValueError("Batch size cannot be larger than the dataset size.") if n_iters is not None and n_epochs is not None: raise ValueError("Only one of n_iters and n_epochs should be specified.") if iter_params is None: iter_params = self._iter_params # The previous iteration was the last one to perform, so stop iterating if iter_params['_stop_iter']: if 'notifier' in iter_params: iter_params['notifier'].close() raise StopIteration("Dataset is over. No more batches left.") if iter_params['_order'] is None: iter_params['_order'] = self.shuffle(shuffle, iter_params) num_items = len(iter_params['_order']) rest_items = None if iter_params['_start_index'] + batch_size >= num_items: rest_items = np.copy(iter_params['_order'][iter_params['_start_index']:]) rest_of_batch = iter_params['_start_index'] + batch_size - num_items if rest_of_batch > 0: if drop_last: rest_items = None rest_of_batch = batch_size iter_params['_start_index'] = 0 iter_params['_n_epochs'] += 1 iter_params['_order'] = self.shuffle(shuffle, iter_params) else: rest_of_batch = batch_size new_items = iter_params['_order'][iter_params['_start_index'] : iter_params['_start_index'] + rest_of_batch] if rest_items is None: batch_items = new_items else: batch_items = np.concatenate((rest_items, new_items)) if n_iters is not None and iter_params['_n_iters'] >= n_iters or \ n_epochs is not None and iter_params['_n_epochs'] >= n_epochs: if 'notifier' in iter_params: iter_params['notifier'].close() if n_iters is not None or drop_last and (rest_items is None or len(rest_items) < batch_size): raise StopIteration("Dataset is over. No more batches left.") iter_params['_stop_iter'] = True iter_params['_n_iters'] += 1 return self.create_batch(rest_items, pos=True) iter_params['_n_iters'] += 1 iter_params['_start_index'] += rest_of_batch return self.create_batch(batch_items, pos=True)
[docs] def gen_batch(self, batch_size, shuffle=False, n_iters=None, n_epochs=None, drop_last=False, notifier=False, iter_params=None): """ Generate batches Parameters ---------- batch_size : int Desired number of items in the batch (the actual batch could contain fewer items). shuffle specifies the order of items (see :meth:`~.DatasetIndex.shuffle`) n_iters : int Number of iterations to make (only one of `n_iters` and `n_epochs` should be specified). n_epochs : int Number of epochs required (only one of `n_iters` and `n_epochs` should be specified). drop_last : bool If `True`, drops the last batch (in each epoch) if it contains fewer than `batch_size` items. If `False`, than the last batch in each epoch could contain repeating indices (which might be a problem) and the very last batch could contain fewer than `batch_size` items. For instance, `gen_batch(3, shuffle=False, n_epochs=2, drop_last=False)` for a dataset with 4 items returns indices [0,1,2], [3,0,1], [2,3]. While `gen_batch(3, shuffle=False, n_epochs=2, drop_last=True)` returns indices [0,1,2], [0,1,2]. Take into account that `gen_batch(3, shuffle=True, n_epochs=2, drop_last=False)` could return batches [3,0,1], [2,0,2], [1,3]. Here the second batch contains two items with the same index "2". This might become a problem if some action uses `batch.get_pos()` or `batch.index.get_pos()` methods so that one of the identical items will be missed. However, there is nothing to worry about if you don't iterate over batch items explicitly (i.e. `for item in batch`) or implicitly (through `batch[ix]`). notifier : str, dict, or instance of `.Notifier` Configuration of displayed progress bar, if any. If str or dict, then parameters of `.Notifier` initialization. For more details about notifying capabilities, refer to `.Notifier` documentation. Yields ------ An instance of the same class with a subset of indices Raises ------ ValueError When `n_epochs` and `n_iters` have been passed at the same time. Examples -------- :: for index_batch in index.gen_batch(BATCH_SIZE, shuffle=True, n_epochs=2, drop_last=True): # do whatever you want """ iter_params = iter_params or self.get_default_iter_params() if n_iters is not None: total = n_iters elif n_epochs is None: total = None elif drop_last: total = len(self) // batch_size * n_epochs else: total = math.ceil(len(self) * n_epochs / batch_size) iter_params.update({'_total': total}) if notifier: if not isinstance(notifier, Notifier): notifier = Notifier(**(notifier if isinstance(notifier, dict) else {'bar': notifier}), total=None, batch_size=batch_size, n_iters=n_iters, n_epochs=n_epochs, drop_last=drop_last, length=len(self._dataset.index)) notifier.local = True else: notifier.local = False if notifier.total is None: notifier.compute_total(total=None, batch_size=batch_size, n_iters=n_iters, n_epochs=n_epochs, drop_last=drop_last, length=len(self._dataset.index)) notifier.make_bar() iter_params['notifier'] = notifier while True: if n_epochs is not None and iter_params['_n_epochs'] >= n_epochs: return try: batch = self.next_batch(batch_size, shuffle, n_iters, n_epochs, drop_last, iter_params) except StopIteration: return if 'notifier' in iter_params: notifier.update() yield batch
[docs] def create_batch(self, index, pos=True, as_array=False, *args, **kwargs): """ Create a batch from given indices. Parameters ---------- index : int, slice, list, numpy.array or DatasetIndex If 'pos' is True, then 'index' should contain positions of items in the current index to be returned as separate batch. If 'pos' is False, then 'index' should contain indices to be returned as separate batch (so expected batch is just the very same index). pos : bool Whether to return indices or positions as_array : bool Whether to return array or an instance of DatasetIndex Returns ------- DatasetIndex or numpy.array Part of initial DatasetIndex, specified by 'index'. Examples -------- Create DatasetIndex with first 100 natural numbers, then get batch with every second item >>> DatasetIndex(100).create_batch(index=2*numpy.arange(50)) """ _ = args, kwargs if isinstance(index, DatasetIndex): _index = index.indices else: _index = index if pos: batch = self.subset_by_pos(_index) else: batch = _index if not as_array: batch = self.create_subset(batch) return batch
[docs]class FilesIndex(DatasetIndex): """ Index with the list of files or directories with the given path pattern Examples -------- Create a sorted index of files in a directory: >>> fi = FilesIndex(path='/path/to/data/files/*', sort=True) Create an index of directories through all subdirectories: >>> fi = FilesIndex(path='/path/to/data/archive*/patient*', dirs=True) Create an index of files in several directories, and file extensions are ignored: >>> fi = FilesIndex(path=['/path/to/archive/2016/*','/path/to/current/file/*'], no_ext=True) To get a path to the file call `get_fullpath(index_id)`: >>> path = fi.get_fullpath(some_id) Split into train / test / validation in 80/15/5 ratio >>> fi.split([0.8, 0.15]) Get a position of a customer in the index >>> item_pos = fi.get_pos(customer_id) """ def __init__(self, *args, **kwargs): self._paths = None self.dirs = False super().__init__(*args, **kwargs) @property def paths(self): return self._paths
[docs] @classmethod def concat(cls, *index_list): """Create index by concatenating other indices. Parameters ---------- index_list : list Indices to be concatenated. Each item is expected to contain index property with 1-d sequence of indices. Returns ------- DatasetIndex Contains one common index. """ paths = {} for index in index_list: paths.update(index.paths) return type(index_list[0])(index=np.concatenate([i.index for i in index_list]), paths=paths)
[docs] def build_index(self, index=None, path=None, *args, **kwargs): """ Build index from a path string or an index given. """ if path is None: _index = self.build_from_index(index, *args, **kwargs) else: _index = self.build_from_path(path, *args, **kwargs) if len(_index) != len(self._paths): raise ValueError("Index contains non-unique elements, which leads to path collision") return _index
[docs] def build_from_index(self, index, paths, dirs=None): """ Build index from another index for indices given. """ if isinstance(index, DatasetIndex): index = index.indices else: index = DatasetIndex(index).indices if isinstance(paths, dict): self._paths = dict((file, paths[file]) for file in index) else: self._paths = dict((file, paths[pos]) for pos, file in np.ndenumerate(index)) self.dirs = dirs return index
[docs] def build_from_path(self, path, dirs=False, no_ext=False, sort=False): """ Build index from a path/glob or a sequence of paths/globs. """ if isinstance(path, str): paths = [path] else: paths = path if len(paths) == 0: raise ValueError("`path` must contain at least one entry.") _all_index = [] _all_paths = {} for one_path in paths: _index, _paths = self.build_from_one_path(one_path, dirs, no_ext) _all_index.append(_index) _all_paths.update(_paths) _all_index = np.ravel(_all_index) if sort: _all_index.sort() self._paths = _all_paths self.dirs = dirs return _all_index
[docs] def build_from_one_path(self, path, dirs=False, no_ext=False): """ Build index from a path/glob. """ if not isinstance(path, str): raise TypeError(f'Each path must be a string, instead got {path}') check_fn = os.path.isdir if dirs else os.path.isfile pathlist = glob.iglob(path, recursive=True) _full_index = np.asarray([self.build_key(fname, no_ext) for fname in pathlist if check_fn(fname)]) if len(_full_index): _index = _full_index[:, 0] _paths = _full_index[:, 1] else: warnings.warn(f"No items to index in {path}") _index, _paths = np.empty(0), np.empty(0) _paths = dict(zip(_index, _paths)) return _index, _paths
[docs] @staticmethod def build_key(fullpathname, no_ext=False): """ Create index item from full path name. """ key_name = os.path.basename(fullpathname) if no_ext: dot_position = key_name.rfind('.') dot_position = dot_position if dot_position > 0 else len(key_name) key_name = key_name[:dot_position] return key_name, fullpathname
[docs] def get_fullpath(self, key): """ Return the full path name for an item in the index. """ return self._paths[key]
[docs] def create_subset(self, index): """ Return a new FilesIndex based on the subset of indices given. """ return type(self).from_index(index=index, paths=self._paths, dirs=self.dirs)