tud_lbm.operators.protocols
Protocol (structural) types for LBM operators.
These protocols define the contract that each operator category must fulfil. They enable loose coupling: code depending on CollisionOperator can work with any function/class implementing that protocol, without importing the specific implementation.
Design principle: Operator protocols are intentionally minimal — they capture the bare essentials (signatures, docstrings) without dictating implementation details like decorators or registry membership.
Usage:
from operators.protocols import CollisionOperator
from registry import get_operators
collision_ops = get_operators("collision_models")
bgk_fn = collision_ops["bgk"].target
# Static type-checkers and isinstance() will accept bgk_fn
# as a CollisionOperator
def my_collision_logic(collision_op: CollisionOperator):
...
Classes
Collision operator — transforms |
|
Streaming operator — propagates populations along velocity directions. |
|
Equilibrium operator — computes |
|
Macroscopic operator — computes |
|
Boundary-condition operator — applies edge BC rules to populations. |
|
Initialiser operator — creates the initial distribution |
|
Step operator — executes one full LBM time step. |
|
Hysteresis operator — updates wetting state via contact angle hysteresis. |
|
Unified protocol for force operator modules. |
|
Bound initialiser — builds the initial |
|
Bound multiphase trial-step — advances |
|
Marker protocol for JAX-pytree-compatible extra state containers. |
|
Plugin contract for initialising and updating extra |
|
Differential operator — computes spatial derivatives. |
|
Persistence port: writes simulation state and metadata to disk. |
|
Parsing port: reads and validates configuration from external format. |
|
Structural contract for Matplotlib plot operators. |
Module Contents
- class tud_lbm.operators.protocols.CollisionOperator[source]
Bases:
ProtocolCollision operator — transforms
(f, feq, tau) → f_col.The collision step replaces non-conserved moments with their equilibrium values, relaxed toward equilibrium with time scale
tau.Signature:
def collide(f, feq, tau, source=None, ...) -> f_col
- __call__(f: jax.numpy.ndarray, feq: jax.numpy.ndarray, tau: float, source: jax.numpy.ndarray | None = None, **kwargs: Any) jax.numpy.ndarray[source]
Compute post-collision distribution.
- Parameters:
f – Populations, shape
(nx, ny, nz, q, 1).feq – Equilibrium distribution, shape
(nx, ny, nz, q, 1).tau – Relaxation time (> 0.5).
source – Optional forcing source term, shape
(nx, ny, nz, q, 1).**kwargs – Operator-specific parameters.
- Returns:
Post-collision populations, same shape as f.
- class tud_lbm.operators.protocols.StreamingOperator[source]
Bases:
ProtocolStreaming operator — propagates populations along velocity directions.
The streaming step shifts each population component
f_ialong the direction of its lattice velocityc_i, using periodic boundary conditions across the domain (boundary conditions are applied afterward).Signature:
def stream(f, lattice) -> f_streamed
- __call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) jax.numpy.ndarray[source]
Propagate populations across the domain.
- Parameters:
f – Populations, shape
(nx, ny, nz, q, 1).lattice –
Latticewith velocity vectorsc.
- Returns:
Post-streaming populations, same shape as f.
- class tud_lbm.operators.protocols.EquilibriumOperator[source]
Bases:
ProtocolEquilibrium operator — computes
(rho, u, lattice) → feq.The equilibrium distribution is the rest state toward which the collision operator relaxes the system. It encodes the hydrodynamic moment structure.
Signature:
def compute_equilibrium(rho, u, lattice) -> feq
- __call__(rho: jax.numpy.ndarray, u: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) jax.numpy.ndarray[source]
Compute the equilibrium distribution.
- Parameters:
rho – Density field, shape
(nx, ny, nz, 1, 1).u – Velocity field, shape
(nx, ny, nz, 1, d)where d ∈ {2, 3}.lattice –
Latticewith weightswand velocity vectorsc.
- Returns:
Equilibrium distribution
feq, shape(nx, ny, nz, q, 1).
- class tud_lbm.operators.protocols.MacroscopicOperator[source]
Bases:
ProtocolMacroscopic operator — computes
(f, lattice) → (rho, u, ...).Macroscopic fields are the moments of the population distribution, computed via summation over velocity directions.
Signature:
def compute_macroscopic(f, lattice, force=None) -> (rho, u) or (rho, u, force)
- __call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice, force: jax.numpy.ndarray | None = None, **kwargs: Any) tuple[jax.numpy.ndarray, jax.numpy.ndarray] | tuple[jax.numpy.ndarray, jax.numpy.ndarray, jax.numpy.ndarray][source]
Compute density and velocity fields.
- Parameters:
f – Populations, shape
(nx, ny, nz, q, 1).lattice –
Lattice.force – Optional external force field.
**kwargs – Additional keyword arguments.
force – Optional external force field, shape
(nx, ny, nz, 1, d). When provided, velocity is corrected byu ← u + force / (2ρ).
- Returns:
(rho, u)whererho: shape(nx, ny, nz, 1, 1)u: shape(nx, ny, nz, 1, d)
With force:
(rho, u_eq, force)where u_eq includes the force correction.- Return type:
Without force
- class tud_lbm.operators.protocols.BoundaryOperator[source]
Bases:
ProtocolBoundary-condition operator — applies edge BC rules to populations.
Boundary conditions enforce Dirichlet/Neumann constraints or flux periodicity at domain edges. They are applied post-streaming.
Signature:
def apply_bc(f_stream, f_col, bc_masks) -> f_bc
- __call__(f_stream: jax.numpy.ndarray, f_col: jax.numpy.ndarray, bc_masks: Any) jax.numpy.ndarray[source]
Apply boundary conditions to post-streaming populations.
- Parameters:
f_stream – Post-streaming populations.
f_col – Post-collision populations (for symmetry BC).
bc_masks – Pre-computed edge masks from
BCMasks.
- Returns:
Populations with boundary conditions applied.
- class tud_lbm.operators.protocols.InitialiserOperator[source]
Bases:
ProtocolInitialiser operator — creates the initial distribution
f.Initialisation strategies include: - “standard”: rest equilibrium
f_eq(ρ_0, u_0)where ρ_0 = 1, u_0 = 0 - “init_from_file”: load from an NPZ file - Multiphase variants: tanh density profileSignature:
def init_fn(grid_shape, lattice, **kwargs) -> f
- __call__(grid_shape: tuple[int, int, int], lattice: tud_lbm.lattice.lattice.Lattice, **kwargs: Any) jax.numpy.ndarray[source]
Initialise the distribution function.
- Parameters:
grid_shape – Grid dimensions
(nx, ny, nz).lattice –
Lattice.**kwargs – Initialiser-specific keyword arguments (e.g.,
density,rho_l,rho_v,interface_width,npz_path).
- Returns:
Initial population distribution, shape
(nx, ny, nz, q, 1).
- class tud_lbm.operators.protocols.StepOperator[source]
Bases:
ProtocolStep operator — executes one full LBM time step.
The step operator orchestrates the complete LBM algorithm: collision, streaming, boundary conditions, and any physics-specific updates (e.g., hysteresis in multiphase wetting).
Signature:
def step(setup, state) -> state_next
- class tud_lbm.operators.protocols.HysteresisOperator[source]
Bases:
ProtocolHysteresis operator — updates wetting state via contact angle hysteresis.
The hysteresis operator applies dynamic contact angle adjustment based on velocity direction (advancing/receding). It is only called when both wetting and hysteresis configurations are present.
Signature:
def update_wetting_state(wetting, rho, setup, f_t, **kwargs) -> wetting_next
- __call__(wetting: Any, rho: jax.numpy.ndarray, setup: Any, f_t: jax.numpy.ndarray, **kwargs: Any) tud_lbm.pipeline.state.WettingState[source]
Update wetting state with hysteresis.
- Parameters:
wetting – Current
WettingState.rho – Density field, shape
(nx, ny, nz, 1, 1).setup –
SimulationSetup.f_t – Pre-step populations, shape
(nx, ny, nz, q, 1).**kwargs – Operator-specific parameters (e.g.,
force_ext).
- Returns:
Updated
WettingState.
- class tud_lbm.operators.protocols.ForceOperator[source]
Bases:
ProtocolUnified protocol for force operator modules.
Every force module exposes setup-time
buildand step-timecomputemethods.
- class tud_lbm.operators.protocols.InitialPopulationOperator[source]
Bases:
ProtocolBound initialiser — builds the initial
ffor a fixed setup.This is the setup-bound closure stored on
SimulationSetup.initial_f_fn. UnlikeInitialiserOperator, the grid shape and lattice are already captured; callers only supply optional overrides.Signature:
def initial_f_fn(init_kwargs=None) -> f
- class tud_lbm.operators.protocols.MultiphaseStepOperator[source]
Bases:
ProtocolBound multiphase trial-step — advances
f_tby one step.This is the setup-bound closure stored on
SimulationSetup.multiphase_step. The setup is already captured; callers pass the current populations and optional physics fields.Signature:
def multiphase_step(f_t, *, force_ext=None, wetting=None, ...) -> f_out
- __call__(f_t: jax.numpy.ndarray, *, force_ext: jax.numpy.ndarray | None = None, wetting: Any = None, gradient_density: Any = None, laplacian_density: Any = None) jax.numpy.ndarray[source]
Run one multiphase trial step.
- Parameters:
f_t – Pre-step populations, shape
(nx, ny, nz, q, 1).force_ext – Optional external force, shape
(nx, ny, nz, 1, d).wetting – Optional
WettingState.gradient_density – Optional pre-built density gradient operator.
laplacian_density – Optional pre-built density Laplacian operator.
- Returns:
Post-BC populations, shape
(nx, ny, nz, q, 1).
- class tud_lbm.operators.protocols.ExtraState[source]
Bases:
ProtocolMarker protocol for JAX-pytree-compatible extra state containers.
Implementations are intentionally unconstrained to support both parameter-style containers (e.g. wetting scalars) and distribution-style containers (e.g. electric potential populations).
- class tud_lbm.operators.protocols.ExtraStatePlugin[source]
Bases:
ProtocolPlugin contract for initialising and updating extra
Statefields.- is_active(config: Any) bool[source]
Return whether this plugin should be enabled for the given config.
- class tud_lbm.operators.protocols.DifferentialOperator[source]
Bases:
ProtocolDifferential operator — computes spatial derivatives.
Gradients and Laplacians on lattice grids, used for multiphase chemical potential and interfacial stress.
Signature:
def compute_derivative(field) → derivative_field
- class tud_lbm.operators.protocols.SimulationRepository[source]
Bases:
ProtocolPersistence port: writes simulation state and metadata to disk.
Abstracts the storage mechanism (HDF5, NumPy .npz, Parquet, etc.).
Typical operations: - Save trajectory snapshots at specified intervals - Write metadata (config, simulation parameters) - Recover state for restart
Signature:
class MyRepository(SimulationRepository): def save_snapshot(self, state, time_step, field_names): # write to disk def load_snapshot(self, time_step): # read from disk and return State object
- save_snapshot(state: tud_lbm.pipeline.state.State, time_step: int, field_names: tuple[str, Ellipsis] | None = None) None[source]
Persist a simulation state snapshot.
- Parameters:
state – Current
State.time_step – Current iteration number (for naming/indexing).
field_names – Which fields to save (e.g.,
("rho", "u")).Nonemeans save all fields.
- class tud_lbm.operators.protocols.ConfigReader[source]
Bases:
ProtocolParsing port: reads and validates configuration from external format.
Abstracts the input format (TOML, JSON, YAML, dict, etc.).
Typical operations: - Parse a config file - Validate against schema - Return a
SimulationConfigSignature:
class TomlConfigReader(ConfigReader): def load(self, path): # read TOML file and return SimulationConfig
- class tud_lbm.operators.protocols.PlotOperator(config: tud_lbm.config.simulation_config.SimulationConfig, data_dir: str | pathlib.Path | None = None)[source]
Bases:
ProtocolStructural contract for Matplotlib plot operators.
Plot operators render simulation snapshots onto matplotlib axes, enabling flexible visualization strategies for different fields and use cases.
Signature:
class MyPlotter(PlotOperatorProtocol): def __call__(self, ax, data, timestep): # render to axes