Source code for porespy.simulations._imbibition

import inspect
import numpy as np
from porespy.filters import (
    find_trapped_regions,
    seq_to_satn,
    trim_disconnected_blobs,
    fftmorphology,
)
from porespy.metrics import pc_map_to_pc_curve
from porespy.tools import (
    Results,
    get_tqdm,
    _insert_disks_at_points_parallel,
    _insert_disk_at_points_parallel,
    _insert_disk_at_points,
    ps_round,
    make_contiguous,
)
from porespy import settings
from edt import edt
from numba import njit, prange


tqdm = get_tqdm()


__all__ = [
    'imbibition',
    # The following are reference implementations using different techniques
    'imbibition_dt',
    'imbibition_dt_fft',
    'imbibition_fft',
    'imbibition_dsi',
]


[docs] def imbibition_dsi( im, inlets=None, outlets=None, steps=None, smooth=True, ): r""" Performs a distance transform based imbibition simulation using direct sphere insertion to accomplish dilation and distance transform thresholding for erosion Parameters ---------- im : ndarray The boolean image of the void space on which to perform the simulation inlets : ndarray (optional) A boolean array with `True` values indicating the inlet locations for the invading (wetting) fluid. If not provided then access limitations will not be applied, meaning that the invading fluid can appear anywhere within the domain. outlets : ndarray (optional) A boolean array with `True` values indicating the outlet locations through which defending (non-wetting) phase would exit the domain. If not provided then trapping of the non-wetting phase is ignored. steps : scalar or array_like A list of which sphere sizes to invade. If `None` (default) then each unique integer value in the distance transform is used. If a scalar then a list of steps is generated from `steps` to 1. smooth : boolean If `True` (default) then the spheres are drawn without any single voxel protrusions on the faces. Returns ------- results : Dataclass-like object An object with the following attributes: ----------- ---------------------------------------------------------------- Attribute Description ----------- ---------------------------------------------------------------- `im_seq` The sequence map indicating the sequence or step number at which each voxels was first invaded. `im_size` The size map indicating the size of the sphere being drawn when each voxel was first invaded. ----------- ---------------------------------------------------------------- Notes ----- The sphere insert stesps will be executed in parallel if `porespy.settings.ncores > 1` """ if settings.ncores > 1: func = _insert_disk_at_points_parallel else: func = _insert_disk_at_points im = np.array(im, dtype=bool) dt = edt(im, parallel=settings.ncores) dt_int = dt.astype(int) if steps is None: bins = np.unique(np.around(dt[im], decimals=0)) elif type(steps) is int: bins = np.arange(1, steps, 1) else: bins = np.unique(steps) bins = bins[bins > 0] im_seq = -np.ones_like(im, dtype=int) im_size = np.zeros_like(im, dtype=float) nwp = np.zeros_like(im, dtype=bool) desc = inspect.currentframe().f_code.co_name # Get current func name for i, r in enumerate(tqdm(bins, desc=desc, **settings.tqdm)): if smooth: seeds = dt >= r edges = dt_int == r else: seeds = dt > r edges = (dt > r)*(dt_int <= (r + 1)) coords = np.vstack(np.where(edges)) nwp.fill(False) if coords.size > 0: nwp = func( im=nwp, coords=coords, r=int(r), v=True, smooth=smooth, ) nwp[seeds] = True wp = (~nwp)*im if inlets is not None: wp = trim_disconnected_blobs(wp, inlets=inlets) mask = wp*(im_seq == -1) im_size[mask] = r im_seq[mask] = i + 1 if outlets is not None: trapped = find_trapped_regions( im=im, seq=im_seq, outlets=outlets, return_mask=True, conn='min', method='cluster', min_size=0, ) im_seq[trapped] = -1 im_seq = make_contiguous(im_seq, mode='symmetric') im_size[trapped] = -1 results = Results() results.im_seq = im_seq*im results.im_size = im_size*im return results
[docs] def imbibition_dt_fft( im, inlets=None, outlets=None, residual=None, steps=None, smooth=True, ): r""" Performs a distance transform based imbibition simulation using distance transform thresholding for the erosion step and fft-based convolution for the dilation step. Parameters ---------- im : ndarray The boolean image of the void space on which to perform the simulation inlets : ndarray (optional) A boolean array with `True` values indicating the inlet locations for the invading (wetting) fluid. If not provided then access limitations will not be applied, meaning that the invading fluid can appear anywhere within the domain. outlets : ndarray (optional) A boolean array with `True` values indicating the outlet locations through which defending (non-wetting) phase would exit the domain. If not provided then trapping of the non-wetting phase is ignored. steps : scalar or array_like A list of which sphere sizes to invade. If `None` (default) then each unique integer value in the distance transform is used. If a scalar then a list of steps is generated from `steps` to 1. smooth : boolean If `True` (default) then the spheres are drawn without any single voxel protrusions on the faces. Returns ------- results : Dataclass-like object An object with the following attributes: ----------- ---------------------------------------------------------------- Attribute Description ----------- ---------------------------------------------------------------- `im_seq` The sequence map indicating the sequence or step number at which each voxels was first invaded. `im_size` The size map indicating the size of the sphere being drawn when each voxel was first invaded. ----------- ---------------------------------------------------------------- Notes ----- The distance transform will be executed in parallel if `porespy.settings.ncores > 1` """ im = np.array(im, dtype=bool) dt = edt(im, parallel=settings.ncores) if steps is None: bins = np.unique(np.around(dt[im], decimals=0)) elif type(steps) is int: bins = np.arange(1, steps, 1) else: bins = np.unique(steps) bins = bins[bins > 0] im_seq = -np.ones_like(im, dtype=int) im_size = np.zeros_like(im, dtype=float) desc = inspect.currentframe().f_code.co_name # Get current func name for i, r in enumerate(tqdm(bins, desc=desc, **settings.tqdm)): # Perform erosion using dt seeds = dt >= r if smooth else dt > r # Perform dilation using convolution se = ps_round(r, ndim=im.ndim, smooth=smooth) wp = im*~fftmorphology(seeds, se, mode='dilation') # Trimming disconnected wetting phase if inlets is not None: wp = trim_disconnected_blobs(wp, inlets=inlets) # TODO: Not sure this residual code works # if residual is not None: # blobs = trim_disconnected_blobs(residual, inlets=wp) # seeds2 = trim_disconnected_blobs(seeds, inlets=blobs + inlets) # wp = im*~fftmorphology(seeds2, se, mode='dilation') mask = wp*(im_seq == -1) im_size[mask] = r im_seq[mask] = i+1 # if residual is not None: # im_seq[im_seq > 0] += 1 # im_seq[residual] = 1 # im_size[residual] = np.inf # Apply trapping as a post-processing step if outlets given if outlets is not None: trapped = find_trapped_regions( im=im, seq=im_seq, outlets=outlets, return_mask=True, conn='min', method='cluster', min_size=0, ) im_seq[trapped] = -1 im_seq = make_contiguous(im_seq, mode='symmetric') im_size[trapped] = -1 results = Results() results.im_seq = im_seq*im results.im_size = im_size*im return results
[docs] def imbibition_dt( im, inlets=None, outlets=None, residual=None, steps=None, smooth=True, ): r""" Performs a distance transform based imbibition simulation using distance transform thresholding for the erosion step and a second distance transform for the dilation step. Parameters ---------- im : ndarray The boolean image of the void space on which to perform the simulation inlets : ndarray (optional) A boolean array with `True` values indicating the inlet locations for the invading (wetting) fluid. If not provided then access limitations will not be applied, meaning that the invading fluid can appear anywhere within the domain. outlets : ndarray (optional) A boolean array with `True` values indicating the outlet locations through which defending (non-wetting) phase would exit the domain. If not provided then trapping of the non-wetting phase is ignored. steps : scalar or array_like A list of which sphere sizes to invade. If `None` (default) then each unique integer value in the distance transform is used. If a scalar then a list of steps is generated from `steps` to 1. smooth : boolean If `True` (default) then the spheres are drawn without any single voxel protrusions on the faces. Returns ------- results : Results object A dataclass-like object with the following attributes: ========== ================================================================= Attribute Description ========== ================================================================= im_seq A numpy array with each voxel value indicating the sequence at which it was invaded. Values of -1 indicate that it was not invaded. im_size A numpy array with each voxel value indicating the radius of spheres being inserted when it was invaded. ========== ================================================================= Notes ----- The distance transforms will be executed in parallel if `porespy.settings.ncores > 1` """ im = np.array(im, dtype=bool) dt = edt(im, parallel=settings.ncores) if steps is None: bins = np.unique(np.around(dt[im], decimals=0)) elif type(steps) is int: bins = np.arange(1, steps, 1) else: bins = np.unique(steps) bins = bins[bins > 0] im_seq = -np.ones_like(im, dtype=int) im_size = np.zeros_like(im, dtype=float) desc = inspect.currentframe().f_code.co_name # Get current func name for i, r in enumerate(tqdm(bins, desc=desc, **settings.tqdm)): # Perform erosion using dt seeds = dt >= r if smooth else dt > r # Perform dilation using dt tmp = edt(~seeds, parallel=settings.ncores) wp = ~(tmp < r) if smooth else ~(tmp <= r) wp[~im] = 0 # Trimming disconnected wetting phase if inlets is not None: wp = trim_disconnected_blobs(wp, inlets=inlets) # TODO: Not sure this residual code works # if residual is not None: # blobs = trim_disconnected_blobs(residual, inlets=wp) # seeds2 = trim_disconnected_blobs(seeds, inlets=blobs + inlets) # wp = im*~fftmorphology(seeds2, se, mode='dilation') mask = wp*(im_seq == -1) im_size[mask] = r im_seq[mask] = i+1 # if residual is not None: # im_seq[im_seq > 0] += 1 # im_seq[residual] = 1 # im_size[residual] = np.inf # Apply trapping as a post-processing step if outlets given if outlets is not None: trapped = find_trapped_regions( im=im, seq=im_seq, outlets=outlets, return_mask=True, conn='min', method='cluster', min_size=0, ) im_seq[trapped] = -1 im_seq = make_contiguous(im_seq, mode='symmetric') im_size[trapped] = -1 results = Results() results.im_seq = im_seq*im results.im_size = im_size*im return results
[docs] def imbibition_fft( im, inlets=None, outlets=None, residual=None, steps=None, smooth=True, ): r""" Performs a distance transform based imbibition simulation using fft-based convolution for both the erosion and dilation steps Parameters ---------- im : ndarray The boolean image of the void space on which to perform the simulation inlets : ndarray (optional) A boolean array with `True` values indicating the inlet locations for the invading (wetting) fluid. If not provided then access limitations will not be applied, meaning that the invading fluid can appear anywhere within the domain. outlets : ndarray (optional) A boolean array with `True` values indicating the outlet locations through which defending (non-wetting) phase would exit the domain. If not provided then trapping of the non-wetting phase is ignored. steps : scalar or array_like A list of which sphere sizes to invade. If `None` (default) then each unique integer value in the distance transform is used. If a scalar then a list of steps is generated from `steps` to 1. smooth : boolean If `True` (default) then the spheres are drawn without any single voxel protrusions on the faces. Returns ------- results : Dataclass-like object An object with the following attributes: ----------- ---------------------------------------------------------------- Attribute Description ----------- ---------------------------------------------------------------- `im_seq` The sequence map indicating the sequence or step number at which each voxels was first invaded. `im_size` The size map indicating the size of the sphere being drawn when each voxel was first invaded. ----------- ---------------------------------------------------------------- """ im = np.array(im, dtype=bool) dt = edt(im, parallel=settings.ncores) if steps is None: bins = np.unique(np.around(dt[im], decimals=0)) elif type(steps) is int: bins = np.arange(1, steps, 1) else: bins = np.unique(steps) bins = bins[bins > 0] im_seq = -np.ones_like(im, dtype=int) im_size = np.zeros_like(im, dtype=float) desc = inspect.currentframe().f_code.co_name # Get current func name for i, r in enumerate(tqdm(bins, desc=desc, **settings.tqdm)): # Perform erosion using convolution se = ps_round(r, ndim=im.ndim, smooth=smooth) seeds = ~fftmorphology(~im, se, mode='dilation') # Perform dilation using convolution wp = im*~fftmorphology(seeds, se, mode='dilation') # Trimming disconnected wetting phase if inlets is not None: wp = trim_disconnected_blobs(wp, inlets=inlets) # TODO: Not sure this residual code works # if residual is not None: # blobs = trim_disconnected_blobs(residual, inlets=wp) # seeds2 = trim_disconnected_blobs(seeds, inlets=blobs + inlets) # wp = im*~fftmorphology(seeds2, se, mode='dilation') mask = wp*(im_seq == -1) im_size[mask] = r im_seq[mask] = i+1 # if residual is not None: # im_seq[im_seq > 0] += 1 # im_seq[residual] = 1 # im_size[residual] = np.inf # Apply trapping as a post-processing step if outlets given if outlets is not None: trapped = find_trapped_regions( im=im, seq=im_seq, outlets=outlets, return_mask=True, conn='min', method='cluster', min_size=0, ) im_seq[trapped] = -1 im_seq = make_contiguous(im_seq, mode='symmetric') im_size[trapped] = -1 results = Results() results.im_seq = im_seq*im results.im_size = im_size*im return results
[docs] def imbibition( im, pc=None, dt=None, inlets=None, outlets=None, residual=None, steps=25, min_size=0, conn='min', ): r""" Performs an imbibition simulation using image-based sphere insertion Parameters ---------- im : ndarray The image of the porous materials with void indicated by ``True`` pc : ndarray An array containing precomputed capillary pressure values in each voxel. This can include gravity effects or not. This can be generated by ``capillary_transform``. If not provided then `2/dt` is used. dt : ndarray (optional) The distance transform of ``im``. If not provided it will be calculated, so supplying it saves time. inlets : ndarray An image the same shape as ``im`` with ``True`` values indicating the wetting fluid inlet(s). If ``None`` then the wetting film is able to appear anywhere within the domain. residual : ndarray, optional A boolean mask the same shape as ``im`` with ``True`` values indicating to locations of residual wetting phase. steps : int or array_like (default = 25) The range of pressures to apply. If an integer is given then steps will be created between the lowest and highest pressures in ``pc``. If a list is given, each value in the list is used directly in order. min_size : int Any clusters of trapped voxels smaller than this size will be set to not trapped. This argument is only used if `outlets` is given. This is useful to prevent small voxels along edges of the void space from being set to trapped. These can appear to be trapped due to the jagged nature of the digital image. The default is 0, meaning this adjustment is not applied, but a value of 3 or 4 is recommended to activate this adjustment. conn : str Can be either `'min'` or `'max'` and controls the shape of the structuring element used to determine voxel connectivity. The default is `'min'` which imposes the strictest criteria, so that voxels must share a face to be considered connected. Returns ------- results : Result Object A dataclass-like object with the following attributes: ----------- ---------------------------------------------------------------- Attribute Description ----------- ---------------------------------------------------------------- im_pc An ndarray with each voxel indicating the step number at which it was first invaded by wetting phase. im_seq A numpy array with each voxel value indicating the sequence at which it was invaded by the wetting phase. Values of -1 indicate that it was not invaded, either because it was trapped, inaccessbile, or sufficient pressure was not reached. im_snwp A numpy array with each voxel value indicating the global non-wetting phase saturation at the point it was invaded. im_trapped A numpy array with ``True`` values indicating trapped voxels if `outlets` was provided, otherwise will be `None`. pc 1D array of capillary pressure values that were applied snwp 1D array of non-wetting phase saturations for each applied value of capillary pressure (``pc``). ----------- ---------------------------------------------------------------- Notes ----- The simulation proceeds as though the non-wetting phase pressure is very high and is slowly lowered. Then imbibition occurs into the smallest accessible regions at each step. Closed or inaccessible pores are assumed to be filled with wetting phase. Examples -------- """ if dt is None: dt = edt(im) if pc is None: pc = 2/dt pc = np.copy(pc) pc[~im] = 0 # Remove any infs or nans from pc computation if isinstance(steps, int): mask = np.isfinite(pc)*im Ps = np.logspace(np.log10(pc[mask].max()), np.log10(pc[mask].min()*.99), steps) else: Ps = np.unique(steps)[::-1] # To ensure they are in descending order # Initialize empty arrays to accumulate results of each loop im_pc = np.zeros_like(im, dtype=float) im_seq = np.zeros_like(im, dtype=int) im_size = np.zeros_like(im, dtype=int) for step, P in enumerate(tqdm(Ps, **settings.tqdm)): # This can be made faster if I find a way to get only seeds on edge, so # less spheres need to be drawn invadable = (pc <= P)*im nwp_mask = np.zeros_like(im, dtype=bool) if np.any(invadable): coords = np.where(invadable) radii = dt[coords].astype(int) nwp_mask = _insert_disks_at_points_parallel( im=nwp_mask, coords=np.vstack(coords), radii=radii, v=True, smooth=True, overwrite=True, ) if inlets is not None: nwp_mask = ~trim_disconnected_blobs( im=(~nwp_mask)*im, inlets=inlets, conn=conn, )*im if residual is not None: nwp_mask = nwp_mask * ~residual mask = (nwp_mask == 0) * (im_seq == 0) * im if np.any(mask): im_seq[mask] = step im_pc[mask] = P im_size[mask] = np.amin(radii) trapped = None if outlets is not None: if inlets is not None: outlets[inlets] = False # Ensure outlets do not overlap inlets trapped = find_trapped_regions( im=im, seq=im_seq, outlets=outlets, return_mask=True, method='cluster', min_size=min_size, ) im_pc[trapped] = -np.inf im_seq[trapped] = -1 if residual is not None: im_pc[residual] = np.inf im_seq[residual] = 0 satn = seq_to_satn(im=im, seq=im_seq, mode='imbibition') # Collect data in a Results object results = Results() results.im_snwp = satn results.im_seq = im_seq results.im_pc = im_pc results.im_trapped = trapped pc_curve = pc_map_to_pc_curve(pc=im_pc, im=im, seq=im_seq, mode='imbibition') results.pc = pc_curve.pc results.snwp = pc_curve.snwp return results
@njit(parallel=True) def _insert_disks_npoints_nradii_1value_parallel( im, coords, radii, v, overwrite=False, smooth=False, ): # pragma: no cover if im.ndim == 2: xlim, ylim = im.shape for row in prange(len(coords[0])): i, j = coords[0][row], coords[1][row] r = radii[row] for a, x in enumerate(range(i-r, i+r+1)): if (x >= 0) and (x < xlim): for b, y in enumerate(range(j-r, j+r+1)): if (y >= 0) and (y < ylim): R = ((a - r)**2 + (b - r)**2)**0.5 if (R <= r)*(~smooth) or (R < r)*(smooth): if overwrite or (im[x, y] == 0): im[x, y] = v else: xlim, ylim, zlim = im.shape for row in prange(len(coords[0])): i, j, k = coords[0][row], coords[1][row], coords[2][row] r = radii[row] for a, x in enumerate(range(i-r, i+r+1)): if (x >= 0) and (x < xlim): for b, y in enumerate(range(j-r, j+r+1)): if (y >= 0) and (y < ylim): for c, z in enumerate(range(k-r, k+r+1)): if (z >= 0) and (z < zlim): R = ((a - r)**2 + (b - r)**2 + (c - r)**2)**0.5 if (R <= r)*(~smooth) or (R < r)*(smooth): if overwrite or (im[x, y, z] == 0): im[x, y, z] = v return im # %% if __name__ == '__main__': import porespy as ps import matplotlib.pyplot as plt import numpy as np from edt import edt from copy import copy ps.visualization.set_mpl_style() cm = copy(plt.cm.turbo) cm.set_under('grey') cm.set_over('k') i = np.random.randint(1, 100000) # bad: 38364, good: 65270, 71698 i = 95063 print(i) im = ps.generators.blobs([500, 500], porosity=0.65, blobiness=2, seed=i) im = ps.filters.fill_closed_pores(im, surface=True) inlets = ps.generators.faces(im.shape, inlet=0) outlets = ps.generators.faces(im.shape, outlet=0) lt = ps.filters.local_thickness(im) residual = (lt < 8)*im pc = ps.filters.capillary_transform(im=im, voxel_size=1e-4) imb1 = imbibition(im=im, pc=pc, inlets=inlets) imb2 = imbibition(im=im, pc=pc, inlets=inlets, outlets=outlets) imb3 = imbibition(im=im, pc=pc, inlets=inlets, residual=residual) imb4 = imbibition(im=im, pc=pc, inlets=inlets, outlets=outlets, residual=residual) # %% fig, ax = plt.subplot_mosaic( [['(a)', '(b)', '(e)', '(e)'], ['(c)', '(d)', '(e)', '(e)']], figsize=[12, 8], ) # imb1.im_pc[~im] = -1 ax['(a)'].imshow(imb1.im_seq/im, origin='lower', cmap=cm, vmin=0) vmax = imb2.im_seq.max() ax['(b)'].imshow(imb2.im_seq/im, origin='lower', cmap=cm, vmin=0, vmax=vmax) ax['(c)'].imshow(imb3.im_seq/im, origin='lower', cmap=cm, vmin=0) ax['(d)'].imshow(imb4.im_seq/im, origin='lower', cmap=cm, vmin=0) pc, s = ps.metrics.pc_map_to_pc_curve( pc=imb1.im_pc, seq=imb1.im_seq, im=im, mode='imbibition') ax['(e)'].semilogx(pc, s, 'b->', label='imbibition') pc, s = ps.metrics.pc_map_to_pc_curve( pc=imb2.im_pc, seq=imb2.im_seq, im=im, mode='imbibition') ax['(e)'].semilogx(pc, s, 'r-<', label='imbibition w trapping') pc, s = ps.metrics.pc_map_to_pc_curve( pc=imb3.im_pc, seq=imb3.im_seq, im=im, mode='imbibition') ax['(e)'].semilogx(pc, s, 'g-^', label='imbibition w residual') pc, s = ps.metrics.pc_map_to_pc_curve( pc=imb4.im_pc, seq=imb4.im_seq, im=im, mode='imbibition') ax['(e)'].semilogx(pc, s, 'm-*', label='imbibition w residual & trapping') ax['(e)'].legend()