Source code for batchflow.research.experiment

#pylint:disable=logging-fstring-interpolation
""" Experiment and corresponding classes. """

import os
import sys
from copy import copy, deepcopy
import itertools
import traceback
import contextlib
import warnings
from collections import OrderedDict
import time

from .. import Config, Pipeline, spawn_seed_sequence, make_rng, make_seed_sequence
from ..decorators import parallel
from ..named_expr import eval_expr
from ..utils import to_list

from .domain import ConfigAlias
from .named_expr import E, O, EC
from .utils import generate_id, must_execute, parse_name, MultiOut
from .profiler import ExecutorProfiler
from .storage import BaseResearchStorage

class PipelineWrapper:
    """ Make callable or generator from `batchflow.pipeline`.

    Parameters
    ----------
    pipeline : Pipeline
        pipeline defined with `.run_later`
    mode : 'generator', 'func' or 'execute_for', optional
        the way to use pipeline, by default 'generator':
            - 'generator': pipeline will be used as generator of batches
            - 'func': execute pipeline with `.run`
            - 'execute_for': execute pipeline with `.execute_for` with batch
    variables : str, list, optional
        variables to return from call
    """
    def __init__(self, pipeline, mode='generator', variables=None):
        if mode not in['generator', 'func', 'execute_for']:
            raise ValueError(f'Unknown PipelineWrapper mode: {mode}')
        if isinstance(pipeline, str):
            pipeline = parse_name(pipeline)
        self.pipeline = pipeline
        self.mode = mode
        self.variables = to_list(variables or [])
        self.config = None

    def __call__(self, config, batch=None, **kwargs):
        """ Execute pipeline.

        Parameters
        ----------
        config : Config
            `Config` for pipeline, defined at the first pipeline execution.
        batch : Batch, optional
            `Batch` to use with `execute_for` method, by default None

        Returns
        -------
        tuple or generator
            return depends on the mode:
                - 'generator': generator
                - 'func': the pipeline object and its variables values
                - 'execute_for': the processed batch and its variables values
        """
        if self.config is None:
            self.config = {**config, **kwargs}
            self.pipeline.set_config(self.config)

        if self.mode == 'generator':
            return self.generator()
        if self.mode == 'func':
            return (self.pipeline.run(), *self._get_vars_values())
        batch = self.pipeline.execute_for(batch)
        return (batch, *self._get_vars_values()) # if self.mode == 'execute_for'

    def generator(self):
        """ Generator returns batches from pipeline. Generator will stop when StopIteration will be raised. """
        self.reset()
        while True:
            try:
                yield (self.pipeline.next_batch(), *self._get_vars_values())
            except StopIteration:
                return

    def _get_vars_values(self):
        return [self.pipeline.v(var) for var in self.variables if var is not None]

    def __getattr__(self, attr):
        return getattr(self.pipeline, attr)

    def reset(self):
        """ Reset pipeline state: variables and bacth generator. """
        self.pipeline.reset('iter', 'vars')

    def __copy__(self):
        """ Create copy of the pipeline with the same mode. """
        if isinstance(self.pipeline, (list, tuple)):
            return PipelineWrapper(self.pipeline, self.mode, self.variables)

        return PipelineWrapper(self.pipeline + Pipeline(), self.mode, self.variables)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

class InstanceCreator:
    """ Instance class to use in each experiment in research. Will be initialized at the start of
    the experiment execution.

    Parameters
    ----------
    name : str
        name of the instance to use in research.
    creator : class
        class of the instance.
    root : bool, optional
        does instance is the same for all branches or not, by default False.
    args : list
        args for instance initialization, by default None.
    kwargs, other_kwargs : dict
        kwargs for instance initialization.
    """
    def __init__(self, name, creator, root=False, args=None, kwargs=None, **other_kwargs):
        self.name = name
        self.creator = creator
        self.root = root
        self.args = [] if args is None else args
        self.kwargs = {} if kwargs is None else kwargs
        self.other_kwargs = other_kwargs

    def __call__(self, experiment, *args, **kwargs):
        """ Create instance of the creator. """
        args = [*self.args, *args]
        kwargs = {**self.kwargs, **kwargs}
        args = eval_expr(args, experiment=experiment)
        kwargs = eval_expr(kwargs, experiment=experiment)
        other_kwargs = eval_expr(self.other_kwargs, experiment=experiment)
        return self.creator(*args, **other_kwargs, **kwargs)

class ExecutableUnit:
    """ Class to represent callables and generators executed in experiment.

    Parameters
    ----------
    name : str
        name of the unit.
    func : callable or tuple of str, optional
        callable itself or tuple which consists of instance name and its attribute to call, by default None.
        `func` and `generator` can't be defined simultaneously.
    generator : generator or tuple of str, optional
        generator itself or tuple which consists of instance name and its attribute to call, by default None.
        `func` and `generator` can't be defined simultaneously.
    root : bool, optional
        does unit is the same for all branches or not, by default False.
    when : str, int or list of ints, optional
        iterations of the experiment to execute unit, by default 1.
            - If `'last'`, unit will be executed just at last iteration (if `iteration + 1 == n_iters` or
              `StopIteration` was raised).
            - If positive int, pipeline will be executed each `when` iterations.
            - If str, must be `'#{it}'` or `'last'` where it is int, the pipeline will be executed at this
              iteration (zero-based).
            - If list, must be list of int or str described above.
    args, kwargs : optional
        args and kwargs for unit call, by default None.
    save_to : str, list or None, optional
        names to save output from unit. Must be None if `save_output_dict` is True. By default None.
    save_output_dict : bool, optional
        if the output is a dict, use its keys as names of variables to store in results.
        If True, `save_to` must be None. By default False.
    """
    def __init__(self, name, func=None, generator=None, root=False, when=1,
                 args=None, kwargs=None, save_to=None, save_output_dict=False, **other_kwargs):
        self.name = name
        self.callable = func
        self.generator = generator

        self.root = root
        self.when = when

        if isinstance(self.when, (int, str)):
            self.when = [self.when]

        self.args = [] if args is None else args
        self.kwargs = {} if kwargs is None else kwargs
        self.other_kwargs = other_kwargs

        self.config = None
        self.experiment = None
        self.output = None # the last output of the unit.
        self.iterator = None # created iterator

        self.iteration = 0

        self.save_to = save_to
        self.save_output_dict = save_output_dict

        if self.save_to is not None and self.save_output_dict:
            raise ValueError('save_to is not None and save_output_dict is True.')


    def set_unit(self, config, experiment):
        """ Set config and experiment instance for the unit. """
        self.config = config
        self.experiment = experiment

    def transform_method(self):
        """ Transform `callable` or `generator` from tuples of str to instance attributes. """
        attr = 'callable' if self.callable is not None else 'generator'
        src = getattr(self, attr)
        if isinstance(src, (tuple, list)):
            setattr(self, attr, getattr(self.experiment.instances[src[0]], src[1]))
        if isinstance(src, PipelineWrapper) and isinstance(src.pipeline, (tuple, list)):
            pipeline = src.pipeline
            src.pipeline = getattr(self.experiment.instances[pipeline[0]], pipeline[1])

    def __call__(self, iteration, n_iters, last=False):
        """ Call unit: execute callable or get the next item from generator.

        Parameters
        ----------
        iteration : int
            current iteration of the experiment.
        n_iters : int or None
            total number of iterations of the experiment. `None` means that experiment will be executed until
            `StopIteration` for at least one executable unit.
        last : bool, optional
            does it is the last iteration or not, by default False. `last` is `True` when StopIteration was raised
            for one of the previously executed units or `iteration + 1 == n_iters` when `n_iters` is not None.

        Returns
        -------
        object
            output of the wrapped unit
        """
        if iteration == 0:
            self.transform_method()

        if self.must_execute(iteration, n_iters, last):
            total = (n_iters - 1) if n_iters is not None else None
            self.experiment.logger.debug(f"Execute '{self.name}' [{iteration}/{total}]")

            self.iteration = iteration
            args = eval_expr(self.args, experiment=self.experiment)
            kwargs = eval_expr(self.kwargs, experiment=self.experiment)
            other_kwargs = eval_expr(self.other_kwargs, experiment=self.experiment)

            start_time = time.time()
            if self.callable is not None:
                self.output = self.callable(*args, **kwargs, **other_kwargs)
            else:
                if self.iterator is None:
                    start_time = time.time()
                    self.iterator = self.generator(*args, **kwargs, **other_kwargs)
                else:
                    start_time = time.time()
                self.output = next(self.iterator)

            if self.save_to or self.save_output_dict:
                self.save_output()

            eval_time = time.time() - start_time
            return self.output, eval_time

        return None, None

    def must_execute(self, iteration, n_iters=None, last=False):
        """ Returns does unit must be executed for the current iteration. """
        return must_execute(iteration, self.when, n_iters, last)

    def save_output(self):
        """ Save output of the unit. """
        if self.save_output_dict:
            dst = self.output.keys()
            src = self.output.values()
        else:
            if not isinstance(self.save_to, (list, tuple)):
                src = [self.output]
                dst = [self.save_to]
            else:
                src = self.output
                dst = self.save_to

            if len(src) != len(dst):
                raise ValueError(f'Length of src and dst must be the same but src={src} and dst={dst}')

        for _src, _dst in zip(src, dst):
            if _dst is not None:
                self.experiment.storage.update_variable(_dst, _src)

    @property
    def src(self):
        """ Return wrapped source (callable or generator) of the unit. """
        attr = 'callable' if self.callable is not None else 'generator'
        return getattr(self, attr)

    def __copy__(self):
        """ Create copy of the unit. """
        attrs = ['name', 'callable', 'generator', 'root', 'when', 'args', 'kwargs', 'save_to', 'save_output_dict']
        params = {attr if attr !='callable' else 'func': copy(getattr(self, attr)) for attr in attrs}
        new_unit = ExecutableUnit(**params, **copy(self.other_kwargs))
        return new_unit

    def __getattr__(self, key):
        return getattr(self.src, key)

    def __getitem__(self, key):
        return self.src[key]

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

[docs]class Experiment: """ Experiment description which consists of lists of instances to create and actions to execute. Each action defines executable unit (callable or generator) and corresponding execution parameters. Actions will be executed in the order defined by list. Actions can be defined as attributes of some instance (e.g., see `name` of `:meth:.add_callable`). Parameters ---------- instance_creators : list, optional list of instance_creators, by default None. Can be extended by `:meth:.add_instance`. actions : list, optional list of actions, by default None. Can be extended by `:meth:.add_executable_unit` and other methods. namespaces : list, optional list of namespaces, by default None. If None, then global namespace will be added. """ def __init__(self, instance_creators=None, actions=None, namespaces=None): if instance_creators is not None: self.instance_creators = OrderedDict(instance_creators) else: self.instance_creators = OrderedDict() if actions is None: self.actions = OrderedDict() # unit_name : (instance_name, attr_name) or callable else: self.actions = actions self._namespaces = namespaces if namespaces is not None else [] self._is_alive = True # should experiment be executed or not. Becomes False when Exceptions was raised and all # units for these iterations were executed. self._is_failed = False # was an exception raised or not self.last = False self.outputs = {} self.storage = None self.has_dump = False # does unit has any dump actions or not self.name = None # name of the executor/research self.dump_results = None self.loglevel = None self.monitor = None self.id = None #pylint:disable=invalid-name self.index = None self.config_alias = None self.config = None self.executor = None self.research = None self.instances = None self.logger = None self.iteration = None self.random_seed = None self.random = None self.profiler = None self.stdout_file = None self.stderr_file = None @property def is_alive(self): return self._is_alive @is_alive.setter def is_alive(self, value): self._is_alive = self._is_alive and value @property def is_failed(self): return self._is_failed @is_failed.setter def is_failed(self, value): self._is_failed = self._is_failed or value
[docs] def add_executable_unit(self, name, src=None, mode='func', when=1, save_to=None, dump=None, args=None, **kwargs): """ Add executable unit to experiment. Parameters ---------- name : str name of unit to use inside of the research. Can be `'instance_name.attr'` to refer to instance attr. src : callable or generator, optional callable or generator to wrap into ExecutableUnit, by default None. mode : str, optional type of src ('func' or 'generator'), by default 'func' when : int, str or list, optional iterations to execute callable (see `when` of `:class:ExecutableUnit`), by default 1. save_to : str or list, optional dst to save output of the unit (if needed), by default None. dump : int, str or list, optional iterations to dump results (see `when` of `:class:ExecutableUnit`), by default 1. args : list, optional args to execute unit, by default None. kwargs : dict kwargs to execute unit. Returns ------- Research """ if not isinstance(name, str) and hasattr(name, '__name__'): src = name name = src.__name__ if src is None: kwargs[mode] = parse_name(name) else: kwargs[mode] = src name = self.add_postfix(name) self.actions[name] = ExecutableUnit(name=name, args=args, when=when, save_to=save_to, **kwargs) if dump is not None: self.dump(save_to, when=dump) return self
[docs] def add_callable(self, name, func=None, args=None, when=1, save_to=None, dump=None, **kwargs): """ Add callable to experiment. Parameters ---------- name : str name of callable to use inside of the research. Can be `'instance_name.method'` to refer to instance method. func : callable, optional callable to add into experiment, by default None. args : list, optional args to execute callable, by default None. when : int, str or list, optional iterations to execute callable (see `when` of `:class:ExecutableUnit`), by default 1. save_to : str or list, optional dst to save output of the callable (if needed), by default None. dump : int, str or list, optional iterations to dump results (see `when` of `:class:ExecutableUnit`), by default 1. root : bool, optional does unit is the same for all branches or not, by default False. kwargs : dict kwargs to execute callable. Returns ------- Research """ return self.add_executable_unit(name, src=func, mode='func', when=when, save_to=save_to, dump=dump, args=args, **kwargs)
[docs] def add_generator(self, name, generator=None, args=None, **kwargs): """ Add generator to experiment. Parameters ---------- name : str name of generator to use inside of the research. Can be `'instance_name.method'` to refer to instance method. generator : generator, optional generator to add into experiment, by default None. args : list, optional args to create iterator, by default None. when : int, str or list, optional iterations to get item from generator (see `when` of `:class:ExecutableUnit`), by default 1. save_to : NamedExpression, optional dst to save generated item (if needed), by default None. root : bool, optional does unit is the same for all branches or not, by default False. kwargs : dict kwargs to create iterator. Returns ------- Research """ return self.add_executable_unit(name, src=generator, mode='generator', args=args, **kwargs)
[docs] def add_instance(self, name, creator, root=False, **kwargs): """ Add instance of some class into research. Parameters ---------- name : str instance name. creator : class class which instance will be used to get attributes. root : bool, optional does instances is the same for all branches or not, by default False. Returns ------- Experiment """ self.instance_creators[name] = InstanceCreator(name, creator, root, **kwargs) self.add_callable(f'init_{name}', _create_instance, experiments=E(all=root), root=root, item_name=name, when="%0") return self
[docs] def add_pipeline(self, name, root=None, branch=None, run=False, variables=None, dump=None, when=1, **kwargs): """ Add pipeline to experiment. Parameters ---------- name : str name of pipeline to use inside of the research. Can be `'instance_name.attribute'` to refer to instance attribute. root : batchflow.Pipeline, optional a pipeline to execute, by default None. It must contain `run` action with `lazy=True` or `run_later`. Only if `branch` is None, `root` may contain parameters that can be defined by config. from domain. branch : Pipeline, optional a parallelized pipeline to execute, by default None. Several copies of branch pipeline will be executed in parallel per each batch received from the root pipeline. May contain parameters that can be defined by domain, all branch pipelines will correspond to different experiments and will have different configs from domain. run : bool, optional if False then `.next_batch()` will be applied to pipeline, else `.run()` , by default False. dump : int, str or list, optional iterations to dump results (see `when` of `:class:ExecutableUnit`), by default 1. variables : str, list or None, optional variables of pipeline to save. when : int, str or list, optional iterations to execute (see `when` of `:class:ExecutableUnit`), by default 1. Returns ------- Research """ save_to = [None] + to_list(variables or []) if branch is None: mode = 'func' if run else 'generator' pipeline = PipelineWrapper(root if root is not None else name, mode=mode, variables=variables) self.add_executable_unit(name, src=pipeline, mode=mode, config=EC(full=True), when=when, save_to=save_to, **kwargs) else: root = PipelineWrapper(root, mode='generator') branch = PipelineWrapper(branch, mode='execute_for', variables=variables) self.add_generator(f'{name}_root', generator=root, config=EC(full=True), when=when, **kwargs) self.add_callable(f'{name}', func=branch, config=EC(full=True), batch=O(f'{name}_root')[0], save_to=save_to, when=when, **kwargs) if variables is not None: if dump is not None: self.dump(variables, dump) return self
[docs] def add_namespace(self, namespace): """ Add namespace to experiment. """ self._namespaces += [namespace] return self
@property def _all_namespaces(self): common_namespaces = [sys.modules["__main__"], sys.modules["builtins"]] namespaces = [sys.modules[namespace] if isinstance(namespace, str) else namespace for namespace in self._namespaces] return common_namespaces + namespaces
[docs] def get_method(self, name): """ Return a method by the name """ for namespace in self._all_namespaces: if hasattr(namespace, name): return getattr(namespace, name) if isinstance(namespace, dict) and name in namespace: return namespace[name] return None
def __getattr__(self, name): method = self.get_method(name) if method is None: warnings.warn(f'Method {name} was not found in any namespace.') return None return _explicit_call(method, name, self)
[docs] def save(self, src, dst, when=1, save_output_dict=False, copy=False): #pylint:disable=redefined-outer-name """ Save something to research results. Parameters ---------- src : NamedExpression value to save. dst : str name in the results. when : int, str or list, optional iterations to execute (see `when` of `:class:ExecutableUnit`), by default 1. copy : bool, optional copy value or not, by default False """ name = '__save_results' if dst is None else f'__save_results_{dst}' name = self.add_postfix(name) self.add_callable(name, _get_input, x=src, copy=copy, when=when, save_to=dst, experiment=E(), save_output_dict=save_output_dict) return self
[docs] def dump(self, variable=None, when='last'): """ Dump current results to the storage and clear it. Parameters ---------- variable : str, optional names in results to dump, by default None. None means that all results will be dumped. when : int, str or list, optional iterations to execute (see `when` of `:class:ExecutableUnit`), by default 1. Returns ------- Research """ self.has_dump = True name = '__dump_results' if variable is None else f'__dump_results_{variable}' name = self.add_postfix(name) self.add_callable(name, _dump_results, when=when, variable=variable, experiment=E()) return self
[docs] def add_postfix(self, new_name): """ Add postfix for conincided unit name. """ n_actions = sum(self._has_postfix(new_name, unit_name) for unit_name in self.actions) return new_name if n_actions == 0 else f"{new_name}_{n_actions}"
def _has_postfix(self, new_name, unit_item): postfix = unit_item[len(new_name):] return (unit_item == new_name) or (len(postfix) >= 2 and postfix[0] == '_' and postfix[1:].isdigit()) @property def only_callables(self): """ Check if experiment has only callables. """ for unit in self.actions.values(): if unit.callable is None: return False return True @property def results(self): return self.storage.results
[docs] def copy(self): """ Create copy of the experiment. Is needed to create experiments for branches. """ instance_creators = copy(self.instance_creators) actions = OrderedDict([(name, copy(unit)) for name, unit in self.actions.items()]) new_experiment = Experiment(instance_creators=instance_creators, actions=actions) new_experiment.has_dump = self.has_dump return new_experiment
def __getitem__(self, key): return self.actions[key] def __copy__(self): return self.copy() def __deepcopy__(self, memo): _ = memo return self.copy()
[docs] def init(self, index, config, executor=None): """ Create all instances of units to start experiment. """ self.index = index self.executor = executor self.research = executor.research seed = spawn_seed_sequence(self.executor) self.random_seed = seed self.random = make_rng(seed) if self.research is not None: self.id = config.pop_config('id').config()['id'] else: self.id = generate_id(config, self.random) self.pop_index_keys(config) self.config_alias = config self.config = config.config() # Get attributes from research or kwargs of executor params = ['loglevel', 'name', 'monitor', 'debug', 'profile', 'redirect_stdout', 'redirect_stderr', 'dump_results'] for attr in params: value = getattr(self.executor, attr) setattr(self, attr, value) storage_class = self.executor.storage.experiment_storage_class self.storage = storage_class(self, loglevel=self.loglevel) self.logger = self.storage.logger self.profiler = self.storage.profiler self.instances = OrderedDict() root_experiment = executor.experiments[0] if len(executor.experiments) > 0 else None for name in self.actions: if self.actions[name].root and root_experiment is not None: self.actions[name] = root_experiment.actions[name] else: self.actions[name].set_unit(config=config, experiment=self)
[docs] def pop_index_keys(self, config): """ Remove auxilary keys used to create prefix. """ for key in config.keys(): if key[0] == '#': config.pop_config(key) config.pop_config('_prefix')
[docs] def create_stream(self, name, *streams): """ Create contextmanager to redirect stdout/stderr. """ streams = [stream for stream in streams if not isinstance(stream, contextlib.nullcontext)] if len(streams) > 0: if name == 'stdout': return contextlib.redirect_stdout(MultiOut(*streams)) return contextlib.redirect_stderr(MultiOut(*streams)) # 'stderr' return contextlib.nullcontext()
[docs] def call(self, name, iteration, n_iters=None): """ Execute one iteration of the experiment. """ context_manager_out = self.create_stream( 'stdout', self.storage.stdout_file, self.executor.storage.stdout_file ) context_manager_err = self.create_stream( 'stderr', self.storage.stderr_file, self.executor.storage.stderr_file ) with context_manager_out, context_manager_err: if self.is_alive or name.startswith('__'): if self.profiler: self.profiler.enable() self.last = self.last or (iteration + 1 == n_iters) self.iteration = iteration exception = (StopIteration, KeyboardInterrupt) if self.debug else Exception try: self.outputs[name], unit_time = self.actions[name](iteration, n_iters, last=self.last) except exception as e: #pylint:disable=broad-except self.is_failed = True self.last = True if isinstance(e, StopIteration): self.logger.info(f"Stop '{name}' [{iteration}/{n_iters}]") else: ex_traceback = e.__traceback__ msg = ''.join(traceback.format_exception(e.__class__, e, ex_traceback)) self.logger.error(f"Fail '{name}' [{iteration}/{n_iters}]: Exception\n{msg}") if self.monitor: self.monitor.fail_item_execution(name, self, msg) if self.is_failed and ((list(self.actions.keys())[-1] == name) or (not self.executor.finalize)): self.is_alive = False if self.profiler: self.profiler.disable(iteration, name, unit_time=unit_time, experiment=self.id)
[docs] def show_profile_info(self, **kwargs): return self.profiler.show_profile_info(**kwargs)
@property def profile_info(self): return self.profiler.profile_info def __str__(self): repr = '' spacing = ' ' * 4 if len(self.instance_creators) > 0: repr += "instances:\n" for name, creator in self.instance_creators.items(): repr += spacing + f"{name}(\n" repr += 2 * spacing + f"root={creator.root},\n" repr += ''.join([spacing * 2 + f"{key}={value}\n" for key, value in creator.kwargs.items()]) repr += spacing + ")\n" repr += '\n' if len(self.actions) > 0: repr += "units:\n" attrs = ['callable', 'generator', 'root', 'when', 'args'] for name, action in self.actions.items(): repr += spacing + f"{name}(\n" repr += ''.join([spacing * 2 + f"{key}={getattr(action, key)}\n" for key in attrs]) kwargs = {**action.kwargs, **action.other_kwargs} repr += spacing * 2 + f"kwargs={kwargs}\n" + spacing + ")\n" return repr def __getstate__(self): return self.__dict__ def __setstate__(self, d): self.__dict__.update(d)
class Executor: """ Executor of experiments in branches. Parameters ---------- experiment : Experiment experiment scheme. research : Research, optional Research instance to get meta information (if needed), by default None. worker : Worker, optional Worker instance, by default None. configs : list, optional configs for different branches, by default None. executor_config : Config of dict, optional config for the all experiments, will be appended to each expeirment config, by default None. branches_configs : list, optional configs for each branch, will be appended to each expeirment config, by default None. target : str, optional how to parallelize branches ('threads' or 'for'), by default 'threads'. n_iters : int, optional the number of iterations to execute, by default None. task_name : str, optional name of the task, by default None """ def __init__(self, experiment, research=None, worker=None, configs=None, executor_config=None, branches_configs=None, target='threads', n_iters=None, task_name=None, **kwargs): if configs is None: if branches_configs is None: self.n_branches = 1 else: self.n_branches = len(branches_configs) else: if branches_configs is not None and len(configs) != len(branches_configs): raise ValueError('`configs` and `branches_configs` must be of the same length.') self.n_branches = len(configs) self.configs = configs or [Config() for _ in range(self.n_branches)] self.executor_config = Config(executor_config or {}) self.branches_configs = branches_configs or [Config() for _ in range(self.n_branches)] self.branches_configs = [Config(config) for config in self.branches_configs] self.n_iters = n_iters self.research = research self.experiment_template = experiment self.task_name = task_name or 'Task' self.target = target self.set_params(kwargs) self.worker = worker if worker is not None: seed = spawn_seed_sequence(worker) else: seed = make_seed_sequence() self.random_seed = seed self.random = make_rng(seed) if self.research is not None: self.storage = self.research.storage else: storage = 'local' if self.dump_results else 'memory' self.storage = BaseResearchStorage(self, storage=storage) self.create_experiments() self.pid = None self.common_stdout = None self.common_stderr = None def set_params(self, kwargs): """ Set params of executor. Is used to get attributes from research or from kwargs. """ defaults = { 'loglevel': 'debug', 'name': 'executor', 'monitor': None, 'debug': False, 'profile': False, 'redirect_stdout': True, 'redirect_stderr': True, 'dump_results': False, 'finalize': False } for attr, value_ in defaults.items(): if self.research: value = getattr(self.research, attr) else: value = kwargs.get(attr, value_) setattr(self, attr, value) def create_experiments(self): """ Initialize experiments. """ self.experiments = [] for index, (config, branch_config) in enumerate(zip(self.configs, self.branches_configs)): full_config = ConfigAlias(config) + ConfigAlias(branch_config) + ConfigAlias(self.executor_config) experiment = self.experiment_template.copy() experiment.init(index, full_config, self) experiment.logger.info(f"{self.task_name}[{index}] has been started with config:\n {repr(config)}") self.experiments.append(experiment) def run(self): """ Run experiments. """ self.storage.create_redirection_streams() with self.storage.stdout_file, self.storage.stderr_file: self.pid = os.getpid() if self.research and self.research.parallel else None iterations = range(self.n_iters) if self.n_iters else itertools.count() if self.research: for experiment in self.experiments: self.research.monitor.start_experiment(experiment) for iteration in iterations: for unit_name, unit in self.experiment_template.actions.items(): if unit.root or len(self.experiments) == 1: self.call_root(iteration, unit_name) else: self.parallel_call(iteration, unit_name, target=self.target, debug=self.debug) #pylint:disable=unexpected-keyword-arg if not any(experiment.is_alive for experiment in self.experiments): break if self.research: for experiment in self.experiments: if experiment.is_alive: self.research.monitor.execute_iteration(experiment) for index, experiment in enumerate(self.experiments): if self.research: self.research.monitor.stop_experiment(experiment) experiment.logger.info(f"{self.task_name}[{index}] has been finished.") self.close() def close(self): """ Close storages. """ for experiment in self.experiments: experiment.storage.close() self.storage.close_files() if self.research is None: self.storage.close() @parallel(init='_parallel_init_call') def parallel_call(self, experiment, iteration, unit_name): """ Parallel call of the unit 'unit_name' """ if self.finalize or (not experiment.is_failed) or unit_name.startswith('__'): experiment.call(unit_name, iteration, self.n_iters) def _parallel_init_call(self, iteration, unit_name): """ Auxilary method to call before '_parallel_call'. """ _ = iteration, unit_name return self.experiments def call_root(self, iteration, unit_name): """ Call root executable unit. """ # TODO: experiment must be alive if error was in the branch after all roots if self.finalize or (not self.experiments[0].is_failed) or unit_name.startswith('__'): self.experiments[0].call(unit_name, iteration, self.n_iters) for experiment in self.experiments[1:]: if self.finalize or (not experiment.is_failed) or unit_name.startswith('__'): experiment.outputs[unit_name] = self.experiments[0].outputs[unit_name] for attr in ['_is_alive', '_is_failed', 'iteration']: setattr(experiment, attr, getattr(self.experiments[0], attr)) @property def profiler(self): """ Executor profiler. """ if self.experiments[0].profile: return ExecutorProfiler(self.experiments) return None @property def profile_info(self): """ Profile info. """ if self.profiler: return self.profiler.profile_info return None def show_profile_info(self, **kwargs): return self.profiler.show_profile_info(**kwargs) def _create_instance(experiments, item_name): if not isinstance(experiments, list): experiments = [experiments] instance = experiments[0].instance_creators[item_name](experiments[0]) for e in experiments: e.instances[item_name] = instance def _get_input(x, copy, *args, **kwargs): #pylint:disable=redefined-outer-name _ = args, kwargs return deepcopy(x) if copy else x def _dump_results(variable, experiment): """ Callable to dump results. """ experiment.storage.dump_results(variable) def _explicit_call(method, name, experiment): """ Add unit into research by explicit call in research-pipeline. """ def _method(*args, **kwargs): return experiment.add_executable_unit(name, src=method, args=args, **kwargs) return _method