tud_lbm.operators.protocols

Protocol (structural) types for LBM operators.

These protocols define the contract that each operator category must fulfil. They enable loose coupling: code depending on CollisionOperator can work with any function/class implementing that protocol, without importing the specific implementation.

Design principle: Operator protocols are intentionally minimal — they capture the bare essentials (signatures, docstrings) without dictating implementation details like decorators or registry membership.

Usage:

from operators.protocols import CollisionOperator
from registry import get_operators

collision_ops = get_operators("collision_models")
bgk_fn = collision_ops["bgk"].target

# Static type-checkers and isinstance() will accept bgk_fn
# as a CollisionOperator
def my_collision_logic(collision_op: CollisionOperator):
    ...

Classes

CollisionOperator

Collision operator — transforms (f, feq, tau) f_col.

StreamingOperator

Streaming operator — propagates populations along velocity directions.

EquilibriumOperator

Equilibrium operator — computes (rho, u, lattice) feq.

MacroscopicOperator

Macroscopic operator — computes (f, lattice) (rho, u, ...).

BoundaryOperator

Boundary-condition operator — applies edge BC rules to populations.

InitialiserOperator

Initialiser operator — creates the initial distribution f.

StepOperator

Step operator — executes one full LBM time step.

HysteresisOperator

Hysteresis operator — updates wetting state via contact angle hysteresis.

ForceOperator

Unified protocol for force operator modules.

InitialPopulationOperator

Bound initialiser — builds the initial f for a fixed setup.

MultiphaseStepOperator

Bound multiphase trial-step — advances f_t by one step.

ExtraState

Marker protocol for JAX-pytree-compatible extra state containers.

ExtraStatePlugin

Plugin contract for initialising and updating extra State fields.

DifferentialOperator

Differential operator — computes spatial derivatives.

SimulationRepository

Persistence port: writes simulation state and metadata to disk.

ConfigReader

Parsing port: reads and validates configuration from external format.

PlotOperator

Structural contract for Matplotlib plot operators.

Module Contents

class tud_lbm.operators.protocols.CollisionOperator[source]

Bases: Protocol

Collision operator — transforms (f, feq, tau) f_col.

The collision step replaces non-conserved moments with their equilibrium values, relaxed toward equilibrium with time scale tau.

Signature:

def collide(f, feq, tau, source=None, ...) -> f_col
__call__(f: jax.numpy.ndarray, feq: jax.numpy.ndarray, tau: float, source: jax.numpy.ndarray | None = None, **kwargs: Any) jax.numpy.ndarray[source]

Compute post-collision distribution.

Parameters:
  • f – Populations, shape (nx, ny, nz, q, 1).

  • feq – Equilibrium distribution, shape (nx, ny, nz, q, 1).

  • tau – Relaxation time (> 0.5).

  • source – Optional forcing source term, shape (nx, ny, nz, q, 1).

  • **kwargs – Operator-specific parameters.

Returns:

Post-collision populations, same shape as f.

class tud_lbm.operators.protocols.StreamingOperator[source]

Bases: Protocol

Streaming operator — propagates populations along velocity directions.

The streaming step shifts each population component f_i along the direction of its lattice velocity c_i, using periodic boundary conditions across the domain (boundary conditions are applied afterward).

Signature:

def stream(f, lattice) -> f_streamed
__call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) jax.numpy.ndarray[source]

Propagate populations across the domain.

Parameters:
  • f – Populations, shape (nx, ny, nz, q, 1).

  • latticeLattice with velocity vectors c.

Returns:

Post-streaming populations, same shape as f.

class tud_lbm.operators.protocols.EquilibriumOperator[source]

Bases: Protocol

Equilibrium operator — computes (rho, u, lattice) feq.

The equilibrium distribution is the rest state toward which the collision operator relaxes the system. It encodes the hydrodynamic moment structure.

Signature:

def compute_equilibrium(rho, u, lattice) -> feq
__call__(rho: jax.numpy.ndarray, u: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) jax.numpy.ndarray[source]

Compute the equilibrium distribution.

Parameters:
  • rho – Density field, shape (nx, ny, nz, 1, 1).

  • u – Velocity field, shape (nx, ny, nz, 1, d) where d ∈ {2, 3}.

  • latticeLattice with weights w and velocity vectors c.

Returns:

Equilibrium distribution feq, shape (nx, ny, nz, q, 1).

class tud_lbm.operators.protocols.MacroscopicOperator[source]

Bases: Protocol

Macroscopic operator — computes (f, lattice) (rho, u, ...).

Macroscopic fields are the moments of the population distribution, computed via summation over velocity directions.

Signature:

def compute_macroscopic(f, lattice, force=None) -> (rho, u) or (rho, u, force)
__call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice, force: jax.numpy.ndarray | None = None, **kwargs: Any) tuple[jax.numpy.ndarray, jax.numpy.ndarray] | tuple[jax.numpy.ndarray, jax.numpy.ndarray, jax.numpy.ndarray][source]

Compute density and velocity fields.

Parameters:
  • f – Populations, shape (nx, ny, nz, q, 1).

  • latticeLattice.

  • force – Optional external force field.

  • **kwargs – Additional keyword arguments.

  • force – Optional external force field, shape (nx, ny, nz, 1, d). When provided, velocity is corrected by u u + force / (2ρ).

Returns:

(rho, u) where
  • rho: shape (nx, ny, nz, 1, 1)

  • u: shape (nx, ny, nz, 1, d)

With force: (rho, u_eq, force) where u_eq includes the force correction.

Return type:

Without force

class tud_lbm.operators.protocols.BoundaryOperator[source]

Bases: Protocol

Boundary-condition operator — applies edge BC rules to populations.

Boundary conditions enforce Dirichlet/Neumann constraints or flux periodicity at domain edges. They are applied post-streaming.

Signature:

def apply_bc(f_stream, f_col, bc_masks) -> f_bc
__call__(f_stream: jax.numpy.ndarray, f_col: jax.numpy.ndarray, bc_masks: Any) jax.numpy.ndarray[source]

Apply boundary conditions to post-streaming populations.

Parameters:
  • f_stream – Post-streaming populations.

  • f_col – Post-collision populations (for symmetry BC).

  • bc_masks – Pre-computed edge masks from BCMasks.

Returns:

Populations with boundary conditions applied.

class tud_lbm.operators.protocols.InitialiserOperator[source]

Bases: Protocol

Initialiser operator — creates the initial distribution f.

Initialisation strategies include: - “standard”: rest equilibrium f_eq(ρ_0, u_0) where ρ_0 = 1, u_0 = 0 - “init_from_file”: load from an NPZ file - Multiphase variants: tanh density profile

Signature:

def init_fn(grid_shape, lattice, **kwargs) -> f
__call__(grid_shape: tuple[int, int, int], lattice: tud_lbm.lattice.lattice.Lattice, **kwargs: Any) jax.numpy.ndarray[source]

Initialise the distribution function.

Parameters:
  • grid_shape – Grid dimensions (nx, ny, nz).

  • latticeLattice.

  • **kwargs – Initialiser-specific keyword arguments (e.g., density, rho_l, rho_v, interface_width, npz_path).

Returns:

Initial population distribution, shape (nx, ny, nz, q, 1).

class tud_lbm.operators.protocols.StepOperator[source]

Bases: Protocol

Step operator — executes one full LBM time step.

The step operator orchestrates the complete LBM algorithm: collision, streaming, boundary conditions, and any physics-specific updates (e.g., hysteresis in multiphase wetting).

Signature:

def step(setup, state) -> state_next
__call__(setup: Any, state: tud_lbm.pipeline.state.State) tud_lbm.pipeline.state.State[source]

Execute one LBM time step.

Parameters:
  • setupSimulationSetup containing all pre-built operators and parameters.

  • state – Current State.

Returns:

Updated State after one time step.

class tud_lbm.operators.protocols.HysteresisOperator[source]

Bases: Protocol

Hysteresis operator — updates wetting state via contact angle hysteresis.

The hysteresis operator applies dynamic contact angle adjustment based on velocity direction (advancing/receding). It is only called when both wetting and hysteresis configurations are present.

Signature:

def update_wetting_state(wetting, rho, setup, f_t, **kwargs) -> wetting_next
__call__(wetting: Any, rho: jax.numpy.ndarray, setup: Any, f_t: jax.numpy.ndarray, **kwargs: Any) tud_lbm.pipeline.state.WettingState[source]

Update wetting state with hysteresis.

Parameters:
  • wetting – Current WettingState.

  • rho – Density field, shape (nx, ny, nz, 1, 1).

  • setupSimulationSetup.

  • f_t – Pre-step populations, shape (nx, ny, nz, q, 1).

  • **kwargs – Operator-specific parameters (e.g., force_ext).

Returns:

Updated WettingState.

class tud_lbm.operators.protocols.ForceOperator[source]

Bases: Protocol

Unified protocol for force operator modules.

Every force module exposes setup-time build and step-time compute methods.

build(params: Any, grid_shape: tuple[int, Ellipsis]) Any[source]

Construct precomputed data for the force module.

compute(state: Any, precomputed: Any, *, diff_ops: Any = None) jax.numpy.ndarray[source]

Compute the force contribution for the current state.

class tud_lbm.operators.protocols.InitialPopulationOperator[source]

Bases: Protocol

Bound initialiser — builds the initial f for a fixed setup.

This is the setup-bound closure stored on SimulationSetup.initial_f_fn. Unlike InitialiserOperator, the grid shape and lattice are already captured; callers only supply optional overrides.

Signature:

def initial_f_fn(init_kwargs=None) -> f
__call__(init_kwargs: dict | None = None) jax.numpy.ndarray[source]

Build the initial population distribution.

Parameters:

init_kwargs – Optional keyword overrides (e.g. density, rho_l, npz_path).

Returns:

Initial populations, shape (nx, ny, nz, q, 1).

class tud_lbm.operators.protocols.MultiphaseStepOperator[source]

Bases: Protocol

Bound multiphase trial-step — advances f_t by one step.

This is the setup-bound closure stored on SimulationSetup.multiphase_step. The setup is already captured; callers pass the current populations and optional physics fields.

Signature:

def multiphase_step(f_t, *, force_ext=None, wetting=None, ...) -> f_out
__call__(f_t: jax.numpy.ndarray, *, force_ext: jax.numpy.ndarray | None = None, wetting: Any = None, gradient_density: Any = None, laplacian_density: Any = None) jax.numpy.ndarray[source]

Run one multiphase trial step.

Parameters:
  • f_t – Pre-step populations, shape (nx, ny, nz, q, 1).

  • force_ext – Optional external force, shape (nx, ny, nz, 1, d).

  • wetting – Optional WettingState.

  • gradient_density – Optional pre-built density gradient operator.

  • laplacian_density – Optional pre-built density Laplacian operator.

Returns:

Post-BC populations, shape (nx, ny, nz, q, 1).

class tud_lbm.operators.protocols.ExtraState[source]

Bases: Protocol

Marker protocol for JAX-pytree-compatible extra state containers.

Implementations are intentionally unconstrained to support both parameter-style containers (e.g. wetting scalars) and distribution-style containers (e.g. electric potential populations).

class tud_lbm.operators.protocols.ExtraStatePlugin[source]

Bases: Protocol

Plugin contract for initialising and updating extra State fields.

name: str[source]
is_active(config: Any) bool[source]

Return whether this plugin should be enabled for the given config.

init_state(setup: Any) dict[str, Any][source]

Create initial extra fields merged into state.state.State.

update_state(setup: Any, prev_state: Any, new_state: Any, **context: Any) Any[source]

Apply per-step extra-state updates and return the updated state.

class tud_lbm.operators.protocols.DifferentialOperator[source]

Bases: Protocol

Differential operator — computes spatial derivatives.

Gradients and Laplacians on lattice grids, used for multiphase chemical potential and interfacial stress.

Signature:

def compute_derivative(field) → derivative_field
__call__(field: jax.numpy.ndarray) jax.numpy.ndarray[source]

Compute a spatial derivative.

Parameters:

field – Scalar or vector field, shape (nx, ny, 1, 1) or (nx, ny, 1, 2).

Returns:

Derivative field, matching or broadened shape.

class tud_lbm.operators.protocols.SimulationRepository[source]

Bases: Protocol

Persistence port: writes simulation state and metadata to disk.

Abstracts the storage mechanism (HDF5, NumPy .npz, Parquet, etc.).

Typical operations: - Save trajectory snapshots at specified intervals - Write metadata (config, simulation parameters) - Recover state for restart

Signature:

class MyRepository(SimulationRepository):
    def save_snapshot(self, state, time_step, field_names):
        # write to disk
    def load_snapshot(self, time_step):
        # read from disk and return State object
save_snapshot(state: tud_lbm.pipeline.state.State, time_step: int, field_names: tuple[str, Ellipsis] | None = None) None[source]

Persist a simulation state snapshot.

Parameters:
  • state – Current State.

  • time_step – Current iteration number (for naming/indexing).

  • field_names – Which fields to save (e.g., ("rho", "u")). None means save all fields.

load_snapshot(time_step: int) tud_lbm.pipeline.state.State[source]

Load a previously saved snapshot.

Parameters:

time_step – Iteration number of the snapshot to retrieve.

Returns:

Reconstructed State.

class tud_lbm.operators.protocols.ConfigReader[source]

Bases: Protocol

Parsing port: reads and validates configuration from external format.

Abstracts the input format (TOML, JSON, YAML, dict, etc.).

Typical operations: - Parse a config file - Validate against schema - Return a SimulationConfig

Signature:

class TomlConfigReader(ConfigReader):
    def load(self, path):
        # read TOML file and return SimulationConfig
load(source: str) Any[source]

Read and parse a configuration.

Parameters:

source – Configuration source (filepath, dict, URL, etc.).

Returns:

A validated SimulationConfig.

class tud_lbm.operators.protocols.PlotOperator(config: tud_lbm.config.simulation_config.SimulationConfig, data_dir: str | pathlib.Path | None = None)[source]

Bases: Protocol

Structural contract for Matplotlib plot operators.

Plot operators render simulation snapshots onto matplotlib axes, enabling flexible visualization strategies for different fields and use cases.

Signature:

class MyPlotter(PlotOperatorProtocol):
    def __call__(self, ax, data, timestep):
        # render to axes
name: str[source]
config: tud_lbm.config.simulation_config.SimulationConfig[source]
is_available(data: dict[str, numpy.ndarray]) bool[source]

Return whether this operator can render the provided snapshot.

Parameters:

data – Snapshot dictionary with field names as keys (e.g., "rho", "u").

Returns:

True if all required fields are present; False otherwise.

__call__(ax: matplotlib.axes.Axes, data: dict[str, numpy.ndarray], timestep: int) None[source]

Render one panel onto the provided axes.

Parameters:
  • ax – Matplotlib axes object to draw onto.

  • data – Snapshot dictionary with computed fields.

  • timestep – Current iteration number (for annotations).