pydra.engine.helpers module

Administrative support for the engine framework.

class pydra.engine.helpers.PydraFileLock(lockfile)[source]

Bases: object

Wrapper for filelock’s SoftFileLock that makes it work with asyncio.

pydra.engine.helpers.argstr_formatting(argstr, inputs, value_updates=None)[source]

formatting argstr that have form {field_name}, using values from inputs and updating with value_update if provided

pydra.engine.helpers.copyfile_workflow(wf_path, result)[source]

if file in the wf results, the file will be copied to the workflow directory

pydra.engine.helpers.create_checksum(name, inputs)[source]

Generate a checksum name for a given combination of task name and inputs.

  • name (str) – Task name.

  • inputs (str) – String of inputs.

pydra.engine.helpers.custom_validator(instance, attribute, value)[source]

simple custom validation take into account ty.Union, ty.List, ty.Dict (but only one level depth) adding an additional validator, if allowe_values provided

pydra.engine.helpers.ensure_list(obj, tuple2list=False)[source]

Return a list whatever the input object is.


>>> ensure_list(list("abc"))
['a', 'b', 'c']
>>> ensure_list("abc")
>>> ensure_list(tuple("abc"))
[('a', 'b', 'c')]
>>> ensure_list(tuple("abc"), tuple2list=True)
['a', 'b', 'c']
>>> ensure_list(None)
>>> ensure_list(5.0)
pydra.engine.helpers.execute(cmd, strip=False)[source]

Run the event loop with coroutine.

Uses read_and_display_async() unless a loop is already running, in which case read_and_display() is used.

  • cmd (list or tuple) – The command line to be executed.

  • strip (bool) – TODO


Extract runtime information from a file.


fname (os.pathlike) – The file containing runtime information


runtime – A runtime object containing the collected information.

Return type



Return the number of CPUs available to the current process or, if that is not available, the total number of CPUs on the system.


n_proc – The number of available CPUs.

Return type



Get current event loop.

If the loop is closed, a new loop is created and set as the current event loop.


loop – The current event loop

Return type



Generate hash of object.

pydra.engine.helpers.hash_value(value, tp=None, metadata=None, precalculated=None)[source]

calculating hash or returning values recursively

pydra.engine.helpers.load_and_run(task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the task

async pydra.engine.helpers.load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the workflow

pydra.engine.helpers.load_result(checksum, cache_locations)[source]

Restore a result from the cache.

  • checksum (str) – Unique identifier of the task to be loaded.

  • cache_locations (list of os.pathlike) – List of cache directories, in order of priority, where the checksum will be looked for.

pydra.engine.helpers.load_task(task_pkl, ind=None)[source]

loading a task from a pickle file, settings proper input for the specific ind


Create a data class given a spec.


spec – TODO

pydra.engine.helpers.output_from_inputfields(output_spec, input_spec)[source]

Collect values from output from input fields. If names_only is False, the output_spec is updated, if names_only is True only the names are returned

  • output_spec – TODO

  • input_spec – TODO


Sort objects by position, following Python indexing conventions.

Ordering is positive positions, lowest to highest, followed by unspecified positions (None) and negative positions, lowest to highest.

>>> position_sort([(None, "d"), (-3, "e"), (2, "b"), (-2, "f"), (5, "c"), (1, "a")])
['a', 'b', 'c', 'd', 'e', 'f']

args (list of (int/None, object) tuples)

Return type

list of objects


Visit a task object and print its input/output interface.

pydra.engine.helpers.read_and_display(*cmd, strip=False, hide_display=False)[source]

Capture a process’ standard output.

async pydra.engine.helpers.read_and_display_async(*cmd, hide_display=False, strip=False)[source]

Capture standard input and output of a process, displaying them as they arrive.

Works line-by-line.

async pydra.engine.helpers.read_stream_and_display(stream, display)[source]

Read from stream line by line until EOF, display, and capture the lines.

pydra.engine.helpers.record_error(error_path, error)[source]

Write an error file. Path, result=None, task=None, name_prefix=None)[source]

Save a TaskBase object and/or results.

  • task_path (Path) – Write directory

  • result (Result) – Result to pickle and write

  • task (TaskBase) – Task to pickle and write


Calculate the checksum of a task.

input hash, output hash, environment hash


task (TaskBase) – The input task.