"""Algorithms based on ``BDM`` objects."""
from itertools import product
from random import choice
import numpy as np
[docs]class PerturbationExperiment:
"""Perturbation experiment class.
Perturbation experiment studies change of BDM / entropy under changes
applied to the underlying dataset. This is the main tool for detecting
parts of a system having some causal significance as opposed
to noise parts.
Parts which when perturbed yield negative contribution to the overall
complexity after change are likely to be important for the system,
since their removal make it more noisy. On the other hand parts that yield
positive contribution to the overall complexity after change are likely
to be noise since they elongate the system's description length.
Attributes
----------
bdm : BDM
BDM object. It has to be configured properly to handle
the dataset that is to be studied.
X : array_like (optional)
Dataset for perturbation analysis. May be set later.
metric : {'bdm', 'ent'}
Which metric to use for perturbing.
See also
--------
pybdm.bdm.BDM : BDM computations
Examples
--------
>>> import numpy as np
>>> from pybdm import BDM, PerturbationExperiment
>>> X = np.random.randint(0, 2, (100, 100))
>>> bdm = BDM(ndim=2)
>>> pe = PerturbationExperiment(bdm, metric='bdm')
>>> pe.set_data(X)
>>> idx = np.argwhere(X) # Perturb only ones (1 --> 0)
>>> delta_bdm = pe.run(idx)
>>> len(delta_bdm) == idx.shape[0]
True
More examples can be found in :doc:`usage`.
"""
def __init__(self, bdm, X=None, metric='bdm'):
"""Initialization method."""
self.bdm = bdm
self.metric = metric
self._counter = None
self._value = None
self._ncounts = None
if self.metric == 'bdm':
self._method = self._update_bdm
elif self.metric == 'ent':
self._method = self._update_ent
else:
raise AttributeError("Incorrect metric, not one of: 'bdm', 'ent'")
if X is None:
self.X = X
else:
self.set_data(X)
def __repr__(self):
cn = self.__class__.__name__
bdm = str(self.bdm)[1:-1]
return "<{}(metric={}) with {}>".format(cn, self.metric, bdm)
@property
def size(self):
"""Data size getter."""
return self.X.size
@property
def shape(self):
"""Data shape getter."""
return self.X.shape
@property
def ndim(self):
"""Data number of axes getter."""
return self.X.ndim
[docs] def set_data(self, X):
"""Set dataset for the perturbation experiment.
Parameters
----------
X : array_like
Dataset to perturb.
"""
if not np.isin(np.unique(X), range(self.bdm.nsymbols)).all():
raise ValueError("'X' is malformed (too many or ill-mapped symbols)")
self.X = X
self._counter = self.bdm.decompose_and_count(X)
if self.metric == 'bdm':
self._value = self.bdm.compute_bdm(self._counter)
elif self.metric == 'ent':
self._value = self.bdm.compute_ent(self._counter)
self._ncounts = sum(self._counter.values())
def _idx_to_parts(self, idx):
def _slice(i, k):
start = i - i % k
end = start + k
return slice(start, end)
try:
shift = self.bdm.partition.shift
except AttributeError:
shift = 0
shape = self.bdm.partition.shape
if shift == 0:
r_idx = tuple((k // l)*l for k, l in zip(idx, shape))
idx = tuple(slice(k, k+l) for k, l in zip(r_idx, shape))
else:
idx = tuple(slice(max(0, k-l+1), k+l) for k, l in zip(idx, shape))
yield from self.bdm.decompose(self.X[idx])
def _update_bdm(self, idx, old_value, new_value, keep_changes):
old_bdm = self._value
new_bdm = self._value
for key, cmx in self.bdm.lookup(self._idx_to_parts(idx)):
n = self._counter[(key, cmx)]
if n > 1:
new_bdm += np.log2((n-1) / n)
if keep_changes:
self._counter[(key, cmx)] -= 1
else:
new_bdm -= cmx
if keep_changes:
del self._counter[(key, cmx)]
self.X[idx] = new_value
for key, cmx in self.bdm.lookup(self._idx_to_parts(idx)):
n = self._counter.get((key, cmx), 0)
if n > 0:
new_bdm += np.log2((n+1) / n)
else:
new_bdm += cmx
if keep_changes:
self._counter.update([(key, cmx)])
if not keep_changes:
self.X[idx] = old_value
else:
self._value = new_bdm
return new_bdm - old_bdm
def _update_ent(self, idx, old_value, new_value, keep_changes):
old_ent = self._value
new_ent = self._value
for key, cmx in self.bdm.lookup(self._idx_to_parts(idx)):
n = self._counter[(key, cmx)]
p = n / self._ncounts
new_ent += p*np.log2(p)
if n > 1:
p = (n-1) / self._ncounts
new_ent -= p*np.log2(p)
if keep_changes:
self._counter[(key, cmx)] -= 1
elif keep_changes:
del self._counter[(key, cmx)]
self.X[idx] = new_value
for key, cmx in self.bdm.lookup(self._idx_to_parts(idx)):
n = self._counter.get((key, cmx), 0) + 1
p = n / self._ncounts
new_ent -= p*np.log2(p)
if n > 1:
p = (n-1) / self._ncounts
new_ent += p*np.log2(p)
if keep_changes:
self._counter.update([(key, cmx)])
if not keep_changes:
self.X[idx] = old_value
else:
self._value = new_ent
return new_ent - old_ent
[docs] def perturb(self, idx, value=-1, keep_changes=False):
"""Perturb element of the dataset.
Parameters
----------
idx : tuple
Index tuple of an element.
value : int or callable or None
Value to assign.
If negative then new value is randomly selected from the set
of other possible values.
For binary data this is just a bit flip and no random numbers
generation is involved in the process.
keep_changes : bool
If ``True`` then changes in the dataset are persistent,
so each perturbation step depends on the previous ones.
Returns
-------
float :
BDM value change.
Examples
--------
>>> from pybdm import BDM
>>> bdm = BDM(ndim=1)
>>> X = np.ones((30, ), dtype=int)
>>> perturbation = PerturbationExperiment(bdm, X)
>>> perturbation.perturb((10, ), -1) # doctest: +FLOAT_CMP
26.91763012739709
"""
old_value = self.X[idx]
if value < 0:
if self.bdm.nsymbols <= 2:
value = 1 if old_value == 0 else 0
else:
value = choice([
x for x in range(self.bdm.nsymbols)
if x != old_value
])
if old_value == value:
return 0
return self._method(idx, old_value, value, keep_changes)
[docs] def run(self, idx=None, values=None, keep_changes=False):
"""Run perturbation experiment.
Parameters
----------
idx : array_like or None
*Numpy* integer array providing indexes (in rows) of elements
to perturb. If ``None`` then all elements are perturbed.
values : array_like or None
Value to assign during perturbation.
Negative values correspond to changing value to other
randomly selected symbols from the alphabet.
If ``None`` then all values are assigned this way.
If set then its dimensions must agree with the dimensions
of ``idx`` (they are horizontally stacked).
keep_changes : bool
If ``True`` then changes in the dataset are persistent,
so each perturbation step depends on the previous ones.
Returns
-------
array_like
1D float array with perturbation values.
Examples
--------
>>> from pybdm import BDM
>>> bdm = BDM(ndim=1)
>>> X = np.ones((30, ), dtype=int)
>>> perturbation = PerturbationExperiment(bdm, X)
>>> changes = np.array([10, 20])
>>> perturbation.run(changes) # doctest: +FLOAT_CMP
array([26.91763013, 27.34823681])
"""
if idx is None:
indexes = [ range(k) for k in self.X.shape ]
idx = np.array([ x for x in product(*indexes) ], dtype=int)
if values is None:
values = np.full((idx.shape[0], ), -1, dtype=int)
return np.apply_along_axis(
lambda r: self.perturb(tuple(r[:-1]), r[-1], keep_changes=keep_changes),
axis=1,
arr=np.column_stack((idx, values))
)