""" Pipeline decorators """
import os
import traceback
import threading
import concurrent.futures as cf
import asyncio
import functools
import logging
import inspect
try:
from numba import jit
except ImportError:
jit = None
from .named_expr import P
def make_function(method, is_global=False):
""" Makes a function from a method
Parameters
----------
method
a callable
is_global : bool
whether to create a function in a global namespace
Notes
-----
A method should not be decorated with any other decorator.
"""
source = inspect.getsource(method).split('\n')
indent = len(source[0]) - len(source[0].lstrip())
# strip indent spaces
source = [s[indent:] for s in source if len(s) > indent]
# skip all decorator and comment lines before 'def' or 'async def'
start = 0
for i, s in enumerate(source):
if s[:3] in ['def', 'asy']:
start = i
break
source = '\n'.join(source[start:])
globs = globals() if is_global else method.__globals__.copy()
exec(source, globs) # pylint:disable=exec-used
# Method with the same name might exist in various classes or modules
# so a global function should have a unique name
function_name = method.__module__ + "_" + method.__qualname__
function_name = function_name.replace('.', '_')
globs[function_name] = globs[method.__name__]
return globs[function_name]
def _workers_count():
cpu_count = 0
try:
cpu_count = len(os.sched_getaffinity(0))
except AttributeError:
cpu_count = os.cpu_count()
return cpu_count * 4
def _make_action_wrapper_with_args(use_lock=None, no_eval=None): # pylint: disable=redefined-outer-name
return functools.partial(_make_action_wrapper, use_lock=use_lock, no_eval=no_eval)
def _make_action_wrapper(action_method, use_lock=None, no_eval=None):
@functools.wraps(action_method)
def _action_wrapper(action_self, *args, **kwargs):
""" Call the action method """
if use_lock is not None:
if action_self.pipeline is not None:
if isinstance(use_lock, bool):
_lock_name = '#_lock_' + action_method.__name__
else:
_lock_name = use_lock
if not action_self.pipeline.has_variable(_lock_name):
action_self.pipeline.init_variable(_lock_name, threading.Lock())
action_self.pipeline.get_variable(_lock_name).acquire()
_res = action_method(action_self, *args, **kwargs)
if use_lock is not None:
if action_self.pipeline is not None:
action_self.pipeline.get_variable(_lock_name).release()
return _res
if isinstance(no_eval, str):
no_eval = [no_eval]
_action_wrapper.action = dict(method=action_method, use_lock=use_lock, no_eval=no_eval)
return _action_wrapper
[docs]def action(*args, **kwargs):
""" Decorator for action methods in :class:`~.Batch` classes
Parameters
----------
use_lock : bool or str
whether to lock an action when a pipeline is executed. It can be bool or a lock name.
A pipeline variable with a lock is created in the pipeline during the execution.
no_eval : str or a sequence of str
parameters to skip from named expression evaluation.
A parameter should be passed as a named argument only.
Examples
--------
.. code-block:: python
@action
def some_action(self, arg1, arg2):
...
@action(no_eval='dst')
def calc_offset(self, src, dst=None):
...
@action(use_lock=True)
def critical_section(self, some_arg, another_arg):
...
@action(use_lock='lock_name')
def another_critical_section(self, some_arg, another_arg):
...
"""
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# action without arguments
return _make_action_wrapper(action_method=args[0])
# action with arguments
return _make_action_wrapper_with_args(*args, **kwargs)
[docs]def apply_parallel(*args, **kwargs):
""" Mark class method for transform in its metaclass.
Decorator writes `kwargs` to the method attribute `apply_kwargs`,
so they can be extracted and used in metaclass.
Parameters
----------
args, kwargs
other parameters passed to `apply_parallel` method of the class
where this decorator is being used
Notes
-----
Redefine the attribute `apply_defaults <.Batch.apply_defaults>` in
the batch class. This is proposed solely for the purposes of brevity — in
order to avoid repeated heavily loaded class methods decoration, e.g.
`@apply_parallel(src='images', target='for')` which in most cases is
actually equivalent to simple `@apply_parallel` assuming
that the defaults are redefined for the class whose methods are being
transformed.
Note, that if no defaults redefined those from the nearest
parent class will be used in :class:`~.batch.MethodsTransformingMeta`.
"""
def mark(method):
method.apply_kwargs = kwargs
return method
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return mark(args[0])
if len(args) != 0:
raise ValueError(f"This decorator accepts only named arguments, got {args}!")
return mark
[docs]def any_action_failed(results):
""" Return `True` if some parallelized invocations threw exceptions """
return any(isinstance(res, Exception) for res in results)
def inbatch_parallel(init, post=None, target='threads', _use_self=None, debug=False, **dec_kwargs):
""" Decorator for parallel methods in :class:`~.Batch` classes
Parameters
----------
init
a method name or a callable that returns an iterable for parallelization
(e.g. a list of indices or items to be passed to a parallelized method)
post
a method name or a callable to call after parallel invocations
(e.g. to assemble the batch)
target : 'threads', 'mpc', 'async', 'for'
a parallelization engine
_use_self : bool
whether to pass `self` (i.e. whether a decorated callable is a method or a function)
debug : bool
If False then inbatch_parallel doesn't process exceptions. Works only with target='for'
Notes
-----
`mpc` can be used with a method that is decorated only by `inbatch_parallel`.
All other decorators will be ignored.
"""
if target not in ['nogil', 'threads', 'mpc', 'async', 'for', 't', 'm', 'a', 'f']:
raise ValueError("target should be one of 'threads', 'mpc', 'async', 'for'")
if debug and target not in ['for', 'f']:
raise ValueError("target should be 'for' for debug=True")
def inbatch_parallel_decorator(method):
""" Return a decorator which run a method in parallel """
use_self = '.' in method.__qualname__ if _use_self is None else _use_self
mpc_method = method
if target in {'mpc', 'm'} and use_self:
try:
mpc_method = make_function(method, is_global=True)
except Exception: # pylint:disable=broad-except
mpc_method = None
def _check_functions(self):
""" Check decorator's `init` and `post` parameters """
if init is None:
raise ValueError("init cannot be None")
if isinstance(init, str):
try:
init_fn = getattr(self, init)
except AttributeError as e:
raise ValueError("init should refer to a method or property of the class", type(self).__name__,
"returning the list of arguments") from e
elif callable(init):
init_fn = init
else:
init_fn = init
if isinstance(post, str):
try:
post_fn = getattr(self, post)
except AttributeError as e:
raise ValueError("post should refer to a method of the class", type(self).__name__) from e
elif callable(post):
post_fn = post
else:
post_fn = post
return init_fn, post_fn
def _call_init_fn(init_fn, args, kwargs):
if callable(init_fn):
return init_fn(*args, **kwargs)
return init_fn
def _call_post_fn(self, post_fn, futures, args, kwargs):
all_results = []
for future in futures:
try:
if isinstance(future, (cf.Future, asyncio.Task)):
result = future.result()
else:
result = future
except Exception as exce: # pylint: disable=broad-except
result = exce
finally:
all_results += [result]
if post_fn is None:
if any_action_failed(all_results):
all_errors = [error for error in all_results if isinstance(error, Exception)]
logging.error("Parallel action failed %s", all_errors)
traceback.print_tb(all_errors[0].__traceback__)
raise RuntimeError("Parallel action failed")
return self
return post_fn(all_results, *args, **kwargs)
def _prepare_args(self, args, kwargs):
params = []
def _get_value(value, pos=None, name=None):
if isinstance(value, P):
if pos is not None:
params.append(pos)
elif name is not None:
params.append(name)
v = value.get(batch=self, parallel=True)
return v
return value
_args = []
for i, v in enumerate(args):
_args.append(_get_value(v, pos=i))
_kwargs = {}
for k, v in kwargs.items():
_kwargs.update({k: _get_value(v, name=k)})
return _args, _kwargs, params
def _make_args(self, iteration, init_args, args, kwargs, params=None):
""" Make args, kwargs tuple """
if isinstance(init_args, tuple) and len(init_args) == 2 and \
isinstance(init_args[0], tuple) and isinstance(init_args[1], dict):
margs, mkwargs = init_args
elif isinstance(init_args, dict):
margs = []
mkwargs = init_args
else:
margs = init_args
mkwargs = {}
margs = margs if isinstance(margs, (list, tuple)) else [margs]
if params:
_args = list(args)
_kwargs = {**kwargs}
for k in params:
if isinstance(k, str):
_kwargs[k] = _kwargs[k][iteration]
else:
_args[k] = _args[k][iteration]
else:
_args = args
_kwargs = kwargs
if len(args) > 0:
margs = list(margs) + list(_args)
if len(kwargs) > 0:
mkwargs.update(_kwargs)
if use_self:
margs = [self] + list(margs)
return margs, mkwargs
def wrap_with_threads(self, args, kwargs):
""" Run a method in parallel threads """
init_fn, post_fn = _check_functions(self)
n_workers = kwargs.pop('n_workers', _workers_count())
with cf.ThreadPoolExecutor(max_workers=n_workers) as executor:
futures = []
args, kwargs, params = _prepare_args(self, args, kwargs)
full_kwargs = {**dec_kwargs, **kwargs}
for iteration, arg in enumerate(_call_init_fn(init_fn, args, full_kwargs)):
margs, mkwargs = _make_args(self, iteration, arg, args, kwargs, params)
one_ft = executor.submit(method, *margs, **mkwargs)
futures.append(one_ft)
timeout = kwargs.get('timeout', None)
cf.wait(futures, timeout=timeout, return_when=cf.ALL_COMPLETED)
return _call_post_fn(self, post_fn, futures, args, full_kwargs)
def wrap_with_mpc(self, args, kwargs):
""" Run a method in parallel processes """
init_fn, post_fn = _check_functions(self)
n_workers = kwargs.pop('n_workers', _workers_count())
with cf.ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = []
args, kwargs, params = _prepare_args(self, args, kwargs)
full_kwargs = {**dec_kwargs, **kwargs}
for iteration, arg in enumerate(_call_init_fn(init_fn, args, full_kwargs)):
margs, mkwargs = _make_args(self, iteration, arg, args, kwargs, params)
one_ft = executor.submit(mpc_method, *margs, **mkwargs)
futures.append(one_ft)
timeout = kwargs.pop('timeout', None)
cf.wait(futures, timeout=timeout, return_when=cf.ALL_COMPLETED)
return _call_post_fn(self, post_fn, futures, args, full_kwargs)
def wrap_with_async(self, args, kwargs):
""" Run a method in parallel with async / await """
try:
loop = asyncio.get_event_loop()
except RuntimeError:
# this is a new thread where there is no loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
# allow to specify a loop as an action parameter
loop = kwargs.get('loop', loop)
if loop.is_running():
raise RuntimeError('Cannot parallel async methods with a running event loop (e.g. in IPython).')
init_fn, post_fn = _check_functions(self)
futures = []
args, kwargs, params = _prepare_args(self, args, kwargs)
full_kwargs = {**dec_kwargs, **kwargs}
for iteration, arg in enumerate(_call_init_fn(init_fn, args, full_kwargs)):
margs, mkwargs = _make_args(self, iteration, arg, args, kwargs, params)
futures.append(loop.create_task(method(*margs, **mkwargs)))
loop.run_until_complete(asyncio.gather(*futures, return_exceptions=True))
return _call_post_fn(self, post_fn, futures, args, full_kwargs)
def wrap_with_for(self, debug, args, kwargs):
""" Run a method sequentially (without parallelism) """
init_fn, post_fn = _check_functions(self)
_ = kwargs.pop('n_workers', _workers_count())
futures = []
args, kwargs, params = _prepare_args(self, args, kwargs)
full_kwargs = {**dec_kwargs, **kwargs}
for iteration, arg in enumerate(_call_init_fn(init_fn, args, full_kwargs)):
margs, mkwargs = _make_args(self, iteration, arg, args, kwargs, params)
if debug:
one_ft = method(*margs, **mkwargs)
else:
try:
one_ft = method(*margs, **mkwargs)
except Exception as e: # pylint: disable=broad-except
one_ft = e
futures.append(one_ft)
return _call_post_fn(self, post_fn, futures, args, full_kwargs)
@functools.wraps(method)
def wrapped_method(*args, **kwargs):
""" Wrap a method with a required parallel engine """
if use_self:
# the first arg is self, not an ordinary arg
self = args[0]
args = args[1:]
else:
# still need self to preserve the signatures of other functions
self = None
_target = kwargs.pop('target', target)
_debug = kwargs.pop('debug', debug)
if asyncio.iscoroutinefunction(method) or _target in ['async', 'a']:
x = wrap_with_async(self, args, kwargs)
elif _target in ['threads', 't']:
x = wrap_with_threads(self, args, kwargs)
elif _target in ['mpc', 'm']:
if mpc_method is not None:
x = wrap_with_mpc(self, args, kwargs)
else:
raise ValueError('Cannot use MPC with this method', method)
elif _target in ['for', 'f']:
x = wrap_with_for(self, _debug, args, kwargs)
else:
raise ValueError('Wrong parallelization target:', _target)
return x
return wrapped_method
return inbatch_parallel_decorator
def parallel(*args, use_self=None, **kwargs):
""" Decorator for a parallel execution of a function """
return inbatch_parallel(*args, _use_self=use_self, **kwargs)
def njit(nogil=True, parallel=True): # pylint: disable=redefined-outer-name
""" Fake njit decorator to use when numba is not installed """
_, _ = nogil, parallel
def njit_fake_decorator(method):
""" Return a decorator """
@functools.wraps(method)
def wrapped_method(*args, **kwargs):
""" Log warning that numba is not installed which causes preformance degradation """
logging.warning('numba is not installed. This causes a severe performance degradation for method %s',
method.__name__)
return method(*args, **kwargs)
return wrapped_method
return njit_fake_decorator
[docs]def mjit(*args, nopython=True, nogil=True, **kwargs):
""" jit decorator for methods
Notes
-----
This decorator should be applied directly to a method, not another decorator.
"""
def _jit(method):
if jit is not None:
func = make_function(method)
func = jit(*args, nopython=nopython, nogil=nogil, **kwargs)(func)
else:
func = method
logging.warning('numba is not installed. This causes a severe performance degradation for method %s',
method.__name__)
@functools.wraps(method)
def _wrapped_method(self, *args, **kwargs):
_ = self
return func(None, *args, **kwargs)
return _wrapped_method
if len(args) == 1 and (callable(args[0])) and len(kwargs) == 0:
method = args[0]
args = tuple()
return _jit(method)
return _jit
[docs]def deprecated(msg):
""" Decorator for deprecated functions and methods """
def decorator(func):
@functools.wraps(func)
def _call(*args, **kwargs):
logging.warning(msg)
return func(*args, **kwargs)
return _call
return decorator