Source code for fluidimage.executors.base

"""Base class for executors
===========================

.. autoclass:: ExecutorBase
   :members:
   :private-members:

.. autoclass:: MultiExecutorBase
   :members:
   :private-members:

"""

import atexit
import copy
import os
import signal
import sys
import traceback
from abc import ABC, abstractmethod
from pathlib import Path
from time import sleep, time

from rich.console import Console
from rich.progress import Progress

from fluiddyn import time_as_str
from fluiddyn.io.tee import MultiFile
from fluiddyn.util.paramcontainer import ParamContainer
from fluidimage.config import get_config
from fluidimage.topologies.nb_cpu_cores import nb_cores
from fluidimage.util import (
    get_txt_memory_usage,
    log_error,
    log_memory_usage,
    logger,
    reset_logger,
    safe_eval,
    str_short,
)

config = get_config()

_omp_num_threads_equal_1_at_import = os.environ.get("OMP_NUM_THREADS") == "1"


[docs]class ExecutorBase(ABC): """Base class for executors. Parameters ---------- topology : fluidimage.topology.base.TopologyBase A computational topology. path_dir_result : str or pathlib.Path The path of the directory where the results have to be saved. nb_max_workers : int, optional (None) nb_items_queue_max : int, optional (None), logging_level : str, optional {"info"}, sleep_time : number, optional {None}, stop_if_error : bool, optional {False} """ info_job: dict path_job_data: Path _path_lockfile: Path num_expected_results: int def _init_log_path(self): unique_postfix = f"{time_as_str()}_{os.getpid()}" path_job_data = self.path_dir_result / f"job_{unique_postfix}" if path_job_data.exists(): index = 0 unique_postfix0 = unique_postfix while path_job_data.exists(): index += 1 unique_postfix = unique_postfix0 + str(index) path_job_data = self.path_dir_result / f"log_{unique_postfix}" self._unique_postfix = unique_postfix self.path_job_data = path_job_data self._path_lockfile = self.path_job_data / "is_running.lock" self.path_dir_exceptions = self.path_dir_result / f"log_{unique_postfix}" self._log_path = self.path_dir_result / f"log_{unique_postfix}.txt" def __init__( self, topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, logging_level="info", sleep_time=None, stop_if_error=False, path_log=None, ): if not _omp_num_threads_equal_1_at_import: raise SystemError( "For performance reason," 'the environment variable OMP_NUM_THREADS has to be set to "1" ' "before executing a Fluidimage topology." ) del sleep_time self.topology = topology self.logging_level = logging_level self.stop_if_error = stop_if_error path_dir_result = Path(path_dir_result).resolve() path_dir_result.mkdir(exist_ok=True) self.path_dir_result = path_dir_result self._init_log_path() if path_log is not None: self._log_path = path_log self._log_file = open(self._log_path, "w", encoding="utf-8") stdout = sys.stdout if isinstance(stdout, MultiFile): stdout = sys.__stdout__ stderr = sys.stderr if isinstance(stderr, MultiFile): stderr = sys.__stderr__ self._old_stdout = sys.stdout self._old_stderr = sys.stderr sys.stdout = MultiFile([stdout, self._log_file]) sys.stderr = MultiFile([stderr, self._log_file]) if logging_level: for handler in logger.handlers: logger.removeHandler(handler) from fluidimage import config_logging config_logging(logging_level, file=self._get_file_object_for_logger()) if nb_max_workers is None: if config is not None: try: nb_max_workers = safe_eval( config["topology"]["nb_max_workers"] ) except KeyError: pass # default nb_max_workers # Difficult: trade off between overloading and limitation due to input # output. The user can do much better for a specific case. if nb_max_workers is None: if nb_cores < 16: nb_max_workers = nb_cores + 2 else: nb_max_workers = nb_cores self.nb_max_workers = nb_max_workers if nb_items_queue_max is None: nb_items_queue_max = max(4 * nb_max_workers, 8) self.nb_items_queue_max = nb_items_queue_max self._has_to_stop = False if sys.platform != "win32": def handler_signals(signal_number, stack): del stack print( f"signal {signal_number} received: set _has_to_stop to True " f"({type(self).__name__})." ) self._has_to_stop = True signal.signal(12, handler_signals) # Picks up async works self.works = [ work for work in self.topology.works if work.kind is None or "one shot" not in work.kind ] self._indices_to_be_computed = None # to avoid a pylint warning self.t_start = None def _get_file_object_for_logger(self): return sys.stdout def _init_compute(self): self.t_start = time() self._init_num_expected_results() if self.num_expected_results == 0: return self._init_compute_log() self._save_job_data() def _init_compute_log(self): log_memory_usage(time_as_str(2) + ": starting execution. mem usage") topology_name = str_short(type(self.topology)) executor_name = str_short(type(self)) print(" topology:", topology_name) print(" executor:", executor_name) print(" nb_cpus_allowed =", nb_cores) print(" nb_max_workers =", self.nb_max_workers) print(" num_expected_results", self.num_expected_results) print(" path_dir_result =", self.path_dir_result) print( "Monitoring app can be launched with:\n" f"fluidimage-monitor {self.path_dir_result}" ) self.info_job = { "topology": topology_name, "executor": executor_name, "nb_cpus_allowed": nb_cores, "nb_max_workers": self.nb_max_workers, "path_dir_result": self.path_dir_result, "num_expected_results": self.num_expected_results, } def _save_lock_file(self): if self._path_lockfile.exists(): raise RuntimeError( f"File {self._path_lockfile} already exists. It usually " "means that this directory is already being used by " "another process. Alternatively it might be that an " "old lockfile has not been deleted (which is a bug). " "If no process uses this directory, the lockfile " "can safely be removed." ) else: with open(self._path_lockfile, "w", encoding="utf-8") as file: file.write(time_as_str() + f"\n{os.getpid()}\n") atexit.register(self._release_lock) def _release_lock(self): if self._path_lockfile.exists(): self._path_lockfile.unlink(missing_ok=True) def _save_job_data(self): self.path_job_data = self.path_dir_result / f"job_{self._unique_postfix}" self.path_job_data.mkdir(exist_ok=False) self._save_lock_file() params = self.topology.params if isinstance(params, dict): params = ParamContainer("params", attribs=params) params._save_as_xml(self.path_job_data / "params.xml") info_job = ParamContainer("info", attribs=self.info_job) info_job._save_as_xml(self.path_job_data / "info.xml") def _reset_std_as_default(self): sys.stdout = self._old_stdout sys.stderr = self._old_stderr reset_logger() self._log_file.close() def _finalize_compute(self): log_memory_usage(time_as_str(2) + ": end of `compute`. mem usage") self.topology.print_at_exit(time() - self.t_start) self._reset_std_as_default() self._release_lock()
[docs] def log_in_file(self, *args, sep=" ", end="\n"): """Simple write in the log file (without print)""" self._log_file.write(sep.join(str(arg) for arg in args) + end)
[docs] def log_in_file_memory_usage(self, txt, color="OKGREEN", end="\n"): """Write the memory usage in the log file""" self._log_file.write(get_txt_memory_usage(txt, color) + end)
[docs] def exec_one_shot_works(self): """ Execute all "one shot" functions. """ for work in self.topology.works: if work.kind is not None and "one shot" in work.kind: pretty = str_short(work.func_or_cls.__func__) print(f'Running "one_shot" job "{work.name}" ({pretty})') work.func_or_cls(work.input_queue, work.output_queue)
[docs] def log_exception(self, exception, work_name, key): """Log an exception in a file.""" path_log = self.path_dir_exceptions / f"exception_{work_name}_{key}.txt" log_error( "error during work " f"{work_name} ({key}) (logged in {path_log})" ) self.path_dir_exceptions.mkdir(exist_ok=True) try: parts = traceback.format_exception( etype=type(exception), value=exception, tb=exception.__traceback__ ) except TypeError: # Python >= 3.10 parts = traceback.format_exception(exception) formatted_exception = "".join(parts) with open(path_log, "w", encoding="utf-8") as file: file.write( f"Exception for work {work_name}, key {key}:\n" + formatted_exception )
def _init_num_expected_results(self): if hasattr(self.topology, "series"): self._init_num_expected_results_series() else: self._init_num_expected_results_first_queue() def _init_num_expected_results_series(self): if self.topology.how_saving == "complete" and hasattr( self.topology, "compute_indices_to_be_computed" ): self._indices_to_be_computed = ( self.topology.compute_indices_to_be_computed() ) self.num_expected_results = len(self._indices_to_be_computed) else: series = self.topology.series self.num_expected_results = len( range(series.ind_start, series.ind_stop, series.ind_step) ) def _init_num_expected_results_first_queue(self): first_work = self.topology.works[0] if first_work.input_queue is not None: raise NotImplementedError if isinstance(first_work.output_queue, tuple): raise NotImplementedError # fill the first queue first_work.func_or_cls( input_queue=None, output_queue=first_work.output_queue ) self._first_queue = copy.copy(first_work.output_queue) # split the first queue self._keys_first_queue = list(self._first_queue.keys()) self.num_expected_results = len(self._keys_first_queue)
[docs]class MultiExecutorBase(ExecutorBase): """Manage the multi-executor mode This class is not the one whose really compute the topology. The topology is split and each slice is computed with an ExecutorAsync Parameters ---------- nb_max_workers : None, int Limits the numbers of workers working in the same time. nb_items_queue_max : None, int Limits the numbers of items that can be in a output_queue. sleep_time : None, float Defines the waiting time (from trio.sleep) of a function. Async functions await `trio.sleep(sleep_time)` when they have done a work on an item, and when there is nothing in their input_queue. """ def __init__( self, topology, path_dir_result, nb_max_workers=None, nb_items_queue_max=None, sleep_time=0.01, logging_level="info", stop_if_error=False, ): if stop_if_error: raise NotImplementedError super().__init__( topology, path_dir_result, nb_max_workers, nb_items_queue_max, logging_level=logging_level, ) self.sleep_time = sleep_time self.nb_processes = self.nb_max_workers self.processes = [] # to avoid a pylint warning self.log_paths = None def _init_log_path(self): super()._init_log_path() path_dir_log = self.path_dir_exceptions path_dir_log.mkdir(exist_ok=True) self._log_path = path_dir_log / (path_dir_log.name + ".txt")
[docs] @abstractmethod def _start_processes(self): """Start the processes doing the hard work"""
[docs] def compute(self): """Compute the topology.""" self._init_compute() if self.num_expected_results == 0: print("Nothing to be computed") return self.log_paths = [] if sys.platform != "win32": def handler_signals(signal_number, stack): del stack print( f"signal {signal_number} received: set _has_to_stop to True " f"({type(self).__name__})." ) self._has_to_stop = True for process in self.processes: os.kill(process.pid, signal_number) signal.signal(12, handler_signals) self._start_processes() self._wait_for_all_processes() self._finalize_compute()
def _poll_return_code(self, process): return process.poll()
[docs] def _join_processes(self): """Join the processes"""
def _wait_for_all_processes(self): running_processes = { idx: process for idx, process in enumerate(self.processes) } running_processes_updated = {} return_codes = {} errors = {} num_results_vs_idx_process = [0 for idx in range(len(self.processes))] paths_len_results = [ self.path_job_data / f"len_results_{idx:03}.txt" for idx in range(len(self.processes)) ] num_results = num_results_previous = 0 console = Console(file=sys.__stdout__) with Progress(console=console) as progress: progress_task = progress.add_task( "[green]Computation", total=self.num_expected_results ) while running_processes: sleep(0.2) for idx, process in running_processes.items(): ret_code = self._poll_return_code(process) if ret_code is None: running_processes_updated[idx] = process else: return_codes[idx] = ret_code if ret_code != 0: try: error = process.stderr.read() except AttributeError: error = f"{ret_code = }" errors[idx] = error logger.error(error) if paths_len_results[idx].exists(): with open( paths_len_results[idx], encoding="utf-8" ) as file: content = file.read() if content: num_results_vs_idx_process[idx] = int(content) num_results = sum(num_results_vs_idx_process) if num_results != num_results_previous: if num_results_previous == 0: print(f"{time_as_str(2)}: first result detected") num_results_previous = num_results progress.update(progress_task, completed=num_results) running_processes, running_processes_updated = ( running_processes_updated, running_processes, ) running_processes_updated.clear() self._join_processes() if errors: raise RuntimeError( f"{len(errors)} sub-executors failed (over {len(self.processes)} processes)." ) def _finalize_compute(self): self.topology.results = results = [] for path in self.path_job_data.glob("results_*.txt"): with open(path, encoding="utf-8") as file: results.extend(line.strip() for line in file.readlines()) super()._finalize_compute()