tud_lbm.pipeline.parallel_runner ================================ .. py:module:: tud_lbm.pipeline.parallel_runner .. autoapi-nested-parse:: Parallel simulation runner for parameter sweeps. Executes multiple SimulationConfig objects in parallel using process pooling, with progress tracking and error aggregation. Example usage:: from config.adapter_toml import TomlAdapter from config.array_expansion import expand_config from runner.parallel_runner import run_parallel_simulations adapter = TomlAdapter() config_dict = adapter.load_raw("config_parallel.toml") configs, metadata = expand_config(config_dict) results = run_parallel_simulations( configs, max_workers=4, verbose=True, ) for result in results: print(f"Simulation {result.index}: {result.status}") Classes ------- .. autoapisummary:: tud_lbm.pipeline.parallel_runner.SimulationResult Functions --------- .. autoapisummary:: tud_lbm.pipeline.parallel_runner.run_single_simulation tud_lbm.pipeline.parallel_runner.run_parallel_simulations tud_lbm.pipeline.parallel_runner.print_result_line tud_lbm.pipeline.parallel_runner.generate_plots tud_lbm.pipeline.parallel_runner.save_sweep_log Module Contents --------------- .. py:class:: SimulationResult Result of a single simulation execution. .. attribute:: index Position in the parameter sweep (0-based). .. attribute:: config The :class:`SimulationConfig` that was run. .. attribute:: status One of ``"success"``, ``"failed"``, ``"skipped"``. .. attribute:: output_dir Path to results directory (if successful). .. attribute:: parameters Dict of parameter values for this simulation (if array sweep). .. attribute:: error Exception message (if status == "failed"). .. attribute:: duration Elapsed time in seconds (if completed). .. py:attribute:: index :type: int .. py:attribute:: config :type: config.simulation_config.SimulationConfig .. py:attribute:: status :type: str .. py:attribute:: output_dir :type: str | None :value: None .. py:attribute:: parameters :type: dict[str, Any] | None :value: None .. py:attribute:: error :type: str | None :value: None .. py:attribute:: duration :type: float :value: 0.0 .. py:function:: run_single_simulation(index: int, config: config.simulation_config.SimulationConfig, parameters: dict[str, Any] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) -> SimulationResult Execute a single simulation in a worker process. This function is pickled and executed in a separate process pool. :param index: Position in the sweep (for identification). :param config: The :class:`SimulationConfig` to execute. :param parameters: Dict of parameter values (used for logging/naming). :param setup_fn: Callable that builds setup from config; defaults to :func:`setup.simulation_setup.build_setup`. :param run_fn: Callable that runs the simulation; defaults to the functional runner from :mod:`runner.run`. :returns: :class:`SimulationResult` with status and output path. .. py:function:: run_parallel_simulations(configs: list[config.simulation_config.SimulationConfig], *, max_workers: int | None = None, parameters_list: list[dict[str, Any]] | None = None, verbose: bool = False, continue_on_error: bool = True, progress_callback: collections.abc.Callable[[int, int], None] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) -> list[SimulationResult] Execute multiple simulations in parallel. :param configs: List of :class:`SimulationConfig` objects to execute. :param max_workers: Max number of worker processes. Defaults to CPU count. :param parameters_list: Optional list of dicts mapping parameter names to values (for sweep identification/logging). Should match length of *configs*. :param verbose: If True, print progress and status updates. :param continue_on_error: If False, stop execution on first failure. :param progress_callback: Optional callable(completed, total) for progress tracking. :param setup_fn: Optional custom setup function (default: build_setup). :param run_fn: Optional custom run function (default: run). :returns: List of :class:`SimulationResult` objects, one per config. :raises RuntimeError: If any simulation fails and *continue_on_error* is False. .. py:function:: print_result_line(result: SimulationResult, completed: int, total: int) -> None Print a single progress line for a completed simulation. .. py:function:: generate_plots(results: list[SimulationResult], *, verbose: bool) -> None Generate plots for all successful simulations that request them. .. py:function:: save_sweep_log(results: list[SimulationResult], output_dir: str | pathlib.Path) -> None Save a JSON manifest describing the parameter sweep. :param results: List of :class:`SimulationResult` objects. :param output_dir: Directory to save ``sweep_manifest.json``.