Source code for fluidimage.executors.exec_async_seq_for_multi

"""Sequential executor for multi executor

.. autoclass:: ExecutorAsyncSeqForMulti
   :members:
   :private-members:

"""

import sys
from pathlib import Path
from time import time

import trio

from fluiddyn import time_as_str

from .exec_async_sequential import ExecutorAsyncSequential


[docs]class ExecutorAsyncSeqForMulti(ExecutorAsyncSequential): """Slightly modified ExecutorAsyncSequential""" def __init__( self, topology, path_dir_result, nb_max_workers=1, sleep_time=0.01, logging_level="info", stop_if_error=False, path_log=None, t_start=None, index_process=None, ): if stop_if_error: raise NotImplementedError( "stop_if_error not implemented for ExecutorAsyncForMulti" ) self._log_path = path_log super().__init__( topology, path_dir_result, nb_max_workers=nb_max_workers, nb_items_queue_max=8, sleep_time=sleep_time, logging_level=logging_level, path_log=path_log, ) self.t_start = t_start self.index_process = index_process # No need to correctly set num_expected_results for this class self.num_expected_results = None if hasattr(self.topology, "results"): self.async_funcs["_save_topology_results"] = ( self._save_topology_results ) path_log_dir = Path(self._log_path).parent path_job_data = path_log_dir.with_name("job" + path_log_dir.name[3:]) self._path_results = ( path_job_data / f"results_{self.index_process:03}.txt" ) self._path_num_results = ( self._path_results.parent / f"len_results_{self.index_process:03}.txt" ) self._len_saved_results = 0 sys.stdout = self._log_file def _get_file_object_for_logger(self): return self._log_file def _init_log_path(self): self.path_dir_exceptions = Path(self._log_path).parent def _init_compute(self): self.time_start_str = time_as_str() self._init_compute_log() if hasattr(self, "_path_results"): self._path_results.touch() with open(self._path_num_results, "w", encoding="utf-8") as file: file.write("0\n") def _init_num_expected_results(self): pass def _finalize_compute(self): self._reset_std_as_default() txt = self.topology.make_text_at_exit(time() - self.t_start) with open(self._log_path, "a", encoding="utf-8") as file: file.write(txt) if hasattr(self.topology, "results"): self._save_results_names() def _save_results_names(self): new_results = self.topology.results[self._len_saved_results :] self._len_saved_results = len(self.topology.results) with open(self._path_num_results, "w", encoding="utf-8") as file: file.write(f"{self._len_saved_results}\n") if new_results: if isinstance(new_results[0], str): new_results = [Path(path).name for path in new_results] elif hasattr(new_results[0], "name"): new_results = [_r.name for _r in new_results] else: new_results = [str(_r) for _r in new_results] new_results = "\n".join(new_results) + "\n" with open(self._path_results, "a", encoding="utf-8") as file: file.write(new_results) if not self._log_file.closed: self._log_file.flush() async def _save_topology_results(self): while not self._has_to_stop: self._save_results_names() await trio.sleep(1.0)
Executor = ExecutorAsyncSeqForMulti