pydra.engine.helpers module

Administrative support for the engine framework.

class pydra.engine.helpers.PydraFileLock(lockfile)[source]

Bases: object

Wrapper for filelock’s SoftFileLock that makes it work with asyncio.

pydra.engine.helpers.argstr_formatting(argstr, inputs, value_updates=None)[source]

formatting argstr that have form {field_name}, using values from inputs and updating with value_update if provided

pydra.engine.helpers.copyfile_workflow(wf_path, result)[source]

if file in the wf results, the file will be copied to the workflow directory

pydra.engine.helpers.create_checksum(name, inputs)[source]

Generate a checksum name for a given combination of task name and inputs.

  • name (str) – Task name.

  • inputs (str) – String of inputs.

pydra.engine.helpers.custom_validator(instance, attribute, value)[source]

simple custom validation take into account ty.Union, ty.List, ty.Dict (but only one level depth) adding an additional validator, if allowe_values provided

pydra.engine.helpers.ensure_list(obj, tuple2list=False)[source]

Return a list whatever the input object is.


>>> ensure_list(list("abc"))
['a', 'b', 'c']
>>> ensure_list("abc")
>>> ensure_list(tuple("abc"))
[('a', 'b', 'c')]
>>> ensure_list(tuple("abc"), tuple2list=True)
['a', 'b', 'c']
>>> ensure_list(None)
>>> ensure_list(5.0)
pydra.engine.helpers.execute(cmd, strip=False)[source]

Run the event loop with coroutine.

Uses read_and_display_async() unless a loop is already running, in which case read_and_display() is used.

  • cmd (list or tuple) – The command line to be executed.

  • strip (bool) – TODO


Extract runtime information from a file.


fname (os.pathlike) – The file containing runtime information


runtime – A runtime object containing the collected information.

Return type:



Return the number of CPUs available to the current process or, if that is not available, the total number of CPUs on the system.


n_proc – The number of available CPUs.

Return type:



Get current event loop.

If the loop is closed, a new loop is created and set as the current event loop.


loop – The current event loop

Return type:



Generate hash of object.

pydra.engine.helpers.hash_value(value, tp=None, metadata=None, precalculated=None)[source]

calculating hash or returning values recursively

pydra.engine.helpers.load_and_run(task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the task

async pydra.engine.helpers.load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the workflow

pydra.engine.helpers.load_result(checksum, cache_locations)[source]

Restore a result from the cache.

  • checksum (str) – Unique identifier of the task to be loaded.

  • cache_locations (list of os.pathlike) – List of cache directories, in order of priority, where the checksum will be looked for.

pydra.engine.helpers.load_task(task_pkl, ind=None)[source]

loading a task from a pickle file, settings proper input for the specific ind


Create a data class given a spec.


spec – TODO

pydra.engine.helpers.output_from_inputfields(output_spec, input_spec)[source]

Collect values from output from input fields. If names_only is False, the output_spec is updated, if names_only is True only the names are returned

  • output_spec – TODO

  • input_spec – TODO


Sort objects by position, following Python indexing conventions.

Ordering is positive positions, lowest to highest, followed by unspecified positions (None) and negative positions, lowest to highest.

>>> position_sort([(None, "d"), (-3, "e"), (2, "b"), (-2, "f"), (5, "c"), (1, "a")])
['a', 'b', 'c', 'd', 'e', 'f']

args (list of (int/None, object) tuples)

Return type:

list of objects


Visit a task object and print its input/output interface.

pydra.engine.helpers.read_and_display(*cmd, strip=False, hide_display=False)[source]

Capture a process’ standard output.

async pydra.engine.helpers.read_and_display_async(*cmd, hide_display=False, strip=False)[source]

Capture standard input and output of a process, displaying them as they arrive.

Works line-by-line.

async pydra.engine.helpers.read_stream_and_display(stream, display)[source]

Read from stream line by line until EOF, display, and capture the lines.

pydra.engine.helpers.record_error(error_path, error)[source]

Write an error file. Path, result=None, task=None, name_prefix=None)[source]

Save a TaskBase object and/or results.

  • task_path (Path) – Write directory

  • result (Result) – Result to pickle and write

  • task (TaskBase) – Task to pickle and write


Calculate the checksum of a task.

input hash, output hash, environment hash


task (TaskBase) – The input task.