Source code for porespy.networks._size_factors

import math
import inspect
import numpy as np
import scipy.ndimage as spim
from scipy.ndimage import zoom as zm
from skimage.morphology import ball
from porespy import settings
from porespy.tools import extend_slice, get_tqdm


tqdm = get_tqdm()


__all__ = [
    "diffusive_size_factor_AI",
    "diffusive_size_factor_DNS",
    "create_model",
    "find_conns",
]


[docs] def diffusive_size_factor_AI(regions, throat_conns, model, g_train, voxel_size=1): ''' Parameters ---------- regions : ndarray A segmented 3D image of pore regions/a pair of two regions. throat_conns : array An Nt by 2 array containing the throat connections. The indices orders in throat_conns start from 0 to be consistent with network extraction method. model : tensorflow model The trained model to be used for prediction. g_train : array The training data distribution. This will be used for denormalizing the prediction. voxel_size : scalar, optional Voxel size of the image. The default is 1. Returns ------- diff_size_factor : array An array of length conns containing diffusive size factor of the conduits in the segmented image (regions). Examples -------- `Click here <https://porespy.org/examples/networks/reference/diffusive_size_factor_AI.html>`_ to view online example. """ ''' import tensorflow as tf if g_train is None: raise ValueError("Training ground truth data must be given" + "to be used for normalizing the test data") its = -1 pairs = np.empty((len(throat_conns), 64, 64, 64), dtype='int32') diff_size_factor = [] zm_ratios = [] desc = inspect.currentframe().f_code.co_name # Get current func name for i in tqdm(np.arange(len(throat_conns)), desc=desc, **settings.tqdm): cn = throat_conns[i] # crop two pore regions and label them as 1,2 bb = _calc_bound_box_bi(regions, cn[0], cn[1]) roi_crop = np.copy(regions[bb[0]:bb[3], bb[1]:bb[4], bb[2]:bb[5]]) # label two pore regions as 1,2 roi_masked = _create_labeled_pair(cn, roi_crop) # resize to AI input size resize_result = _resize_to_AI_input(roi_masked) roi_resized = resize_result['resized_im'] zm_ratio = resize_result['zm_ratio'] zm_ratios.append(zm_ratio) its = its+1 pairs[its, :, :, :] = roi_resized.copy() test_data = _convert_to_tf_image_datatype(pairs, n_image=len(throat_conns)) if len(throat_conns) < 16: batch_size = 1 else: batch_size = 16 test_steps = math.ceil(len(throat_conns) / batch_size) predictions = model.predict(test_data, steps=test_steps) predictions = np.squeeze(predictions) denorm_size_factor = _denorm_predict(predictions, g_train) g = denorm_size_factor * voxel_size * (1/np.array(zm_ratios)) diff_size_factor = g if len(throat_conns) > 1: tf.keras.backend.clear_session() return diff_size_factor
def diffusive_size_factor_DNS(regions, throat_conns, voxel_size=1): """ Calculates the diffusive size factor of pore to pore regions in a segmented image of porous material using finite difference method. Parameters ---------- regions : ndarray A segmented 3D image of pore regions/a pair of two regions. throat_conns : array An Nt by 2 array containing the throat connections. The indices orders in throat_conns start from 0 to be consistent with network extraction method. voxel_size : scalar, optional Voxel size of the image. The default is 1. Returns ------- diff_size_factor : array An array of length conns containing diffusive size factor of the conduits in the segmented image (regions). """ DNS_size_factor = [] desc = inspect.currentframe().f_code.co_name # Get current func name settings.tqdm['disable'] = False for i in tqdm(np.arange(len(throat_conns)), desc=desc, **settings.tqdm): cn = throat_conns[i] # crop two pore regions and label them as 1,2 bb = _calc_bound_box_bi(regions, cn[0], cn[1]) roi_crop = np.copy(regions[bb[0]:bb[3], bb[1]:bb[4], bb[2]:bb[5]]) # label two pore regions as 1,2 roi_masked = _create_labeled_pair(cn, roi_crop) DNS_size_factor.append(_calc_g_val(roi_masked)) diff_size_factor = np.array(DNS_size_factor) * voxel_size return diff_size_factor def _calc_g_val(im): ''' Calculates the diffusive size factor of conduit image (ROI) using finite difference method. The finite difference nodes are created using OpenPNM's CubicTemplate method. Parameters ---------- im : ndarray 3D image of a pair of pore to pore regions (conduit) Returns ------- g : scalar Diffusive size factor of the conduit. ''' import openpnm as op c1 = 20 c2 = 10 im = np.copy(im) results = _find_conns_roi_info(im) centroids = results['p_coords'] p_dia_local = results['p_dia_local'] # create a mask where solid phase==True to trim the nodes # in the finite difference nodes that are located in solid/not ROI mask1 = np.array(np.where(im[:] == 1, im[:], 0), dtype=bool) mask2 = np.array(np.where(im[:] == 2, im[:], 0), dtype=bool) mask1 = np.reshape(mask1, mask1.size) mask2 = np.reshape(mask2, mask2.size) mask = ~(mask1+mask2) n = im.shape meds = op.network.Cubic(shape=[n[0], n[1], n[2]], spacing=1) meds['pore.region1'] = mask1.copy() meds['pore.region2'] = mask2.copy() # trim nodes that are located in solid phase/not ROI op.topotools.trim(meds, pores=mask) meds.add_model(propname='pore.cluster_number', model=op.models.network.cluster_number) meds.add_model(propname='pore.cluster_size', model=op.models.network.cluster_size) cluster_size = np.max(meds['pore.cluster_size']) trim_pores = meds['pore.cluster_size'] < cluster_size op.topotools.trim(network=meds, pores=trim_pores) phss = op.phase.Phase(network=meds) # A diffusive conductance of 1 ensures a finite difference approach # where each node is located at the corner of each voxel phss['throat.diffusive_conductance'] = 1 algs = op.algorithms.FickianDiffusion(network=meds, phase=phss) # find centroid of each pore region in the finite difference nodes pr1 = closest_node(centroids[0], meds['pore.coords']) pr2 = closest_node(centroids[1], meds['pore.coords']) algs.set_value_BC(pores=pr1, values=c1) algs.set_value_BC(pores=pr2, values=c2) algs.run() # calculate average concentrations within inscribed spheres r1 = p_dia_local[0]/2 r2 = p_dia_local[1]/2 if np.round(r1) == 0: # This prevent error in find_nearby_pores for narrow regions # If the region is narrow, use the entire region for average concentration c1_avr = algs['pore.concentration'][meds['pore.region1']].mean() else: # use the inscribed sphere within the region for average concentration pores1 = meds.find_nearby_pores(pr1, r=np.round(r1), flatten=True, include_input=True) pores1 = np.append(pores1, pr1) pores1 = np.unique(pores1) c1_avr = algs['pore.concentration'][pores1].mean() if np.round(r2) == 0: c2_avr = algs['pore.concentration'][meds['pore.region2']].mean() else: pores2 = meds.find_nearby_pores(pr2, r=np.round(r2), flatten=True, include_input=True) pores2 = np.append(pores2, pr2) pores2 = np.unique(pores2) c2_avr = algs['pore.concentration'][pores2].mean() g = abs(algs.rate(pores=pr1)[0]/(c1_avr-c2_avr)) return g def closest_node(extracted_nodes, fd_nodes): """ Finds the indice of a node in the finite difference nodes that locates closest to the centroid point of a pore region. Parameters ---------- extracted_nodes : array An array of the coordinate of the centroid point of a pores region. fd_nodes : array An array of the coordinate of all nodes in the finite difference nodes. Returns ------- scalar The indice of the nearest finite difference node to the centroid of a pore region. """ fd_nodes = np.asarray(fd_nodes) dist = np.sum((fd_nodes - extracted_nodes)**2, axis=1) return int(np.argmin(dist)) def _find_conns_roi_info(im): ''' Finds the connections list, coordinates of pores centroids and their inscribed sphere's diameter. These values are necessary to be known for applying the finite difference method. Parameters ---------- im : ndarray A segmented image of a porous medium. Returns ------- A dictionary of info: t_conns : array An Nt by 2 array containing the throats' connections in the segmented image. p_coords : array An Np by 3 array containing the pores centroids coordinates in the segmented image. p_dia_local : array An Np size array containing the pores inscribed diameter in the segmented image. ''' struc_elem = ball slices = spim.find_objects(im) Ps = np.arange(1, np.amax(im)+1) p_dia_local = np.zeros((len(Ps), ), dtype=float) p_coords = np.zeros((len(Ps), im.ndim), dtype=float) t_conns = [] desc = inspect.currentframe().f_code.co_name # Get current func name settings.tqdm['disable'] = True for i in tqdm(Ps, desc=desc, **settings.tqdm): pore = i - 1 if slices[pore] is None: continue s = extend_slice(slices[pore], im.shape) sub_im = im[s] pore_im = sub_im == i # additional info to find centroids and inscribed_diam padded_mask = np.pad(pore_im, pad_width=1, mode='constant') pore_dt = spim.distance_transform_edt(padded_mask) s_offset = np.array([i.start for i in s]) p_coords[pore, :] = spim.center_of_mass(pore_im) + s_offset p_dia_local[pore] = (2*np.amax(pore_dt)) - np.sqrt(3) im_w_throats = spim.binary_dilation(input=pore_im, structure=struc_elem(1)) im_w_throats = im_w_throats*sub_im Pn = np.unique(im_w_throats)[1:] - 1 for j in Pn: if j > pore: t_conns.append([pore, j]) results = { 't_conns': t_conns, 'p_coords': p_coords, 'p_dia_local': p_dia_local } return results def _calc_bound_box_bi(regions, pore_1, pore_2): ''' Parameters ---------- regions : ndarray 3D image of the segmented regions from which the bounding box of a local pore to pore region will be extracted. pore_1 : scalar Label of pore region1 in a pair of pore to pore connection (throat) pore_2 : scalar Label of pore region1 in a pair of pore to pore connection (throat) Returns ------- b_box : array An array containing the coordinates of the bounding box that covers region pore_1 and region pore_2. The orders of coordinates are: x_min, y_min, z_min, x_max, y_max, z_max. ''' slice_x, slice_y, slice_z = spim.find_objects(regions == pore_1+1)[0] slice_x2, slice_y2, slice_z2 = spim.find_objects(regions == pore_2+1)[0] min_box = [min(slice_x.start, slice_x2.start), min(slice_y.start, slice_y2.start), min(slice_z.start, slice_z2.start)] max_box = [max(slice_x.stop, slice_x2.stop), max(slice_y.stop, slice_y2.stop), max(slice_z.stop, slice_z2.stop)] b_box = np.hstack([min_box, max_box]) return b_box def _resize_to_AI_input(im): ''' Parameters ---------- im : ndarray 3D image of a pair of pore to pore regions (conduit) cropped from segmented image of a porous medium. Returns ------- result : dict Contains Resized image of the cropped regions and its zoom ratio. If original image(im) is of shape (n,n,n) resizing includes one step of zoom to a (64,64,64) image. Otherwise, the original image will be first zero padded to its maximum dimension (nmax,nmax,nmax) before zoom step. ''' if len(np.unique(im.shape)) != 1: x, y, z = np.shape(im) max_size = max([x, y, z]) x_diff, y_diff, z_diff = max_size-np.array([x, y, z]) XB = int(np.round(x_diff/2)) # padding before axis x XA = int(x_diff-XB) # padding after axis x YB = int(np.round(y_diff/2)) YA = int(y_diff-YB) ZB = int(np.round(z_diff/2)) ZA = int(z_diff-ZB) im = np.pad(im, ((XB, XA), (YB, YA), (ZB, ZA)), 'constant', constant_values=0) zm_ratio = 64/im.shape[0] resized_im = zm(im, zoom=[zm_ratio, zm_ratio, zm_ratio], order=0) result = {'zm_ratio': zm_ratio, 'resized_im': resized_im} return result def find_conns(im): ''' Parameters ---------- im : ndarray A segmented image of a porous medium. Returns ------- t_conns : array An Nt by 2 addat containing the throats' connections in the segmented image. ''' struc_elem = ball slices = spim.find_objects(im) Ps = np.arange(1, np.amax(im)+1) t_conns = [] desc = inspect.currentframe().f_code.co_name # Get current func name settings.tqdm['disable'] = True for i in tqdm(Ps, desc=desc, **settings.tqdm): pore = i - 1 if slices[pore] is None: continue s = extend_slice(slices[pore], im.shape) sub_im = im[s] pore_im = sub_im == i im_w_throats = spim.binary_dilation(input=pore_im, structure=struc_elem(1)) im_w_throats = im_w_throats*sub_im Pn = np.unique(im_w_throats)[1:] - 1 for j in Pn: if j > pore: t_conns.append([pore, j]) return t_conns def _create_labeled_pair(cn, im): ''' Parameters ---------- cn : array An array containing two elements [region1,region2] that are labels of pore regions in a pair (im). im : ndarray 3D image of a pair of neighbouring pores. Returns ------- roi_masked : ndarray labeled image of two pore regions (im) where solid, region1, and region2 are labeled as 0, 1, and 2, respectively. ''' # create labeled image for AI : labels (0,1,2) for (solid,pore1,pore2) roi_crop = np.copy(im) mask1 = np.array(np.where(roi_crop == cn[0]+1, roi_crop, 0), dtype=bool) mask2 = np.array(np.where(roi_crop == cn[1]+1, roi_crop, 0), dtype=bool) roi_masked = 1*mask1 + 2*mask2 return roi_masked def _convert_to_tf_image_datatype(pair, n_image): ''' Parameters ---------- pair : ndarray Labeled pore to pore regions. n_image : scalar Number of pairs of conduit regions. Returns ------- test_data : tensorflow data Tensorflow data type test data. The test data may include 1 or more pairs of conduit images. ''' import tensorflow as tf # create a tensor of size (n_image, pair_shape) data_ims = np.zeros(shape=(n_image, 64, 64, 64, 1)) data = np.expand_dims(pair, axis=-1) if n_image == 1: data_ims[0, :, :, :] = data BS = 1 else: data_ims = data if n_image < 16: BS = 1 else: BS = 16 # create test_data as a tensorflow dataset type AUTOTUNE = tf.data.experimental.AUTOTUNE test_data = tf.data.Dataset.from_tensor_slices(data_ims) test_data = test_data.cache() test_data = test_data.batch(BS) # batch size = 1 test_data = test_data.prefetch(AUTOTUNE) return test_data def _denorm_predict(prediction, g_train): ''' Parameters ---------- prediction : array Normalized predicted diffusive size factors for conduit images. g_train : array Diffusive size factors used in training AI. Returns ------- denorm : array Denormalized predicted diffusive size factor for conduit images. ''' from sklearn import preprocessing scaler = preprocessing.MinMaxScaler(feature_range=(0, 1)) _ = scaler.fit_transform(g_train.reshape(-1, 1)) denorm = scaler.inverse_transform(X=prediction.reshape(-1, 1)) denorm = np.squeeze(denorm) return denorm def _id_block(x, filters, kernel_size): from tensorflow.keras import layers as ly from tensorflow.keras.initializers import glorot_uniform f1, f2, f3 = filters k = kernel_size x_orig = x x = ly.Conv3D(filters=f1, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='valid', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Activation('relu')(x) x = ly.Conv3D(filters=f2, kernel_size=(k, k, k), strides=(1, 1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Activation('relu')(x) x = ly.Conv3D(filters=f3, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='valid', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Add()([x, x_orig]) x = ly.Activation('relu')(x) return x def _conv_block(x, filters, kernel_size, stride): from tensorflow.keras import layers as ly from tensorflow.keras.initializers import glorot_uniform f1, f2, f3 = filters k = kernel_size s = stride x_orig = x x = ly.Conv3D(filters=f1, kernel_size=(1, 1, 1), strides=(s, s, s), padding='valid', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Activation('relu')(x) x = ly.Conv3D(filters=f2, kernel_size=(k, k, k), strides=(1, 1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Activation('relu')(x) x = ly.Conv3D(filters=f3, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='valid', kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x_shortcut = ly.Conv3D(filters=f3, kernel_size=(1, 1, 1), strides=(s, s, s), padding='valid', kernel_initializer=glorot_uniform(seed=0))(x_orig) x_shortcut = ly.BatchNormalization(axis=-1)(x_shortcut) x = ly.Add()([x, x_shortcut]) x = ly.Activation('relu')(x) return x def _resnet3d(input_shape=(64, 64, 64, 1)): from tensorflow.keras import layers as ly from tensorflow.keras.initializers import glorot_uniform from tensorflow.keras.models import Model x_in = ly.Input(shape=input_shape) x = ly.ZeroPadding3D(padding=(3, 3, 3))(x_in) # stage 1 x = ly.Conv3D(filters=64, kernel_size=(7, 7, 7), strides=(2, 2, 2), kernel_initializer=glorot_uniform(seed=0))(x) x = ly.BatchNormalization(axis=-1)(x) x = ly.Activation('relu')(x) x = ly.MaxPooling3D(pool_size=(3, 3, 3), strides=(2, 2, 2))(x) # stage 2 x = _conv_block(x, filters=(64, 64, 256), kernel_size=3, stride=1) x = _id_block(x, filters=(64, 64, 256), kernel_size=3) x = _id_block(x, filters=(64, 64, 256), kernel_size=3) # stage 3 x = _conv_block(x, filters=(128, 128, 512), kernel_size=3, stride=2) x = _id_block(x, filters=(128, 128, 512), kernel_size=3) x = _id_block(x, filters=(128, 128, 512), kernel_size=3) x = _id_block(x, filters=(128, 128, 512), kernel_size=3) # stage 4 x = _conv_block(x, filters=(256, 256, 1024), kernel_size=3, stride=2) x = _id_block(x, filters=(256, 256, 1024), kernel_size=3) x = _id_block(x, filters=(256, 256, 1024), kernel_size=3) x = _id_block(x, filters=(256, 256, 1024), kernel_size=3) x = _id_block(x, filters=(256, 256, 1024), kernel_size=3) x = _id_block(x, filters=(256, 256, 1024), kernel_size=3) # stage 5 x = _conv_block(x, filters=(512, 512, 2048), kernel_size=3, stride=2) x = _id_block(x, filters=(512, 512, 2048), kernel_size=3) x = _id_block(x, filters=(512, 512, 2048), kernel_size=3) # average pooling x = ly.AveragePooling3D(pool_size=(2, 2, 2))(x) # output layer x = ly.Flatten()(x) x = ly.Dense(units=512, kernel_initializer=glorot_uniform(seed=0))(x) x = ly.Activation(activation='relu')(x) x = ly.Dropout(rate=0.5)(x) x = ly.Dense(units=1, kernel_initializer=glorot_uniform(seed=0))(x) x = ly.Activation(activation='linear')(x) # create model model = Model(inputs=x_in, outputs=x, name='resnet3d') return model def create_model(): ''' Returns ------- model : tensorflow model ResNet50 model built using convolutional and identity blocks. ''' from tensorflow.keras.optimizers import Adam model = _resnet3d() model.compile(loss='mse', optimizer=Adam(lr=1e-4), metrics=['mse']) return model