pydra.engine.helpers module

Administrative support for the engine framework.

class pydra.engine.helpers.PydraFileLock(lockfile)[source]

Bases: object

Wrapper for filelock’s SoftFileLock that makes it work with asyncio.

pydra.engine.helpers.allowed_values_validator(_, attribute, value)[source]

checking if the values is in allowed_values

pydra.engine.helpers.argstr_formatting(argstr, inputs, value_updates=None)[source]

formatting argstr that have form {field_name}, using values from inputs and updating with value_update if provided

pydra.engine.helpers.copyfile_workflow(wf_path: PathLike, result)[source]

if file in the wf results, the file will be copied to the workflow directory

pydra.engine.helpers.create_checksum(name, inputs)[source]

Generate a checksum name for a given combination of task name and inputs.

  • name (str) – Task name.

  • inputs (str) – String of inputs.

pydra.engine.helpers.ensure_list(obj, tuple2list=False)[source]

Return a list whatever the input object is.


>>> ensure_list(list("abc"))
['a', 'b', 'c']
>>> ensure_list("abc")
>>> ensure_list(tuple("abc"))
[('a', 'b', 'c')]
>>> ensure_list(tuple("abc"), tuple2list=True)
['a', 'b', 'c']
>>> ensure_list(None)
>>> ensure_list(5.0)
pydra.engine.helpers.execute(cmd, strip=False)[source]

Run the event loop with coroutine.

Uses read_and_display_async() unless a loop is already running, in which case read_and_display() is used.

  • cmd (list or tuple) – The command line to be executed.

  • strip (bool) – TODO


Converts a list to a single item if it is of length == 1


Extract runtime information from a file.


fname (os.pathlike) – The file containing runtime information


runtime – A runtime object containing the collected information.

Return type:



Return the number of CPUs available to the current process or, if that is not available, the total number of CPUs on the system.


n_proc – The number of available CPUs.

Return type:



Get current event loop.

If the loop is closed, a new loop is created and set as the current event loop.


loop – The current event loop

Return type:


pydra.engine.helpers.load_and_run(task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the task

async pydra.engine.helpers.load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs)[source]

loading a task from a pickle file, settings proper input and running the workflow

pydra.engine.helpers.load_result(checksum, cache_locations)[source]

Restore a result from the cache.

  • checksum (str) – Unique identifier of the task to be loaded.

  • cache_locations (list of os.pathlike) – List of cache directories, in order of priority, where the checksum will be looked for.

pydra.engine.helpers.load_task(task_pkl, ind=None)[source]

loading a task from a pickle file, settings proper input for the specific ind


Create a data class given a spec.


spec – TODO

pydra.engine.helpers.output_from_inputfields(output_spec, input_spec)[source]

Collect values from output from input fields. If names_only is False, the output_spec is updated, if names_only is True only the names are returned

  • output_spec – TODO

  • input_spec – TODO

pydra.engine.helpers.parse_copyfile(fld: Attribute, default_collation=CopyCollation.any)[source]

Gets the copy mode from the ‘copyfile’ value from a field attribute


Parse a argstr format string and return all keywords used in it.


Sort objects by position, following Python indexing conventions.

Ordering is positive positions, lowest to highest, followed by unspecified positions (None) and negative positions, lowest to highest.

>>> position_sort([(None, "d"), (-3, "e"), (2, "b"), (-2, "f"), (5, "c"), (1, "a")])
['a', 'b', 'c', 'd', 'e', 'f']

args (list of (int/None, object) tuples)

Return type:

list of objects


Visit a task object and print its input/output interface.

pydra.engine.helpers.read_and_display(*cmd, strip=False, hide_display=False)[source]

Capture a process’ standard output.

async pydra.engine.helpers.read_and_display_async(*cmd, hide_display=False, strip=False)[source]

Capture standard input and output of a process, displaying them as they arrive.

Works line-by-line.

async pydra.engine.helpers.read_stream_and_display(stream, display)[source]

Read from stream line by line until EOF, display, and capture the lines.

pydra.engine.helpers.record_error(error_path, error)[source]

Write an error file. Path, result=None, task=None, name_prefix=None)[source]

Save a TaskBase object and/or results.

  • task_path (Path) – Write directory

  • result (Result) – Result to pickle and write

  • task (TaskBase) – Task to pickle and write