Source code for fluidimage.executors

"""Executors of computational topologies
========================================

The executors are used to execute a topology.

From the user point of view, the executor is chosen from the method
:func:`fluidimage.topologies.base.TopologyBase.compute`. The default executor
is :class:`fluidimage.executors.multi_exec_async.MultiExecutorAsync`.

There are many executors with different computational strategies. Depending on
the computational topology and the hardware, it can be more efficient to chose
an executor compared to another.

.. autosummary::
   :toctree:

   base
   exec_sequential
   exec_async
   exec_async_sequential
   multi_exec_async
   multi_exec_subproc
   exec_async_seq_for_multi
   exec_async_multiproc
   exec_async_servers
   servers

.. autofunction:: get_entry_points

.. autofunction:: get_executor_names

.. autofunction:: import_executor_class

"""

import importlib
import os
import sys

if sys.version_info < (3, 10):
    from importlib_metadata import EntryPoint, entry_points
else:
    from importlib.metadata import EntryPoint, entry_points

import trio


def afterfork():
    trio._core._thread_cache.THREAD_CACHE._idle_workers.clear()


if hasattr(os, "register_at_fork"):
    os.register_at_fork(after_in_child=afterfork)

from .base import ExecutorBase

# on Windows (and MacOS), one cannot use "multi_exec_async"
# because the OS does not support forks (or not fully) and
# multiprocessing works differently than on Linux
# see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
supported_multi_executors = ["multi_exec_subproc"]
if sys.platform == "linux":
    supported_multi_executors.insert(0, "multi_exec_async")


_entry_points = None


[docs]def get_entry_points(reload=False): """Discover the executors installed""" global _entry_points if _entry_points is None or reload: _entry_points = entry_points(group="fluidimage.executors") if not _entry_points: raise RuntimeError( "No executor entry point were found, which indicates an installation issue." ) return _entry_points
[docs]def get_executor_names(): """Get available executor names""" return set(entry_point.name for entry_point in get_entry_points())
def _get_module_fullname_from_name(name): """Get the module name from an executor name Parameters ---------- name : str Name of an executor. """ entry_points = get_entry_points() selected_entry_points = entry_points.select(name=name) if len(selected_entry_points) == 0: raise ValueError(f"Cannot find an executor for {name = }. {entry_points}") elif len(selected_entry_points) > 1: logging.warning( f"{len(selected_entry_points)} plugins were found for {name = }" ) return selected_entry_points[name].value
[docs]def import_executor_class(name): """Import an executor class. Parameters ---------- name : str Executor name. Returns ------- The corresponding executor class. """ if isinstance(name, EntryPoint): module_fullname = name.value name = name.name else: module_fullname = _get_module_fullname_from_name(name) mod = importlib.import_module(module_fullname) return mod.Executor
__all__ = ["ExecutorBase", "get_executor_names", "import_executor_class"]