fluidimage.executors.exec_async#

Executor async/await#

This executer uses await/async with trio library to put topology tasks in concurrency.

A single executor (in one process) is used. If CPU bounded tasks are limited by the Python GIL, the threads won’t use at the same time the CPU.

This means that the work will run on one CPU at a time, except if the topology uses compiled code releasing the GIL. In this case, the GIL can be bypassed and computation can use many CPU at a time.

class fluidimage.executors.exec_async.ExecutorAsync(topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, sleep_time=0.01, logging_level='info', stop_if_error=False, path_log=None)[source]#

Bases: ExecutorBase

Executor async/await.

The work in performed in a single process.

compute()[source]#

Compute the whole topology.

Begin by executing one shot jobs, then execute multiple shots jobs implemented as async functions. Warning, one shot jobs must be ancestors of multiple shots jobs in the topology.

async start_async_works()[source]#

Create a trio nursery and start all async functions.

define_functions()[source]#

Define sync and async functions.

Define sync (“one shot” functions) and async functions (multiple shot functions), and store them in self.async_funcs.

The behavior of the executor is mostly defined here. To sum up : Each “multiple shot” waits for an items to be available in there input_queue and process the items as soon as they are available.

def_async_func_work_io(work)[source]#

Define an asynchronous function launching a io work.

def_async_func_work_cpu(work)[source]#

Define an asynchronous function launching a cpu work.

async async_run_work_io(work)[source]#

Is destined to be started with a “trio.start_soon”.

Executes the work on an item (key, obj), and add the result on work.output_queue.

Parameters:
work

A work from the topology

async async_run_work_cpu(work)[source]#

Is destined to be started with a “trio.start_soon”.

Executes the work on an item (key, obj), and add the result on work.output_queue.

Parameters:
work

A work from the topology

async update_has_to_stop()[source]#

Work has to stop flag. Check if all works has been done.

Return True if there are no workers in working and if there is no items in all queues.

Classes

Executor

alias of ExecutorAsync

ExecutorAsync(topology, path_dir_result[, ...])

Executor async/await.