"""Executor async/await using servers
=====================================
A executor using async for IO and servers for CPU-bounded tasks.
.. autoclass:: ExecutorAsyncServers
:members:
:private-members:
"""
import os
import signal
import numpy as np
import trio
from fluiddyn import time_as_str
from fluidimage.util import log_debug, logger
from .exec_async import ExecutorAsync
from .servers import launch_server
max_items_in_server = 4
[docs]class ExecutorAsyncServers(ExecutorAsync):
"""Executor async/await using servers.
Parameters
----------
nb_max_workers : None, int
Limits the numbers of workers working in the same time.
nb_items_queue_max : None, int
Limits the numbers of items that can be in a output_queue.
sleep_time : None, float
Defines the waiting time (from trio.sleep) of a function. Functions await
"trio.sleep" when they have done a work on an item, and when there is
nothing in there input_queue.
"""
_type_server = "multiprocessing"
def _init_log_path(self):
super()._init_log_path()
path_dir_log = self.path_dir_exceptions
path_dir_log.mkdir(exist_ok=True)
self._log_path = path_dir_log / (path_dir_log.name + ".txt")
def __init__(
self,
topology,
path_dir_result,
nb_max_workers=None,
nb_items_queue_max=None,
sleep_time=0.01,
logging_level="info",
stop_if_error=False,
):
if stop_if_error:
raise NotImplementedError
super().__init__(
topology,
path_dir_result,
nb_max_workers,
nb_items_queue_max,
sleep_time=sleep_time,
logging_level=logging_level,
)
# create nb_max_workers servers
self.workers = []
for ind_worker in range(self.nb_max_workers):
log_path = self._log_path.parent / f"process_{ind_worker:03d}.txt"
self.workers.append(
launch_server(
topology,
log_path,
self._type_server,
sleep_time,
logging_level,
)
)
def signal_handler(sig, frame):
del sig, frame
logger.info("Ctrl+C signal received...")
for worker in self.workers:
worker.terminate()
self._has_to_stop = True
self.nursery.cancel_scope.cancel()
# we need to raise the exception
raise KeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
[docs] def compute(self):
"""Compute the whole topology.
Begin by executing one shot jobs, then execute multiple shots jobs
implemented as async functions. Warning, one shot jobs must be ancestors
of multiple shots jobs in the topology.
"""
self._init_compute()
for worker in self.workers:
worker.send(("__t_start__", self.t_start))
self.exec_one_shot_works()
trio.run(self.start_async_works)
self._finalize_compute()
[docs] async def start_async_works(self):
"""Create a trio nursery and start all async functions."""
async with trio.open_nursery() as self.nursery:
for af in reversed(self.async_funcs.values()):
self.nursery.start_soon(af)
self.nursery.start_soon(self.update_has_to_stop)
logger.info("terminate the servers")
for worker in self.workers:
worker.terminate()
[docs] async def update_has_to_stop(self):
"""Work has to stop flag. Check if all works has been done.
Return True if there are no workers in working and if there is no items in
all queues.
"""
while not self._has_to_stop:
result = (
(not any([bool(queue) for queue in self.topology.queues]))
and all(worker.is_unoccupied for worker in self.workers)
and self.nb_working_workers_io == 0
)
if result:
self._has_to_stop = True
log_debug("has_to_stop!")
if self.logging_level == "debug":
log_debug(f"self.topology.queues: {self.topology.queues}")
log_debug(
"[worker.is_unoccupied for worker in self.workers]: "
f"{[worker.is_unoccupied for worker in self.workers]}"
)
log_debug(
"[worker.is_available for worker in self.workers]: "
f"{[worker.is_available for worker in self.workers]}"
)
log_debug(
"[worker.nb_items_to_process for worker in self.workers] "
f"{[worker.nb_items_to_process for worker in self.workers]}"
)
log_debug(
f"self.nb_working_workers_io: {self.nb_working_workers_io}"
)
await trio.sleep(self.sleep_time)
[docs] async def async_run_work_cpu(self, work, worker):
"""Is destined to be started with a "trio.start_soon".
Executes the work on an item (key, obj), and add the result on
work.output_queue.
Parameters
----------
work :
A work from the topology
worker : .servers.WorkerMultiprocessing
A client to communicate with the server worker.
"""
try:
key, obj = work.input_queue.pop_first_item()
except KeyError:
worker.is_available = True
return
if work.check_exception(key, obj):
worker.is_available = True
return
def run_process():
# create a communication channel
parent_conn, child_conn = worker.new_pipe()
# send (work, key, obj, comm) to the server
worker.send_job((work.name, key, obj, child_conn))
worker.is_available = True
# wait for the end of the computation
work_name_received, key_received, result = parent_conn.recv()
assert work.name == work_name_received
assert key == key_received
return result
ret = await trio.to_thread.run_sync(run_process)
if work.output_queue is not None:
work.output_queue[key] = ret
worker.well_done_thanks()
[docs] def def_async_func_work_cpu(self, work):
async def func(work=work):
while True:
while not work.input_queue or (
work.output_queue is not None
and len(work.output_queue) >= self.nb_items_queue_max
):
if self._has_to_stop:
return
await trio.sleep(self.sleep_time)
available_worker = False
while not available_worker:
if self._has_to_stop:
return
available_worker = self.get_available_worker()
await trio.sleep(self.sleep_time)
self.nursery.start_soon(
self.async_run_work_cpu, work, available_worker
)
await trio.sleep(self.sleep_time)
return func
[docs] def get_available_worker(self):
"""Get a worker available to receive a new job"""
available_workers = [
worker
for worker in self.workers
if worker.is_available
and worker.nb_items_to_process < max_items_in_server
]
if not available_workers:
return False
nb_items_to_process_workers = [
worker.nb_items_to_process for worker in available_workers
]
index = np.argmin(nb_items_to_process_workers)
worker = available_workers[index]
worker.is_available = False
return worker
Executor = ExecutorAsyncServers