tud_lbm.pipeline.parallel_runner
Parallel simulation runner for parameter sweeps.
Executes multiple SimulationConfig objects in parallel using process pooling, with progress tracking and error aggregation.
Example usage:
from config.adapter_toml import TomlAdapter
from config.array_expansion import expand_config
from runner.parallel_runner import run_parallel_simulations
adapter = TomlAdapter()
config_dict = adapter.load_raw("config_parallel.toml")
configs, metadata = expand_config(config_dict)
results = run_parallel_simulations(
configs,
max_workers=4,
verbose=True,
)
for result in results:
print(f"Simulation {result.index}: {result.status}")
Classes
Result of a single simulation execution. |
Functions
|
Execute a single simulation in a worker process. |
|
Execute multiple simulations in parallel. |
|
Print a single progress line for a completed simulation. |
|
Generate plots for all successful simulations that request them. |
|
Save a JSON manifest describing the parameter sweep. |
Module Contents
- class tud_lbm.pipeline.parallel_runner.SimulationResult[source]
Result of a single simulation execution.
- tud_lbm.pipeline.parallel_runner.run_single_simulation(index: int, config: config.simulation_config.SimulationConfig, parameters: dict[str, Any] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) SimulationResult[source]
Execute a single simulation in a worker process.
This function is pickled and executed in a separate process pool.
- Parameters:
index – Position in the sweep (for identification).
config – The
SimulationConfigto execute.parameters – Dict of parameter values (used for logging/naming).
setup_fn – Callable that builds setup from config; defaults to
setup.simulation_setup.build_setup().run_fn – Callable that runs the simulation; defaults to the functional runner from
runner.run.
- Returns:
SimulationResultwith status and output path.
- tud_lbm.pipeline.parallel_runner.run_parallel_simulations(configs: list[config.simulation_config.SimulationConfig], *, max_workers: int | None = None, parameters_list: list[dict[str, Any]] | None = None, verbose: bool = False, continue_on_error: bool = True, progress_callback: collections.abc.Callable[[int, int], None] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) list[SimulationResult][source]
Execute multiple simulations in parallel.
- Parameters:
configs – List of
SimulationConfigobjects to execute.max_workers – Max number of worker processes. Defaults to CPU count.
parameters_list – Optional list of dicts mapping parameter names to values (for sweep identification/logging). Should match length of configs.
verbose – If True, print progress and status updates.
continue_on_error – If False, stop execution on first failure.
progress_callback – Optional callable(completed, total) for progress tracking.
setup_fn – Optional custom setup function (default: build_setup).
run_fn – Optional custom run function (default: run).
- Returns:
List of
SimulationResultobjects, one per config.- Raises:
RuntimeError – If any simulation fails and continue_on_error is False.
- tud_lbm.pipeline.parallel_runner.print_result_line(result: SimulationResult, completed: int, total: int) None[source]
Print a single progress line for a completed simulation.
- tud_lbm.pipeline.parallel_runner.generate_plots(results: list[SimulationResult], *, verbose: bool) None[source]
Generate plots for all successful simulations that request them.
- tud_lbm.pipeline.parallel_runner.save_sweep_log(results: list[SimulationResult], output_dir: str | pathlib.Path) None[source]
Save a JSON manifest describing the parameter sweep.
- Parameters:
results – List of
SimulationResultobjects.output_dir – Directory to save
sweep_manifest.json.