
class Pipeline(dataset=None, config=None, pipeline=None, actions=None, strict=False, proba=None, repeat=None)[source]
PREFIX = '\x1b[1m\x1b[4m'
acquire_lock(name='lock', **kwargs)[source]

Acquire lock


name (string) – a lock name


Return type

self - in order to use it in the pipeline chains


Add namespace to call pipeline actions from


namespace (str or module) – a module name as a string or a module reference


Add a standard time module and numpy:

dataset.p.add_namespace('time', numpy)


The current module and dataset module are included by default.

Passing a namespace as a string is necessary for multiprocess executions, e.g. when running a pipeline with prefetch and mpc target.

append_pipeline(pipeline, proba=None, repeat=None)[source]

Add a nested pipeline to the log of future actions

assign_variable(name, value)[source]

Assign a value to a variable

call(fn, *args, save_to=None, **kwargs)[source]

Call any function during pipeline execution

  • fn (a function, method or callable to call.) – Could be a named expression.

  • save_to (a named expression or a sequence of named expressions) – A location where function output will be saved to.


As a function from any namespace (see add_namespace()) can be called within a pipeline, call is convenient with lambdas:

    .call(lambda batch: (image.shape[1] for image in batch.images), B(), save_to=V('image_widths'))
classmethod concat(pipe1, pipe2)[source]

Create a new pipeline concatenating two given pipelines

create_batch(batch_index, *args, **kwargs)[source]

Create a new batch by given indices and execute all lazy actions


Delete a variable Same as delete_variable(name)


Delete all variables


Delete a variable If the variable does not exists, the warning will be issued.


name (str) – a name of the variable


Return type

self - in order to use it in the pipeline chains


Discard the batch (helpful in multiprocessing prefetching to prevent passing the batch back)


Return type

self - in order to use it in the pipeline chains

execute_for(batch, notifier=None, iteration=None, seed=None, new_loop=False)[source]

Run a pipeline for one batch

  • batch – an input batch

  • notifier – a notifier instance

  • iteration (int) – a pipeline iteration this batch is used at

  • seed (SeedSequence) – a numpy SeedSequence to use when executing

  • new_loop (bool) – whether to create a new async loop.


Return type

a batch - an output from the last action in the pipeline

classmethod from_pipeline(pipeline, actions=None, proba=None, repeat=None)[source]

Create a pipeline from another pipeline

gather_metrics(metrics_class, *args, save_to=None, **kwargs)[source]

Collect metrics for a model

  • metrics_class (class or str) –

    A class which calculates metrics (see Metrics)

    If str:

    • ’class’ for :class:`~.ClassificationMetrics)

    • ’segmentation’ or ‘mask’ for :class:`~.SegmentationMetricsByPixels)

    • ’instance’ for :class:`~.SegmentationMetricsByInstances)

  • args

  • kwargs – Parameters for metrics calculation

  • save_to (a named expression) – A location where metrics will be saved to.


For available metrics see metrics API.

A mode can be passed to save_to expression:

  • ‘w’ saves metrics for the last batch only which is convenient for metrics evaluation during training.

  • ‘u’ is more suitable to calculate metrics during testing / validation.

  • ‘a’ collects the history of batch metrics.


pipeline = (dataset.test.p
    .import_model('unet', train_pipeline)
    .predict_model('unet', fetches='predictions', feed_dict={'x': B('images')},
    .gather_metrics('masks', targets=B('masks'), predictions=V('inferred_masks'),
                    fmt='proba', axis=-1, save_to=V('metrics', mode='u'))
    .run(BATCH_SIZE, notifier=True)

metrics = pipeline.get_variable('metrics')
metrics.evaluate(['sensitivity', 'specificity'])
gen_batch(*args, **kwargs)[source]

Generate batches

  • batch_size (int) – desired number of items in the batch (the actual batch could contain fewer items)

  • shuffle (bool or int) –

    specifies the randomization and the order of items (default=False):

    • False - items go sequentially, one after another as they appear in the index; a random number generator is created with a random entropy

    • True - items are shuffled randomly before each epoch; a random number generator is created with a random entropy

    • int - a seed number for a random shuffle; a random number generator is created with the given seed.

  • n_iters (int) – Number of iterations to make (only one of n_iters and n_epochs should be specified).

  • n_epochs (int) – Number of epochs required (only one of n_iters and n_epochs should be specified).

  • drop_last (bool) –

    if True, drops the last batch (in each epoch) if it contains fewer than batch_size items.

    If False, than the last batch in each epoch could contain repeating indices (which might be a problem) and the very last batch could contain fewer than batch_size items.

    See gen_batch() for details.

  • notifier (str, dict, or instance of .Notifier) – Configuration of displayed progress bar, if any. If str or dict, then parameters of .Notifier initialization. For more details about notifying capabilities, refer to .Notifier documentation.

  • prefetch (int) – a number of batches to process in advance (default=0)

  • target ('threads' or 'mpc') – batch parallelization engine used for prefetching (default=’threads’). ‘mpc’ rarely works well due to complicated and slow python’s inter-process communications. Don’t use pipeline variables and models in mpc-mode as each batch is being processed in a separate copy of the pipeline.

  • reset (list of str, str or bool) –

    what to reset to start from scratch:

    • ’iter’ - restart the batch iterator

    • ’variables’ - re-initialize all pipeline variables

    • ’models’ - reset all models

  • ignore_exceptions (bool) – whether to continue the pipeline when an exception for any batch is caught (default=True). When exceptions are not ignored while prefetching, the pipeline is stopped when the first one is caught, however, all prefeteched batches will still be processed in the background.

  • profile – whether to gather execution statistics. 0 or False - do not gather 1 or True - gather action times 2 or ‘detailed’ - gather full profiling with cProfile.


an instance of the batch class returned by the last action


for batch in pipeline.gen_batch(C('batch_size'), shuffle=True, n_epochs=2, drop_last=True):
    # do whatever you want
gen_rebatch(*args, **kwargs)[source]

Generate batches for rebatch operation

get_action_name(action, add_index=False)[source]

Return a pretty action name


Return a probability of the last action


Return a repeat count of the last action


Return a method by the name

get_model_by_name(name, batch=None)[source]

Retrieve a model by its name

get_variable(name, *args, create=False, **kwargs)[source]

Return a variable value.

If the variable does not exists, it might be created and initialized (see init_variable below)

  • name (string) – a name of the variable

  • create (bool) – whether to create a variable if it does not exist. Default is False.

  • args – parameters for init_variable() if create is True.

  • kwargs – parameters for init_variable() if create is True.


Return type

a value of the variable




Check if a variable exists


name (str) – a name of the variable


Return type

True if the variable exists

import_model(name, source)[source]

Import a model from another pipeline

  • name (str) – a name with which the model is stored in this pipeline

  • source – a model or a pipeline to import from


Import a model instance to the pipeline:

pipeline.import_model('my-model', custom_resnet_model)

Import ‘resnet’ model from train_pipeline and store it in the pipeline under the name ‘my-model’:

pipeline.import_model('my-model', train_pipeline.m('resnet'))

Import ‘my-model’ from train_pipeline and store it as ‘my-model’ in the pipeline:

pipeline.import_model('my-model', train_pipeline)
property index

Return index of the source dataset

property indices

Return the sequence of indices of the source dataset

init_lock(name='lock', **kwargs)[source]

Create a lock as a pipeline variable


name (string) – a lock name


Return type

self - in order to use it in the pipeline chains


>>> pp = dataset.p
init_model(name, model_class=None, mode='dynamic', config=None, source=None)[source]

Initialize a static or dynamic model by building or importing it

  • name (str) – a name for the model (to refer to it later when training or infering).

  • model_class (class or named expression) – a model class (might also be specified in the config).

  • mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)

  • config (dict or Config) – model configurations parameters, where each key and value could be named expressions.

  • source – a model or a pipeline to import from


Build a model:

pipeline.init_model('my-model', MyModel, 'static')

Import a model:

pipeline.init_model('my-model', source=train_pipeline)

Build a model with a config:

  .init_variable('images_shape', [256, 256])
  .init_model('my_model', MyModel, 'static', config={'input_shape': V('images_shape')})

  .init_variable('shape_name', 'images_shape')
  .init_model('my_model', C('model'), 'dynamic', config={V('shape_name'): B('images_shape')})
init_variable(name, default=None, lock=True, **kwargs)[source]

Create a variable if not exists. If the variable exists, does nothing.

  • name (string) – a name of the variable

  • default – an initial value for the variable set when pipeline is created

  • lock (bool) – whether to lock a variable before each update (default: True)


Return type

self - in order to use it in the pipeline chains


>>> pp = dataset.p.
            .init_variable("iterations", default=0)
            .init_variable("accuracy", 0)
            .init_variable("loss_history", [])
            .load('/some/path', fmt='blosc')

Create several variables


variables (dict or tuple) – if tuple, contains variable names which will have None as default values if dict, then mapping from variable names to values and init params (see init_variable())


Return type

self - in order to use it in the pipeline chains


>>> pp = dataset.p
            .init_variables({"loss_history": dict(default=[]),
                             "predictions", dict(default=[])})
            .init_variables("metrics", "counter", "worst_prediction")
            .load('/some/path', fmt='blosc')

Join one or several pipelines

load_model(name, model_class=None, mode='dynamic', *args, **kwargs)[source]

Load a model at each iteration

  • name (str) – a name for the model (to refer to it later when training or infering).

  • model_class (class or named expression) – a model class (might also be specified in the config).

  • mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

load_model_now(name, model_class=None, mode='dynamic', *args, batch=None, **kwargs)[source]

Load a model immediately

  • name (str) – a name for the model (to refer to it later when training or infering).

  • model_class (class or named expression) – a model class (might also be specified in the config).

  • mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)

  • batch (Batch) – (optional) a batch which might be used to evaluate named expressions in other parameters

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

load_model_once(mode, name=None, model_class=None, *args, **kwargs)[source]

Load a model once before the first iteration

  • name (str) – a name for the model (to refer to it later when training or infering).

  • model_class (class or named expression) – a model class (might also be specified in the config).

  • mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

m(name, batch=None)[source]

A shorter alias for get_model_by_name()

merge(*pipelines, fn=None, components=None, batch_class=None)[source]

Merge pipelines

property model

An alias for a present model, if pipeline has only one model initialized.

next_batch(*args, n_epochs=None, **kwargs)[source]

Get the next batch and execute all lazy actions


n_epochs is None by default to allow for infinite batch generation.

See also


property num_actions

Return index length

predict_model(name, *args, save_to=None, **kwargs)[source]

Predict using a model

  • name (str - a model name) –

  • save_to (a named expression or a sequence of named expressions.) – A location where the model output will be stored.


All other named parameters are treated as data mappings of any type which keys and values could be named expressions:

  • B(‘name’) - a batch class attribute or component name

  • V(‘name’) - a pipeline variable name

  • C(‘name’) - a pipeline config option

  • F(name) - a callable

  • R(‘name’) - a random value from a distribution ‘name’

These expressions are substituted by their actual values. All other value will be used “as is”. These parameters after substitution will be sent to model.predict(…).


>>> pipeline
        .predict_model('resnet', x=B('images'), y_true=B('labels'), save_to=B('predicted_labels'))

Call a resnet model predict method with x and y_true arguments: predictions = resnet.predict(x=batch.images, y_true=batch.labels)

Predictions will be stored batch.predicted_labels.

>>> pipeline
    .init_variable('inferred_masks', default=[])

Call a my_model model train method with images as positional and fetches as keyword arguments: predictions = my_model.train(B.images, fetches='predictions') Predictions for each batch will be stored in a pipeline variable inferred_masks.

print(*args, **kwargs)[source]

Print a value during pipeline execution

property profile_info
property random
property random_seed
rebatch(batch_size, merge=None, components=None, batch_class=None)[source]

Set the output batch size

release_lock(name='lock', **kwargs)[source]

Release lock


name (string) – a lock name


Return type

self - in order to use it in the pipeline chains

reset(*args, profile=False, seed=None)[source]

Clear all iteration metadata in order to start iterating from scratch

  • what (list of str, str or bool or None) –

    what to reset to start from scratch:

    • ’iter’ - restart the batch iterator

    • ’variables’ - re-initialize all pipeline variables

    • ’models’ - reset all models

  • profile (bool or {0, 1, 2} or 'detailed') – whether to use profiler

  • random – a random state (see make_rng()). If not specified, RNG will be created with a random entropy.



pipeline.reset('vars', 'models', profile=True)

pipeline.reset(['iter', 'vars'], random=42)
run(*args, **kwargs)[source]

Execute all lazy actions for each batch in the dataset

See also


run_later(*args, **kwargs)[source]

Define params to execute pipeline later

run_now(*args, **kwargs)[source]

Execute pipeline immediately

save_model(name, *args, **kwargs)[source]

Save a model at each iteration

  • name (str) – a model name

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

save_model_now(name, *args, batch=None, **kwargs)[source]

Save a model immediately

  • name (str) – a model name

  • batch (Batch) – (optional) a batch which might be used to evaluate named expressions in other parameters

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

save_model_once(name, *args, **kwargs)[source]

Save a model after the last iteration

  • name (str) – a model name

  • args – model-specific parameters (like paths, formats, etc)

  • kwargs – model-specific parameters (like paths, formats, etc)

save_to(dst, value=None)[source]

Save a value of a given named expression lazily during pipeline execution

  • dst (NamedExpression or any data container) – destination

  • value – an updating value, could be a value of any type or a named expression


Return type

self - in order to use it in the pipeline chains


This method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only. save_data_to() is imperative and may be used to change variable value within actions.

set_config(config, clear=False)[source]

Update pipeline’s config

  • config (dict) – configuration parameters

  • clear (bool) – whether to clear the current config


Link the pipeline to a dataset


dataset (Dataset) – a dataset to link to


This method is a declarative version of pipeline << dataset, so it is executed only when the pipeline is run.

It is always run as the first action in the pipeline chain despite it’s actual location.

set_variable(name, value, mode='w', batch=None)[source]

Set a variable value If the variable does not exists, it will be created, however, the warning will be displayed that the variable was not initialized.

  • name (str or a named expression - a variable name) –

  • value – an updating value, could be a value of any type or a named expression

  • mode (str) –

    a method to update a variable value, could be one of:

    • ’w’ or ‘write’ to rewrite a variable with a new value. This is a default mode.

    • ’a’ or ‘append’ to append a value to a variable (e.g. if a variable is a list).

    • ’e’ or ‘extend’ to extend a variable with a new value (e.g. if a variable is a list).

    • ’u’ or ‘update’ to update a variable with a new value (e.g. if a variable is a dict).

    For sets and dicts ‘a’ and ‘u’ do exactly the same.


Unlike update_variable() this method sets a new value immediately. So set_variable is imperative and may be used within actions, while update_variable is declarative and should be used in pipeline definition chains.

train_model(name, *args, save_to=None, **kwargs)[source]

Train a model

  • name (str) – a model name

  • save_to (a named expression or a sequence of named expressions.) – A location where the model output will be stored.


All other named parameters are treated as data mappings of any type which keys and values could be named expressions:

  • B(‘name’) - a batch class attribute or component name

  • V(‘name’) - a pipeline variable name

  • C(‘name’) - a pipeline config option

  • F(name) - a callable

  • R(‘name’) - a random value from a given distribution

These expressions are substituted by their actual values. All other value will be used “as is”. These parameters after substitution will be sent to model.train(…).


>>> pipeline.train_model('resnet', x=B('images'), y_true=B('masks'))

Would call a resnet model train method with x and y_true arguments: resnet.train(x=batch.images, y_true=batch.masks)

>>> pipeline
       .init_variable('tensor_name', 'x')
       .train_model('resnet', feed_dict={V('tensor_name'): B('images')})

Would call a resnet model train method with a feed_dict argument: resnet.train(feed_dict={'x': batch.images})

update(expr, value=None)[source]

Update a value of a given named expression lazily during pipeline execution

  • expr (NamedExpression) – an expression

  • value – an updating value, could be a value of any type or a named expression


Return type

self - in order to use it in the pipeline chains


This method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only. set_variable is imperative and may be used to change variable value within actions.


Update pipeline’s config

  • config (dict) – configuration parameters

  • clear (bool) – whether to clear the current config

update_variable(name, value=None, mode='w')[source]

Update a value of a given variable lazily during pipeline execution

  • name (str or a named expression - a variable name) –

  • value – an updating value, could be a value of any type or a named expression

  • mode (str) –

    a method to update a variable value, could be one of:

    • ’w’ or ‘write’ to rewrite a variable with a new value. This is a default mode.

    • ’a’ or ‘append’ to append a value to a variable (e.g. if a variable is a list).

    • ’e’ or ‘extend’ to extend a variable with a new value (e.g. if a variable is a list).

    • ’u’ or ‘update’ to update a variable with a new value (e.g. if a variable is a dict).

    For sets and dicts ‘a’ and ‘u’ do exactly the same.


Return type

self - in order to use it in the pipeline chains


Unlike set_variable() this method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only. set_variable is imperative and may be used to change variable value within actions.

v(name, *args, **kwargs)[source]

A shorter alias for get_variable()