pydra.engine package

The core of the workflow engine.

class pydra.engine.AuditFlag(value)[source]

Bases: Flag

Auditing flags.

ALL = 3[source]

Track provenance and resource utilization.

NONE = 0[source]

Do not track provenance or monitor resources.

PROV = 1[source]

Track provenance only.

RESOURCE = 2[source]

Monitor resource utilization only.

class pydra.engine.ShellCommandTask(audit_flags: ~pydra.utils.messenger.AuditFlag = AuditFlag.NONE, cache_dir=None, input_spec: ~pydra.engine.specs.SpecInfo | None = None, cont_dim=None, messenger_args=None, messengers=None, name=None, output_spec: ~pydra.engine.specs.SpecInfo | None = None, rerun=False, strip=False, environment=<pydra.engine.environments.Native object>, **kwargs)[source]

Bases: TaskBase

Wrap a shell command as a task element.

DEFAULT_COPY_COLLATION = 2[source]
property cmdline

Get the actual command line that will be submitted Returns a list if the task has a state.

command_args(root=None)[source]

Get command line arguments

get_bindings(root: str | None = None) dict[str, tuple[str, str]][source]

Return bindings necessary to run task in an alternative root.

This is primarily intended for contexts when a task is going to be run in a container with mounted volumes.

Parameters:

root (str)

Returns:

bindings – Mapping from paths in the host environment to the target environment

Return type:

dict

input_spec = None
output_spec = None
class pydra.engine.Submitter(plugin: str | Type[Worker] = 'cf', **kwargs)[source]

Bases: object

Send a task to the execution backend.

close()[source]

Close submitter.

Do not close previously running loop.

async expand_runnable(runnable, wait=False, rerun=False)[source]

This coroutine handles state expansion.

Removes any states from runnable. If wait is set to False (default), aggregates all worker execution coroutines and returns them. If wait is True, waits for all coroutines to complete / error and returns None.

Parameters:
  • runnable (pydra Task) – Task instance (Task, Workflow)

  • wait (bool (False)) – Await all futures before completing

Returns:

futures – Coroutines for TaskBase execution.

Return type:

set or None

async expand_workflow(wf, rerun=False)[source]

Expand and execute a stateless Workflow. This method is only reached by Workflow._run_task.

Parameters:

wf (Workflow) – Workflow Task object

Returns:

wf – The computed workflow

Return type:

pydra.engine.core.Workflow

async submit_from_call(runnable, rerun, environment)[source]

This coroutine should only be called once per Submitter call, and serves as the bridge between sync/async lands.

There are 4 potential paths based on the type of runnable: 0) Workflow has a different plugin than a submitter 1) Workflow without State 2) Task without State 3) (Workflow or Task) with State

Once Python 3.10 is the minimum, this should probably be refactored into using structural pattern matching.

class pydra.engine.Workflow(name, audit_flags: AuditFlag = AuditFlag.NONE, cache_dir=None, cache_locations=None, input_spec: List[str] | Dict[str, Type[Any]] | SpecInfo | None = None, cont_dim=None, messenger_args=None, messengers=None, output_spec: List[str] | Dict[str, type] | SpecInfo | BaseSpec | None = None, rerun=False, propagate_rerun=True, **kwargs)[source]

Bases: TaskBase

A composite task with structure of computational graph.

add(task)[source]

Add a task to the workflow.

Parameters:

task (TaskBase) – The task to be added.

property checksum

Calculates the unique checksum of the task. Used to create specific directory name for task that are run; and to create nodes checksums needed for graph checksums (before the tasks have inputs etc.)

create_connections(task, detailed=False)[source]

Add and connect a particular task to existing nodes in the workflow.

Parameters:
  • task (TaskBase) – The task to be added.

  • detailed (bool) – If True, add_edges_description is run for self.graph to add a detailed descriptions of the connections (input/output fields names)

create_dotfile(type='simple', export=None, name=None, output_dir=None)[source]

creating a graph - dotfile and optionally exporting to other formats

property graph_sorted

Get a sorted graph representation of the workflow.

property lzin[source]
property nodes

Get the list of node names.

set_output(connections: Tuple[str, LazyField] | List[Tuple[str, LazyField]])[source]

Set outputs of the workflow by linking them with lazy outputs of tasks

Parameters:

connections (tuple[str, LazyField] or list[tuple[str, LazyField]] or None) – single or list of tuples linking the name of the output to a lazy output of a task in the workflow.

Submodules