tud_lbm.pipeline.parallel_runner

Parallel simulation runner for parameter sweeps.

Executes multiple SimulationConfig objects in parallel using process pooling, with progress tracking and error aggregation.

Example usage:

from config.adapter_toml import TomlAdapter
from config.array_expansion import expand_config
from runner.parallel_runner import run_parallel_simulations

adapter = TomlAdapter()
config_dict = adapter.load_raw("config_parallel.toml")
configs, metadata = expand_config(config_dict)

results = run_parallel_simulations(
    configs,
    max_workers=4,
    verbose=True,
)

for result in results:
    print(f"Simulation {result.index}: {result.status}")

Classes

SimulationResult

Result of a single simulation execution.

Functions

run_single_simulation(→ SimulationResult)

Execute a single simulation in a worker process.

run_parallel_simulations(→ list[SimulationResult])

Execute multiple simulations in parallel.

print_result_line(→ None)

Print a single progress line for a completed simulation.

generate_plots(→ None)

Generate plots for all successful simulations that request them.

save_sweep_log(→ None)

Save a JSON manifest describing the parameter sweep.

Module Contents

class tud_lbm.pipeline.parallel_runner.SimulationResult[source]

Result of a single simulation execution.

index[source]

Position in the parameter sweep (0-based).

config[source]

The SimulationConfig that was run.

status[source]

One of "success", "failed", "skipped".

output_dir[source]

Path to results directory (if successful).

parameters[source]

Dict of parameter values for this simulation (if array sweep).

error[source]

Exception message (if status == “failed”).

duration[source]

Elapsed time in seconds (if completed).

index: int[source]
config: config.simulation_config.SimulationConfig[source]
status: str[source]
output_dir: str | None = None[source]
parameters: dict[str, Any] | None = None[source]
error: str | None = None[source]
duration: float = 0.0[source]
tud_lbm.pipeline.parallel_runner.run_single_simulation(index: int, config: config.simulation_config.SimulationConfig, parameters: dict[str, Any] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) SimulationResult[source]

Execute a single simulation in a worker process.

This function is pickled and executed in a separate process pool.

Parameters:
  • index – Position in the sweep (for identification).

  • config – The SimulationConfig to execute.

  • parameters – Dict of parameter values (used for logging/naming).

  • setup_fn – Callable that builds setup from config; defaults to setup.simulation_setup.build_setup().

  • run_fn – Callable that runs the simulation; defaults to the functional runner from runner.run.

Returns:

SimulationResult with status and output path.

tud_lbm.pipeline.parallel_runner.run_parallel_simulations(configs: list[config.simulation_config.SimulationConfig], *, max_workers: int | None = None, parameters_list: list[dict[str, Any]] | None = None, verbose: bool = False, continue_on_error: bool = True, progress_callback: collections.abc.Callable[[int, int], None] | None = None, setup_fn: collections.abc.Callable | None = None, run_fn: collections.abc.Callable | None = None) list[SimulationResult][source]

Execute multiple simulations in parallel.

Parameters:
  • configs – List of SimulationConfig objects to execute.

  • max_workers – Max number of worker processes. Defaults to CPU count.

  • parameters_list – Optional list of dicts mapping parameter names to values (for sweep identification/logging). Should match length of configs.

  • verbose – If True, print progress and status updates.

  • continue_on_error – If False, stop execution on first failure.

  • progress_callback – Optional callable(completed, total) for progress tracking.

  • setup_fn – Optional custom setup function (default: build_setup).

  • run_fn – Optional custom run function (default: run).

Returns:

List of SimulationResult objects, one per config.

Raises:

RuntimeError – If any simulation fails and continue_on_error is False.

tud_lbm.pipeline.parallel_runner.print_result_line(result: SimulationResult, completed: int, total: int) None[source]

Print a single progress line for a completed simulation.

tud_lbm.pipeline.parallel_runner.generate_plots(results: list[SimulationResult], *, verbose: bool) None[source]

Generate plots for all successful simulations that request them.

tud_lbm.pipeline.parallel_runner.save_sweep_log(results: list[SimulationResult], output_dir: str | pathlib.Path) None[source]

Save a JSON manifest describing the parameter sweep.

Parameters:
  • results – List of SimulationResult objects.

  • output_dir – Directory to save sweep_manifest.json.