import logging
import sys
import numpy as np
import importlib
from dataclasses import dataclass
import psutil
import inspect
import time
logger = logging.getLogger("porespy")
__all__ = [
'sanitize_filename',
'get_tqdm',
'show_docstring',
'Results',
'tic',
'toc',
]
def _format_time(timespan, precision=3):
"""Formats the timespan in a human readable form"""
if timespan >= 60.0:
# we have more than a minute, format that in a human readable form
# Idea from http://snipplr.com/view/5713/
parts = [("d", 60*60*24), ("h", 60*60), ("min", 60), ("s", 1)]
time = []
leftover = timespan
for suffix, length in parts:
value = int(leftover / length)
if value > 0:
leftover = leftover % length
time.append(u'%s%s' % (str(value), suffix))
if leftover < 1:
break
return " ".join(time)
# Unfortunately the unicode 'micro' symbol can cause problems in
# certain terminals.
# See bug: https://bugs.launchpad.net/ipython/+bug/348466
# Try to prevent crashes by being more secure than it needs to
# E.g. eclipse is able to print a µ, but has no sys.stdout.encoding set.
units = [u"s", u"ms", u'us', "ns"] # the save value
if hasattr(sys.stdout, 'encoding') and sys.stdout.encoding:
try:
u'\xb5'.encode(sys.stdout.encoding)
units = [u"s", u"ms", u'\xb5s', "ns"]
except UnicodeEncodeError:
pass
scaling = [1, 1e3, 1e6, 1e9]
if timespan > 0.0:
order = min(-int(np.floor(np.log10(timespan)) // 3), 3)
else:
order = 3
return u"%.*g %s" % (precision, timespan * scaling[order], units[order])
def tic():
r"""
Homemade version of matlab tic and toc function, tic starts or resets
the clock, toc reports the time since the last call of tic.
See Also
--------
toc
"""
global _startTime_for_tictoc
_startTime_for_tictoc = time.time()
def toc(quiet=False):
r"""
Homemade version of matlab tic and toc function, tic starts or resets
the clock, toc reports the time since the last call of tic.
Parameters
----------
quiet : bool, default is False
If False then a message is output to the console. If
True the message is not displayed and the elapsed time is returned.
See Also
--------
tic
"""
if "_startTime_for_tictoc" not in globals():
raise Exception("Start time not set, call tic first")
t = time.time() - _startTime_for_tictoc
if quiet is False:
print(f"Elapsed time: {_format_time(t)}")
return t
def _is_ipython_notebook(): # pragma: no cover
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
return True # Jupyter notebook or qtconsole
if shell == 'TerminalInteractiveShell':
return False # Terminal running IPython
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
@dataclass
class Settings: # pragma: no cover
r"""
A dataclass for use at the module level to store settings. This class
is defined as a Singleton so now matter how or where it gets
instantiated the same object is returned, containing all existing
settings.
Parameters
----------
notebook : boolean
Is automatically determined upon initialization of PoreSpy, and is
``True`` if running within a Jupyter notebook and ``False``
otherwise. This is used by the ``porespy.tools.get_tqdm`` function
to determine whether a standard or a notebook version of the
progress bar should be used.
tqdm : dict
This dictionary is passed directly to the the ``tqdm`` function
throughout PoreSpy (``for i in tqdm(range(N), **settings.tqdm)``).
To see a list of available options visit the tqdm website.
Probably the most important is ``'disable'`` which when set to
``True`` will silence the progress bars. It's also possible to
adjust the formatting such as ``'colour'`` and ``'ncols'``, which
controls width.
loglevel : str, or int
Determines what messages to get printed in console. Options are:
"TRACE" (5), "DEBUG" (10), "INFO" (20), "SUCCESS" (25), "WARNING" (30),
"ERROR" (40), "CRITICAL" (50)
"""
__instance__ = None
# Might need to add 'file': sys.stdout to tqdm dict
tqdm = {'disable': False,
'colour': None,
'ncols': None,
'leave': False,
'file': sys.stdout}
_loglevel = 40 if _is_ipython_notebook() else 30
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._notebook = None
self._ncores = psutil.cpu_count(logical=False)
@property
def loglevel(self):
return self._loglevel
@loglevel.setter
def loglevel(self, value):
if isinstance(value, str):
options = {
"TRACE" : 5,
"DEBUG" : 10,
"INFO" : 20,
"SUCESS" : 25,
"WARNING" : 30,
"ERROR" : 40,
"CRITICAL" : 50
}
value = options[value]
self._loglevel = value
logger.setLevel(value)
def __new__(cls):
if Settings.__instance__ is None:
Settings.__instance__ = super().__new__(cls)
return Settings.__instance__
def __repr__(self):
indent = 0
for item in self.__dir__():
if not item.startswith('_'):
indent = max(indent, len(item) + 1)
s = ''
for item in self.__dir__():
if not item.startswith('_'):
s += ''.join((item, ':', ' '*(indent-len(item))))
attr = getattr(self, item)
temp = ''.join((attr.__repr__(), '\n'))
if isinstance(attr, dict):
temp = temp.replace(',', '\n' + ' '*(indent + 1))
s += temp
return s
def _get_ncores(self):
if self._ncores is None:
self._ncores = psutil.cpu_count(logical=False)
return self._ncores
def _set_ncores(self, val):
cpu_count = psutil.cpu_count(logical=False)
if val is None:
val = cpu_count
elif val > cpu_count:
logger.error('Value is more than the available number of cores')
val = cpu_count
self._ncores = val
ncores = property(fget=_get_ncores, fset=_set_ncores)
def _get_notebook(self):
if self._notebook is None:
self._notebook = _is_ipython_notebook()
return self._notebook
def _set_notebook(self, val):
logger.error('This value is determined automatically at runtime')
notebook = property(fget=_get_notebook, fset=_set_notebook)
[docs]def get_tqdm(): # pragma: no cover
r"""
Fetches a version of ``tqdm`` function that depends on the environment.
Either text-based for the IPython console or gui-based for Jupyter
notebooks.
Returns
-------
tqdm : function handle
The function to use when wrapping an iterator (i.e. tqdm(range(n)))
"""
if Settings().notebook is True:
tqdm = importlib.import_module('tqdm.notebook')
else:
tqdm = importlib.import_module('tqdm')
return tqdm.tqdm
[docs]def show_docstring(func): # pragma: no cover
r"""
Fetches the docstring for a function and returns it in markdown format.
Useful for printing in a Jupyter notebook.
Parameters
----------
func : object
Function handle to function whose docstring is desired
Returns
-------
md : str
A string with the markdown syntax included, suitable for printing
in a Jupyter notebook using the ``IPython.display.Markdown``
function.
"""
title = f'---\n ## Documentation for ``{func.__name__}``\n ---\n'
try:
from npdoc_to_md import render_md_from_obj_docstring
txt = render_md_from_obj_docstring(obj=func, obj_namespace=func.__name__)
except ModuleNotFoundError:
txt = func.__doc__
return title + txt + '\n---'
[docs]def sanitize_filename(filename, ext, exclude_ext=False):
r"""
Returns a sanitized string in the form of name.extension
Parameters
----------
filename : str
Unsanitized filename, could be 'test.vtk' or just 'test'
ext : str
Extension of the file, could be 'vtk'
exclude_ext : bool
If True, the returned string doesn't have the extension
Returns
-------
sanitized : str
Sanitized filename in form of name.extension
"""
ext.strip(".")
if filename.endswith(f".{ext}"):
name = ".".join(filename.split(".")[:-1])
else:
name = filename
filename_formatted = f"{name}" if exclude_ext else f"{name}.{ext}"
return filename_formatted
[docs]class Results:
r"""
A minimal class for use when returning multiple values from a function
This class supports dict-like assignment and retrieval
(``obj['im'] = im``), namedtuple-like attribute look-ups (``obj.im``),
and generic class-like object assignment (``obj.im = im``)
"""
def __init__(self, **kwargs):
self._func = inspect.getouterframes(inspect.currentframe())[1].function
self._time = time.asctime()
def __iter__(self):
for k, v in self.__dict__.items():
if not k.startswith('_'):
yield v
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
self.__dict__[key] = value
def __str__(self):
header = "―" * 78
lines = [
header,
f"Results of {self._func} generated at {self._time}",
header,
]
for item in list(self.__dict__.keys()):
if item.startswith('_'):
continue
if (isinstance(self[item], np.ndarray)):
s = np.shape(self[item])
lines.append("{0:<25s} Array of size {1}".format(item, s))
elif hasattr(self[item], 'keys'):
N = len(self[item].keys())
lines.append("{0:<25s} Dictionary with {1} items".format(item, N))
else:
lines.append("{0:<25s} {1}".format(item, self[item]))
lines.append(header)
return "\n".join(lines)