import inspect
import logging
import numpy as np
import numpy.typing as npt
import heapq as hq
from numba import njit
from porespy import settings
from porespy.tools import (
get_tqdm,
get_border,
make_contiguous,
_insert_disk_at_points,
Results,
get_edt,
)
from typing import Literal
from porespy.filters import (
find_trapped_regions,
seq_to_satn,
)
logger = logging.getLogger(__name__)
tqdm = get_tqdm()
edt = get_edt()
__all__ = [
'qbip',
'ibip',
'injection',
]
[docs]
def qbip(
im: npt.NDArray,
pc: npt.NDArray = None,
dt: npt.NDArray = None,
inlets: npt.NDArray = None,
outlets: npt.NDArray = None,
maxiter: int = None,
return_sizes: bool = False,
return_pressures: bool = False,
conn: Literal['min', 'max'] = 'min',
min_size: int = 0,
):
r"""
Simulates non-wetting injection using a priority queue, optionally including
the effect of gravity
"""
im = np.atleast_3d(im == 1)
if maxiter is None: # Compute numpy of pixels in image
maxiter = (im == 1).sum()
if inlets is None:
inlets = np.zeros_like(im)
inlets[0, ...] = True
inlets = np.atleast_3d(inlets)
if dt is None:
dt = edt(im)
dt = np.atleast_3d(dt)
if pc is None:
pc = 2.0/dt
pc = np.atleast_3d(pc)
# Initialize arrays and do some preprocessing
inv_seq = np.zeros_like(im, dtype=int)
inv_pc = np.zeros_like(im, dtype=float)
if return_pressures is False:
inv_pc *= -np.inf # This is a flag to the numba-jit function to ignore it
inv_size = np.zeros_like(im, dtype=float)
if return_sizes is False:
inv_size *= -np.inf # This is a flag to the numba-jit function to ignore it
# Call numba'd inner loop
sequence, pressure, size, step = _qbip_inner_loop(
im=im,
inlets=inlets,
dt=dt,
pc=pc,
seq=inv_seq,
pressure=inv_pc,
size=inv_size,
maxiter=maxiter,
conn=conn,
)
logger.info(f"Exiting after {step} steps")
# Reduce back to 2D if necessary
sequence = sequence.squeeze()
pressure = pressure.squeeze()
size = size.squeeze()
pc = pc.squeeze()
im = im.squeeze()
# Convert invasion image so that uninvaded voxels are set to -1 and solid to 0
sequence[sequence == 0] = -1
sequence[~im] = 0
sequence = make_contiguous(im=sequence, mode='symmetric')
# Deal with invasion pressures and sizes similarly
if return_pressures:
pressure[sequence < 0] = np.inf
pressure[~im] = 0
if return_sizes:
size[sequence < 0] = np.inf
size[~im] = 0
# Deal with trapping if outlets were specified
if outlets is not None:
logger.info('Computing trapping and adjusting outputs')
sequence = find_trapped_regions(
im=im,
seq=sequence,
outlets=outlets,
return_mask=False,
conn=conn,
min_size=min_size,
method='queue',
)
trapped = (sequence == -1).squeeze()
pressure = pressure.astype(float).squeeze()
pressure[trapped] = np.inf
size = size.astype(float)
size[trapped] = np.inf
# Create results object for collected returned values
results = Results()
results.im_seq = sequence
results.im_snwp = seq_to_satn(sequence, im=im) # convert sequence to saturation
if return_pressures:
results.im_pc = pressure
if return_sizes:
results.im_size = size
return results
@njit
def _qbip_inner_loop(
im,
inlets,
dt,
pc,
seq,
pressure,
size,
maxiter,
conn,
): # pragma: no cover
# Initialize the heap
inds = np.where(inlets*im)
bd = []
for row, (i, j, k) in enumerate(zip(inds[0], inds[1], inds[2])):
bd.append([pc[i, j, k], dt[i, j, k], i, j, k])
hq.heapify(bd)
# Note which sites have been added to heap already
processed = inlets*im + ~im # Add solid phase to be safe
step = 1 # Total step number
for _ in range(1, maxiter):
if len(bd) == 0:
break
pts = [hq.heappop(bd)] # Put next site into pts list
while len(bd) and (bd[0][0] == pts[0][0]): # Pop any items with equal Pc
pts.append(hq.heappop(bd))
for pt in pts:
# Insert discs of invading fluid into images
seq = _insert_disk_at_point(
im=seq,
i=pt[2], j=pt[3], k=pt[4],
r=int(pt[1]), v=step, overwrite=False)
# Putting -inf in images is a numba compatible flag for 'skip'
if pressure[0, 0, 0] > -np.inf:
pressure = _insert_disk_at_point(
im=pressure,
i=pt[2], j=pt[3], k=pt[4],
r=int(pt[1]), v=pt[0], overwrite=False)
if size[0, 0, 0] > -np.inf:
size = _insert_disk_at_point(
im=size, i=pt[2], j=pt[3], k=pt[4],
r=int(pt[1]), v=pt[1], overwrite=False)
# Add neighboring points to heap and processed array
neighbors = _find_valid_neighbors(
i=pt[2], j=pt[3], k=pt[4], im=processed, conn=conn)
for n in neighbors:
hq.heappush(bd, [pc[n], dt[n], n[0], n[1], n[2]])
processed[n[0], n[1], n[2]] = True
step += 1
return seq, pressure, size, step
@njit
def _find_valid_neighbors(
i,
j,
im,
k=0,
conn='min',
valid=False
): # pragma: no cover
if im.ndim == 2:
xlim, ylim = im.shape
if conn == 'min':
mask = [[0, 1, 0], [1, 1, 1], [0, 1, 0]]
elif conn == 'max':
mask = [[1, 1, 1], [1, 1, 1], [1, 1, 1]]
neighbors = []
for a, x in enumerate(range(i-1, i+2)):
if (x >= 0) and (x < xlim):
for b, y in enumerate(range(j-1, j+2)):
if (y >= 0) and (y < ylim):
if mask[a][b] == 1:
if im[x, y] == valid:
neighbors.append((x, y))
else:
xlim, ylim, zlim = im.shape
if conn == 'min':
mask = [[[0, 0, 0], [0, 1, 0], [0, 0, 0]],
[[0, 1, 0], [1, 1, 1], [0, 1, 0]],
[[0, 0, 0], [0, 1, 0], [0, 0, 0]]]
elif conn == 'max':
mask = [[[1, 1, 1], [1, 1, 1], [1, 1, 1]],
[[1, 1, 1], [1, 1, 1], [1, 1, 1]],
[[1, 1, 1], [1, 1, 1], [1, 1, 1]]]
neighbors = []
for a, x in enumerate(range(i-1, i+2)):
if (x >= 0) and (x < xlim):
for b, y in enumerate(range(j-1, j+2)):
if (y >= 0) and (y < ylim):
for c, z in enumerate(range(k-1, k+2)):
if (z >= 0) and (z < zlim):
if mask[a][b][c] == 1:
if im[x, y, z] == valid:
neighbors.append((x, y, z))
return neighbors
@njit
def _insert_disk_at_point(im, i, j, r, v, k=0, overwrite=False): # pragma: no cover
r"""
Insert spheres (or disks) of specified radii into an ND-image at given locations.
This function uses numba to accelerate the process, and does not overwrite
any existing values (i.e. only writes to locations containing zeros).
Parameters
----------
im : ND-array
The image into which the spheres/disks should be inserted. This is an
'in-place' operation.
i, j, k: int
The center point of each sphere/disk. If the image is 2D then `k` can be
omitted.
r : array_like
The radius of the sphere/disk to insert
v : scalar
The value to insert
overwrite : boolean, optional
If ``True`` then the inserted spheres overwrite existing values. The
default is ``False``.
smooth : boolean
If `True` (default) then the small bumps on the outer perimeter of each
face is not present.
"""
if im.ndim == 2:
xlim, ylim = im.shape
for a, x in enumerate(range(i-r, i+r+1)):
if (x >= 0) and (x < xlim):
for b, y in enumerate(range(j-r, j+r+1)):
if (y >= 0) and (y < ylim):
R = ((a - r)**2 + (b - r)**2)**0.5
if R < r:
if overwrite or (im[x, y] == 0):
im[x, y] = v
else:
xlim, ylim, zlim = im.shape
for a, x in enumerate(range(i-r, i+r+1)):
if (x >= 0) and (x < xlim):
for b, y in enumerate(range(j-r, j+r+1)):
if (y >= 0) and (y < ylim):
if zlim > 1: # For a truly 3D image
for c, z in enumerate(range(k-1, k+r+1)):
if (z >= 0) and (z < zlim):
R = ((a - r)**2 + (b - r)**2 + (c - r)**2)**0.5
if R < r:
if overwrite or (im[x, y, z] == 0):
im[x, y, z] = v
else: # For 3D image with singleton 3rd dimension
R = ((a - r)**2 + (b - r)**2)**0.5
if R < r:
if overwrite or (im[x, y, 0] == 0):
im[x, y, 0] = v
return im
@njit
def _where(arr):
inds = np.where(arr)
result = np.vstack(inds)
return result
[docs]
def ibip(
im: npt.NDArray,
inlets: npt.NDArray = None,
outlets: npt.NDArray = None,
dt: npt.NDArray = None,
maxiter: int = 10000,
return_sizes: bool = True,
conn: str = 'min',
min_size: int = 0,
):
r"""
Simulates non-wetting fluid injection on an image using the IBIP algorithm [1]_
Parameters
----------
im : ND-array
Boolean array with ``True`` values indicating void voxels
inlets : ND-array
Boolean array with ``True`` values indicating where the invading fluid
is injected from. If ``None``, all faces will be used.
dt : ND-array (optional)
The distance transform of ``im``. If not provided it will be
calculated, so supplying it saves time.
maxiter : scalar
The number of steps to apply before stopping. The default is to run
for 10,000 steps which is almost certain to reach completion if the
image is smaller than about 250-cubed.
return_sizes : bool
If `True` then an array containing the size of the sphere which first
overlapped each voxel is returned. This array is not computed by default
as it increases computation time.
Returns
-------
results : dataclass-like
A dataclass-like object with the following arrays as attributes:
============= ===============================================================
Attribute Description
============= ===============================================================
im_seq A numpy array with each voxel value containing the step at
which it was invaded. Uninvaded voxels are set to -1.
im_snwp A numpy array with each voxel value indicating the saturation
present in the domain it was invaded. Solids are given 0, and
uninvaded regions are given -1.
im_size If `return_sizes` was set to `True`, then a numpy array with
each voxel containing the radius of the sphere, in voxels,
that first overlapped it.
============= ===============================================================
See Also
--------
porosimetry
drainage
qbip
ibop
References
----------
.. [1] Gostick JT, Misaghian N, Yang J, Boek ES. Simulating volume-controlled
invasion of a non-wetting fluid in volumetric images using basic image
processing tools. Computers & Geosciences. 158(1), 104978 (2022). `Link.
<https://doi.org/10.1016/j.cageo.2021.104978>`_
Notes
-----
This function is slower and is less capable than `qbip`, which returns identical
results, so it is recommended to use that instead.
"""
# Process the boundary image
if inlets is None:
inlets = np.zeros_like(im)
inlets[0, ...] = True
inlets = inlets*im
if maxiter is None:
maxiter = im.sum()
bd = np.copy(inlets > 0)
if dt is None: # Find dt if not given
dt = edt(im)
# Initialize inv image with -1 in the solid, and 0's in the void
seq = -1*(~im)
sizes = -1.0*(~im)
scratch = np.copy(bd)
desc = inspect.currentframe().f_code.co_name # Get current func name
for step in tqdm(range(1, maxiter), desc=desc, **settings.tqdm):
# Find insertion points
edge = scratch*(dt > 0)
if ~edge.any():
break
# Find the maximum value of the dt underlaying the new edge
r_max = (dt*edge).max()
# Find all values of the dt with that size
dt_thresh = dt >= r_max
# Extract the actual coordinates of the insertion sites
pt = _where(edge*dt_thresh)
seq = _insert_disk_at_points(
im=seq,
coords=pt,
r=int(r_max),
v=step,
smooth=True,
)
if return_sizes:
sizes = _insert_disk_at_points(
im=sizes,
coords=pt,
r=int(r_max),
v=r_max,
smooth=True,
)
dt, bd = _update_dt_and_bd(dt, bd, pt)
scratch = _insert_disk_at_points(
im=scratch,
coords=pt,
r=1,
v=1,
smooth=False,
)
# Convert inv image so that uninvaded voxels are set to -1 and solid to 0
temp = seq == 0 # Uninvaded voxels are set to -1 after _ibip
seq[~im] = 0
seq[temp] = -1
seq = make_contiguous(im=seq, mode='symmetric')
# Deal with invasion sizes similarly
temp = sizes == 0
sizes[~im] = 0
sizes[temp] = -1
# Deal with trapping if outlets were specified
if outlets is not None:
logger.info('Computing trapping and adjusting outputs')
sequence = find_trapped_regions(
im=im,
seq=seq,
outlets=outlets,
return_mask=False,
conn=conn,
min_size=min_size,
method='queue',
)
trapped = (sequence == -1).squeeze()
# pressure = pressure.astype(float).squeeze()
# pressure[trapped] = np.inf
seq[trapped] = -1
sizes[trapped] = -1
results = Results()
results.im_seq = np.copy(seq)
results.im_snwp = seq_to_satn(seq=seq, im=im)
if return_sizes:
results.im_size = np.copy(sizes)
return results
@njit()
def _update_dt_and_bd(dt, bd, pt):
if dt.ndim == 2:
for i in range(pt.shape[1]):
bd[pt[0, i], pt[1, i]] = True
dt[pt[0, i], pt[1, i]] = 0
else:
for i in range(pt.shape[1]):
bd[pt[0, i], pt[1, i], pt[2, i]] = True
dt[pt[0, i], pt[1, i], pt[2, i]] = 0
return dt, bd
[docs]
def injection(
im,
pc=None,
dt=None,
inlets=None,
outlets=None,
maxiter=None,
return_sizes=False,
return_pressures=True,
conn='min',
min_size=0,
method='qbip',
):
r"""
Performs injection of non-wetting fluid including the effect of gravity and
trapping of wetting phase.
Parameters
----------
im : ndarray
A boolean image of the porous media with ``True`` values indicating
the void space
pc : ndarray, optional
Precomputed capillary pressure transform which is used to determine
the invadability of each voxel. If not provided then the `2/dt` is used,
which is equivalent to a surface tension and voxel size of unity, and a
contact angle of 180 degrees.
dt : ndarray (optional)
The distance transform of ``im``. If not provided it will be
calculated, so supplying it saves time.
inlets : ndarray, optional
A boolean image with ``True`` values indicating the inlet locations.
If not provided then the beginning of the x-axis is assumed.
outlets : ndarray, optional
A boolean image with ``True`` values indicating the outlet locations.
If this is provided then trapped voxels of wetting phase are found and
all the output images are adjusted accordingly. Note that trapping can
be assessed during postprocessing as well.
return_sizes : bool, default = `False`
If `True` then an array containing the size of the sphere which first
overlapped each pixel is returned. This array is not computed by default
as computing it increases computation time.
return_pressures : bool, default = `True`
If `True` then an array containing the capillary pressure at which
each pixels was first invaded is returned.
maxiter : int
The maximum number of iteration to perform. The default is equal to the
number of void pixels in `im`.
min_size : int
Any clusters of trapped voxels smaller than this size will be set to not
trapped. This argument is only used if `outlets` is given. This is useful
to prevent small voxels along edges of the void space from being set to
trapped. These can appear to be trapped due to the jagged nature of the
digital image. The default is 0, meaning this adjustment is not applied,
but a value of 3 or 4 is recommended to activate this adjustment.
conn : str
Controls the shape of the structuring element used to find neighboring
voxels. Options are:
========= ==================================================================
Option Description
========= ==================================================================
'min' This corresponds to a cross with 4 neighbors in 2D and 6 neighbors
in 3D.
'max' This corresponds to a square or cube with 8 neighbors in 2D and
26 neighbors in 3D.
========= ==================================================================
method : str
Controls the method used perform the simulation. Options are:
========= ==================================================================
Option Description
========= ==================================================================
'qbip' Uses 'queue-based invasion percolation' [1]_. This is the default.
It is much faster.
'ibip' Uses 'image-based invasion percolation' [2]_. This is only
provided for completeness since it is the original algorithm.
========= ==================================================================
Returns
-------
results : Results object
A dataclass-like object with the following attributes:
========== =================================================================
Attribute Description
========== =================================================================
im_seq A numpy array with each voxel value containing the step at
which it was invaded. Uninvaded voxels are set to -1.
im_snwp A numpy array with each voxel value indicating the saturation
present in the domain it was invaded. Solids are given 0, and
uninvaded regions are given -1.
im_pc If `return_pressures` was set to `True`, then a numpy array with
each voxel value indicating the capillary pressure at which it
was invaded. Uninvaded voxels have value of ``np.inf``.
im_size If `return_sizes` was set to `True`, then a numpy array with
each voxel containing the radius of the sphere, in voxels, that
first overlapped it.
========== =================================================================
References
----------
.. [1] Gostick JT, Misaghian N, A Irannezhad, B Zhao. *A computationally
efficient queue-based algorithm for simulating volume-controlled drainage
under the influence of gravity on volumetric images*. `Advances in Water
Resources <https://doi.org/10.1016/j.advwatres.2024.104799>`_. 193(11),
104799 (2024)
.. [2] Gostick JT, Misaghian N, Yang J, Boek ES. *Simulating volume-controlled
invasion of a non-wetting fluid in volumetric images using basic image
processing tools*. `Computers and the Geosciences
<https://doi.org/10.1016/j.cageo.2021.104978>`_. 158(1), 104978 (2022)
Examples
--------
`Click here
<https://porespy.org/examples/filters/reference/injection.html>`_
to view an online example.
"""
if method == 'qbip':
results = qbip(
im=im,
pc=pc,
dt=dt,
inlets=inlets,
outlets=outlets,
maxiter=maxiter,
return_sizes=return_sizes,
return_pressures=return_pressures,
conn=conn,
min_size=min_size,
)
elif method == 'ibip':
results = ibip(
im=im,
dt=dt,
inlets=inlets,
outlets=outlets,
maxiter=maxiter,
return_sizes=return_sizes,
conn=conn,
min_size=min_size,
)
return results
if __name__ == "__main__":
import porespy as ps
import matplotlib.pyplot as plt
from copy import copy
# %%
im = ~ps.generators.random_spheres([400, 200], r=20, seed=0, clearance=10)
inlets = np.zeros_like(im)
inlets[0, :] = True
inlets = inlets*im
pc = ps.filters.capillary_transform(im)
ip = injection(im, pc=pc, inlets=inlets, return_sizes=True)
outlets = np.zeros_like(im)
outlets[-1, :] = True
outlets = outlets*im
ps.tools.tic()
trapped_new = ps.filters.find_trapped_regions(
im=im,
seq=ip.im_seq,
outlets=outlets,
return_mask=False,
method='queue',
min_size=5,
)
ps.tools.toc()
ps.tools.tic()
trapped = ps.filters.find_trapped_regions(
im=im,
seq=ip.im_seq,
outlets=outlets,
return_mask=False,
method='cluster',
min_size=5,
)
ps.tools.toc()
# %%
cm = copy(plt.cm.turbo)
cm.set_under('grey')
fig, ax = plt.subplots(1, 3)
ax[0].imshow(
ip.im_seq/im,
origin='lower',
interpolation='none',
vmin=0.0001,
cmap=cm,
)
ax[1].imshow(
trapped/im,
origin='lower',
interpolation='none',
vmin=0.0001,
cmap=cm,
)
ax[2].imshow(
trapped_new/im,
origin='lower',
interpolation='none',
vmin=0.0001,
cmap=cm,
)
ip2 = ps.simulations.injection(
im=im,
inlets=inlets,
outlets=outlets,
method='ibip',
)