import numpy as np
import numpy.typing as npt
from typing import Literal
from skimage.morphology import ball, disk, cube, square
from porespy import settings
from porespy.metrics import pc_map_to_pc_curve
from porespy.tools import (
_insert_disks_at_points,
_insert_disks_at_points_parallel,
get_tqdm,
Results,
)
from porespy.filters import (
trim_disconnected_blobs,
find_trapped_regions,
pc_to_satn,
pc_to_seq,
)
from porespy.generators import (
borders,
)
try:
from pyedt import edt
except ModuleNotFoundError:
from edt import edt
__all__ = [
'drainage',
'ibop',
]
tqdm = get_tqdm()
def ibop(
im: npt.NDArray,
pc: npt.NDArray = None,
dt: npt.NDArray = None,
inlets: npt.NDArray = None,
outlets: npt.NDArray = None,
residual: npt.NDArray = None,
steps: int = None,
return_sizes: bool = False,
conn: Literal['min', 'max'] = 'min',
min_size: int = 0,
):
im = np.array(im, dtype=bool)
if dt is None:
dt = edt(im)
if inlets is None:
inlets = borders(shape=im.shape, mode='faces') * im
if outlets is not None:
outlets = outlets*im
if np.sum(inlets * outlets):
raise Exception('Specified inlets and outlets overlap')
if pc is None:
pc = 2.0/dt
pc[~im] = 0 # Remove any infs or nans from pc computation
if isinstance(steps, int): # Use values in pc for invasion steps
vmax = pc[pc < np.inf].max()
vmin = pc[im][pc[im] > -np.inf].min()
Ps = np.linspace(vmin, vmax*1.1, steps)
elif steps is None:
Ps = np.unique(pc[im])
else:
Ps = np.unique(steps) # To ensure they are in ascending order
# Initialize empty arrays to accumulate results of each loop
nwp_mask = np.zeros_like(im, dtype=bool)
im_seq = np.zeros_like(im, dtype=int)
im_pc = np.zeros_like(im, dtype=float)
im_size = np.zeros_like(im, dtype=float)
seeds = np.zeros_like(im, dtype=bool)
# Begin IBOP algorithm
if conn == 'min':
strel = ball(1) if im.ndim == 3 else disk(1)
elif conn == 'max':
strel = cube(3) if im.ndim == 3 else square(3)
else:
raise Exception(f"Unrecognized value for conn ({conn})")
step = 1
for p in tqdm(Ps, **settings.tqdm):
# Find all locations in image invadable at current pressure
invadable = (pc <= p)*im
# Trim locations not connected to the inlets
if inlets is not None:
invadable = trim_disconnected_blobs(
im=invadable,
inlets=inlets,
strel=strel,
)
# Isolate only newly found locations to speed up inserting
temp = invadable*(~seeds)
# Find (i, j, k) coordinates of new locations
coords = np.where(temp)
# Add new locations to list of invaded locations
seeds += invadable
# Extract the local size of sphere to insert at each new location
radii = dt[coords]
# Insert spheres of given radii at new locations
nwp_mask = _insert_disks_at_points_parallel(
im=nwp_mask,
coords=np.vstack(coords),
radii=radii.astype(int),
v=True,
smooth=True,
overwrite=False,
)
# Values to images using mask
mask = nwp_mask * (im_seq == 0) * im
if np.any(mask):
im_seq[mask] = step
im_pc[mask] = p
step += 1
if return_sizes and (np.size(radii) > 0):
im_size[mask] = np.amin(radii)
# Deal with impact of residual, if present
if residual is not None:
# Find residual connected to current invasion front
inv_temp = (im_pc > 0)
if np.any(inv_temp):
# Find invadable pixels connected to surviving residual
temp = trim_disconnected_blobs(
residual, inv_temp, strel=strel)*~inv_temp
if np.any(temp):
# Trim invadable pixels not connected to residual
new_seeds = trim_disconnected_blobs(invadable, temp, strel=strel)
# Find (i, j, k) coordinates of new locations
coords = np.where(new_seeds)
# Add new locations to list of invaded locations
seeds += new_seeds
# Extract the local size of sphere to insert at each new location
radii = dt[coords].astype(int)
# Insert spheres of given radii at new locations
nwp_mask = _insert_disks_at_points_parallel(
im=nwp_mask,
coords=np.vstack(coords),
radii=radii.astype(int),
v=True,
smooth=True,
overwrite=False,
)
mask = nwp_mask * (im_seq == 0) * im
if np.any(mask):
im_seq[mask] = step
im_pc[mask] = p
step += 1
if return_sizes and (np.size(radii) > 0):
im_size[mask] = np.amin(radii)
# Set uninvaded voxels to inf
im_pc[(im_pc == 0)*im] = np.inf
# Add residual if given
if residual is not None:
im_pc[residual] = -np.inf
im_seq[residual] = 0
# Analyze trapping and adjust computed images accordingly
trapped = None
if outlets is not None:
trapped = find_trapped_regions(
im=im,
seq=im_seq,
outlets=outlets,
method='cluster',
min_size=min_size,
)
trapped[im_seq == -1] = True
im_pc[trapped] = np.inf # Trapped defender only displaced as Pc -> inf
if residual is not None: # Re-add residual to inv
im_pc[residual] = -np.inf # Residual defender is always present
# Initialize results object
results = Results()
results.im_snwp = pc_to_satn(pc=im_pc, im=im, mode='drainage')
results.im_seq = im_seq
# results.im_seq = pc_to_seq(pc=pc_inv, im=im, mode='drainage')
results.im_pc = im_pc
results.im_trapped = trapped
if trapped is not None:
results.im_seq[trapped] = -1
results.im_snwp[trapped] = -1
results.im_pc[trapped] = np.inf
if return_sizes:
im_size[im_pc == np.inf] = np.inf
im_size[im_pc == -np.inf] = -np.inf
results.im_size = im_size
results.pc, results.snwp = pc_map_to_pc_curve(
im=im,
pc=results.im_pc,
seq=results.im_seq,
mode='drainage',
)
return results
[docs]
def drainage(
im: npt.NDArray,
pc: npt.NDArray = None,
dt: npt.NDArray = None,
inlets: npt.NDArray = None,
outlets: npt.NDArray = None,
residual: npt.NDArray = None,
steps: int = 25,
return_sizes: bool = False,
min_size: int = 0,
):
r"""
Simulate drainage using image-based sphere insertion, optionally including
gravity
Parameters
----------
im : ndarray
The image of the porous media with ``True`` values indicating the
void space.
pc : ndarray, optional
Precomputed capillary pressure transform which is used to determine
the invadability of each voxel. If not provided then it is calculated
as `2/dt`.
dt : ndarray (optional)
The distance transform of ``im``. If not provided it will be
calculated, so supplying it saves time.
inlets : ndarray, optional
A boolean image the same shape as ``im``, with ``True`` values
indicating the inlet locations. If not specified then access limitations
are not applied so the result is essentially a local thickness filter.
outlets : ndarray, optional
A boolean image with ``True`` values indicating the outlet locations.
If this is provided then trapped voxels of wetting phase are found and
all the output images are adjusted accordingly. Note that trapping can
be assessed during postprocessing as well.
residual : ndarray, optional
A boolean array indicating the locations of any residual invading
phase. This is added to the intermediate image prior to trimming
disconnected clusters, so will create connections to some clusters
that would otherwise be removed. The residual phase is indicated
in the final image by ``-np.inf`` values, since these are invaded at
all applied capillary pressures.
steps : int or array_like (default = 25)
The range of pressures to apply. If an integer is given then the given
number of steps will be created between the lowest and highest values in
``pc``. If a list is given, each value in the list is used in ascending
order. If `None` is given then all the possible values in `pc`
are used.
return_sizes : bool, default = `False`
If `True` then an array containing the size of the sphere which first
overlapped each pixel is returned. This array is not computed by default
as computing it increases computation time.
conn : str
Controls the shape of the structuring element used to find neighboring
voxels when looking connectivity of invading blobs. Options are:
========= ==================================================================
Option Description
========= ==================================================================
'min' This corresponds to a cross with 4 neighbors in 2D and 6 neighbors
in 3D.
'max' This corresponds to a square or cube with 8 neighbors in 2D and
26 neighbors in 3D.
========= ==================================================================
min_size : int
Any clusters of trapped voxels smaller than this size will be set to not
trapped. This argument is only used if `outlets` is given. This is useful
to prevent small voxels along edges of the void space from being set to
trapped. These can appear to be trapped due to the jagged nature of the
digital image. The default is 0, meaning this adjustment is not applied,
but a value of 3 or 4 is recommended to activate this adjustment.
Returns
-------
results : Results object
A dataclass-like object with the following attributes:
========== ============================================================
Attribute Description
========== ============================================================
im_seq An ndarray with each voxel indicating the step number at
which it was first invaded by non-wetting phase
im_satn A numpy array with each voxel value indicating the global
value of the non-wetting phase saturation at the point it
was invaded
im_size If `return_sizes` was set to `True`, then a numpy array with
each voxel containing the radius of the sphere, in voxels, that
first overlapped it.
im_pc A numpy array with each voxel value indicating the
capillary pressure at which it was invaded.
im_trapped A numpy array with ``True`` values indicating trapped voxels if
`outlets` was provided, otherwise will be `None`.
pc 1D array of capillary pressure values that were applied
swnp 1D array of non-wetting phase saturations for each applied
value of capillary pressure (``pc``).
========== ============================================================
See Also
--------
drainage
Notes
-----
This algorithm only provides sensible results for gravity stabilized
configurations, meaning the more dense fluid is on the bottom. Be sure that
``inlets`` are specified accordingly.
References
----------
.. [1] Chadwick EA, Hammen LH, Schulz VP, Bazylak A, Ioannidis MA, Gostick JT.
Incorporating the effect of gravity into image-based drainage simulations on
volumetric images of porous media.
`Water Resources Research. <https://doi.org/10.1029/2021WR031509>`_.
58(3), e2021WR031509 (2022)
Examples
--------
`Click here
<https://porespy.org/examples/simulations/reference/drainage.html>`_
to view online example.
"""
results = ibop(
im=im,
pc=pc,
dt=dt,
inlets=inlets,
outlets=outlets,
residual=residual,
steps=steps,
return_sizes=return_sizes,
min_size=min_size,
)
return results
if __name__ == "__main__":
import numpy as np
import porespy as ps
import matplotlib.pyplot as plt
from copy import copy
ps.visualization.set_mpl_style()
cm = copy(plt.cm.turbo)
cm.set_under('grey')
cm.set_over('k')
# %% Run this cell to regenerate the variables in drainage
bg = 'white'
plots = True
im = ps.generators.blobs(
shape=[500, 500],
porosity=0.7,
blobiness=1.5,
seed=16,
)
im = ps.filters.fill_blind_pores(im, surface=True)
inlets = np.zeros_like(im)
inlets[0, :] = True
outlets = np.zeros_like(im)
outlets[-1, :] = True
lt = ps.filters.local_thickness(im)
dt = edt(im)
residual = lt > 25
steps = 25
pc = ps.filters.capillary_transform(
im=im,
dt=dt,
sigma=0.072,
theta=180,
rho_nwp=1000,
rho_wp=0,
g=0,
voxel_size=1e-4,
)
# %% Run different drainage simulations
drn1 = ps.simulations.drainage(
im=im,
pc=pc,
inlets=inlets,
steps=100,
)
drn2 = ps.simulations.drainage(
im=im,
pc=pc,
inlets=inlets,
outlets=outlets,
steps=100,
)
drn3 = ps.simulations.drainage(
im=im,
pc=pc,
inlets=inlets,
residual=residual,
steps=100,
)
drn4 = ps.simulations.drainage(
im=im,
pc=pc,
inlets=inlets,
outlets=outlets,
residual=residual,
steps=100,
)
drn5 = ps.simulations.drainage(
im=im,
pc=pc,
steps=100,
)
# %% Visualize the invasion configurations for each scenario
if plots:
fig, ax = plt.subplot_mosaic(
[['(a)', '(b)', '(e)', '(e)'],
['(c)', '(d)', '(e)', '(e)']],
figsize=[12, 8],
)
# drn1.im_pc[~im] = -1
ax['(a)'].imshow(drn1.im_seq/im, origin='lower', cmap=cm, vmin=0)
vmax = drn2.im_seq.max()
ax['(b)'].imshow(drn2.im_seq/im, origin='lower', cmap=cm, vmin=0, vmax=vmax)
ax['(c)'].imshow(drn3.im_seq/im, origin='lower', cmap=cm, vmin=0)
ax['(d)'].imshow(drn4.im_seq/im, origin='lower', cmap=cm, vmin=0)
pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn1.im_pc, seq=drn1.im_seq, im=im, mode='drainage')
ax['(e)'].plot(pc, s, 'b->', label='drainage')
pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn2.im_pc, seq=drn2.im_seq, im=im, mode='drainage')
ax['(e)'].plot(pc, s, 'r-<', label='drainage w trapping')
pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn3.im_pc, seq=drn3.im_seq, im=im, mode='drainage')
ax['(e)'].plot(pc, s, 'g-^', label='drainage w residual')
pc, s = ps.metrics.pc_map_to_pc_curve(pc=drn4.im_pc, seq=drn4.im_seq, im=im, mode='drainage')
ax['(e)'].plot(pc, s, 'm-*', label='drainage w residual & trapping')
ax['(e)'].legend()