Source code for fluidimage.executors.exec_async_sequential

"""Executor async/await sequential
==================================

A executor using async for IO but launching CPU-bounded tasks sequentially.

.. autoclass:: ExecutorAsyncSequential
   :members:
   :private-members:

"""

import time

from .exec_async import ExecutorAsync


[docs]class ExecutorAsyncSequential(ExecutorAsync): """Async executor launching CPU-bounded tasks sequentially"""
[docs] async def async_run_work_cpu(self, work): """Executes the work on an item (key, obj), and add the result on work.output_queue. Parameters ---------- work : A work from the topology key : hashable The key of the dictionary item to be process obj : object The value of the dictionary item to be process """ self.nb_working_workers_cpu += 1 try: key, obj = work.input_queue.pop_first_item() except KeyError: self.nb_working_workers_cpu -= 1 return if work.check_exception(key, obj): self.nb_working_workers_cpu -= 1 return t_start = time.time() self.log_in_file_memory_usage( f"{time.time() - self.t_start:.2f} s. Launch work " + work.name_no_space + f" ({key}). mem usage" ) arg = work.prepare_argument(key, obj) # pylint: disable=W0703 try: # here we do something very bad from the async point of view: # we launch a potentially long blocking function: ret = work.func_or_cls(arg) except Exception as error: self.log_exception(error, work.name_no_space, key) if self.stop_if_error: raise ret = error else: self.log_in_file( f"work {work.name_no_space} ({key}) " f"done in {time.time() - t_start:.3f} s" ) if work.output_queue is not None: work.output_queue[key] = ret self.nb_working_workers_cpu -= 1
Executor = ExecutorAsyncSequential