Source code for fluidimage.executors.exec_async_multiproc
"""Executor async/await + multiprocessing
=========================================
A executor using async for IO and multiprocessing for CPU bounded tasks.
.. autoclass:: ExecutorAsyncMultiproc
:members:
:private-members:
"""
import time
from multiprocessing import Event, Pipe, Process
import trio
from fluidimage.util import log_debug, logger
from .exec_async import ExecutorAsync
def exec_work_and_comm(func, obj, child_conn, event):
# log_debug(f"process ({key}) started")
event.set()
# pylint: disable=W0703
try:
result = func(obj)
except Exception as error:
result = error
# log_debug(f"in process, send result ({key}): {result}")
child_conn.send(result)
[docs]class ExecutorAsyncMultiproc(ExecutorAsync):
"""Async executor using multiprocessing to launch CPU-bounded tasks"""
[docs] async def async_run_work_cpu(self, work):
"""Is destined to be started with a "trio.start_soon".
Executes the work on an item (key, obj), and add the result on
work.output_queue.
Parameters
----------
work :
A work from the topology
"""
self.nb_working_workers_cpu += 1
try:
key, obj = work.input_queue.pop_first_item()
except KeyError:
self.nb_working_workers_cpu -= 1
return
if work.check_exception(key, obj):
self.nb_working_workers_cpu -= 1
return
arg = work.prepare_argument(key, obj)
t_start = time.time()
self.log_in_file_memory_usage(
f"{time.time() - self.t_start:.2f} s. Launch work "
+ work.name_no_space
+ f" ({key}). mem usage"
)
parent_conn, child_conn = Pipe()
event = Event()
def run_process():
# we do this complicate thing because there may be a strange bug
def start_process_and_check(index_attempt):
process = Process(
target=exec_work_and_comm,
args=(work.func_or_cls, arg, child_conn, event),
)
process.daemon = True
process.start()
# check whether the process has really started (possible bug!)
if not event.wait(1):
log_debug(
f"problem: process {work.name_no_space} ({key}) "
f"has not really started... (attempt {index_attempt})"
)
process.terminate()
return False
return process
really_started = False
for index_attempt in range(10):
process = start_process_and_check(index_attempt)
if process:
really_started = True
break
if not really_started:
raise RuntimeError(
f"A process {work.name_no_space} ({key}) "
"has not started after 10 attempts"
)
# todo: use parent_conn.poll to implement a timeout
# log_debug(f"waiting for result ({key})")
result = parent_conn.recv()
# log_debug(f"result ({key}) received")
process.join(10 * self.sleep_time)
if process.exitcode != 0:
logger.error("process.exitcode: %s", process.exitcode)
process.terminate()
return result
ret = await trio.to_thread.run_sync(run_process)
if isinstance(ret, Exception):
self.log_exception(ret, work.name_no_space, key)
if self.stop_if_error:
raise ret
else:
self.log_in_file(
f"work {work.name_no_space} ({key}) "
f"done in {time.time() - t_start:.3f} s"
)
if work.output_queue is not None:
work.output_queue[key] = ret
self.nb_working_workers_cpu -= 1
Executor = ExecutorAsyncMultiproc