import numpy as np
[docs]
class Chain:
"""
A discrete-time Markov chain.
States are arbitrary hashable python objects.
Transitions are stored as adjacency dictionaries with attributes, rather than as a matrix.
This class stores structures only, algorithms are implemented externally
"""
def __init__(self, data=None, **attr):
"""
Entry point for Chain Class
:param data: Optional data, converted to states.
:param attr:
"""
# Chain-Level attributes
self.chain = dict(attr)
# State -> state attribute dict
self._state = {}
# State -> successor -> transition attribution dict
self._trans = {} # Trans Rights
if data is not None:
self.add_states_from(data)
[docs]
def add_state(self, s, **attr):
"""Add a state to the chain
:param s: State to add
:param attr: Any other arguments
"""
if s not in self._state:
self._state[s] = {}
self._trans[s] = {}
self._state[s].update(attr)
[docs]
def add_states_from(self, states, **attr):
"""Adds multiple states to the chain
:param states: Iterable states to add
:param attr: Any other arguments
"""
for state in states:
self.add_state(state, **attr)
[docs]
def add_transition(self, u, v, p=None, **attr):
"""Add a transition u -> v to the chain
:param u: Transition origin state
:param v: Transition target state
:param p: Probability (optional)
:param attr: Any other arguments
"""
self.add_state(u)
self.add_state(v)
self._trans[u][v] = {"p": p, **attr}
@property
def states(self):
"""
:return: List of states
"""
return self._state.keys()
[docs]
def transitions(self, u=None):
"""
Returns transitions
If u is none:
iterate over (u, v, attr)
Else:
iterate over (u, v, attr) for fixed u
:param u: State to return transitions of
"""
if u is None:
for u, nbrs in self._trans.items():
for v, attr in nbrs.items():
yield u, v, attr
else:
for v, attr in self._trans.get(u, {}).items():
yield u, v, attr
[docs]
def add_transitions_from(self, data):
"""
Adds transitions from an iterable.
Each element may be:
(u, v)
(u, v, p)
(u, v, attr)
(u, v, p, attr)
:param data: Data to convert from
"""
for e in data:
if len(e) == 2:
u, v = e
self.add_transition(u, v)
elif len(e) == 3:
u, v, third = e
if isinstance(third, dict):
self.add_transition(u, v, **third)
else:
self.add_transition(u, v, p=third)
elif len(e) == 4:
u, v, p, attr = e
self.add_transition(u, v, p=p, **attr)
else:
raise ValueError("Invalid transition")
[docs]
def successors(self, u):
"""
Returns all v where u can transition to v
:param u: Origin State
:return: List of States that u can transition to
"""
return self._trans[u].keys()
[docs]
def predecessors(self, v):
"""
Returns all u where u can transition to v
:param v: Target state to find transitions
"""
for u, nbrs in self._trans.items():
if v in nbrs:
yield u
[docs]
def has_state(self, s):
"""
Returns true if s is a state
:param s: Possible state to check for
:return: If state exists in chain
"""
return s in self._state
[docs]
def has_transition(self, u, v):
"""
returns true if u and v are transitions
:param u: Origin of transition to find
:param v: Target of transition to find
:return: If transition exists
"""
return u in self._trans and v in self._trans[u]
[docs]
def transition_mass(self, u, v):
"""
Return the transition probability from state u to state v
If there is no outgoing edge from u to v, returns 0
:param u: Origin of the weight to find
:param v: Target of the weight to find
:return:
"""
return self._trans[u].get(v, 0.0)["p"]
[docs]
def out_degree(self, u, weight=None):
"""
Returns the number of outgoing edges if weight is none.
Else returns the sum of the weight of outgoing edges
:param u: State to count weight
:param weight: "p" returns sum.
:return: Sum or count of weights of outgoing edges
"""
if weight == "p":
return sum(attr.get("p", 0) for attr in self._trans[u].values())
return len(self._trans[u])
[docs]
def in_degree(self, v, weight=None):
"""
Returns the number of entering edges if weight is none.
Else returns the sum of the weight of entering edges
:param v: Target state to count weight
:param weight: "p" returns sum.
:return: Sum or count of weight of entering edges
"""
deg = 0
for u in self._trans:
for v in self._trans[u]:
if weight == "p":
deg += self._trans[u][v].get("p", 0)
else:
deg += 1
return deg
[docs]
def is_stochastic(self, tol=1e-12):
"""
Returns True if the chain is stochastic
:param tol: Optional Tolerance
:return: True if chain is stochastic
"""
for u, nbrs in self._trans.items():
if not nbrs:
continue # allow absorbing states
total = sum(attr.get("p", 0) for attr in nbrs.values())
if abs(total - 1.0) > tol:
return False
return True
[docs]
def normalise(self):
"""
Takes the sums of the weights and normalises them to 1
"""
for u in self._trans:
total = sum(attr.get("p", 0) for attr in self._trans[u].values())
if total > 0:
for v in self._trans[u]:
self._trans[u][v]["p"] /= total
def __len__(self):
return len(self._state)
def __iter__(self):
return iter(self._state)
def __contains__(self, s):
return s in self._state
def __repr__(self):
return f"Chain with {len(self)} states and {sum(len(n) for n in self._trans.values())} transitions"
[docs]
@classmethod
def from_adjacency_matrix(cls, matrix, states=None, normalise=True, validate=True):
"""
Constructs a Markov chain from an adjacency matric
The adjacency matrix is interpreted row-wise
:param matrix: Sequence of Sequences of non-negative numbers
:param states: Optional state labels
:param normalise: Optional, rows of matrix normalised to 1
:param validate: Optional, validates the matrix for correctness
:return: Chain
"""
n = len(matrix)
if n == 0:
raise ValueError(f"Matrix {matrix} must be non-empty")
if validate:
cls._validate_matrix(matrix, states)
if states is None:
states = list(range(n))
chain = cls()
for i, row in enumerate(matrix):
total = sum(row)
for j, value in enumerate(row):
if value <= 0:
continue
prob = value / total if normalise else value
chain.add_transition(states[i], states[j], prob)
return chain
@staticmethod
def _validate_matrix(matrix, states=None):
n = len(matrix)
# Check square
for row in matrix:
if len(row) != n:
raise ValueError(f"Matrix {matrix} must be square")
# Check states same size as matrix
if states is not None and len(states) != n:
raise ValueError(f"Number of states must match matrix dimension")
# Non-negativity, non-zero
for row in matrix:
if any(x < 0 for x in row):
raise ValueError(f"Matrix entries must be non-negative")
if sum(row) == 0:
raise ValueError(f"Matrix rows must be non-zero")
@staticmethod
def _validate_transitions(transitions, tol=1e-12):
for origin, target in transitions.items():
total = sum(target.values())
if abs(total - 1.0) > tol:
raise ValueError(f"Transition {transitions[origin]} must sum to 1")
if any(p < 0 for p in target.values()):
raise ValueError(
f"Transition {transitions[origin]} must be non-negative"
)
[docs]
def to_adjacency_matrix(self, states=None, dense=True):
"""
:param states:
:param dense:
:return:
"""
if states is None:
states = list(self.states)
if dense:
n = len(states)
matrix = [[0.0 for _ in range(n)] for _ in range(n)]
state_index = {s: i for i, s in enumerate(states)}
for i, u in enumerate(states):
for v in self.successors(u):
j = state_index[v]
matrix[i][j] = self.transition_mass(u, v)
return matrix
else:
matrix = {}
for u in states:
matrix[u] = {v: self.transition_mass(u, v) for v in self.successors(u)}
return matrix
[docs]
@classmethod
def merge(cls, chain1, chain2, merge_type="add", normalise=True):
merged = cls()
# add all transitions from chain1
for u in chain1.states:
for v in chain1.successors(u):
merged.add_transition(u, v, chain1.transition_mass(u, v))
# add all transitions from chain2
for u in chain2.states:
for v in chain2.successors(u):
p2 = chain2.transition_mass(u, v)
if u not in merged._trans:
merged.add_state(u)
if merge_type == "add":
if v in merged.successors(u):
# Only sum existing probability
merged._trans[u][v]["p"] += p2
else:
merged.add_transition(u, v, p2)
else: # overwrite
merged.add_transition(u, v, p2)
if normalise:
for u in merged.states:
total = sum(merged.transition_mass(u, v) for v in merged.successors(u))
if total > 0:
for v in merged.successors(u):
merged._trans[u][v]["p"] /= total
return merged
[docs]
def stationary_distribution(self, method="auto", tol=1e-12, max_iter=10000):
n = len(self.states)
states = list(self.states)
P = np.array(self.to_adjacency_matrix(states, dense=True), dtype=float)
if method == "auto":
if n <= 20:
method = "linear"
else:
method = "power"
if method == "linear":
# Solve π P = π <=> (P.T - I) π^T = 0, sum π_i = 1
A = P.T - np.eye(n)
A = np.vstack([A, np.ones(n)])
b = np.zeros(n + 1)
b[-1] = 1
try:
pi = np.linalg.lstsq(A, b, rcond=None)[0]
pi = np.clip(pi, 0, None) # Remove negative small values
pi /= pi.sum()
except np.linalg.LinAlgError:
raise ValueError("Linear algebra soloutio nfailed, try method ='power'")
elif method == "power":
row_sums = P.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1
P = P / row_sums
pi = np.ones(n) / n
for _ in range(max_iter):
pi_next = pi @ P
if np.linalg.norm(pi_next - pi, 1) < tol:
pi = pi_next
break
pi = pi_next
pi /= pi.sum()
return dict(zip(states, pi))