pydra.engine.workers module

Execution workers.

class pydra.engine.workers.ConcurrentFuturesWorker(n_procs=None)[source]

Bases: Worker

A worker to execute in parallel using Python’s concurrent futures.

close()[source]

Finalize the internal pool of tasks.

async exec_as_coro(runnable, rerun=False, environment=None)[source]

Run a task (coroutine wrapper).

plugin_name = 'cf'
run_el(runnable, rerun=False, environment=None, **kwargs)[source]

Run a task.

class pydra.engine.workers.DaskWorker(**kwargs)[source]

Bases: Worker

A worker to execute in parallel using Dask.distributed. This is an experimental implementation with limited testing.

close()[source]

Finalize the internal pool of tasks.

async exec_dask(runnable, rerun=False)[source]

Run a task (coroutine wrapper).

plugin_name = 'dask'
run_el(runnable, rerun=False, **kwargs)[source]

Run a task.

class pydra.engine.workers.DistributedWorker(loop=None, max_jobs=None)[source]

Bases: Worker

Base Worker for distributed execution.

async fetch_finished(futures)[source]

Awaits asyncio’s asyncio.Task until one is finished.

Limits number of submissions based on py:attr:DistributedWorker.max_jobs.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

max_jobs

Maximum number of concurrently running jobs.

class pydra.engine.workers.PsijLocalWorker(**kwargs)[source]

Bases: PsijWorker

A worker to execute tasks using PSI/J on the local machine.

plugin_name = 'psij-local'
subtype = 'local'
class pydra.engine.workers.PsijSlurmWorker(**kwargs)[source]

Bases: PsijWorker

A worker to execute tasks using PSI/J using SLURM.

plugin_name = 'psij-slurm'
subtype = 'slurm'
class pydra.engine.workers.PsijWorker(**kwargs)[source]

Bases: Worker

A worker to execute tasks using PSI/J.

close()[source]

Finalize the internal pool of tasks.

async exec_psij(runnable, rerun=False)[source]

Run a task (coroutine wrapper).

Raises:

Exception – If stderr is not empty.

Return type:

None

make_job(spec, attributes)[source]

Create a PSI/J job.

Parameters:
  • spec (psij.JobSpec) – PSI/J job specification.

  • attributes (any) – Job attributes.

Returns:

PSI/J job.

Return type:

psij.Job

make_spec(cmd=None, arg=None)[source]

Create a PSI/J job specification.

Parameters:
  • cmd (str, optional) – Executable command. Defaults to None.

  • arg (list, optional) – List of arguments. Defaults to None.

Returns:

PSI/J job specification.

Return type:

psij.JobSpec

run_el(interface, rerun=False, **kwargs)[source]

Run a task.

class pydra.engine.workers.SGEWorker(loop=None, max_jobs=None, poll_delay=1, qsub_args=None, write_output_files=True, max_job_array_length=50, indirect_submit_host=None, max_threads=None, poll_for_result_file=True, default_threads_per_task=1, polls_before_checking_evicted=60, collect_jobs_delay=30, default_qsub_args='', max_mem_free=None)[source]

Bases: DistributedWorker

A worker to execute tasks on SLURM systems.

async check_for_results_files(jobid, threads_requested)[source]
async get_output_by_task_pkl(task_pkl)[source]
async get_tasks_to_run(task_qsub_args, mem_free)[source]
plugin_name = 'sge'
run_el(runnable, rerun=False)[source]

Worker submission API.

async submit_array_job(sargs, tasks_to_run, error_file)[source]
class pydra.engine.workers.SerialWorker(**kwargs)[source]

Bases: Worker

A worker to execute linearly.

close()[source]

Return whether the task is finished.

async exec_serial(runnable, rerun=False, environment=None)[source]
async fetch_finished(futures)[source]

Awaits asyncio’s asyncio.Task until one is finished.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

plugin_name = 'serial'
run_el(interface, rerun=False, environment=None, **kwargs)[source]

Run a task.

class pydra.engine.workers.SlurmWorker(loop=None, max_jobs=None, poll_delay=1, sbatch_args=None)[source]

Bases: DistributedWorker

A worker to execute tasks on SLURM systems.

plugin_name = 'slurm'
run_el(runnable, rerun=False, environment=None)[source]

Worker submission API.

class pydra.engine.workers.Worker(loop=None)[source]

Bases: object

A base class for execution of tasks.

close()[source]

Close this worker.

async fetch_finished(futures)[source]

Awaits asyncio’s asyncio.Task until one is finished.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

run_el(interface, **kwargs)[source]

Return coroutine for task execution.