Source code for porespy.simulations._drainage

import numpy as np
import numpy.typing as npt
from typing import Literal
from skimage.morphology import ball, disk, cube, square
from porespy import settings
from porespy.metrics import pc_map_to_pc_curve
from porespy.tools import (
    _insert_disks_at_points,
    _insert_disks_at_points_parallel,
    get_tqdm,
    Results,
)
from porespy.filters import (
    trim_disconnected_blobs,
    find_trapped_regions,
    pc_to_satn,
    pc_to_seq,
)
from porespy.generators import (
    borders,
)
try:
    from pyedt import edt
except ModuleNotFoundError:
    from edt import edt


__all__ = [
    'drainage',
    'ibop',
]


tqdm = get_tqdm()


def ibop(
    im: npt.NDArray,
    pc: npt.NDArray = None,
    dt: npt.NDArray = None,
    inlets: npt.NDArray = None,
    outlets: npt.NDArray = None,
    residual: npt.NDArray = None,
    steps: int = None,
    return_sizes: bool = False,
    conn: Literal['min', 'max'] = 'min',
    min_size: int = 0,
):

    im = np.array(im, dtype=bool)

    if dt is None:
        dt = edt(im)

    if inlets is None:
        inlets = borders(shape=im.shape, mode='faces') * im

    if outlets is not None:
        outlets = outlets*im
        if np.sum(inlets * outlets):
            raise Exception('Specified inlets and outlets overlap')

    if pc is None:
        pc = 2.0/dt
    pc[~im] = 0  # Remove any infs or nans from pc computation

    if isinstance(steps, int):  # Use values in pc for invasion steps
        vmax = pc[pc < np.inf].max()
        vmin = pc[im][pc[im] > -np.inf].min()
        Ps = np.linspace(vmin, vmax*1.1, steps)
    elif steps is None:
        Ps = np.unique(pc[im])
    else:
        Ps = np.unique(steps)  # To ensure they are in ascending order

    # Initialize empty arrays to accumulate results of each loop
    nwp_mask = np.zeros_like(im, dtype=bool)
    im_seq = np.zeros_like(im, dtype=int)
    im_pc = np.zeros_like(im, dtype=float)
    im_size = np.zeros_like(im, dtype=float)
    seeds = np.zeros_like(im, dtype=bool)

    # Begin IBOP algorithm
    if conn == 'min':
        strel = ball(1) if im.ndim == 3 else disk(1)
    elif conn == 'max':
        strel = cube(3) if im.ndim == 3 else square(3)
    else:
        raise Exception(f"Unrecognized value for conn ({conn})")
    step = 1
    for p in tqdm(Ps, **settings.tqdm):
        # Find all locations in image invadable at current pressure
        invadable = (pc <= p)*im
        # Trim locations not connected to the inlets
        if inlets is not None:
            invadable = trim_disconnected_blobs(
                im=invadable,
                inlets=inlets,
                strel=strel,
            )
        # Isolate only newly found locations to speed up inserting
        temp = invadable*(~seeds)
        # Find (i, j, k) coordinates of new locations
        coords = np.where(temp)
        # Add new locations to list of invaded locations
        seeds += invadable
        # Extract the local size of sphere to insert at each new location
        radii = dt[coords]
        # Insert spheres of given radii at new locations
        nwp_mask = _insert_disks_at_points_parallel(
            im=nwp_mask,
            coords=np.vstack(coords),
            radii=radii.astype(int),
            v=True,
            smooth=True,
            overwrite=False,
        )
        # Values to images using mask
        mask = nwp_mask * (im_seq == 0) * im
        if np.any(mask):
            im_seq[mask] = step
            im_pc[mask] = p
            step += 1
            if return_sizes and (np.size(radii) > 0):
                im_size[mask] = np.amin(radii)

        # Deal with impact of residual, if present
        if residual is not None:
            # Find residual connected to current invasion front
            inv_temp = (im_pc > 0)
            if np.any(inv_temp):
                # Find invadable pixels connected to surviving residual
                temp = trim_disconnected_blobs(
                    residual, inv_temp, strel=strel)*~inv_temp
                if np.any(temp):
                    # Trim invadable pixels not connected to residual
                    new_seeds = trim_disconnected_blobs(invadable, temp, strel=strel)
                    # Find (i, j, k) coordinates of new locations
                    coords = np.where(new_seeds)
                    # Add new locations to list of invaded locations
                    seeds += new_seeds
                    # Extract the local size of sphere to insert at each new location
                    radii = dt[coords].astype(int)
                    # Insert spheres of given radii at new locations
                    nwp_mask = _insert_disks_at_points_parallel(
                        im=nwp_mask,
                        coords=np.vstack(coords),
                        radii=radii.astype(int),
                        v=True,
                        smooth=True,
                        overwrite=False,
                    )

                    mask = nwp_mask * (im_seq == 0) * im
                    if np.any(mask):
                        im_seq[mask] = step
                        im_pc[mask] = p
                        step += 1
                        if return_sizes and (np.size(radii) > 0):
                            im_size[mask] = np.amin(radii)

    # Set uninvaded voxels to inf
    im_pc[(im_pc == 0)*im] = np.inf

    # Add residual if given
    if residual is not None:
        im_pc[residual] = -np.inf
        im_seq[residual] = 0

    # Analyze trapping and adjust computed images accordingly
    trapped = None
    if outlets is not None:
        trapped = find_trapped_regions(
            im=im,
            seq=im_seq,
            outlets=outlets,
            method='cluster',
            min_size=min_size,
        )
        trapped[im_seq == -1] = True
        im_pc[trapped] = np.inf  # Trapped defender only displaced as Pc -> inf
        if residual is not None:  # Re-add residual to inv
            im_pc[residual] = -np.inf  # Residual defender is always present

    # Initialize results object
    results = Results()
    results.im_snwp = pc_to_satn(pc=im_pc, im=im, mode='drainage')
    results.im_seq = im_seq
    # results.im_seq = pc_to_seq(pc=pc_inv, im=im, mode='drainage')
    results.im_pc = im_pc
    results.im_trapped = trapped
    if trapped is not None:
        results.im_seq[trapped] = -1
        results.im_snwp[trapped] = -1
        results.im_pc[trapped] = np.inf
    if return_sizes:
        im_size[im_pc == np.inf] = np.inf
        im_size[im_pc == -np.inf] = -np.inf
        results.im_size = im_size
    results.pc, results.snwp = pc_map_to_pc_curve(
        im=im,
        pc=results.im_pc,
        seq=results.im_seq,
        mode='drainage',
    )
    return results


[docs] def drainage( im: npt.NDArray, pc: npt.NDArray = None, dt: npt.NDArray = None, inlets: npt.NDArray = None, outlets: npt.NDArray = None, residual: npt.NDArray = None, steps: int = 25, return_sizes: bool = False, min_size: int = 0, ): r""" Simulate drainage using image-based sphere insertion, optionally including gravity Parameters ---------- im : ndarray The image of the porous media with ``True`` values indicating the void space. pc : ndarray, optional Precomputed capillary pressure transform which is used to determine the invadability of each voxel. If not provided then it is calculated as `2/dt`. dt : ndarray (optional) The distance transform of ``im``. If not provided it will be calculated, so supplying it saves time. inlets : ndarray, optional A boolean image the same shape as ``im``, with ``True`` values indicating the inlet locations. If not specified then access limitations are not applied so the result is essentially a local thickness filter. outlets : ndarray, optional A boolean image with ``True`` values indicating the outlet locations. If this is provided then trapped voxels of wetting phase are found and all the output images are adjusted accordingly. Note that trapping can be assessed during postprocessing as well. residual : ndarray, optional A boolean array indicating the locations of any residual invading phase. This is added to the intermediate image prior to trimming disconnected clusters, so will create connections to some clusters that would otherwise be removed. The residual phase is indicated in the final image by ``-np.inf`` values, since these are invaded at all applied capillary pressures. steps : int or array_like (default = 25) The range of pressures to apply. If an integer is given then the given number of steps will be created between the lowest and highest values in ``pc``. If a list is given, each value in the list is used in ascending order. If `None` is given then all the possible values in `pc` are used. return_sizes : bool, default = `False` If `True` then an array containing the size of the sphere which first overlapped each pixel is returned. This array is not computed by default as computing it increases computation time. conn : str Controls the shape of the structuring element used to find neighboring voxels when looking connectivity of invading blobs. Options are: ========= ================================================================== Option Description ========= ================================================================== 'min' This corresponds to a cross with 4 neighbors in 2D and 6 neighbors in 3D. 'max' This corresponds to a square or cube with 8 neighbors in 2D and 26 neighbors in 3D. ========= ================================================================== min_size : int Any clusters of trapped voxels smaller than this size will be set to not trapped. This argument is only used if `outlets` is given. This is useful to prevent small voxels along edges of the void space from being set to trapped. These can appear to be trapped due to the jagged nature of the digital image. The default is 0, meaning this adjustment is not applied, but a value of 3 or 4 is recommended to activate this adjustment. Returns ------- results : Results object A dataclass-like object with the following attributes: ========== ============================================================ Attribute Description ========== ============================================================ im_seq An ndarray with each voxel indicating the step number at which it was first invaded by non-wetting phase im_satn A numpy array with each voxel value indicating the global value of the non-wetting phase saturation at the point it was invaded im_size If `return_sizes` was set to `True`, then a numpy array with each voxel containing the radius of the sphere, in voxels, that first overlapped it. im_pc A numpy array with each voxel value indicating the capillary pressure at which it was invaded. im_trapped A numpy array with ``True`` values indicating trapped voxels if `outlets` was provided, otherwise will be `None`. pc 1D array of capillary pressure values that were applied swnp 1D array of non-wetting phase saturations for each applied value of capillary pressure (``pc``). ========== ============================================================ See Also -------- drainage Notes ----- This algorithm only provides sensible results for gravity stabilized configurations, meaning the more dense fluid is on the bottom. Be sure that ``inlets`` are specified accordingly. References ---------- .. [1] Chadwick EA, Hammen LH, Schulz VP, Bazylak A, Ioannidis MA, Gostick JT. Incorporating the effect of gravity into image-based drainage simulations on volumetric images of porous media. `Water Resources Research. <https://doi.org/10.1029/2021WR031509>`_. 58(3), e2021WR031509 (2022) Examples -------- `Click here <https://porespy.org/examples/simulations/reference/drainage.html>`_ to view online example. """ results = ibop( im=im, pc=pc, dt=dt, inlets=inlets, outlets=outlets, residual=residual, steps=steps, return_sizes=return_sizes, min_size=min_size, ) return results
if __name__ == "__main__": import numpy as np import porespy as ps import matplotlib.pyplot as plt from copy import copy ps.visualization.set_mpl_style() cm = copy(plt.cm.turbo) cm.set_under('grey') cm.set_over('k') # %% Run this cell to regenerate the variables in drainage bg = 'white' plots = True im = ps.generators.blobs( shape=[500, 500], porosity=0.7, blobiness=1.5, seed=16, ) im = ps.filters.fill_blind_pores(im, surface=True) inlets = np.zeros_like(im) inlets[0, :] = True outlets = np.zeros_like(im) outlets[-1, :] = True lt = ps.filters.local_thickness(im) dt = edt(im) residual = lt > 25 steps = 25 pc = ps.filters.capillary_transform( im=im, dt=dt, sigma=0.072, theta=180, rho_nwp=1000, rho_wp=0, g=0, voxel_size=1e-4, ) # %% Run different drainage simulations drn1 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, steps=100, ) drn2 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, outlets=outlets, steps=100, ) drn3 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, residual=residual, steps=100, ) drn4 = ps.simulations.drainage( im=im, pc=pc, inlets=inlets, outlets=outlets, residual=residual, steps=100, ) drn5 = ps.simulations.drainage( im=im, pc=pc, steps=100, ) # %% Visualize the invasion configurations for each scenario if plots: fig, ax = plt.subplot_mosaic( [['(a)', '(b)', '(e)', '(e)'], ['(c)', '(d)', '(e)', '(e)']], figsize=[12, 8], ) # drn1.im_pc[~im] = -1 ax['(a)'].imshow(drn1.im_seq/im, origin='lower', cmap=cm, vmin=0) vmax = drn2.im_seq.max() ax['(b)'].imshow(drn2.im_seq/im, origin='lower', cmap=cm, vmin=0, vmax=vmax) ax['(c)'].imshow(drn3.im_seq/im, origin='lower', cmap=cm, vmin=0) ax['(d)'].imshow(drn4.im_seq/im, origin='lower', cmap=cm, vmin=0) pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn1.im_pc, seq=drn1.im_seq, im=im, mode='drainage') ax['(e)'].plot(pc, s, 'b->', label='drainage') pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn2.im_pc, seq=drn2.im_seq, im=im, mode='drainage') ax['(e)'].plot(pc, s, 'r-<', label='drainage w trapping') pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn3.im_pc, seq=drn3.im_seq, im=im, mode='drainage') ax['(e)'].plot(pc, s, 'g-^', label='drainage w residual') pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn4.im_pc, seq=drn4.im_seq, im=im, mode='drainage') ax['(e)'].plot(pc, s, 'm-*', label='drainage w residual & trapping') ax['(e)'].legend()