fluidimage.executors.multi_exec_async#

Multi executors async#

class fluidimage.executors.multi_exec_async.MultiExecutorAsync(topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, sleep_time=0.01, logging_level='info', stop_if_error=False)[source]#

Bases: MultiExecutorBase

Manage the multi-executor mode

This class is not the one whose really compute the topology. The topology is split and each slice is computed with an ExecutorAsync

Parameters:
nb_max_workersNone, int

Limits the numbers of workers working in the same time.

nb_items_queue_maxNone, int

Limits the numbers of items that can be in a output_queue.

sleep_timeNone, float

Defines the waiting time (from trio.sleep) of a function. Async functions await trio.sleep(sleep_time) when they have done a work on an item, and when there is nothing in their input_queue.

_start_processes()[source]#

There are two ways to split self.topology work:

  • If first self.topology has “series” attribute (from seriesOfArray), it creates “self.nb_max_workers” topologies and changes “ind_start” and “ind_stop” of topology.series. The split considers series.ind_step.

  • Else, if the first work of the topology has an unique output_queue, it splits that queue in “self.nb_max_worker” slices and create as many topologies. On these last, the first work will be removed and the first queue will be filled with a partition of the first queue Then create as many Executer_await as topologies, give each topology to each executors, and call each Executor_await.compute in a process from multiprocessing.

_start_multiprocess_first_queue()[source]#

Start the processes spitting the work with the first queue

_start_multiprocess_series()[source]#

Start the processes spitting the work with the series object

init_and_compute(topology_this_process, log_path, idx_process)[source]#

Create an executor and start it in a process

launch_process(topology, idx_process)[source]#

Launch one process

_join_processes()[source]#

Join the processes

class fluidimage.executors.multi_exec_async.ExecutorAsyncForMulti(topology, path_dir_result, nb_max_workers=1, sleep_time=0.01, logging_level='info', stop_if_error=False, path_log=None, t_start=None, index_process=None)[source]#

Bases: ExecutorAsyncSeqForMulti

Slightly modified ExecutorAsync

Classes

Executor

alias of MultiExecutorAsync

ExecutorAsyncForMulti(topology, path_dir_result)

Slightly modified ExecutorAsync

MultiExecutorAsync(topology, path_dir_result)

Manage the multi-executor mode