fluidimage.executors.multi_exec_async

Multi executors async (fluidimage.executors.multi_exec_async)

class fluidimage.executors.multi_exec_async.MultiExecutorAsync(topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, sleep_time=0.01, logging_level='info', stop_if_error=False)[source]

Bases: fluidimage.executors.base.ExecutorBase

Manage the multi-executor mode

This class is not the one whose really compute the topology. The topology is split and each slice is computed with an ExecutorAsync

Parameters
nb_max_workersNone, int

Limits the numbers of workers working in the same time.

nb_items_queue_maxNone, int

Limits the numbers of items that can be in a output_queue.

sleep_timeNone, float

defines the waiting time (from trio.sleep) of a function. Async functions await “trio.sleep(sleep_time)” when they have done a work on an item, and when there is nothing in there input_queue.

compute(self)[source]

Compute the topology.

There are two ways to split self.topology work:

  • If first self.topology has “series” attribute (from seriesOfArray), it creates “self.nb_max_workers” topologies and changes “ind_start” and “ind_stop” of topology.series. The split considers series.ind_step.

  • Else, if the first work of the topology has an unique output_queue, it splits that queue in “self.nb_max_worker” slices and create as many topologies. On these last, the first work will be removed and the first queue will be filled with a partition of the first queue Then create as many Executer_await as topologies, give each topology to each executors, and call each Executor_await.compute in a process from multiprocessing.

start_mutiprocess_first_queue(self)[source]

Start the processes spitting the work with the first queue

start_multiprocess_series(self)[source]

Start the processes spitting the work with the series object

launch_process(self, topology, ind_process)[source]

Launch one process

wait_for_all_processes(self)[source]

logging + wait for all processes to finish

class fluidimage.executors.multi_exec_async.ExecutorAsyncForMulti(topology, path_dir_result, log_path, sleep_time=0.01, logging_level='info', stop_if_error=False)[source]

Bases: fluidimage.executors.exec_async_sequential.ExecutorAsyncSequential

Slightly modified ExecutorAsync

Classes

ExecutorAsyncForMulti(topology, …[, …])

Slightly modified ExecutorAsync

MultiExecutorAsync(topology, path_dir_result)

Manage the multi-executor mode