tud_lbm.operators.protocols =========================== .. py:module:: tud_lbm.operators.protocols .. autoapi-nested-parse:: Protocol (structural) types for LBM operators. These protocols define the contract that each operator category must fulfil. They enable loose coupling: code depending on `CollisionOperator` can work with any function/class implementing that protocol, without importing the specific implementation. Design principle: Operator protocols are intentionally minimal — they capture the bare essentials (signatures, docstrings) without dictating implementation details like decorators or registry membership. Usage:: from operators.protocols import CollisionOperator from registry import get_operators collision_ops = get_operators("collision_models") bgk_fn = collision_ops["bgk"].target # Static type-checkers and isinstance() will accept bgk_fn # as a CollisionOperator def my_collision_logic(collision_op: CollisionOperator): ... Classes ------- .. autoapisummary:: tud_lbm.operators.protocols.CollisionOperator tud_lbm.operators.protocols.StreamingOperator tud_lbm.operators.protocols.EquilibriumOperator tud_lbm.operators.protocols.MacroscopicOperator tud_lbm.operators.protocols.BoundaryOperator tud_lbm.operators.protocols.InitialiserOperator tud_lbm.operators.protocols.StepOperator tud_lbm.operators.protocols.HysteresisOperator tud_lbm.operators.protocols.ForceOperator tud_lbm.operators.protocols.InitialPopulationOperator tud_lbm.operators.protocols.MultiphaseStepOperator tud_lbm.operators.protocols.ExtraState tud_lbm.operators.protocols.ExtraStatePlugin tud_lbm.operators.protocols.DifferentialOperator tud_lbm.operators.protocols.SimulationRepository tud_lbm.operators.protocols.ConfigReader tud_lbm.operators.protocols.PlotOperator Module Contents --------------- .. py:class:: CollisionOperator Bases: :py:obj:`Protocol` Collision operator — transforms ``(f, feq, tau) → f_col``. The collision step replaces non-conserved moments with their equilibrium values, relaxed toward equilibrium with time scale ``tau``. Signature:: def collide(f, feq, tau, source=None, ...) -> f_col .. py:method:: __call__(f: jax.numpy.ndarray, feq: jax.numpy.ndarray, tau: float, source: jax.numpy.ndarray | None = None, **kwargs: Any) -> jax.numpy.ndarray Compute post-collision distribution. :param f: Populations, shape ``(nx, ny, nz, q, 1)``. :param feq: Equilibrium distribution, shape ``(nx, ny, nz, q, 1)``. :param tau: Relaxation time (> 0.5). :param source: Optional forcing source term, shape ``(nx, ny, nz, q, 1)``. :param \*\*kwargs: Operator-specific parameters. :returns: Post-collision populations, same shape as *f*. .. py:class:: StreamingOperator Bases: :py:obj:`Protocol` Streaming operator — propagates populations along velocity directions. The streaming step shifts each population component ``f_i`` along the direction of its lattice velocity ``c_i``, using periodic boundary conditions across the domain (boundary conditions are applied afterward). Signature:: def stream(f, lattice) -> f_streamed .. py:method:: __call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) -> jax.numpy.ndarray Propagate populations across the domain. :param f: Populations, shape ``(nx, ny, nz, q, 1)``. :param lattice: :class:`~setup.lattice.Lattice` with velocity vectors ``c``. :returns: Post-streaming populations, same shape as *f*. .. py:class:: EquilibriumOperator Bases: :py:obj:`Protocol` Equilibrium operator — computes ``(rho, u, lattice) → feq``. The equilibrium distribution is the rest state toward which the collision operator relaxes the system. It encodes the hydrodynamic moment structure. Signature:: def compute_equilibrium(rho, u, lattice) -> feq .. py:method:: __call__(rho: jax.numpy.ndarray, u: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice) -> jax.numpy.ndarray Compute the equilibrium distribution. :param rho: Density field, shape ``(nx, ny, nz, 1, 1)``. :param u: Velocity field, shape ``(nx, ny, nz, 1, d)`` where d ∈ {2, 3}. :param lattice: :class:`~setup.lattice.Lattice` with weights ``w`` and velocity vectors ``c``. :returns: Equilibrium distribution ``feq``, shape ``(nx, ny, nz, q, 1)``. .. py:class:: MacroscopicOperator Bases: :py:obj:`Protocol` Macroscopic operator — computes ``(f, lattice) → (rho, u, ...)``. Macroscopic fields are the moments of the population distribution, computed via summation over velocity directions. Signature:: def compute_macroscopic(f, lattice, force=None) -> (rho, u) or (rho, u, force) .. py:method:: __call__(f: jax.numpy.ndarray, lattice: tud_lbm.lattice.lattice.Lattice, force: jax.numpy.ndarray | None = None, **kwargs: Any) -> tuple[jax.numpy.ndarray, jax.numpy.ndarray] | tuple[jax.numpy.ndarray, jax.numpy.ndarray, jax.numpy.ndarray] Compute density and velocity fields. :param f: Populations, shape ``(nx, ny, nz, q, 1)``. :param lattice: :class:`~setup.lattice.Lattice`. :param force: Optional external force field. :param \*\*kwargs: Additional keyword arguments. :param force: Optional external force field, shape ``(nx, ny, nz, 1, d)``. When provided, velocity is corrected by ``u ← u + force / (2ρ)``. :returns: ``(rho, u)`` where - ``rho``: shape ``(nx, ny, nz, 1, 1)`` - ``u``: shape ``(nx, ny, nz, 1, d)`` With *force*: ``(rho, u_eq, force)`` where *u_eq* includes the force correction. :rtype: Without *force* .. py:class:: BoundaryOperator Bases: :py:obj:`Protocol` Boundary-condition operator — applies edge BC rules to populations. Boundary conditions enforce Dirichlet/Neumann constraints or flux periodicity at domain edges. They are applied post-streaming. Signature:: def apply_bc(f_stream, f_col, bc_masks) -> f_bc .. py:method:: __call__(f_stream: jax.numpy.ndarray, f_col: jax.numpy.ndarray, bc_masks: Any) -> jax.numpy.ndarray Apply boundary conditions to post-streaming populations. :param f_stream: Post-streaming populations. :param f_col: Post-collision populations (for symmetry BC). :param bc_masks: Pre-computed edge masks from :class:`~setup.simulation_setup.BCMasks`. :returns: Populations with boundary conditions applied. .. py:class:: InitialiserOperator Bases: :py:obj:`Protocol` Initialiser operator — creates the initial distribution ``f``. Initialisation strategies include: - "standard": rest equilibrium ``f_eq(ρ_0, u_0)`` where ρ_0 = 1, u_0 = 0 - "init_from_file": load from an NPZ file - Multiphase variants: tanh density profile Signature:: def init_fn(grid_shape, lattice, **kwargs) -> f .. py:method:: __call__(grid_shape: tuple[int, int, int], lattice: tud_lbm.lattice.lattice.Lattice, **kwargs: Any) -> jax.numpy.ndarray Initialise the distribution function. :param grid_shape: Grid dimensions ``(nx, ny, nz)``. :param lattice: :class:`~setup.lattice.Lattice`. :param \*\*kwargs: Initialiser-specific keyword arguments (e.g., ``density``, ``rho_l``, ``rho_v``, ``interface_width``, ``npz_path``). :returns: Initial population distribution, shape ``(nx, ny, nz, q, 1)``. .. py:class:: StepOperator Bases: :py:obj:`Protocol` Step operator — executes one full LBM time step. The step operator orchestrates the complete LBM algorithm: collision, streaming, boundary conditions, and any physics-specific updates (e.g., hysteresis in multiphase wetting). Signature:: def step(setup, state) -> state_next .. py:method:: __call__(setup: Any, state: tud_lbm.pipeline.state.State) -> tud_lbm.pipeline.state.State Execute one LBM time step. :param setup: :class:`~setup.simulation_setup.SimulationSetup` containing all pre-built operators and parameters. :param state: Current :class:`~state.state.State`. :returns: Updated :class:`~state.state.State` after one time step. .. py:class:: HysteresisOperator Bases: :py:obj:`Protocol` Hysteresis operator — updates wetting state via contact angle hysteresis. The hysteresis operator applies dynamic contact angle adjustment based on velocity direction (advancing/receding). It is only called when both wetting and hysteresis configurations are present. Signature:: def update_wetting_state(wetting, rho, setup, f_t, **kwargs) -> wetting_next .. py:method:: __call__(wetting: Any, rho: jax.numpy.ndarray, setup: Any, f_t: jax.numpy.ndarray, **kwargs: Any) -> tud_lbm.pipeline.state.WettingState Update wetting state with hysteresis. :param wetting: Current :class:`~state.state.WettingState`. :param rho: Density field, shape ``(nx, ny, nz, 1, 1)``. :param setup: :class:`~setup.simulation_setup.SimulationSetup`. :param f_t: Pre-step populations, shape ``(nx, ny, nz, q, 1)``. :param \*\*kwargs: Operator-specific parameters (e.g., ``force_ext``). :returns: Updated :class:`~state.state.WettingState`. .. py:class:: ForceOperator Bases: :py:obj:`Protocol` Unified protocol for force operator modules. Every force module exposes setup-time ``build`` and step-time ``compute`` methods. .. py:method:: build(params: Any, grid_shape: tuple[int, Ellipsis]) -> Any Construct precomputed data for the force module. .. py:method:: compute(state: Any, precomputed: Any, *, diff_ops: Any = None) -> jax.numpy.ndarray Compute the force contribution for the current state. .. py:class:: InitialPopulationOperator Bases: :py:obj:`Protocol` Bound initialiser — builds the initial ``f`` for a fixed setup. This is the setup-bound closure stored on ``SimulationSetup.initial_f_fn``. Unlike ``InitialiserOperator``, the grid shape and lattice are already captured; callers only supply optional overrides. Signature:: def initial_f_fn(init_kwargs=None) -> f .. py:method:: __call__(init_kwargs: dict | None = None) -> jax.numpy.ndarray Build the initial population distribution. :param init_kwargs: Optional keyword overrides (e.g. ``density``, ``rho_l``, ``npz_path``). :returns: Initial populations, shape ``(nx, ny, nz, q, 1)``. .. py:class:: MultiphaseStepOperator Bases: :py:obj:`Protocol` Bound multiphase trial-step — advances ``f_t`` by one step. This is the setup-bound closure stored on ``SimulationSetup.multiphase_step``. The setup is already captured; callers pass the current populations and optional physics fields. Signature:: def multiphase_step(f_t, *, force_ext=None, wetting=None, ...) -> f_out .. py:method:: __call__(f_t: jax.numpy.ndarray, *, force_ext: jax.numpy.ndarray | None = None, wetting: Any = None, gradient_density: Any = None, laplacian_density: Any = None) -> jax.numpy.ndarray Run one multiphase trial step. :param f_t: Pre-step populations, shape ``(nx, ny, nz, q, 1)``. :param force_ext: Optional external force, shape ``(nx, ny, nz, 1, d)``. :param wetting: Optional :class:`~state.state.WettingState`. :param gradient_density: Optional pre-built density gradient operator. :param laplacian_density: Optional pre-built density Laplacian operator. :returns: Post-BC populations, shape ``(nx, ny, nz, q, 1)``. .. py:class:: ExtraState Bases: :py:obj:`Protocol` Marker protocol for JAX-pytree-compatible extra state containers. Implementations are intentionally unconstrained to support both parameter-style containers (e.g. wetting scalars) and distribution-style containers (e.g. electric potential populations). .. py:class:: ExtraStatePlugin Bases: :py:obj:`Protocol` Plugin contract for initialising and updating extra ``State`` fields. .. py:attribute:: name :type: str .. py:method:: is_active(config: Any) -> bool Return whether this plugin should be enabled for the given config. .. py:method:: init_state(setup: Any) -> dict[str, Any] Create initial extra fields merged into :class:`state.state.State`. .. py:method:: update_state(setup: Any, prev_state: Any, new_state: Any, **context: Any) -> Any Apply per-step extra-state updates and return the updated state. .. py:class:: DifferentialOperator Bases: :py:obj:`Protocol` Differential operator — computes spatial derivatives. Gradients and Laplacians on lattice grids, used for multiphase chemical potential and interfacial stress. Signature:: def compute_derivative(field) → derivative_field .. py:method:: __call__(field: jax.numpy.ndarray) -> jax.numpy.ndarray Compute a spatial derivative. :param field: Scalar or vector field, shape ``(nx, ny, 1, 1)`` or ``(nx, ny, 1, 2)``. :returns: Derivative field, matching or broadened shape. .. py:class:: SimulationRepository Bases: :py:obj:`Protocol` Persistence port: writes simulation state and metadata to disk. Abstracts the storage mechanism (HDF5, NumPy .npz, Parquet, etc.). Typical operations: - Save trajectory snapshots at specified intervals - Write metadata (config, simulation parameters) - Recover state for restart Signature:: class MyRepository(SimulationRepository): def save_snapshot(self, state, time_step, field_names): # write to disk def load_snapshot(self, time_step): # read from disk and return State object .. py:method:: save_snapshot(state: tud_lbm.pipeline.state.State, time_step: int, field_names: tuple[str, Ellipsis] | None = None) -> None Persist a simulation state snapshot. :param state: Current :class:`~state.state.State`. :param time_step: Current iteration number (for naming/indexing). :param field_names: Which fields to save (e.g., ``("rho", "u")``). ``None`` means save all fields. .. py:method:: load_snapshot(time_step: int) -> tud_lbm.pipeline.state.State Load a previously saved snapshot. :param time_step: Iteration number of the snapshot to retrieve. :returns: Reconstructed :class:`~state.state.State`. .. py:class:: ConfigReader Bases: :py:obj:`Protocol` Parsing port: reads and validates configuration from external format. Abstracts the input format (TOML, JSON, YAML, dict, etc.). Typical operations: - Parse a config file - Validate against schema - Return a :class:`~config.simulation_config.SimulationConfig` Signature:: class TomlConfigReader(ConfigReader): def load(self, path): # read TOML file and return SimulationConfig .. py:method:: load(source: str) -> Any Read and parse a configuration. :param source: Configuration source (filepath, dict, URL, etc.). :returns: A validated :class:`~config.simulation_config.SimulationConfig`. .. py:class:: PlotOperator(config: tud_lbm.config.simulation_config.SimulationConfig, data_dir: str | pathlib.Path | None = None) Bases: :py:obj:`Protocol` Structural contract for Matplotlib plot operators. Plot operators render simulation snapshots onto matplotlib axes, enabling flexible visualization strategies for different fields and use cases. Signature:: class MyPlotter(PlotOperatorProtocol): def __call__(self, ax, data, timestep): # render to axes .. py:attribute:: name :type: str .. py:attribute:: config :type: tud_lbm.config.simulation_config.SimulationConfig .. py:method:: is_available(data: dict[str, numpy.ndarray]) -> bool Return whether this operator can render the provided snapshot. :param data: Snapshot dictionary with field names as keys (e.g., ``"rho"``, ``"u"``). :returns: ``True`` if all required fields are present; ``False`` otherwise. .. py:method:: __call__(ax: matplotlib.axes.Axes, data: dict[str, numpy.ndarray], timestep: int) -> None Render one panel onto the provided axes. :param ax: Matplotlib axes object to draw onto. :param data: Snapshot dictionary with computed fields. :param timestep: Current iteration number (for annotations).