# -*- coding: UTF-8 -*-
"""Block Decomposition Method
`BDM` class provides a top-level interface for configuring an instance
of a block decomposition method as well as running actual computations
approximating algorithmic complexity of given datasets.
Configuration step is necessary for specifying dimensionality of allowed
datasets, reference CTM data as well as
boundary conditions for block decomposition etc. This is why BDM
is implemented in an object-oriented fashion, so an instance can be first
configured properly and then it exposes a public method :py:meth:`BDM.bdm`
for computing approximated complexity via BDM.
"""
# pylint: disable=protected-access
import warnings
from math import factorial, log2
from collections import Counter, defaultdict
from functools import reduce
from itertools import cycle, repeat, chain
import numpy as np
from . import options
from .utils import get_ctm_dataset
from .partitions import PartitionIgnore, PartitionCorrelated
from .encoding import string_from_array, normalize_key
from .exceptions import BDMRuntimeWarning
from .exceptions import CTMDatasetNotFoundError, BDMConfigurationError
[docs]class BDM:
"""Block decomposition method.
Attributes
----------
ndim : int
Number of dimensions of target dataset objects. Positive integer.
nsymbols : int
Number of symbols in the alphabet.
partition : Partition class
Partition algorithm class object.
The class is called with the `shape` attribute determined
automatically if not passed and other attributes passed via ``**kwds``.
ctmname : str
Name of the CTM dataset. If ``None`` then a CTM dataset is selected
automatically based on `ndim` and `nsymbols`.
warn_if_missing_ctm : bool
Should ``BDMRuntimeWarning`` be sent in the case there is missing
CTM value. Some CTM values may be missing for larger alphabets as it is
computationally infeasible to explore entire parts space.
Missing CTM values are imputed with mean CTM complexities
over all parts of a given shape.
This can be also disabled globally with the global option
of the same name, i.e. ``pybdm.options.set(warn_if_missing_ctm=False)``.
raise_if_zero : bool
Should error be raised if BDM value is zero.
Zero value indicates that a dataset could have incorrect dimensions.
This can be also disabled globally with the global option
of the same name, i.e. ``pybdm.options.set(raise_if_zero=False)``.
Notes
-----
Block decomposition method in **PyBDM** is implemented in an object-oriented
fashion. This design choice was dictated by the fact that BDM can not be
applied willy-nilly to any dataset, but has to be configured for a particular
type of data (e.g. binary matrices). Hence, it is more convenient to first
configure and instatiate a particular instance of BDM and the apply
it freely to data instead of passing a lot of arguments at every call.
BDM has also natural structure corresponding to the so-called
*split-apply-combine* strategy in data analysis.
First, a large dataset it decomposed into smaller block for which
precomputed CTM values can be efficiently looked up.
Then CTM values for slices are aggregated in a theory-informed way
into a global approximation of complexity of the full dataset. Thus,
BDM computations naturally decomposes into four stages:
#. **Partition (decomposition) stage.** First a dataset is decomposed
into block. This is done by the :py:meth:`decompose` method.
The method itself is dependent on the `partition` attribute which points
to a :py:mod:`pybdm.partitions` object, which implements and configures
a particular variant of the decomposition algorithm.
Detailed description of the available algorithms can be found in
:doc:`theory`.
#. **Lookup stage.** At this stage CTM values for blocks are looked up.
This is when the CTM reference dataset is used.
It is implemented in the :py:meth`lookup` method.
#. **Count stage.** Unique dataset blocks are counted and arranged in
an efficient data structure together with their CTM values.
#. **Aggregate stage.** Final BDM value is computed based on block
counter data structure.
See also
--------
pybdm.ctmdata : available CTM datasets
pybdm.partitions : available partition and boundary condition classes
"""
_ndim_to_ctm = {
# 1D datasets
(1, 2): 'CTM-B2-D12',
(1, 4): 'CTM-B4-D12',
(1, 5): 'CTM-B5-D12',
(1, 6): 'CTM-B6-D12',
(1, 9): 'CTM-B9-D12',
# 2D datasets
(2, 2): 'CTM-B2-D4x4',
}
def __init__(self, ndim, nsymbols=2, shape=None, partition=PartitionIgnore,
ctmname=None, warn_if_missing_ctm=True, raise_if_zero=True, **kwds):
"""Initialization method.
Parameters
----------
shape : tuple
Part shape to be passed to the partition algorithm.
**kwds :
Other keyword arguments passed to a partition algorithm class.
Raises
------
AttributeError
If 'shift' is not ``0`` or ``1``.
AttributeError
If parts' `shape` is not equal in each dimension.
CTMDatasetNotFoundError
If there is no CTM dataset for a combination of `ndim` and `nsymbols`
or a given `ctmname`.
"""
self.ndim = ndim
try:
self.ctmname = ctmname if ctmname else self._ndim_to_ctm[(ndim, nsymbols)]
except KeyError:
msg = "no CTM dataset for 'ndim={}' and 'nsymbols={}'".format(
ndim, nsymbols
)
raise CTMDatasetNotFoundError(msg)
try:
nsymbols, _shape = self.ctmname.split('-')[-2:]
except ValueError:
msg = "incorrect 'ctmname'; it should be in format " + \
"'name-b<nsymbols>-d<shape>'"
raise BDMConfigurationError(msg)
self.nsymbols = int(nsymbols[1:])
if shape is None:
shape = tuple(int(x) for x in _shape[1:].split('x'))
if any([ x != shape[0] for x in shape ]):
raise BDMConfigurationError("'shape' has to be equal in each dimension")
ctm, ctm_missing = get_ctm_dataset(self.ctmname)
self._ctm = ctm
self._ctm_missing = ctm_missing
self.warn_if_missing_ctm = warn_if_missing_ctm
self.raise_if_zero = raise_if_zero
self.partition = partition(shape=shape, **kwds)
def __repr__(self):
partition = str(self.partition)[1:-1]
cn = self.__class__.__name__
return "<{}(ndim={}, nsymbols={}) with {}>".format(
cn, self.ndim, self.nsymbols, partition
)
[docs] def decompose(self, X):
"""Decompose a dataset into blocks.
Parameters
----------
x : array_like
Dataset of arbitrary dimensionality represented as a *Numpy* array.
Yields
------
array_like
Dataset blocks.
Raises
------
AttributeError
If blocks' `shape` and dataset's shape have different numbers of axes.
**Acknowledgments**
Special thanks go to Paweł Weroński for the help with the design of
the non-recursive *partition* algorithm.
Examples
--------
>>> bdm = BDM(ndim=2, shape=(2, 2))
>>> [ x for x in bdm.decompose(np.ones((4, 3), dtype=int)) ]
[array([[1, 1],
[1, 1]]), array([[1, 1],
[1, 1]])]
"""
yield from self.partition.decompose(X)
[docs] def lookup(self, blocks, lookup_ctm=True):
"""Lookup CTM values for blocks in a reference dataset.
Parameters
----------
blocks : sequence
Ordered sequence of dataset parts.
lookup_ctm : bool
Should CTM values be looked up.
Yields
------
tuple
2-tuple with string representation of a dataset part and its CTM value.
Raises
------
KeyError
If key of an object can not be found in the reference CTM lookup table.
Warns
-----
BDMRuntimeWarning
If ``warn_if_missing_ctm=True`` and there is no precomputed CTM
value for a part during the lookup stage.
This can be always disabled with the global option of the same name.
Examples
--------
>>> bdm = BDM(ndim=1)
>>> data = np.ones((12, ), dtype=int)
>>> parts = bdm.decompose(data)
>>> [ x for x in bdm.lookup(parts) ] # doctest: +FLOAT_CMP
[('111111111111', 25.610413747641715)]
"""
for block in blocks:
sh = block.shape
key = string_from_array(block)
if not lookup_ctm:
yield key, None
continue
key_n = normalize_key(key)
try:
cmx = self._ctm[sh][key_n]
except KeyError:
cmx = self._ctm_missing[sh]
if self.warn_if_missing_ctm and options.get('warn_if_missing_ctm'):
msg = "CTM dataset does not contain object '{}' of shape {}".format(
key, sh
)
warnings.warn(msg, BDMRuntimeWarning, stacklevel=2)
yield key, cmx
[docs] def count(self, ctms):
"""Count unique blocks.
Parameters
----------
ctms : sequence of 2-tuples
Ordered 1D sequence of string keys and CTM values.
Returns
-------
Counter
Set of unique blocks with their CTM values and numbers of occurences.
Examples
--------
>>> bdm = BDM(ndim=1)
>>> data = np.ones((24, ), dtype=int)
>>> parts = bdm.decompose(data)
>>> ctms = bdm.lookup(parts)
>>> bdm.count(ctms) # doctest: +FLOAT_CMP
Counter({('111111111111', 25.610413747641715): 2})
"""
counter = Counter(ctms)
return counter
[docs] def decompose_and_count(self, X, lookup_ctm=True):
"""Decompose and count blocks.
Parameters
----------
X : array_like
Dataset representation as a :py:class:`numpy.ndarray`.
Number of axes must agree with the `ndim` attribute.
lookup_ctm : bool
Should CTM values be looked up.
Returns
-------
collections.Counter
Lookup table mapping 2-tuples with string keys and CTM values
to numbers of occurences.
Notes
-----
This is equivalent to calling :py:meth:`decompose`,
:py:meth:`lookup` and :py:meth:`count`.
Examples
--------
>>> import numpy as np
>>> bdm = BDM(ndim=1)
>>> bdm.decompose_and_count(np.ones((12, ), dtype=int)) # doctest: +FLOAT_CMP
Counter({('111111111111', 25.610413747641715): 1})
"""
blocks = self.decompose(X)
blocks = self.lookup(blocks, lookup_ctm=lookup_ctm)
counter = self.count(blocks)
return counter
[docs] def compute_bdm(self, *counters):
"""Approximate Kolmogorov complexity based on the BDM formula.
Parameters
----------
*counters :
Counter objects grouping object keys and occurences.
Returns
-------
float
Approximate algorithmic complexity.
Notes
-----
Detailed description can be found in :doc:`theory`.
Examples
--------
>>> from collections import Counter
>>> bdm = BDM(ndim=1)
>>> c1 = Counter([('111111111111', 1.95207842085224e-08)])
>>> c2 = Counter([('111111111111', 1.95207842085224e-08)])
>>> bdm.compute_bdm(c1, c2) # doctest: +FLOAT_CMP
1.000000019520784
"""
counter = reduce(lambda x, y: x+y, counters)
bdm = 0
for key, n in counter.items():
_, ctm = key
bdm += ctm + log2(n)
return bdm
[docs] def bdm(self, X, normalized=False, check_data=True):
"""Approximate complexity of a dataset with BDM.
Parameters
----------
X : array_like
Dataset representation as a :py:class:`numpy.ndarray`.
Number of axes must agree with the `ndim` attribute.
normalized : bool
Should BDM be normalized to be in the [0, 1] range.
check_data : bool
Should data format be checked.
May be disabled to gain some speed when calling multiple times.
Returns
-------
float
Approximate algorithmic complexity.
Raises
------
TypeError
If `X` is not an integer array and `check_data=True`.
ValueError
If `X` has more than `nsymbols` unique values
and `check_data=True`.
ValueError
If `X` has symbols outside of the ``0`` to `nsymbols-1` range
and `check_data=True`.
ValueError
If computed BDM value is 0 and `raise_if_zero` is ``True``.
Notes
-----
Detailed description can be found in :doc:`theory`.
Examples
--------
>>> import numpy as np
>>> bdm = BDM(ndim=2)
>>> bdm.bdm(np.ones((12, 12), dtype=int)) # doctest: +FLOAT_CMP
25.176631293734488
"""
if check_data:
self._check_data(X)
if normalized and isinstance(self.partition, PartitionCorrelated):
raise NotImplementedError(
"normalized BDM not implemented for '{}' partition".format(
PartitionCorrelated.name
))
counter = self.decompose_and_count(X)
cmx = self.compute_bdm(counter)
if self.raise_if_zero and options.get('raise_if_zero') and cmx == 0:
raise ValueError("Computed BDM is 0, dataset may have incorrect dimensions")
if normalized:
min_cmx = self._get_min_bdm(X)
max_cmx = self._get_max_bdm(X)
cmx = (cmx - min_cmx) / (max_cmx - min_cmx)
return cmx
[docs] def nbdm(self, X, **kwds):
"""Alias for normalized BDM
Other arguments are passed as keywords.
See also
--------
bdm : BDM method
"""
return self.bdm(X, normalized=True, **kwds)
[docs] def compute_ent(self, *counters):
"""Compute block entropy from a counter obejct.
Parameters
----------
*counters :
Counter objects grouping object keys and occurences.
Returns
-------
float
Block entropy in base 2.
Examples
--------
>>> from collections import Counter
>>> bdm = BDM(ndim=1)
>>> c1 = Counter([('111111111111', 1.95207842085224e-08)])
>>> c2 = Counter([('000000000000', 1.95207842085224e-08)])
>>> bdm.compute_ent(c1, c2) # doctest: +FLOAT_CMP
1.0
"""
counter = reduce(lambda x, y: x+y, counters)
ncounts = sum(counter.values())
ent = 0
for n in counter.values():
p = n/ncounts
ent -= p*np.log2(p)
return ent
[docs] def ent(self, X, normalized=False, check_data=True):
"""Block entropy of a dataset.
Parameters
----------
X : array_like
Dataset representation as a :py:class:`numpy.ndarray`.
Number of axes must agree with the `ndim` attribute.
normalized : bool
Should entropy be normalized to be in the [0, 1] range.
check_data : bool
Should data format be checked.
May be disabled to gain some speed when calling multiple times.
Returns
-------
float
Block entropy in base 2.
Raises
------
TypeError
If `X` is not an integer array and `check_data=True`.
ValueError
If `X` has more than `nsymbols` unique values
and `check_data=True`.
ValueError
If `X` has symbols outside of the ``0`` to `nsymbols-1` range
and `check_data=True`.
Examples
--------
>>> import numpy as np
>>> bdm = BDM(ndim=2)
>>> bdm.ent(np.ones((12, 12), dtype=int)) # doctest: +FLOAT_CMP
0.0
"""
if check_data:
self._check_data(X)
if normalized and isinstance(self.partition, PartitionCorrelated):
raise NotImplementedError(
"normalized entropy not implemented for '{}' partition".format(
PartitionCorrelated.name
))
counter = self.decompose_and_count(X, lookup_ctm=False)
ent = self.compute_ent(counter)
if normalized:
min_ent = self._get_min_ent(X)
max_ent = self._get_max_ent(X)
ent = (ent - min_ent) / (max_ent - min_ent)
return ent
[docs] def nent(self, X, **kwds):
"""Alias for normalized block entropy.
Other arguments are passed as keywords.
See also
--------
ent : block entropy method
"""
return self.ent(X, normalized=True, **kwds)
def _check_data(self, X):
"""Check if data is correctly formatted.
Symbols have to mapped to integers from ``0`` to `self.nsymbols-1`.
"""
if not issubclass(X.dtype.type, np.integer):
raise TypeError("'X' has to be an integer array")
symbols = np.unique(X)
if symbols.size > self.nsymbols:
raise ValueError("'X' has more than {} unique symbols".format(
self.nsymbols
))
valid_symbols = np.array([ _ for _ in range(self.nsymbols) ])
bad_symbols = symbols[~np.isin(symbols, valid_symbols)]
if bad_symbols.size > 0:
raise ValueError("'X' contains symbols outside of [0, {}]: {}".format(
str(self.nsymbols-1),
", ".join(str(s) for s in bad_symbols)
))
def _cycle_parts(self, shape):
"""Cycle over all possible dataset parts sorted by complexity."""
def rep(part):
key, cmx = part
n = len(set(key))
k = factorial(self.nsymbols) / factorial(self.nsymbols - n)
return repeat((key, cmx), int(k))
parts = chain.from_iterable(map(rep, self._ctm[shape].items()))
return cycle(enumerate(parts))
def _get_counter_dct(self, X):
cycle_dct = {}
counter_dct = defaultdict(Counter)
for shape, n in self._iter_shapes(X):
if shape not in cycle_dct:
cycle_dct[shape] = self._cycle_parts(shape)
for _ in range(n):
idx, kv = next(cycle_dct[shape])
_, cmx = kv
counter_dct[shape].update(((idx, cmx),))
return counter_dct
def _iter_shapes(self, X):
yield from Counter(self.partition._iter_shapes(X)).items()
def _get_max_bdm(self, X):
counter_dct = self._get_counter_dct(X)
bdm = 0
for dct in counter_dct.values():
for c, n in dct.items():
_, cmx = c
bdm += cmx + log2(n)
return bdm
def _get_min_bdm(self, X):
bdm = 0
for shape, n in self._iter_shapes(X):
cmx = next(reversed(self._ctm[shape].values()))
bdm += cmx + log2(n)
return bdm
def _get_max_ent(self, X):
counter_dct = self._get_counter_dct(X)
parts_count = Counter()
for dct in counter_dct.values():
parts_count.update(idx for idx, _ in dct)
return self.compute_ent(parts_count)
def _get_min_ent(self, X):
shapes = Counter(shape for shape, _ in self._iter_shapes(X))
ncounts = sum(shapes.values())
ent = 0
for n in shapes.values():
p = n/ncounts
ent -= p*log2(p)
return ent