fluidimage.executors.exec_async_servers#

Executor async/await using servers#

A executor using async for IO and servers for CPU-bounded tasks.

class fluidimage.executors.exec_async_servers.ExecutorAsyncServers(topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, sleep_time=0.01, logging_level='info', stop_if_error=False)[source]#

Bases: ExecutorAsync

Executor async/await using servers.

Parameters:
nb_max_workersNone, int

Limits the numbers of workers working in the same time.

nb_items_queue_maxNone, int

Limits the numbers of items that can be in a output_queue.

sleep_timeNone, float

Defines the waiting time (from trio.sleep) of a function. Functions await “trio.sleep” when they have done a work on an item, and when there is nothing in there input_queue.

compute()[source]#

Compute the whole topology.

Begin by executing one shot jobs, then execute multiple shots jobs implemented as async functions. Warning, one shot jobs must be ancestors of multiple shots jobs in the topology.

async start_async_works()[source]#

Create a trio nursery and start all async functions.

async update_has_to_stop()[source]#

Work has to stop flag. Check if all works has been done.

Return True if there are no workers in working and if there is no items in all queues.

async async_run_work_cpu(work, worker)[source]#

Is destined to be started with a “trio.start_soon”.

Executes the work on an item (key, obj), and add the result on work.output_queue.

Parameters:
work

A work from the topology

worker.servers.WorkerMultiprocessing

A client to communicate with the server worker.

def_async_func_work_cpu(work)[source]#

Define an asynchronous function launching a cpu work.

get_available_worker()[source]#

Get a worker available to receive a new job

Classes

Executor

alias of ExecutorAsyncServers

ExecutorAsyncServers(topology, path_dir_result)

Executor async/await using servers.