Source code for porespy.simulations._ibop

import numpy as np
import numpy.typing as npt
from typing import Literal
from skimage.morphology import ball, disk, cube, square
from porespy import settings
from porespy.metrics import pc_curve
from porespy.tools import (
    _insert_disks_at_points,
    _insert_disks_at_points_parallel,
    get_tqdm,
    Results,
)
from porespy.filters import (
    trim_disconnected_blobs,
    find_trapped_regions,
    pc_to_satn,
    pc_to_seq,
)
from porespy.generators import (
    borders,
)
try:
    from pyedt import edt
except ModuleNotFoundError:
    from edt import edt


__all__ = [
    'drainage',
    'ibop',
]


tqdm = get_tqdm()


def ibop(
    im: npt.NDArray,
    pc: npt.NDArray = None,
    dt: npt.NDArray = None,
    inlets: npt.NDArray = None,
    outlets: npt.NDArray = None,
    residual: npt.NDArray = None,
    bins: int = None,
    return_sizes: bool = False,
    conn: Literal['min', 'max'] = 'min',
):
    r"""
    Simulate drainage using image-based sphere insertion, optionally including
    gravity

    Parameters
    ----------
    im : ndarray
        The image of the porous media with ``True`` values indicating the
        void space.
    pc : ndarray, optional
        Precomputed capillary pressure transform which is used to determine
        the invadability of each voxel. If not provided then twice the inverse of
        the distance transform of `im` is used.
    dt : ndarray (optional)
        The distance transform of ``im``.  If not provided it will be
        calculated, so supplying it saves time.
    inlets : ndarray, optional
        A boolean image the same shape as ``im``, with ``True`` values
        indicating the inlet locations. If not specified then access limitations
        are not applied so the result is essentially a local thickness filter.
    outlets : ndarray, optional
        A boolean image with ``True`` values indicating the outlet locations.
        If this is provided then trapped voxels of wetting phase are found and
        all the output images are adjusted accordingly. Note that trapping can
        be assessed during postprocessing as well.
    residual : ndarray, optional
        A boolean array indicating the locations of any residual invading
        phase. This is added to the intermediate image prior to trimming
        disconnected clusters, so will create connections to some clusters
        that would otherwise be removed. The residual phase is indicated
        in the final image by ``-np.inf`` values, since these are invaded at
        all applied capillary pressures.
    bins : int or array_like (default = None)
        The range of pressures to apply. If an integer is given then the given
        number of bins will be created between the lowest and highest values in
        ``pc``. If a list is given, each value in the list is used in ascending
        order. If `None` is given (default) then all the possible values in `pc`
        are used (or `dt` if `pc` is not given).
    return_sizes : bool, default = `False`
        If `True` then an array containing the size of the sphere which first
        overlapped each pixel is returned. This array is not computed by default
        as computing it increases computation time.
    conn : str
        Controls the shape of the structuring element used to find neighboring
        voxels when looking connectivity of invading blobs.  Options are:

        ========= ==================================================================
        Option    Description
        ========= ==================================================================
        'min'     This corresponds to a cross with 4 neighbors in 2D and 6 neighbors
                  in 3D.
        'max'     This corresponds to a square or cube with 8 neighbors in 2D and
                  26 neighbors in 3D.
        ========= ==================================================================

    Returns
    -------
    results : Results object
        A dataclass-like object with the following attributes:

        ========== ============================================================
        Attribute  Description
        ========== ============================================================
        im_seq     An ndarray with each voxel indicating the step number at
                   which it was first invaded by non-wetting phase
        im_satn    A numpy array with each voxel value indicating the global
                   saturation value at the point it was invaded
        im_size    If `return_sizes` was set to `True`, then a numpy array with
                   each voxel containing the radius of the sphere, in voxels, that
                   first overlapped it.
        im_pc      A numpy array with each voxel value indicating the
                   capillary pressure at which it was invaded.
        im_trapped A numpy array with ``True`` values indicating trapped voxels
        pc         1D array of capillary pressure values that were applied
        swnp       1D array of non-wetting phase saturations for each applied
                   value of capillary pressure (``pc``).
        ========== ============================================================

    See Also
    --------
    drainage

    Notes
    -----
    This algorithm only provides sensible results for gravity stabilized
    configurations, meaning the more dense fluid is on the bottom. Be sure that
    ``inlets`` are specified accordingly.

    Examples
    --------
    `Click here
    <https://porespy.org/examples/simulations/reference/drainage.html>`_
    to view online example.

    """
    im = np.array(im, dtype=bool)

    if dt is None:
        dt = edt(im)

    if inlets is None:
        inlets = borders(shape=im.shape, mode='faces') * im

    if outlets is not None:
        outlets = outlets*im
        if np.sum(inlets * outlets):
            raise Exception('Specified inlets and outlets overlap')

    if pc is None:
        pc = 2.0/dt
    pc[~im] = 0  # Remove any infs or nans from pc computation

    if isinstance(bins, int):  # Use values in pc for invasion steps
        vmax = pc[pc < np.inf].max()
        vmin = pc[im][pc[im] > -np.inf].min()
        Ps = np.linspace(vmin, vmax*1.1, bins)
    elif bins is None:
        Ps = np.unique(pc[im])
    else:
        Ps = np.unique(bins)  # To ensure they are in ascending order

    # Initialize empty arrays to accumulate results of each loop
    pc_inv = np.zeros_like(im, dtype=float)
    pc_size = np.zeros_like(im, dtype=float)
    seeds = np.zeros_like(im, dtype=bool)

    # Begin IBOP algorithm
    if conn == 'min':
        strel = ball(1) if im.ndim == 3 else disk(1)
    elif conn == 'max':
        strel = cube(3) if im.ndim == 3 else square(3)
    else:
        raise Exception(f"Unrecognized value for conn ({conn})")
    for p in tqdm(Ps, **settings.tqdm):
        # Find all locations in image invadable at current pressure
        invadable = (pc <= p)*im
        # Trim locations not connected to the inlets
        if inlets is not None:
            invadable = trim_disconnected_blobs(
                im=invadable,
                inlets=inlets,
                strel=strel,
            )
        # Isolate only newly found locations to speed up inserting
        temp = invadable*(~seeds)
        # Find (i, j, k) coordinates of new locations
        coords = np.where(temp)
        # Add new locations to list of invaded locations
        seeds += invadable
        # Extract the local size of sphere to insert at each new location
        radii = dt[coords]
        # Insert spheres of given radii at new locations
        pc_inv = _insert_disks_at_points_parallel(
            im=pc_inv,
            coords=np.vstack(coords),
            radii=radii.astype(int),
            v=p,
            smooth=True,
            overwrite=False,
        )
        if return_sizes and (np.size(radii) > 0):
            pc_size = _insert_disks_at_points_parallel(
                im=pc_size,
                coords=np.vstack(coords),
                radii=radii.astype(int),
                v=np.amin(radii),
                smooth=True,
                overwrite=False,
            )
        # Deal with impact of residual, if present
        if residual is not None:
            # Find residual connected to current invasion front
            inv_temp = (pc_inv > 0)
            if np.any(inv_temp):
                # Find invadable pixels connected to surviving residual
                temp = trim_disconnected_blobs(residual, inv_temp, strel=strel)*~inv_temp
                if np.any(temp):
                    # Trim invadable pixels not connected to residual
                    new_seeds = trim_disconnected_blobs(invadable, temp, strel=strel)
                    # Find (i, j, k) coordinates of new locations
                    coords = np.where(new_seeds)
                    # Add new locations to list of invaded locations
                    seeds += new_seeds
                    # Extract the local size of sphere to insert at each new location
                    radii = dt[coords].astype(int)
                    # Insert spheres of given radii at new locations
                    pc_inv = _insert_disks_at_points_parallel(
                        im=pc_inv,
                        coords=np.vstack(coords),
                        radii=radii.astype(int),
                        v=p,
                        smooth=True,
                        overwrite=False,
                    )
                    if return_sizes and (np.size(radii) > 0):
                        pc_size = _insert_disks_at_points_parallel(
                            im=pc_size,
                            coords=np.vstack(coords),
                            radii=radii.astype(int),
                            v=np.amin(radii),
                            smooth=True,
                            overwrite=False,
                        )

    # Set uninvaded voxels to inf
    pc_inv[(pc_inv == 0)*im] = np.inf

    # Add residual if given
    if residual is not None:
        pc_inv[residual] = -np.inf

    # Analyze trapping and adjust computed images accordingly
    trapped = None
    if outlets is not None:
        seq = pc_to_seq(pc_inv, im=im, mode='drainage')
        trapped = find_trapped_regions(
            im=im,
            seq=seq,
            outlets=outlets,
            method='cluster',
        )
        trapped[seq == -1] = True
        pc_inv[trapped] = np.inf
        if residual is not None:  # Re-add residual to inv
            pc_inv[residual] = -np.inf

    # Initialize results object
    results = Results()
    results.im_satn = pc_to_satn(pc=pc_inv, im=im, mode='drainage')
    results.im_seq = pc_to_seq(pc=pc_inv, im=im, mode='drainage')
    results.im_pc = pc_inv
    if trapped is not None:
        results.im_seq[trapped] = -1
        results.im_satn[trapped] = -1
        results.im_pc[trapped] = np.inf
    if return_sizes:
        pc_size[pc_inv == np.inf] = np.inf
        pc_size[pc_inv == -np.inf] = -np.inf
        results.im_size = pc_size
    results.pc, results.snwp = pc_curve(im=im, pc=results.im_pc)
    return results


[docs] def drainage( im: npt.NDArray, pc: npt.NDArray = None, dt: npt.NDArray = None, inlets: npt.NDArray = None, outlets: npt.NDArray = None, residual: npt.NDArray = None, bins: int = 25, return_sizes: bool = False, ): results = ibop( im=im, pc=pc, dt=dt, inlets=inlets, outlets=outlets, residual=residual, bins=bins, return_sizes=return_sizes, ) return results
drainage.__doc__ = ibop.__doc__ if __name__ == "__main__": import numpy as np import porespy as ps import matplotlib.pyplot as plt from copy import copy from edt import edt # %% Run this cell to regenerate the variables in drainage bg = 'white' plots = True im = ps.generators.blobs( shape=[500, 500], porosity=0.7, blobiness=1.5, ) im = ps.filters.fill_blind_pores(im, surface=True) inlets = np.zeros_like(im) inlets[0, :] = True outlets = np.zeros_like(im) outlets[-1, :] = True lt = ps.filters.local_thickness(im) dt = edt(im) residual = lt > 25 bins = 25 pc = ps.filters.capillary_transform( im=im, dt=dt, sigma=0.072, theta=180, rho_nwp=1000, rho_wp=0, g=0, voxel_size=1e-4, ) # %% Run different drainage simulations drn1 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, ) drn2 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, outlets=outlets, ) drn3 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, residual=residual, ) drn4 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, outlets=outlets, residual=residual, ) drn5 = ps.simulations.drainage( im=im, pc=pc, ) # %% Visualize the invasion configurations for each scenario if plots: cmap = copy(plt.cm.plasma) cmap.set_under(color='black') cmap.set_over(color='grey') # cmap.set_bad(color='white') vmax = pc.max()*2 fig, ax = plt.subplots(2, 3, facecolor=bg) tmp = np.copy(drn1.im_pc) tmp[~im] = -1 tmp[tmp == np.inf] = vmax*2 tmp[tmp == np.inf] = -1 ax[0][0].imshow(tmp, cmap=cmap, vmin=0, vmax=vmax) ax[0][0].set_title("No trapping, no residual") tmp = np.copy(drn2.im_pc) tmp[~im] = -1 tmp[tmp == np.inf] = vmax*2 tmp[tmp == np.inf] = -1 ax[0][1].imshow(tmp, cmap=cmap, vmin=0, vmax=vmax) ax[0][1].set_title("With trapping, no residual") tmp = np.copy(drn3.im_pc) tmp[~im] = -1 tmp[tmp == np.inf] = vmax*2 tmp[tmp == np.inf] = -1 ax[1][0].imshow(tmp, cmap=cmap, vmin=0, vmax=vmax) ax[1][0].set_title("No trapping, with residual") tmp = np.copy(drn4.im_pc) tmp[~im] = -1 tmp[tmp == np.inf] = vmax*2 tmp[tmp == np.inf] = -1 ax[1][1].imshow(tmp, cmap=cmap, vmin=0, vmax=vmax) ax[1][1].set_title("With trapping, with residual") tmp = np.copy(drn5.im_pc) tmp[~im] = -1 tmp[tmp == np.inf] = vmax*2 tmp[tmp == np.inf] = -1 ax[0][2].imshow(tmp, cmap=cmap, vmin=0, vmax=vmax) ax[0][2].set_title("No access limitations") ax[1][2].step(drn1.pc, drn1.snwp, 'b-o', where='post', label="No trapping, no residual") ax[1][2].step(drn2.pc, drn2.snwp, 'r--o', where='post', label="With trapping, no residual") ax[1][2].step(drn3.pc, drn3.snwp, 'g--o', where='post', label="No trapping, with residual") ax[1][2].step(drn4.pc, drn4.snwp, 'm--o', where='post', label="With trapping, with residual") ax[1][2].legend()