Pipeline¶
- class Pipeline(dataset=None, config=None, pipeline=None, actions=None, strict=False, proba=None, repeat=None)[source]¶
- ITEM_LENGTH = 40¶
- LINE_LENGTH = 80¶
- PREFIX = '\x1b[1m\x1b[4m'¶
- acquire_lock(name='lock', **kwargs)[source]¶
Acquire lock
- Parameters
name (string) – a lock name
- Returns
- Return type
self - in order to use it in the pipeline chains
- add_namespace(*namespaces)[source]¶
Add namespace to call pipeline actions from
- Parameters
namespace (str or module) – a module name as a string or a module reference
Examples
Add a standard time module and numpy:
dataset.p.add_namespace('time', numpy)
Notes
The current module and dataset module are included by default.
Passing a namespace as a string is necessary for multiprocess executions, e.g. when running a pipeline with prefetch and mpc target.
- append_pipeline(pipeline, proba=None, repeat=None)[source]¶
Add a nested pipeline to the log of future actions
- call(fn, *args, save_to=None, **kwargs)[source]¶
Call any function during pipeline execution
- Parameters
fn (a function, method or callable to call.) – Could be a named expression.
save_to (a named expression or a sequence of named expressions) – A location where function output will be saved to.
Notes
As a function from any namespace (see
add_namespace()
) can be called within a pipeline, call is convenient with lambdas:pipeline .call(lambda batch: (image.shape[1] for image in batch.images), B(), save_to=V('image_widths'))
- create_batch(batch_index, *args, **kwargs)[source]¶
Create a new batch by given indices and execute all lazy actions
- delete_variable(name)[source]¶
Delete a variable If the variable does not exists, the warning will be issued.
- Parameters
name (str) – a name of the variable
- Returns
- Return type
self - in order to use it in the pipeline chains
- discard_batch()[source]¶
Discard the batch (helpful in multiprocessing prefetching to prevent passing the batch back)
- Returns
- Return type
self - in order to use it in the pipeline chains
- execute_for(batch, notifier=None, iteration=None, seed=None, new_loop=False)[source]¶
Run a pipeline for one batch
- Parameters
- Returns
- Return type
a batch - an output from the last action in the pipeline
- classmethod from_pipeline(pipeline, actions=None, proba=None, repeat=None)[source]¶
Create a pipeline from another pipeline
- gather_metrics(metrics_class, *args, save_to=None, **kwargs)[source]¶
Collect metrics for a model
- Parameters
metrics_class (class or str) –
A class which calculates metrics (see
Metrics
)If str:
’class’ for :class:`~.ClassificationMetrics)
’segmentation’ or ‘mask’ for :class:`~.SegmentationMetricsByPixels)
’instance’ for :class:`~.SegmentationMetricsByInstances)
args –
kwargs – Parameters for metrics calculation
save_to (a named expression) – A location where metrics will be saved to.
Notes
For available metrics see
metrics API
.A mode can be passed to save_to expression:
‘w’ saves metrics for the last batch only which is convenient for metrics evaluation during training.
‘u’ is more suitable to calculate metrics during testing / validation.
‘a’ collects the history of batch metrics.
Examples
pipeline = (dataset.test.p .init_variable('metrics') .init_variable('inferred_masks') .import_model('unet', train_pipeline) .predict_model('unet', fetches='predictions', feed_dict={'x': B('images')}, save_to=V('inferred_masks')) .gather_metrics('masks', targets=B('masks'), predictions=V('inferred_masks'), fmt='proba', axis=-1, save_to=V('metrics', mode='u')) .run(BATCH_SIZE, notifier=True) ) metrics = pipeline.get_variable('metrics') metrics.evaluate(['sensitivity', 'specificity'])
- gen_batch(*args, **kwargs)[source]¶
Generate batches
- Parameters
batch_size (int) – desired number of items in the batch (the actual batch could contain fewer items)
specifies the randomization and the order of items (default=False):
False - items go sequentially, one after another as they appear in the index; a random number generator is created with a random entropy
True - items are shuffled randomly before each epoch; a random number generator is created with a random entropy
int - a seed number for a random shuffle; a random number generator is created with the given seed.
n_iters (int) – Number of iterations to make (only one of n_iters and n_epochs should be specified).
n_epochs (int) – Number of epochs required (only one of n_iters and n_epochs should be specified).
drop_last (bool) –
if True, drops the last batch (in each epoch) if it contains fewer than batch_size items.
If False, than the last batch in each epoch could contain repeating indices (which might be a problem) and the very last batch could contain fewer than batch_size items.
See
gen_batch()
for details.notifier (str, dict, or instance of .Notifier) – Configuration of displayed progress bar, if any. If str or dict, then parameters of .Notifier initialization. For more details about notifying capabilities, refer to .Notifier documentation.
prefetch (int) – a number of batches to process in advance (default=0)
target ('threads' or 'mpc') – batch parallelization engine used for prefetching (default=’threads’). ‘mpc’ rarely works well due to complicated and slow python’s inter-process communications. Don’t use pipeline variables and models in mpc-mode as each batch is being processed in a separate copy of the pipeline.
reset (list of str, str or bool) –
what to reset to start from scratch:
’iter’ - restart the batch iterator
’variables’ - re-initialize all pipeline variables
’models’ - reset all models
ignore_exceptions (bool) – whether to continue the pipeline when an exception for any batch is caught (default=True). When exceptions are not ignored while prefetching, the pipeline is stopped when the first one is caught, however, all prefeteched batches will still be processed in the background.
profile – whether to gather execution statistics. 0 or False - do not gather 1 or True - gather action times 2 or ‘detailed’ - gather full profiling with cProfile.
- Yields
an instance of the batch class returned by the last action
Examples
for batch in pipeline.gen_batch(C('batch_size'), shuffle=True, n_epochs=2, drop_last=True): # do whatever you want
- get_variable(name, *args, create=False, **kwargs)[source]¶
Return a variable value.
If the variable does not exists, it might be created and initialized (see init_variable below)
- Parameters
name (string) – a name of the variable
create (bool) – whether to create a variable if it does not exist. Default is False.
args – parameters for
init_variable()
ifcreate
is True.kwargs – parameters for
init_variable()
ifcreate
is True.
- Returns
- Return type
a value of the variable
- Raises
KeyError –
- has_variable(name)[source]¶
Check if a variable exists
- Parameters
name (str) – a name of the variable
- Returns
- Return type
True if the variable exists
- import_model(name, source)[source]¶
Import a model from another pipeline
- Parameters
name (str) – a name with which the model is stored in this pipeline
source – a model or a pipeline to import from
Examples
Import a model instance to the pipeline:
pipeline.import_model('my-model', custom_resnet_model)
Import ‘resnet’ model from train_pipeline and store it in the pipeline under the name ‘my-model’:
pipeline.import_model('my-model', train_pipeline.m('resnet'))
Import ‘my-model’ from train_pipeline and store it as ‘my-model’ in the pipeline:
pipeline.import_model('my-model', train_pipeline)
- property index¶
Return index of the source dataset
- property indices¶
Return the sequence of indices of the source dataset
- init_lock(name='lock', **kwargs)[source]¶
Create a lock as a pipeline variable
- Parameters
name (string) – a lock name
- Returns
- Return type
self - in order to use it in the pipeline chains
Examples
>>> pp = dataset.p .init_lock("model_update")
- init_model(name, model_class=None, mode='dynamic', config=None, source=None)[source]¶
Initialize a static or dynamic model by building or importing it
- Parameters
name (str) – a name for the model (to refer to it later when training or infering).
model_class (class or named expression) – a model class (might also be specified in the config).
mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)
config (dict or Config) – model configurations parameters, where each key and value could be named expressions.
source – a model or a pipeline to import from
Examples
Build a model:
pipeline.init_model('my-model', MyModel, 'static')
Import a model:
pipeline.init_model('my-model', source=train_pipeline)
Build a model with a config:
pipeline .init_variable('images_shape', [256, 256]) .init_model('my_model', MyModel, 'static', config={'input_shape': V('images_shape')}) pipeline .init_variable('shape_name', 'images_shape') .init_model('my_model', C('model'), 'dynamic', config={V('shape_name'): B('images_shape')})
- init_variable(name, default=None, lock=True, **kwargs)[source]¶
Create a variable if not exists. If the variable exists, does nothing.
- Parameters
name (string) – a name of the variable
default – an initial value for the variable set when pipeline is created
lock (bool) – whether to lock a variable before each update (default: True)
- Returns
- Return type
self - in order to use it in the pipeline chains
Examples
>>> pp = dataset.p. .init_variable("iterations", default=0) .init_variable("accuracy", 0) .init_variable("loss_history", []) .load('/some/path', fmt='blosc') .train_resnet()
- init_variables(*variables)[source]¶
Create several variables
- Parameters
variables (dict or tuple) – if tuple, contains variable names which will have None as default values if dict, then mapping from variable names to values and init params (see
init_variable()
)- Returns
- Return type
self - in order to use it in the pipeline chains
Examples
>>> pp = dataset.p .init_variables({"loss_history": dict(default=[]), "predictions", dict(default=[])}) .init_variables("metrics", "counter", "worst_prediction") .load('/some/path', fmt='blosc') .train_resnet()
- load_model(name, model_class=None, mode='dynamic', *args, **kwargs)[source]¶
Load a model at each iteration
- Parameters
name (str) – a name for the model (to refer to it later when training or infering).
model_class (class or named expression) – a model class (might also be specified in the config).
mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)
args – model-specific parameters (like paths, formats, etc)
kwargs – model-specific parameters (like paths, formats, etc)
- load_model_now(name, model_class=None, mode='dynamic', *args, batch=None, **kwargs)[source]¶
Load a model immediately
- Parameters
name (str) – a name for the model (to refer to it later when training or infering).
model_class (class or named expression) – a model class (might also be specified in the config).
mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)
batch (Batch) – (optional) a batch which might be used to evaluate named expressions in other parameters
args – model-specific parameters (like paths, formats, etc)
kwargs – model-specific parameters (like paths, formats, etc)
- load_model_once(mode, name=None, model_class=None, *args, **kwargs)[source]¶
Load a model once before the first iteration
- Parameters
name (str) – a name for the model (to refer to it later when training or infering).
model_class (class or named expression) – a model class (might also be specified in the config).
mode ({'static', 'dynamic'}) – model creation mode: - static - the model is created right now, during the pipeline definition - dynamic - the model is created at the first iteration when the pipeline is run (default)
args – model-specific parameters (like paths, formats, etc)
kwargs – model-specific parameters (like paths, formats, etc)
- property model¶
An alias for a present model, if pipeline has only one model initialized.
- next_batch(*args, n_epochs=None, **kwargs)[source]¶
Get the next batch and execute all lazy actions
Notes
n_epochs is None by default to allow for infinite batch generation.
See also
- property num_actions¶
Return index length
- predict_model(name, *args, save_to=None, **kwargs)[source]¶
Predict using a model
- Parameters
name (str - a model name) –
save_to (a named expression or a sequence of named expressions.) – A location where the model output will be stored.
Notes
All other named parameters are treated as data mappings of any type which keys and values could be named expressions:
B(‘name’) - a batch class attribute or component name
V(‘name’) - a pipeline variable name
C(‘name’) - a pipeline config option
F(name) - a callable
R(‘name’) - a random value from a distribution ‘name’
These expressions are substituted by their actual values. All other value will be used “as is”. These parameters after substitution will be sent to model.predict(…).
Examples
>>> pipeline .predict_model('resnet', x=B('images'), y_true=B('labels'), save_to=B('predicted_labels'))
Call a resnet model predict method with x and y_true arguments:
predictions = resnet.predict(x=batch.images, y_true=batch.labels)
Predictions will be stored batch.predicted_labels.
>>> pipeline .init_variable('inferred_masks', default=[]) .predict_model('my_model', B.images, fetches='predictions', save_to=V('inferred_masks'))
Call a my_model model train method with images as positional and fetches as keyword arguments:
predictions = my_model.train(B.images, fetches='predictions')
Predictions for each batch will be stored in a pipeline variable inferred_masks.
- property profile_info¶
- property random¶
- property random_seed¶
- rebatch(batch_size, merge=None, components=None, batch_class=None)[source]¶
Set the output batch size
- release_lock(name='lock', **kwargs)[source]¶
Release lock
- Parameters
name (string) – a lock name
- Returns
- Return type
self - in order to use it in the pipeline chains
- reset(*args, profile=False, seed=None)[source]¶
Clear all iteration metadata in order to start iterating from scratch
- Parameters
what (list of str, str or bool or None) –
what to reset to start from scratch:
’iter’ - restart the batch iterator
’variables’ - re-initialize all pipeline variables
’models’ - reset all models
profile (bool or {0, 1, 2} or 'detailed') – whether to use profiler
random – a random state (see
make_rng()
). If not specified, RNG will be created with a random entropy.
Examples
pipeline.reset('iter') pipeline.reset('vars', 'models', profile=True) pipeline.reset(['iter', 'vars'], random=42)
- save_model(name, *args, **kwargs)[source]¶
Save a model at each iteration
- Parameters
name (str) – a model name
args – model-specific parameters (like paths, formats, etc)
kwargs – model-specific parameters (like paths, formats, etc)
- save_model_once(name, *args, **kwargs)[source]¶
Save a model after the last iteration
- Parameters
name (str) – a model name
args – model-specific parameters (like paths, formats, etc)
kwargs – model-specific parameters (like paths, formats, etc)
- save_to(dst, value=None)[source]¶
Save a value of a given named expression lazily during pipeline execution
- Parameters
dst (NamedExpression or any data container) – destination
value – an updating value, could be a value of any type or a named expression
- Returns
- Return type
self - in order to use it in the pipeline chains
Notes
This method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only.
save_data_to()
is imperative and may be used to change variable value within actions.
- set_dataset(dataset)[source]¶
Link the pipeline to a dataset
- Parameters
dataset (Dataset) – a dataset to link to
Notes
This method is a declarative version of
pipeline << dataset
, so it is executed only when the pipeline is run.It is always run as the first action in the pipeline chain despite it’s actual location.
- set_variable(name, value, mode='w', batch=None)[source]¶
Set a variable value If the variable does not exists, it will be created, however, the warning will be displayed that the variable was not initialized.
- Parameters
name (str or a named expression - a variable name) –
value – an updating value, could be a value of any type or a named expression
mode (str) –
a method to update a variable value, could be one of:
’w’ or ‘write’ to rewrite a variable with a new value. This is a default mode.
’a’ or ‘append’ to append a value to a variable (e.g. if a variable is a list).
’e’ or ‘extend’ to extend a variable with a new value (e.g. if a variable is a list).
’u’ or ‘update’ to update a variable with a new value (e.g. if a variable is a dict).
For sets and dicts ‘a’ and ‘u’ do exactly the same.
Notes
Unlike
update_variable()
this method sets a new value immediately. Soset_variable
is imperative and may be used within actions, whileupdate_variable
is declarative and should be used in pipeline definition chains.
- train_model(name, *args, save_to=None, **kwargs)[source]¶
Train a model
- Parameters
name (str) – a model name
save_to (a named expression or a sequence of named expressions.) – A location where the model output will be stored.
Notes
All other named parameters are treated as data mappings of any type which keys and values could be named expressions:
B(‘name’) - a batch class attribute or component name
V(‘name’) - a pipeline variable name
C(‘name’) - a pipeline config option
F(name) - a callable
R(‘name’) - a random value from a given distribution
These expressions are substituted by their actual values. All other value will be used “as is”. These parameters after substitution will be sent to model.train(…).
Examples
>>> pipeline.train_model('resnet', x=B('images'), y_true=B('masks'))
Would call a resnet model train method with x and y_true arguments:
resnet.train(x=batch.images, y_true=batch.masks)
>>> pipeline .init_variable('tensor_name', 'x') .train_model('resnet', feed_dict={V('tensor_name'): B('images')})
Would call a resnet model train method with a feed_dict argument:
resnet.train(feed_dict={'x': batch.images})
- update(expr, value=None)[source]¶
Update a value of a given named expression lazily during pipeline execution
- Parameters
expr (NamedExpression) – an expression
value – an updating value, could be a value of any type or a named expression
- Returns
- Return type
self - in order to use it in the pipeline chains
Notes
This method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only.
set_variable
is imperative and may be used to change variable value within actions.
- update_variable(name, value=None, mode='w')[source]¶
Update a value of a given variable lazily during pipeline execution
- Parameters
name (str or a named expression - a variable name) –
value – an updating value, could be a value of any type or a named expression
mode (str) –
a method to update a variable value, could be one of:
’w’ or ‘write’ to rewrite a variable with a new value. This is a default mode.
’a’ or ‘append’ to append a value to a variable (e.g. if a variable is a list).
’e’ or ‘extend’ to extend a variable with a new value (e.g. if a variable is a list).
’u’ or ‘update’ to update a variable with a new value (e.g. if a variable is a dict).
For sets and dicts ‘a’ and ‘u’ do exactly the same.
- Returns
- Return type
self - in order to use it in the pipeline chains
Notes
Unlike
set_variable()
this method does not change a value of the variable until the pipeline is run. So it should be used in pipeline definition chains only.set_variable
is imperative and may be used to change variable value within actions.