"""Write a human-readable physical parameter overview to disk.
Generates ``physical_parameters.txt`` in the run directory whenever a
:class:`~tud_lbm.io.SimulationIO` is created with a config. Unlike the
saved TOML (machine-readable, round-trippable), this file is intended to
be read directly by a human and includes derived quantities such as
kinematic viscosity.
Public API::
from tud_lbm.io.physical_parameters import write_physical_parameters
write_physical_parameters(config, "/path/to/run/physical_parameters.txt")
"""
from __future__ import annotations
import math
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from tud_lbm.config.simulation_config import SimulationConfig
_CS2 = 1.0 / 3.0 # Speed of sound squared for D2Q9/D3Q19
# Prefix remaps for analysing runs downloaded off DelftBlue: data stored under
# /scratch/user_cluster/LBM/ on the cluster lives under /Users/local_user/ locally.
_INIT_PATH_REMAPS: tuple[tuple[str, str], ...] = (
("/scratch/sbszkudlarek/LBM/26_TUD_LBM/TUD_LBM_data/", "/Users/sbszkudlarek/TUD_LBM_data/DB/"),
)
def _resolve_npz_path(path: str | None) -> str | None:
"""Return an existing path for an init NPZ, remapping known prefixes.
Falls back to the remapped location when the literal path is absent, so
downloaded DelftBlue runs read their real init file instead of defaulting
to the nominal config radius. Returns None when no existing file is found.
"""
if not path:
return None
if Path(path).exists():
return path
for old, new in _INIT_PATH_REMAPS:
if path.startswith(old):
remapped = new + path[len(old) :]
if Path(remapped).exists():
return remapped
return None
def _nu(tau: float) -> float:
"""Kinematic viscosity from relaxation time: nu = cs2 * (tau - 0.5)."""
return _CS2 * (tau - 0.5)
def _section(title: str) -> str:
return f"\n{title}\n" + "-" * len(title)
def _row(label: str, value: object, indent: int = 2) -> str:
pad = " " * indent
return f"{pad}{label:<26}{value}"
def _add_simulation_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Simulation"))
if config.simulation_name:
lines.append(_row("Name:", config.simulation_name))
lines.append(_row("Type:", config.sim_type))
lines.append(_row("Lattice:", config.lattice_type))
def _add_grid_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Grid"))
shape = config.grid_shape
display_shape = shape[:2] if shape[2] == 1 else shape
lines.append(_row("Shape:", " x ".join(str(n) for n in display_shape)))
lines.append(_row("Timesteps:", config.nt))
lines.append(_row("Save interval:", config.save_interval))
def _add_collision_section(lines: list[str], config: SimulationConfig) -> None:
lines.append(_section("Collision"))
tau = config.tau
lines.append(_row("Collision scheme:", config.collision_scheme))
lines.append(_row("tau:", tau))
lines.append(_row("nu (kinematic viscosity):", f"{_nu(tau):.6g} [cs2*(tau-0.5)]"))
def _get_setup_contact_line_length(config: SimulationConfig) -> float | None:
"""Calculate the distance between the two contact lines at setup."""
if config.init_type == "init_from_file":
return _get_contact_line_length_from_file(config)
init = config.initialisation
if not init or not isinstance(init, dict):
return None
try:
centres = init.get("centres", [])
radii = init.get("radii", [])
if not centres or not radii:
return None
nx = float(config.grid_shape[0])
ny = float(config.grid_shape[1])
min_dim = min(nx, ny)
fx, fy = float(centres[0][0]), float(centres[0][1])
r = float(radii[0])
_r = r * min_dim
# Compute distance to the closest bounding wall (0, nx) or (0, ny)
dist_x = min(fx * nx, (1.0 - fx) * nx)
dist_y = min(fy * ny, (1.0 - fy) * ny)
wall_dist = min(dist_x, dist_y)
val = _r**2 - wall_dist**2
if val > 0:
return 2.0 * (val**0.5)
except (IndexError, ValueError, TypeError):
pass
return None
def _contact_line_length_from_rho(rho: np.ndarray, rho_mean: float) -> float | None:
"""Return setup contact-line spacing from a rho field using wall-row transitions."""
try:
row = np.asarray(rho[:, 0, 0, 0, 0], dtype=float)
mask = (row < float(rho_mean)).astype(np.int32)
diff = np.diff(mask)
left_hits = np.nonzero(diff == -1)[0]
right_hits = np.nonzero(diff == 1)[0]
if left_hits.size == 0 or right_hits.size == 0:
return None
idx_left = int(left_hits[0])
idx_right = int(right_hits[-1] + 1)
if idx_left + 1 >= row.size or idx_right - 1 < 0 or idx_right >= row.size:
return None
denom_left = row[idx_left + 1] - row[idx_left]
denom_right = row[idx_right - 1] - row[idx_right]
if math.isclose(denom_left, 0.0) or math.isclose(denom_right, 0.0):
return None
x_left = idx_left + ((rho_mean - row[idx_left]) / denom_left)
x_right = idx_right - ((rho_mean - row[idx_right]) / denom_right)
length = float(x_right - x_left)
if length > 0.0:
return length
return None # noqa: TRY300
except (IndexError, TypeError, ValueError):
return None
def _get_contact_line_length_from_file(config: SimulationConfig) -> float | None:
"""Load rho from NPZ and estimate setup contact-line spacing for init_from_file."""
init = config.initialisation if isinstance(config.initialisation, dict) else {}
npz_path = _resolve_npz_path(config.init_dir or init.get("npz_path"))
if not npz_path:
return None
try:
with np.load(npz_path) as data:
if "rho" not in data:
return None
rho = np.asarray(data["rho"])
if config.rho_l is not None and config.rho_v is not None:
rho_mean = 0.5 * (float(config.rho_l) + float(config.rho_v))
else:
rho_mean = float(np.mean(rho))
return _contact_line_length_from_rho(rho, rho_mean)
except (KeyError, OSError, TypeError, ValueError):
return None
def _ensure_single_gravity_force_source(config: SimulationConfig) -> None:
"""Reject configs that define both gravity force variants simultaneously."""
if config.gravity_force is not None and config.gravity_masked_force is not None:
msg = "Only one gravity force can be applied: set either gravity_force or gravity_masked_force, not both."
raise ValueError(msg)
def _resolve_gravity_value(config: SimulationConfig) -> float | None:
"""Resolve gravity from config.g or known force dictionaries."""
_ensure_single_gravity_force_source(config)
if config.g is not None:
return float(config.g)
for force_name in ("gravity_force", "gravity_masked_force"):
force_dict = getattr(config, force_name, None)
if force_dict and isinstance(force_dict, dict) and "force_g" in force_dict:
return float(force_dict["force_g"])
return None
def _resolve_gravity_inclination(config: SimulationConfig) -> float:
"""Return inclination_angle_deg from the active force config, or 0.0."""
_ensure_single_gravity_force_source(config)
for force_name in ("gravity_force", "gravity_masked_force"):
force_dict = getattr(config, force_name, None)
if force_dict and isinstance(force_dict, dict):
return float(force_dict.get("inclination_angle_deg", 0.0))
return 0.0
def _derive_multiphase_parameters(config: SimulationConfig) -> tuple[float, float] | None:
"""Return (delta_rho_phases, gamma) when multiphase parameters are available and valid."""
has_params = all(x is not None for x in (config.kappa, config.interface_width, config.rho_l, config.rho_v))
if not has_params or config.interface_width == 0:
return None
drho = float(config.rho_l) - float(config.rho_v)
gamma = (2.0 / 3.0) * (float(config.kappa) / float(config.interface_width)) * (drho**2)
return drho, gamma
def _resolve_length_for_dimensionless_numbers(config: SimulationConfig) -> tuple[float, str]:
"""Resolve shared length scale and annotation for Oh/Bo rows."""
cl_length = _get_setup_contact_line_length(config)
if cl_length is not None:
if config.init_type == "init_from_file":
return cl_length, f"L={cl_length:.4g} (init_from_file)"
return cl_length, f"L={cl_length:.4g} (contact line)"
length = float(config.grid_shape[0])
return length, f"L={length} (grid_x)"
def _format_ohnesorge_number_row(config: SimulationConfig, gamma: float, length: float, length_label: str) -> str:
"""Build Ohnesorge-number row from lattice kinematic viscosity."""
nu = _nu(float(config.tau))
oh = nu / (gamma * length * (config.rho_l)) ** 0.5
return _row("Oh (Ohnesorge number):", f"{oh:.6g} [ν√(ρ_l/(γL)), {length_label}]")
def _format_bond_number_row(
delta_rho_phases: float,
gamma: float,
g_val: float,
length: float,
length_label: str,
angle_deg: float = 0.0,
) -> list[str]:
"""Build Bond-number row(s) from shared length scale."""
bo = (delta_rho_phases * (length**2) * g_val) / gamma
angle_rad = math.radians(angle_deg)
bo_normal = bo * math.cos(angle_rad)
bo_tangential = bo * math.sin(angle_rad)
return [
_row("Bo (Bond number):", f"{bo:.6g} [(ΔρgL²)/γ, {length_label}]"),
_row("Bo_perp (Bond normal):", f"{bo_normal:.6g} [(Δρ*g*cos({angle_deg:.4g}deg)*L^2)/gamma, {length_label}]"),
_row(
"Bo_parallel (Bond tangential):",
f"{bo_tangential:.6g} [(Δρ*g*sin({angle_deg:.4g}deg)*L^2)/gamma, {length_label}]",
),
]
def _format_archimedes_number_row(
drho: float, g_val: float, length: float, length_label: str, nu: float, rho_l: float
) -> str:
"""Build Archimedes-number row: Ar = gL^3Δρ / (ν^2ρ_l)."""
ar = (g_val * (length**3) * drho) / ((nu**2) * rho_l)
return _row("Ar (Archimedes number):", f"{ar:.6g} [gL³Δρ/(ν²ρ_l), {length_label}]")
def _add_multiphase_section(lines: list[str], config: SimulationConfig) -> None:
if "multiphase" not in config.sim_type:
return
lines.append(_section("Multiphase"))
lines.append(_row("EOS:", config.eos or "-"))
lines.append(_row("kappa:", config.kappa))
lines.append(_row("rho_liquid:", config.rho_l))
lines.append(_row("rho_vapour:", config.rho_v))
lines.append(_row("Interface width:", config.interface_width))
if config.g is not None:
lines.append(_row("g (gravity):", config.g))
derived = _derive_multiphase_parameters(config)
if derived is None:
return
drho, gamma = derived
lines.append(_row("gamma (surface tension):", f"{gamma:.6g} [2/3(κ/W)(Δρ)²]"))
length, length_label = _resolve_length_for_dimensionless_numbers(config)
lines.append(_format_ohnesorge_number_row(config, gamma, length, length_label))
g_val = _resolve_gravity_value(config)
if g_val is None:
return
angle_deg = _resolve_gravity_inclination(config)
lines.extend(_format_bond_number_row(drho, gamma, g_val, length, length_label, angle_deg))
nu = _nu(float(config.tau))
lines.append(_format_archimedes_number_row(drho, g_val, length, length_label, nu, float(config.rho_l)))
def _add_key_value_section(lines: list[str], title: str, values: dict | None) -> None:
if not values:
return
lines.append(_section(title))
for k, v in values.items():
lines.append(_row(f"{k}:", v))
def _add_forces_section(lines: list[str], config: SimulationConfig) -> None:
force_fields = [
f for f in ("gravity_force", "electric_force", "gravity_masked_force") if getattr(config, f, None) is not None
]
if not force_fields:
return
lines.append(_section("Forces"))
for fname in force_fields:
lines.append(f" {fname}:")
for k, v in getattr(config, fname).items():
lines.append(_row(f"{k}:", v, indent=4))
[docs]
def build_overview(config: SimulationConfig) -> str:
"""Return the full physical parameter overview as a string."""
lines: list[str] = []
sep = "=" * 72
lines += [
sep,
"PHYSICAL PARAMETER OVERVIEW",
f"Generated : {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
sep,
]
_add_simulation_section(lines, config)
_add_grid_section(lines, config)
_add_collision_section(lines, config)
_add_multiphase_section(lines, config)
_add_key_value_section(lines, "Boundary Conditions", config.bc_config)
_add_key_value_section(lines, "Wetting", config.wetting_config)
_add_key_value_section(lines, "Hysteresis", config.hysteresis_config)
_add_key_value_section(lines, "Chemical Step", config.chemical_step_config)
_add_forces_section(lines, config)
lines.append("\n" + sep)
return "\n".join(lines) + "\n"
[docs]
def write_physical_parameters(config: SimulationConfig, path: str | Path) -> None:
"""Write ``physical_parameters.txt`` to *path*.
Args:
config: Validated :class:`~tud_lbm.config.simulation_config.SimulationConfig`.
path: Destination file path (typically ``<run_dir>/physical_parameters.txt``).
"""
dest = Path(path)
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(build_overview(config), encoding="utf-8")