Source code for fluidimage.executors.multi_exec_subproc

"""Multi executor based on subprocesses

.. autoclass:: MultiExecutorSubproc
   :members:
   :private-members:

"""

import subprocess
import sys
from copy import deepcopy
from time import sleep

from fluiddyn import time_as_str
from fluidimage.topologies.splitters import Splitter
from fluidimage.util import logger

from .base import MultiExecutorBase


[docs]class MultiExecutorSubproc(MultiExecutorBase): """Multi executor based on subprocesses and splitters""" splitter: Splitter def _init_num_expected_results(self): try: splitter_cls = self.topology.Splitter except AttributeError as error: raise ValueError( "MultiExecutorSubproc can only execute " "topologies with a Splitter." ) from error params = deepcopy(self.topology.params) try: params._set_child( "compute_kwargs", attribs={ "executor": "exec_async_seq_for_multi", "nb_max_workers": 1, }, ) except ValueError: params.compute_kwargs.executor = "exec_async_seq_for_multi" params.compute_kwargs.nb_max_workers = 1 try: params.compute_kwargs._set_child( "kwargs_executor", attribs={ "path_log": None, "t_start": self.t_start, "index_process": None, }, ) except ValueError: params.compute_kwargs.kwargs_executor.t_start = self.t_start if hasattr(self.topology, "how_saving"): params.saving.how = self.topology.how_saving if hasattr(self.topology, ""): params.saving.path = self.topology.path_dir_result self.splitter = splitter_cls( params, self.nb_processes, self.topology, self._indices_to_be_computed ) self.num_expected_results = self.splitter.num_expected_results
[docs] def _start_processes(self): splitter = self.splitter path_dir_params = ( self.path_dir_result / f"params_files_{self._unique_postfix}" ) path_dir_params.mkdir(exist_ok=True) if ( hasattr(self.topology, "how_saving") and self.topology.how_saving == "complete" and hasattr(splitter, "save_indices_files") ): splitter.save_indices_files(path_dir_params) for index_process, params_split in enumerate( splitter.iter_over_new_params() ): kwargs_executor = params_split.compute_kwargs.kwargs_executor kwargs_executor.path_log = ( self._log_path.parent / f"process_{index_process:03d}.txt" ) kwargs_executor.index_process = index_process path_params = path_dir_params / f"params{index_process:00d}.xml" params_split._save_as_xml(path_params) process = subprocess.Popen( [ sys.executable, "-m", "fluidimage.run_from_xml", str(path_params), ], text=True, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self.processes.append(process) # shift a bit the imports sleep(0.2) logger.info( "%s: %s sequential executors launched in parallel", time_as_str(2), len(self.processes), )
Executor = MultiExecutorSubproc