import importlib
import inspect
import logging
import sys
import time
from dataclasses import dataclass
import numpy as np
import psutil
logger = logging.getLogger("porespy")
__all__ = [
"sanitize_filename",
"get_tqdm",
"get_edt",
"show_docstring",
"Results",
"tic",
"toc",
]
def get_edt():
try:
package = importlib.import_module("pyedt")
return package.edt
except ModuleNotFoundError:
package = importlib.import_module("edt")
return package.edt
def _format_time(timespan, precision=3):
"""Formats the timespan in a human readable form"""
if timespan >= 60.0:
# we have more than a minute, format that in a human readable form
# Idea from http://snipplr.com/view/5713/
parts = [("d", 60 * 60 * 24), ("h", 60 * 60), ("min", 60), ("s", 1)]
time = []
leftover = timespan
for suffix, length in parts:
value = int(leftover / length)
if value > 0:
leftover = leftover % length
time.append("%s%s" % (str(value), suffix))
if leftover < 1:
break
return " ".join(time)
# Unfortunately the unicode 'micro' symbol can cause problems in
# certain terminals.
# See bug: https://bugs.launchpad.net/ipython/+bug/348466
# Try to prevent crashes by being more secure than it needs to
# E.g. eclipse is able to print a µ, but has no sys.stdout.encoding set.
units = ["s", "ms", "us", "ns"] # the save value
if hasattr(sys.stdout, "encoding") and sys.stdout.encoding:
try:
"\xb5".encode(sys.stdout.encoding)
units = ["s", "ms", "\xb5s", "ns"]
except UnicodeEncodeError:
pass
scaling = [1, 1e3, 1e6, 1e9]
if timespan > 0.0:
order = min(-int(np.floor(np.log10(timespan)) // 3), 3)
else:
order = 3
return "%.*g %s" % (precision, timespan * scaling[order], units[order])
def tic():
r"""
Homemade version of matlab tic and toc function, tic starts or resets
the clock, toc reports the time since the last call of tic.
See Also
--------
toc
"""
global _startTime_for_tictoc
_startTime_for_tictoc = time.time()
def toc(quiet=False):
r"""
Homemade version of matlab tic and toc function, tic starts or resets
the clock, toc reports the time since the last call of tic.
Parameters
----------
quiet : bool, default is False
If False then a message is output to the console. If
True the message is not displayed and the elapsed time is returned.
See Also
--------
tic
"""
if "_startTime_for_tictoc" not in globals():
raise Exception("Start time not set, call tic first")
t = time.time() - _startTime_for_tictoc
if quiet is False:
print(f"Elapsed time: {_format_time(t)}")
return t
def _is_ipython_notebook(): # pragma: no cover
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
if shell == "TerminalInteractiveShell":
return False # Terminal running IPython
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
@dataclass
class Settings: # pragma: no cover
r"""
A dataclass for use at the module level to store settings. This class
is defined as a Singleton so now matter how or where it gets
instantiated the same object is returned, containing all existing
settings.
Parameters
----------
notebook : boolean
Is automatically determined upon initialization of PoreSpy, and is
``True`` if running within a Jupyter notebook and ``False``
otherwise. This is used by the ``porespy.tools.get_tqdm`` function
to determine whether a standard or a notebook version of the
progress bar should be used.
tqdm : dict
This dictionary is passed directly to the the ``tqdm`` function
throughout PoreSpy (``for i in tqdm(range(N), **settings.tqdm)``).
To see a list of available options visit the tqdm website.
Probably the most important is ``'disable'`` which when set to
``True`` will silence the progress bars. It's also possible to
adjust the formatting such as ``'colour'`` and ``'ncols'``, which
controls width.
loglevel : str, or int
Determines what messages to get printed in console. Options are:
"TRACE" (5), "DEBUG" (10), "INFO" (20), "SUCCESS" (25), "WARNING" (30),
"ERROR" (40), "CRITICAL" (50)
"""
__instance__ = None
# Might need to add 'file': sys.stdout to tqdm dict
tqdm = {
"disable": True,
"colour": None,
"ncols": None,
"leave": False,
"file": sys.stdout,
}
_loglevel = 40 if _is_ipython_notebook() else 30
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._notebook = None
self._ncores = psutil.cpu_count(logical=False)
@property
def loglevel(self):
return self._loglevel
@loglevel.setter
def loglevel(self, value):
if isinstance(value, str):
options = {
"TRACE": 5,
"DEBUG": 10,
"INFO": 20,
"SUCESS": 25,
"WARNING": 30,
"ERROR": 40,
"CRITICAL": 50,
}
value = options[value]
self._loglevel = value
logger.setLevel(value)
def __new__(cls):
if Settings.__instance__ is None:
Settings.__instance__ = super().__new__(cls)
return Settings.__instance__
def __repr__(self):
indent = 0
for item in self.__dir__():
if not item.startswith("_"):
indent = max(indent, len(item) + 1)
s = ""
for item in self.__dir__():
if not item.startswith("_"):
s += "".join((item, ":", " " * (indent - len(item))))
attr = getattr(self, item)
temp = "".join((attr.__repr__(), "\n"))
if isinstance(attr, dict):
temp = temp.replace(",", "\n" + " " * (indent + 1))
s += temp
return s
def _get_ncores(self):
if self._ncores is None:
self._ncores = psutil.cpu_count(logical=False)
return self._ncores
def _set_ncores(self, val):
cpu_count = psutil.cpu_count(logical=False)
if val is None:
val = cpu_count
elif val > cpu_count:
logger.error("Value is more than the available number of cores")
val = cpu_count
self._ncores = val
ncores = property(fget=_get_ncores, fset=_set_ncores)
def _get_notebook(self):
if self._notebook is None:
self._notebook = _is_ipython_notebook()
return self._notebook
def _set_notebook(self, val):
logger.error("This value is determined automatically at runtime")
notebook = property(fget=_get_notebook, fset=_set_notebook)
[docs]
def get_tqdm(): # pragma: no cover
r"""
Fetches a version of ``tqdm`` function that depends on the environment.
Either text-based for the IPython console or gui-based for Jupyter
notebooks.
Returns
-------
tqdm : function handle
The function to use when wrapping an iterator (i.e. tqdm(range(n)))
"""
if Settings().notebook is True:
tqdm = importlib.import_module("tqdm.notebook")
else:
tqdm = importlib.import_module("tqdm")
return tqdm.tqdm
[docs]
def show_docstring(func): # pragma: no cover
r"""
Fetches the docstring for a function and returns it in markdown format.
Useful for printing in a Jupyter notebook.
Parameters
----------
func : object
Handle to function whose docstring is desired
Returns
-------
md : str
A string with the markdown syntax included, suitable for printing
in a Jupyter notebook using the ``IPython.display.Markdown``
function.
"""
# Note: The following could work too:
# import pandoc
# Markdown(pandoc.write(pandoc.read(func, format='rst'), format='markdown'))
# Although the markdown conversion is not numpydoc specific so is less pretty
try:
from npdoc_to_md import render_obj_docstring
name = func.__module__.rsplit('.', 1)[0] + '.' + func.__name__
txt = render_obj_docstring(name)
except ModuleNotFoundError:
txt = func.__doc__
return txt
[docs]
def sanitize_filename(filename, ext, exclude_ext=False):
r"""
Returns a sanitized string in the form of name.extension
Parameters
----------
filename : str
Unsanitized filename, could be 'test.vtk' or just 'test'
ext : str
Extension of the file, could be 'vtk'
exclude_ext : bool
If True, the returned string doesn't have the extension
Returns
-------
sanitized : str
Sanitized filename in form of name.extension
"""
ext.strip(".")
if filename.endswith(f".{ext}"):
name = ".".join(filename.split(".")[:-1])
else:
name = filename
filename_formatted = f"{name}" if exclude_ext else f"{name}.{ext}"
return filename_formatted
[docs]
class Results:
r"""
A minimal class for use when returning multiple values from a function
This class supports dict-like assignment and retrieval
(``obj['im'] = im``), namedtuple-like attribute look-ups (``obj.im``),
and generic class-like object assignment (``obj.im = im``)
"""
# Resist the urge to add method to this class...the point is to keep
# the namespace clean!!
def __init__(self, **kwargs):
self._func = inspect.getouterframes(inspect.currentframe())[1].function
self._time = time.asctime()
def __iter__(self):
for k, v in self.__dict__.items():
if not k.startswith("_"):
yield v
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
self.__dict__[key] = value
def __str__(self):
header = "―" * 78
lines = [
header,
f"Results of {self._func} generated at {self._time}",
header,
]
for item in list(self.__dict__.keys()):
if item.startswith("_"):
continue
if isinstance(self[item], np.ndarray):
s = np.shape(self[item])
lines.append("{0:<25s} Array of size {1}".format(item, s))
elif hasattr(self[item], "keys"):
N = len(self[item].keys())
lines.append("{0:<25s} Dictionary with {1} items".format(item, N))
else:
lines.append("{0:<25s} {1}".format(item, self[item]))
lines.append(header)
return "\n".join(lines)