"""Write a human-readable physical parameter overview to disk.
Generates ``physical_parameters.txt`` in the run directory whenever a
:class:`~tud_lbm.io.SimulationIO` is created with a config. Unlike the
saved TOML (machine-readable, round-trippable), this file is intended to
be read directly by a human and includes derived quantities such as
kinematic viscosity.
Public API::
from tud_lbm.io.physical_parameters import write_physical_parameters
write_physical_parameters(config, "/path/to/run/physical_parameters.txt")
"""
from __future__ import annotations
import math
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from tud_lbm.config.simulation_config import SimulationConfig
_CS2 = 1.0 / 3.0 # Speed of sound squared for D2Q9/D3Q19
def _nu(tau: float) -> float:
"""Kinematic viscosity from relaxation time: nu = cs2 * (tau - 0.5)."""
return _CS2 * (tau - 0.5)
def _section(title: str) -> str:
return f"\n{title}\n" + "-" * len(title)
def _row(label: str, value: object, indent: int = 2) -> str:
pad = " " * indent
return f"{pad}{label:<26}{value}"
def _add_simulation_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Simulation"))
if config.simulation_name:
lines.append(_row("Name:", config.simulation_name))
lines.append(_row("Type:", config.sim_type))
lines.append(_row("Lattice:", config.lattice_type))
def _add_grid_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Grid"))
shape = config.grid_shape
display_shape = shape[:2] if shape[2] == 1 else shape
lines.append(_row("Shape:", " x ".join(str(n) for n in display_shape)))
lines.append(_row("Timesteps:", config.nt))
lines.append(_row("Save interval:", config.save_interval))
def _add_collision_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Collision"))
tau = config.tau
lines.append(_row("Collision scheme:", config.collision_scheme))
lines.append(_row("tau:", tau))
lines.append(_row("nu (kinematic viscosity):", f"{_nu(tau):.6g} [cs2*(tau-0.5)]"))
def _get_setup_contact_line_length(config: SimulationConfig) -> float | None:
"""Calculate the distance between the two contact lines at setup."""
if config.init_type == "init_from_file":
return _get_contact_line_length_from_file(config)
init = config.initialisation
if not init or not isinstance(init, dict):
return None
try:
centres = init.get("centres", [])
radii = init.get("radii", [])
if not centres or not radii:
return None
nx = float(config.grid_shape[0])
ny = float(config.grid_shape[1])
min_dim = min(nx, ny)
fx, fy = float(centres[0][0]), float(centres[0][1])
r = float(radii[0])
_r = r * min_dim
# Compute distance to the closest bounding wall (0, nx) or (0, ny)
dist_x = min(fx * nx, (1.0 - fx) * nx)
dist_y = min(fy * ny, (1.0 - fy) * ny)
wall_dist = min(dist_x, dist_y)
val = _r**2 - wall_dist**2
if val > 0:
return 2.0 * (val**0.5)
except (IndexError, ValueError, TypeError):
pass
return None
def _contact_line_length_from_rho(rho: np.ndarray, rho_mean: float) -> float | None:
"""Return setup contact-line spacing from a rho field using wall-row transitions."""
try:
row = np.asarray(rho[:, 0, 0, 0, 0], dtype=float)
mask = (row < float(rho_mean)).astype(np.int32)
diff = np.diff(mask)
left_hits = np.nonzero(diff == -1)[0]
right_hits = np.nonzero(diff == 1)[0]
if left_hits.size == 0 or right_hits.size == 0:
return None
idx_left = int(left_hits[0])
idx_right = int(right_hits[-1] + 1)
if idx_left + 1 >= row.size or idx_right - 1 < 0 or idx_right >= row.size:
return None
denom_left = row[idx_left + 1] - row[idx_left]
denom_right = row[idx_right - 1] - row[idx_right]
if math.isclose(denom_left, 0.0) or math.isclose(denom_right, 0.0):
return None
x_left = idx_left + ((rho_mean - row[idx_left]) / denom_left)
x_right = idx_right - ((rho_mean - row[idx_right]) / denom_right)
length = float(x_right - x_left)
if length > 0.0:
return length
return None # noqa: TRY300
except (IndexError, TypeError, ValueError):
return None
def _get_contact_line_length_from_file(config: SimulationConfig) -> float | None:
"""Load rho from NPZ and estimate setup contact-line spacing for init_from_file."""
init = config.initialisation if isinstance(config.initialisation, dict) else {}
npz_path = config.init_dir or init.get("npz_path")
if not npz_path:
return None
try:
with np.load(npz_path) as data:
if "rho" not in data:
return None
rho = np.asarray(data["rho"])
if config.rho_l is not None and config.rho_v is not None:
rho_mean = 0.5 * (float(config.rho_l) + float(config.rho_v))
else:
rho_mean = float(np.mean(rho))
return _contact_line_length_from_rho(rho, rho_mean)
except (KeyError, OSError, TypeError, ValueError):
return None
def _ensure_single_gravity_force_source(config: SimulationConfig) -> None:
"""Reject configs that define both gravity force variants simultaneously."""
if config.gravity_force is not None and config.gravity_masked_force is not None:
msg = "Only one gravity force can be applied: set either gravity_force or gravity_masked_force, not both."
raise ValueError(msg)
def _resolve_gravity_value(config: SimulationConfig) -> float | None:
"""Resolve gravity from config.g or known force dictionaries."""
_ensure_single_gravity_force_source(config)
if config.g is not None:
return float(config.g)
for force_name in ("gravity_force", "gravity_masked_force"):
force_dict = getattr(config, force_name, None)
if force_dict and isinstance(force_dict, dict) and "force_g" in force_dict:
return float(force_dict["force_g"])
return None
def _resolve_gravity_inclination(config: SimulationConfig) -> float:
"""Return inclination_angle_deg from the active force config, or 0.0."""
_ensure_single_gravity_force_source(config)
for force_name in ("gravity_force", "gravity_masked_force"):
force_dict = getattr(config, force_name, None)
if force_dict and isinstance(force_dict, dict):
return float(force_dict.get("inclination_angle_deg", 0.0))
return 0.0
def _derive_multiphase_parameters(config: SimulationConfig) -> tuple[float, float] | None:
"""Return (delta_rho_phases, gamma) when multiphase parameters are available and valid."""
has_params = all(x is not None for x in (config.kappa, config.interface_width, config.rho_l, config.rho_v))
if not has_params or config.interface_width == 0:
return None
drho = float(config.rho_l) - float(config.rho_v)
gamma = (2.0 / 3.0) * (float(config.kappa) / float(config.interface_width)) * (drho**2)
return drho, gamma
def _resolve_length_for_dimensionless_numbers(config: SimulationConfig) -> tuple[float, str]:
"""Resolve shared length scale and annotation for Oh/Bo rows."""
cl_length = _get_setup_contact_line_length(config)
if cl_length is not None:
if config.init_type == "init_from_file":
return cl_length, f"L={cl_length:.4g} (init_from_file)"
return cl_length, f"L={cl_length:.4g} (contact line)"
length = float(config.grid_shape[0])
return length, f"L={length} (grid_x)"
def _format_ohnesorge_number_row(config: SimulationConfig, gamma: float, length: float, length_label: str) -> str:
"""Build Ohnesorge-number row from lattice kinematic viscosity."""
nu = _nu(float(config.tau))
oh = nu * (float(config.rho_l) / (gamma * length)) ** 0.5
return _row("Oh (Ohnesorge number):", f"{oh:.6g} [ν√(ρ_l/(γL)), {length_label}]")
def _format_bond_number_row(
delta_rho_phases: float,
gamma: float,
g_val: float,
length: float,
length_label: str,
angle_deg: float = 0.0,
) -> list[str]:
"""Build Bond-number row(s) from shared length scale."""
bo = (delta_rho_phases * (length**2) * g_val) / gamma
angle_rad = math.radians(angle_deg)
bo_normal = bo * math.cos(angle_rad)
bo_tangential = bo * math.sin(angle_rad)
return [
_row("Bo (Bond number):", f"{bo:.6g} [(ΔρgL²)/γ, {length_label}]"),
_row("Bo_perp (Bond normal):", f"{bo_normal:.6g} [(Δρ*g*cos({angle_deg:.4g}deg)*L^2)/gamma, {length_label}]"),
_row(
"Bo_parallel (Bond tangential):",
f"{bo_tangential:.6g} [(Δρ*g*sin({angle_deg:.4g}deg)*L^2)/gamma, {length_label}]",
),
]
def _format_archimedes_number_row(
drho: float, g_val: float, length: float, length_label: str, nu: float, rho_l: float
) -> str:
"""Build Archimedes-number row: Ar = gL^3Δρ / (ν^2ρ_l)."""
ar = (g_val * (length**3) * drho) / ((nu**2) * rho_l)
return _row("Ar (Archimedes number):", f"{ar:.6g} [gL³Δρ/(ν²ρ_l), {length_label}]")
def _add_multiphase_section(lines: list[str], config: SimulationConfig) -> None:
if "multiphase" not in config.sim_type:
return
lines.append(_section("Multiphase"))
lines.append(_row("EOS:", config.eos or "-"))
lines.append(_row("kappa:", config.kappa))
lines.append(_row("rho_liquid:", config.rho_l))
lines.append(_row("rho_vapour:", config.rho_v))
lines.append(_row("Interface width:", config.interface_width))
if config.g is not None:
lines.append(_row("g (gravity):", config.g))
derived = _derive_multiphase_parameters(config)
if derived is None:
return
drho, gamma = derived
lines.append(_row("gamma (surface tension):", f"{gamma:.6g} [2/3(κ/W)(Δρ)²]"))
length, length_label = _resolve_length_for_dimensionless_numbers(config)
lines.append(_format_ohnesorge_number_row(config, gamma, length, length_label))
g_val = _resolve_gravity_value(config)
if g_val is None:
return
angle_deg = _resolve_gravity_inclination(config)
lines.extend(_format_bond_number_row(drho, gamma, g_val, length, length_label, angle_deg))
nu = _nu(float(config.tau))
lines.append(_format_archimedes_number_row(drho, g_val, length, length_label, nu, float(config.rho_l)))
def _add_key_value_section(lines: list[str], title: str, values: dict | None) -> None:
if not values:
return
lines.append(_section(title))
for k, v in values.items():
lines.append(_row(f"{k}:", v))
def _add_forces_section(lines: list[str], config: SimulationConfig) -> None:
force_fields = [
f for f in ("gravity_force", "electric_force", "gravity_masked_force") if getattr(config, f, None) is not None
]
if not force_fields:
return
lines.append(_section("Forces"))
for fname in force_fields:
lines.append(f" {fname}:")
for k, v in getattr(config, fname).items():
lines.append(_row(f"{k}:", v, indent=4))
[docs]
def build_overview(config: SimulationConfig) -> str:
"""Return the full physical parameter overview as a string."""
lines: list[str] = []
sep = "=" * 72
lines += [
sep,
"PHYSICAL PARAMETER OVERVIEW",
f"Generated : {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
sep,
]
_add_simulation_section(lines, config)
_add_grid_section(lines, config)
_add_collision_section(lines, config)
_add_multiphase_section(lines, config)
_add_key_value_section(lines, "Boundary Conditions", config.bc_config)
_add_key_value_section(lines, "Wetting", config.wetting_config)
_add_key_value_section(lines, "Hysteresis", config.hysteresis_config)
_add_key_value_section(lines, "Chemical Step", config.chemical_step_config)
_add_forces_section(lines, config)
lines.append("\n" + sep)
return "\n".join(lines) + "\n"
[docs]
def write_physical_parameters(config: SimulationConfig, path: str | Path) -> None:
"""Write ``physical_parameters.txt`` to *path*.
Args:
config: Validated :class:`~tud_lbm.config.simulation_config.SimulationConfig`.
path: Destination file path (typically ``<run_dir>/physical_parameters.txt``).
"""
dest = Path(path)
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(build_overview(config), encoding="utf-8")