amachine.am_hmm
1from __future__ import annotations 2 3from abc import abstractmethod 4from typing import override 5import gc 6import copy 7from collections import deque 8from collections import defaultdict 9import json 10from fractions import Fraction 11from pathlib import Path 12import warnings 13import time 14from dataclasses import dataclass, asdict, field 15 16import matplotlib.pyplot as plt 17 18import networkx as nx 19 20from automata.fa.dfa import DFA 21 22import sympy 23import numpy as np 24 25from .am_machine import Machine 26from .am_symbol import Symbol 27 28from .am_msp import MSP, compute_msp, compute_msp_exact 29from .am_solve import solve_for_pi, solve_for_pi_fractional 30from . import am_vis 31from . import am_fast 32 33from .am_causal_state import CausalState 34from .am_transition import Transition 35 36from .am_fast.distance import jensenshannondivergence_gpu as af_jensenshannondivergence 37from .am_fast.distance import jensenshannondivergence_cpu as af_jensenshannondivergence_cpu 38 39class HMM(Machine) : 40 41 """Hidden Markov model implementing epsilon machines, mixed state presentations, 42 complexity measures, and data generation. 43 44 Args: 45 states (list[CausalState] | None): A list of causal states. 46 transitions (list[Transition] | None): A list of transitions between states. 47 start_state (int): Index of the start state. 48 alphabet (list[str]): List of symbols making up the alphabet. 49 name (str): Name of the model. 50 description (str): Description of the model. 51 """ 52 53 def __init__( 54 self, 55 states : list[CausalState] | None = None, 56 transitions : list[Transition] | None = None, 57 start_state : int = 0, 58 alphabet : list[str] | None = None, 59 name : str = "", 60 description : str = "" ) : 61 62 self.alphabet : list[str] | None = alphabet or [] 63 self.states : list[CausalState] | None= states or [] 64 self.transitions : list[Transition] | None= transitions or [] 65 66 self.set_alphabet( self.alphabet ) 67 self.set_states( self.states ) 68 self.set_transitions( self.transitions ) 69 70 self.start_state : int = start_state 71 72 self.name : str = name 73 self.description : str = description 74 75 # To be depreciated 76 self.isoclass : str = None 77 78 # --- derived -------- 79 80 self.complexity : dict[str, any] = {} 81 82 self.symbol_idx_map : dict[str,int] = {} 83 self.state_idx_map : dict[str,int] = {} 84 85 self.pi_fractional = None 86 self.pi : np.ndarray | None = None 87 self.T : np.ndarray | None = None 88 self.msp : MSP | None = None 89 self.reverse_am : HMM | None = None 90 self.is_q_weighted : bool = False 91 self.is_minimal : bool = False 92 93 # --- const ---------- 94 95 self.EPS : float = 1e-12 96 97 #-------------------------------------------------------# 98 # Overrides / Getters # 99 #-------------------------------------------------------# 100 101 @override 102 def get_states(self) -> list[CausalState] : 103 return self.states 104 105 @override 106 def get_transitions(self) -> list[Transition] : 107 return self.transitions 108 109 @override 110 def get_alphabet(self) -> list[Symbol]: 111 return [ Symbol(a) for a in self.alphabet ] 112 113 #-------------------------------------------------------# 114 # Serialization # 115 #-------------------------------------------------------# 116 117 118 def to_dict(self) -> dict[str,any]: 119 120 """Create a dict representing the HMM configuration. 121 122 Returns: 123 124 dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass. 125 """ 126 127 return { 128 "name" : self.name, 129 "description" : self.description, 130 "states" : [ asdict(state) for state in self.states ], 131 "transitions" : [ asdict(transition) for transition in self.transitions ], 132 "alphabet" : self.alphabet, 133 "isoclass" : self.isoclass 134 } 135 136 def from_dict( self, config : dict[str,any] ) : 137 138 """Configure the HMM configuration from a dictionary. 139 140 Args: 141 config (dict[str,any]): The HMM configuration. 142 """ 143 144 self.name = config.name 145 self.description = config.description 146 147 self.set_states( states=[ 148 CausalState( 149 name=state[ "name" ], 150 classes=set( state[ "classes" ] ) 151 ) 152 for state in config.states 153 ] ) 154 155 self.set_transitions( transitions=[ 156 Transition( 157 origin_state_idx=tr[ "origin_state_idx" ], 158 target_state_idx=tr[ "target_state_idx" ], 159 prob=tr[ "prob" ], 160 symbol_idx=tr[ "symbol_idx" ] 161 ) 162 for tr in config.transitions 163 ] ) 164 165 self.set_alphabet( alphabet=config.alphabet ) 166 167 def save_config( 168 self, 169 path : Path, 170 with_complexity : bool = False, 171 with_non_trivial_complexity : bool = False ) : 172 173 config = self.to_dict() 174 175 if with_complexity : 176 177 complexity = self.get_complexities( 178 with_non_trivial=with_non_trivial_complexity 179 ) 180 181 config[ "complexity" ] = complexity 182 183 config[ "structural_properties" ] = { 184 "unifilar" : self.is_unifilar(), 185 "row_stochastic" : self.is_row_stochastic(), 186 "strongly_connected" : self.is_strongly_connected(), 187 "aperiodic" : self.is_aperiodic(), 188 "minimal" : self._is_minimal_as_dfa( topological_only=False ), 189 "topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ), 190 "is_epsilon_machine" : self.is_epsilon_machine() 191 } 192 193 with open( path / "am_config.json", "w", encoding="utf-8" ) as f : 194 json.dump( config, f, ensure_ascii=False, indent=2, default=list ) 195 196 def from_file( self, path : Path ) : 197 with open( Path / "am_config.json", "r" ) as f: 198 config = json.load(f) 199 self.from_dict() 200 201 #-------------------------------------------------------# 202 # Setters and State Management # 203 #-------------------------------------------------------# 204 205 def clear_cache(self) : 206 207 """ 208 Reset all derived properties so they will be recomputed when requested later. 209 """ 210 211 self.complexity = {} 212 self.T = None 213 self.T_x = None 214 self.pi = None 215 self.pi_fractional = None 216 self.msp = None 217 self.reverse_am = None 218 self.is_q_weighted = False 219 self.is_minimal = False 220 221 def set_states( self, states : list[CausalState] ) : 222 self.clear_cache() 223 self.states = states.copy() 224 self.state_idx_map = {} 225 for idx, state in enumerate( self.states ) : 226 self.state_idx_map[ state.name ] = idx 227 228 def set_alphabet( self, alphabet : list[str] ) : 229 230 self.clear_cache() 231 232 old_alphabet = self.alphabet.copy() 233 234 aSet = set() 235 aSet.update( alphabet ) 236 237 self.alphabet = sorted(list(aSet)) 238 239 self.symbol_idx_map = {} 240 for idx, symbol in enumerate( self.alphabet ) : 241 self.symbol_idx_map[ symbol ] = idx 242 243 for i, tr in enumerate( self.transitions ) : 244 symbol = old_alphabet[ tr.symbol_idx ] 245 self.transitions[ i ] = Transition( 246 origin_state_idx=tr.origin_state_idx, 247 target_state_idx=tr.target_state_idx, 248 prob=tr.prob, 249 symbol_idx=self.symbol_idx_map[ symbol ] 250 ) 251 252 def set_transitions( self, transitions : list[Transition] ) : 253 self.clear_cache() 254 self.transitions = transitions.copy() 255 256 def extend_states( self, states : list[CausalState] ) : 257 self.set_states( self.states + states ) 258 259 def extend_alphabet( self, alphabet : list[str] ) : 260 self.set_alphabet( self.alphabet + alphabet ) 261 262 def extend_transitions( self, transitions : list[Transition] ) : 263 self.set_transitions( self.transitions + transitions ) 264 265 def get_complexity_measure_if_exists(self, measure ) : 266 m = self.complexity.get( measure, None ) 267 return m 268 269 def set_complexity_measure(self, measure, value ) : 270 self.complexity[ measure ] = value 271 272 #-------------------------------------------------------# 273 # Get and Compute # 274 #-------------------------------------------------------# 275 276 def get_complexities( 277 self, 278 with_non_trivial=False ) : 279 280 trivial = [ 281 self.C_mu, 282 self.h_mu, 283 self.H_1, 284 self.rho_mu 285 ] 286 287 non_trivial = [ 288 self.E, 289 self.T_inf, 290 self.S, 291 self.chi 292 ] 293 294 complexities = { m.__name__ : m() for m in trivial } 295 296 if with_non_trivial : 297 298 complexities |= { m.__name__ : m() for m in non_trivial } 299 300 for key in [ 'H_L', 'h_mu_L', 'H_sync' ] : 301 if key in self.complexity : 302 complexities[ key ] = self.complexity[ key ] 303 304 return complexities 305 306 #-------------------------------------------------------# 307 308 def get_metadata(self) : 309 return { 310 "name" : self.name, 311 'complexity' : self.complexity, 312 "description" : self.description 313 } 314 315 def get_transition_matrix(self) : 316 317 if self.T is not None : 318 return self.T 319 320 n_states = len( self.states ) 321 T = np.zeros((n_states, n_states)) 322 323 for tr in self.transitions : 324 T[ tr.origin_state_idx, tr.target_state_idx ] = tr.prob 325 326 self.T = T 327 328 return self.T 329 330 #-------------------------------------------------------# 331 332 def get_T_X(self) : 333 334 if self.T_x is not None : 335 return self.T_x 336 337 n_states = len( self.states ) 338 n_symbols = len( self.alphabet ) 339 340 T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ] 341 342 for tr in self.transitions : 343 T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob 344 345 self.T_x = T_x 346 return self.T_x 347 348 #-------------------------------------------------------# 349 350 def get_msp_qw( 351 self, 352 exact_state_cap: int = 1000, 353 verbose: bool = True, 354 ): 355 if self.msp is not None: 356 return self.msp 357 358 try : 359 360 print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" ) 361 362 self.msp = compute_msp_exact( 363 T_x=self.get_Tx_fractional(), 364 pi=self.get_fractional_stationary_distribution(), 365 n_states=len(self.states), 366 alphabet=self.alphabet, 367 exact_state_cap=1000, 368 bool = True 369 ) 370 371 return self.msp 372 373 except RuntimeError as e : 374 warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." ) 375 376 return self.get_msp() 377 378 def get_msp( 379 self, 380 exact_state_cap: int = 175_000, 381 jsd_eps: float = 1e-7, 382 k_ann: int = 50, 383 verbose = True, 384 ) : 385 386 if self.msp is not None: 387 return self.msp 388 389 T_x = self.get_T_X() 390 pi = self.get_stationary_distribution() 391 392 T_stacked = np.stack(T_x) 393 n_symbols = len(self.alphabet) 394 n_input_states = T_stacked.shape[1] 395 396 print( "\nComputing Mixed State Presentation..." ) 397 398 self.msp = compute_msp( 399 T_x=T_x, 400 pi=pi, 401 n_states=len(self.states), 402 alphabet=self.alphabet, 403 exact_state_cap=exact_state_cap, 404 verbose=verbose 405 ) 406 407 return self.msp 408 409 def get_reverse_am(self) : 410 411 if self.reverse_am is not None: 412 return self.reverse_am 413 414 pi = self.get_stationary_distribution() 415 self.reverse_am = copy.deepcopy(self) 416 417 new_transitions = [] 418 for tr in self.transitions: 419 i = tr.target_state_idx 420 j = tr.origin_state_idx 421 422 p_reversed = (pi[j] * tr.prob) / pi[i] 423 424 new_transitions.append( 425 Transition( 426 origin_state_idx=i, 427 target_state_idx=j, 428 prob=p_reversed, 429 symbol_idx=tr.symbol_idx 430 ) 431 ) 432 433 self.reverse_am.set_transitions(new_transitions) 434 435 if self.reverse_am.is_epsilon_machine(): 436 return self.reverse_am 437 438 rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 ) 439 440 self.reverse_am.set_states( rmsp.states ) 441 self.reverse_am.set_transitions( rmsp.transitions ) 442 self.reverse_am.msp = rmsp 443 self.reverse_am.start_state = 0 444 445 self.reverse_am.collapse_to_largest_strongly_connected_subgraph() 446 self.reverse_am.minimize() 447 448 return self.reverse_am 449 450 #-------------------------------------------------------# 451 452 def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] : 453 454 self.to_q_weighted() 455 456 n_states = len( self.states ) 457 n_symbols = len( self.alphabet ) 458 459 T_x = [] 460 461 for x in range( n_symbols ) : 462 T_x.append( [] ) 463 for i in range( n_states ) : 464 T_x[ x ].append( [ 0 for _ in range( n_states ) ] ) 465 466 for tr in self.transitions : 467 T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq 468 469 return T_x 470 471 def get_T_sympy( self ) : 472 473 self.to_q_weighted() 474 475 n = len( self.states ) 476 T = sympy.zeros( n, n ) 477 478 for tr in self.transitions : 479 T[ tr.origin_state_idx, tr.target_state_idx ] = tr.pq 480 481 return T 482 483 def get_fractional_stationary_distribution(self) : 484 485 T = self.get_T_sympy() 486 487 if self.pi_fractional is not None : 488 return self.pi_fractional 489 490 G = self.as_digraph() 491 492 if not nx.is_strongly_connected(G): 493 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 494 495 self.pi_fractional = solve_for_pi_fractional( T ) 496 497 return self.pi_fractional 498 499 def get_stationary_distribution(self): 500 501 if self.pi is not None : 502 return self.pi 503 504 G = self.as_digraph() 505 506 if not nx.is_strongly_connected(G): 507 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 508 509 T = self.get_transition_matrix() 510 return solve_for_pi( T ) 511 512 #-------------------------------------------------------# 513 # Complexity Measures # 514 #-------------------------------------------------------# 515 516 def C_mu( self ) : 517 518 """The *statistical complexity* (aka *forecasting complexity*) : 519 520 .. math:: 521 522 C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma), 523 524 where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 525 526 .. note:: 527 528 **Interpretations** 529 530 * The amount of historical information a process stores. 531 * The amount of structure in a process. 532 533 Returns: 534 535 float: :math:`C_{\\mu}`. 536 537 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 538 Decomposition of Intrinsic Computation*, 2016. 539 <https://arxiv.org/abs/1309.3792> 540 """ 541 542 m = self.get_complexity_measure_if_exists( "C_mu" ) 543 544 if m is not None : 545 return m 546 547 pi = self.get_stationary_distribution() 548 549 h = 0 550 for i, pr in enumerate( pi ) : 551 552 if pr < self.EPS : 553 continue 554 555 h += -pr * np.log2( pr ) 556 557 self.set_complexity_measure( "C_mu", h ) 558 559 return h 560 561 #-------------------------------------------------------# 562 563 def h_mu( self ) : 564 565 """The *entropy rate* : 566 567 .. math:: 568 569 h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma), 570 571 where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 572 573 .. note:: 574 575 **Interpretations** 576 577 * The lower bound on achievable loss in bits. 578 * The irreducable randomness in the process. 579 * The intrinsic Randomness in the process. 580 581 Returns: 582 583 float: :math:`h_{\\mu}`. 584 585 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 586 Decomposition of Intrinsic Computation*, 2016. 587 <https://arxiv.org/abs/1309.3792> 588 """ 589 590 m = self.get_complexity_measure_if_exists( "h_mu" ) 591 592 if m is not None : 593 return m 594 595 T = self.get_transition_matrix() 596 pi = self.get_stationary_distribution() 597 598 n_states = pi.size 599 600 h = 0 601 for i, pr in enumerate( pi ) : 602 603 if pr < self.EPS : 604 continue 605 606 row_entropy = 0 607 for j in range( len( pi ) ) : 608 609 if T[ i, j ] < self.EPS : 610 continue 611 612 row_entropy -= T[ i, j ] * np.log2( T[ i, j ] ) 613 614 h += pr * row_entropy 615 616 self.set_complexity_measure( "h_mu", h ) 617 618 return h 619 620 #-------------------------------------------------------# 621 622 def H_1(self) -> float : 623 624 """The *single symbol uncertainty*: 625 626 .. math:: 627 628 H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)}, 629 630 where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2. 631 632 .. note:: 633 634 **Interpretations** 635 636 * How uncertain you are on average about a single measurement with no context. 637 638 Returns: 639 640 float: :math:`H(1)`. 641 642 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 643 <https://arxiv.org/abs/1105.2988> 644 """ 645 646 m = self.get_complexity_measure_if_exists("H_1") 647 if m is not None: 648 return m 649 650 pi = self.get_stationary_distribution() 651 T_X = self.get_T_X() # dict: symbol -> matrix 652 653 h = 0.0 654 for T_x in T_X: 655 # Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j] 656 p_sym = 0.0 657 for i, pr in enumerate(pi): 658 if pr < self.EPS: 659 continue 660 p_sym += pr * T_x[i, :].sum() 661 662 if p_sym < self.EPS: 663 continue 664 h -= p_sym * np.log2(p_sym) 665 666 self.set_complexity_measure("H_1", h) 667 return h 668 669 #-------------------------------------------------------# 670 671 def rho_mu(self) -> float : 672 673 """The *anticipated information* [^James_2018], p.3.: 674 675 .. math:: 676 677 \\rho_{\\mu}= H(1) - h_{\\mu} 678 679 Returns: 680 681 float: :math:`\\rho_{\\mu}` 682 683 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 684 <https://arxiv.org/abs/1105.2988> 685 """ 686 687 m = self.get_complexity_measure_if_exists("rho_mu") 688 689 if m is not None: 690 return m 691 692 rho = self.H_1() - self.h_mu() 693 694 self.set_complexity_measure("rho_mu", rho) 695 696 return rho 697 698 #-------------------------------------------------------# 699 700 def block_convergence( self ) : 701 702 """ 703 Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]: 704 705 - $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 706 707 - $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 708 709 - $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 710 711 - $H(L) = H[X_{0:L}]$, 712 713 - $h_{\\mu}(L) = H(L) - H(L-1)$, and 714 715 - $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$. 716 717 You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 718 719 <img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;"> 720 721 Returns: 722 723 ComplexityMeasures: An object containing the estimated measures with the following attributes: 724 725 - E (float): The excess entropy. 726 - T_inf (float): The transient information ($\\mathbf{T}$). 727 - S (float): The synchronization information. 728 - E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$). 729 - T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$). 730 - S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$). 731 - H_L (numpy.ndarray): The block entropy ($H(L)$). 732 - h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$). 733 - H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$). 734 - converged (bool): True if the algorithm converged. 735 736 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 737 Decomposition of Intrinsic Computation*, 2016. 738 <https://arxiv.org/abs/1309.3792> 739 """ 740 741 trs = [ [] for _ in range( len( self.states ) ) ] 742 for tr in self.transitions : 743 trs[ tr.origin_state_idx ].append( ( 744 tr.symbol_idx, 745 float( tr.prob ), 746 tr.target_state_idx ) ) 747 748 pi = self.get_stationary_distribution() 749 750 state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ] 751 branches = [(1.0, list(state_dist))] 752 753 print( "\nComputing Block Entropy\n" ) 754 755 C = am_fast.block_entropy_convergence( 756 h_mu = self.h_mu(), 757 n_states = len( self.states ), 758 n_symbols = len( self.alphabet ), 759 convergence_tol = 1e-6, 760 precision = 10, 761 eps = 1e-25, 762 branches = branches, 763 trans = trs, 764 max_branches = 30_000_000 765 ) 766 767 print( "Done\n" ) 768 769 self.set_complexity_measure( f"E", C.E ) 770 self.set_complexity_measure( f"S", C.S ) 771 self.set_complexity_measure( f"T_inf", C.T ) 772 self.set_complexity_measure( f"E_L", C.E_L.tolist() ) 773 self.set_complexity_measure( f"T_L", C.T_L.tolist() ) 774 self.set_complexity_measure( f"S_L", C.S_L.tolist() ) 775 self.set_complexity_measure( f"H_L", C.H_L.tolist() ) 776 self.set_complexity_measure( f"h_mu_L", C.h_mu_L.tolist() ) 777 self.set_complexity_measure( f"H_sync", C.H_sync.tolist() ) 778 779 return C 780 781 #-------------------------------------------------------# 782 783 def E( self ) -> float : 784 785 """The *excess entropy* [^crutchfield_exact_2016], p.4: 786 787 .. math:: 788 789 \\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}] 790 791 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 792 793 .. note:: 794 795 **Interpretations** 796 797 * The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016]. 798 * How much information an observer must extract to synchronize to the process. 799 * Measures how long the process appears more complex than it asymptotically is. 800 * Vanishes for immediately synchronizable processes. 801 802 Returns: 803 804 float: :math:`\\mathbf{E}` 805 806 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 807 Decomposition of Intrinsic Computation*, 2016. 808 <https://arxiv.org/abs/1309.3792> 809 """ 810 811 m = self.get_complexity_measure_if_exists( "E" ) 812 813 if m is not None : 814 return m 815 816 try : 817 msp = self.get_msp() 818 E, S, T = msp.get_E_S_T() 819 self.set_complexity_measure( "E", E ) 820 self.set_complexity_measure( "S", S ) 821 self.set_complexity_measure( "T_inf", T ) 822 823 except Exception as e : 824 825 print( f"MSP failed {e}" ) 826 827 C = self.block_convergence() 828 E = C.E 829 self.set_complexity_measure( "E", E ) 830 831 return E 832 833 #-------------------------------------------------------# 834 835 def S( self ) -> float : 836 837 """The *synchronization* information: 838 839 .. math:: 840 841 \\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L), 842 843 where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4. 844 845 .. note:: 846 847 **Interpretations** 848 849 * The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016]. 850 851 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 852 853 Returns: 854 855 float: :math:`\\mathbf{S}` 856 857 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 858 Decomposition of Intrinsic Computation*, 2016. 859 <https://arxiv.org/abs/1309.3792> 860 """ 861 862 m = self.get_complexity_measure_if_exists( "S" ) 863 864 if m is not None : 865 return m 866 867 try : 868 msp = self.get_msp() 869 E, S, T = msp.get_E_S_T() 870 self.set_complexity_measure( "E", E ) 871 self.set_complexity_measure( "S", S ) 872 self.set_complexity_measure( "T_inf", T ) 873 874 except Exception as e : 875 print( f"{e} \nFalling back to iterative estimation.") 876 exit() 877 878 C = self.block_convergence() 879 S = C.S 880 self.set_complexity_measure( "S", S ) 881 882 return S 883 884 #-------------------------------------------------------# 885 886 def T_inf( self ) -> float : 887 888 """The *transient information*[^crutchfield_exact_2016], p.4: 889 890 .. math:: 891 892 \\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right] 893 894 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 895 896 .. note:: 897 898 **Interpretations** 899 900 * The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016]. 901 902 Returns: 903 904 float: :math:`\\mathbf{T}` 905 906 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 907 Decomposition of Intrinsic Computation*, 2016. 908 <https://arxiv.org/abs/1309.3792> 909 """ 910 911 m = self.get_complexity_measure_if_exists( "T_inf" ) 912 913 if m is not None : 914 return m 915 916 try : 917 msp = self.get_msp() 918 E, S, T = msp.get_E_S_T() 919 self.set_complexity_measure( "E", E ) 920 self.set_complexity_measure( "S", S ) 921 self.set_complexity_measure( "T_inf", T ) 922 923 except Exception as e : 924 print( f"{e} \nFalling back to iterative estimation.") 925 C = self.block_convergence() 926 T_inf = C.T 927 self.set_complexity_measure( "T_inf", T_inf ) 928 929 return T_inf 930 931 #-------------------------------------------------------# 932 933 def chi( self ) -> float : 934 935 """The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2: 936 937 .. math:: 938 939 \\chi = C_{\\mu} - \\mathbf{E} 940 941 :math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`. 942 943 .. note:: 944 945 **Interpretations** 946 947 * Difference between internal stored information and apparent information to an observer. 948 * How muching information is hiding in the system. 949 950 Returns: 951 952 float: :math:`\\chi` 953 954 [^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009. 955 <https://arxiv.org/abs/0902.1209> 956 957 [^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021. 958 <https://arxiv.org/abs/0905.4787> 959 """ 960 961 m = self.get_complexity_measure_if_exists( "chi" ) 962 963 if m is not None : 964 return m 965 966 chi = self.C_mu() - self.E() 967 968 if chi < 0 : 969 970 # if chi is 0, accumulated floating point error can result in small negative values 971 if chi < -1e-5: 972 warnings.warn(f"Crypticity is negative ({chi:.6e}).") 973 974 chi = np.clamp( chi, 0 ) 975 976 self.set_complexity_measure( "chi", chi ) 977 978 return chi 979 980 #-------------------------------------------------------# 981 # Properties # 982 #-------------------------------------------------------# 983 984 def is_row_stochastic(self) : 985 986 """ 987 Check that all states have outgoing transition probabilities that sum to 1. 988 """ 989 990 sums = np.zeros( len( self.states ) ) 991 for tr in self.transitions : 992 sums[ tr.origin_state_idx ] += tr.prob 993 return np.allclose( sums, 1.0 ) 994 995 #-------------------------------------------------------# 996 997 def is_unifilar(self) : 998 999 """ 1000 Check that no state emits the same symbol on transitions to different states. 1001 """ 1002 1003 symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 ) 1004 1005 for tr in self.transitions : 1006 1007 if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 : 1008 symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx 1009 1010 elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx : 1011 return False 1012 1013 return True 1014 1015 #-------------------------------------------------------# 1016 1017 def is_strongly_connected(self) : 1018 1019 """ 1020 Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html). 1021 """ 1022 1023 return nx.is_strongly_connected( self.as_digraph() ) 1024 1025 #-------------------------------------------------------# 1026 1027 def is_aperiodic(self) : 1028 1029 """ 1030 Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph." 1031 """ 1032 1033 return nx.is_aperiodic( self.as_digraph() ) 1034 1035 #-------------------------------------------------------# 1036 1037 def _is_minimal_as_dfa( self, topological_only : bool, verbose=True ) : 1038 1039 with_probs = not topological_only 1040 1041 # Construct the DFA 1042 dfa = self.as_dfa( with_probs=with_probs ) 1043 1044 # Minimize the DFA 1045 #dfa = dfa.minify(retain_names=True) 1046 dfa = am_fast.minify_cpp( dfa, retain_names=True ) 1047 1048 # check we have minimal number of states 1049 if len( dfa.states ) != len( self.states ) : 1050 if verbose : 1051 print( f"Not minimal reduces from {len( self.states )} to {len( dfa.states )} states" ) 1052 return False 1053 1054 return True 1055 1056 def is_topological_epsilon_machine( self, verbose=True ) : 1057 1058 """ 1059 Checks if the HMM is a topological $\\epsilon$-machine [^1]. 1060 1061 [^1]: Johnson et al, Enumerating Finitary Processes, 2024. 1062 <https://arxiv.org/abs/1011.0036> 1063 """ 1064 1065 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1066 if verbose : 1067 print( f"Either non unifilar or not strongly connected" ) 1068 return False 1069 else : 1070 return self._is_minimal_as_dfa( topological_only=True, verbose=verbose ) 1071 1072 def is_epsilon_machine( self, verbose=True ) : 1073 1074 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1075 if verbose : 1076 print( f"Either non unifilar or not strongly connected" ) 1077 return False 1078 else : 1079 return self._is_minimal_as_dfa( topological_only=False, verbose=verbose ) 1080 1081 #-------------------------------------------------------# 1082 # Properties # 1083 #-------------------------------------------------------# 1084 1085 def minimize(self, retain_names: bool = True, verbose=False): 1086 1087 """ 1088 Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM 1089 is unifilar and strongly connected. Converts the HMM to a DFA with symbols 1090 labeled jointly with symbols and probabilities, and uses Myhill-Nerode 1091 equivalence for minimization. Relies on `automata_lib` and uses 1092 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states 1093 final. 1094 1095 Args: 1096 retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`. 1097 1098 Returns: 1099 1100 automata.fa.dfa.DFA : the resulting DFA. 1101 """ 1102 1103 if self.is_minimal : 1104 return 1105 1106 start = time.perf_counter() 1107 1108 if not self.is_unifilar(): 1109 raise ValueError( 1110 "DFA minimization is not valid for non-unifilar HMMs" 1111 ) 1112 1113 was_strongly_connected = self.is_strongly_connected() 1114 1115 was_row_stochastic = self.is_row_stochastic() 1116 n_states_before = len(self.states) 1117 1118 dfa = self.as_dfa(with_probs=True) 1119 1120 #min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True) 1121 min_dfa = am_fast.minify_cpp( dfa, retain_names=True ) 1122 1123 # Build lookup from original state index -> CausalState object 1124 orig_state = {i: s for i, s in enumerate(self.states)} 1125 eq_list = list(min_dfa.states) 1126 1127 start_eq = min_dfa.initial_state 1128 1129 # Separate the start state, then sort the rest by the 1130 # smallest original state index inside each equivalence class. 1131 other_eqs = [eq for eq in eq_list if eq != start_eq] 1132 other_eqs.sort(key=lambda eq: min(eq)) 1133 1134 # Recombine so start eq comes first, followed by the sorted remaining classes 1135 eq_list = [start_eq] + other_eqs 1136 # ---------------------------------------------------------- 1137 1138 # Recompute eq_to_idx with the new ordering 1139 eq_to_idx = {eq: i for i, eq in enumerate(eq_list)} 1140 1141 # new_start is now guaranteed to be 0 1142 new_start = 0 1143 1144 # Map each original state index -> its equivalence class 1145 # Guard: minify() silently drops unreachable states 1146 orig_to_eq = {s: eq for eq in min_dfa.states for s in eq} 1147 1148 # Build lookup from original state index -> its transitions 1149 orig_trs = defaultdict(list) 1150 for t in self.transitions: 1151 orig_trs[t.origin_state_idx].append(t) 1152 1153 new_trs = [] 1154 for eq in min_dfa.states: 1155 rep = next(iter(eq)) 1156 origin_idx = eq_to_idx[eq] 1157 for t in orig_trs[rep]: 1158 target_eq = orig_to_eq[t.target_state_idx] 1159 target_idx = eq_to_idx[target_eq] 1160 new_trs.append(Transition( 1161 origin_state_idx = origin_idx, 1162 target_state_idx = target_idx, 1163 prob = t.prob, 1164 symbol_idx = t.symbol_idx, 1165 )) 1166 1167 members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list] # sorted for determinism 1168 1169 # Compute new names 1170 if retain_names: 1171 new_names = [ 1172 "{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1 1173 else members[0].name 1174 for members in members_list 1175 ] 1176 else: 1177 new_names = [str(j) for j in range(len(eq_list))] 1178 1179 old_name_to_new_name = { 1180 m.name: new_names[j] 1181 for j, members in enumerate(members_list) 1182 for m in members 1183 } 1184 1185 # Build the new states, preserving classes and isomorphs regardless of naming 1186 new_states = [] 1187 for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)): 1188 classes = set().union(*(m.classes for m in members)) 1189 isomorphs = { 1190 old_name_to_new_name.get(iso, iso) 1191 for m in members 1192 for iso in m.isomorphs 1193 if old_name_to_new_name.get(iso, iso) != name 1194 } 1195 new_states.append(CausalState( 1196 name = name, 1197 classes = classes, 1198 isomorphs = isomorphs, 1199 )) 1200 1201 self.set_states(new_states) 1202 self.set_transitions(new_trs) 1203 self.start_state = new_start 1204 1205 if n_states_before == len(new_states) and verbose : 1206 print( f"{n_states_before} state HMM was already minimal.\n" ) 1207 elif verbose : 1208 print( f"Minimized from {n_states_before} to {len(new_states)}\n" ) 1209 1210 if not ( was_strongly_connected == self.is_strongly_connected() ) : 1211 raise RuntimeError( 1212 f"Minimization broke strongly connected" 1213 ) 1214 1215 if not ( was_row_stochastic == self.is_row_stochastic() ) : 1216 raise RuntimeError( 1217 f"Minimization broke row stochasticity" 1218 ) 1219 1220 self.is_minimal = True 1221 1222 #-------------------------------------------------------# 1223 # Modifiers # 1224 #-------------------------------------------------------# 1225 1226 1227 def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) : 1228 1229 # get equivalent networkx graph 1230 G = self.as_digraph() 1231 1232 # if already strongly connected, nothing to do 1233 if not nx.is_strongly_connected( G ) : 1234 1235 start = time.perf_counter() 1236 subgraph_nodes = list( nx.strongly_connected_components( G ) ) 1237 1238 # decompose into strongly connected components and sort by length 1239 # subgraph_nodes = list(nx.strongly_connected_components( G )) 1240 subgraph_nodes.sort(key=len) 1241 component_state_set = subgraph_nodes[-1] 1242 1243 # Take the largest strongly connected component (as list of state names) 1244 component_states = sorted( list( component_state_set ) ) 1245 1246 # make temporary copies of the old transitions and states 1247 old_transitions = [ 1248 Transition( 1249 origin_state_idx=tr.origin_state_idx, 1250 target_state_idx=tr.target_state_idx, 1251 prob=tr.prob, 1252 symbol_idx=tr.symbol_idx 1253 ) 1254 1255 for tr in self.transitions 1256 ] 1257 1258 old_states = [ 1259 CausalState( 1260 name=s.name, 1261 classes=s.classes, 1262 isomorphs=s.isomorphs 1263 ) 1264 for s in self.states 1265 ] 1266 1267 self.set_states( 1268 states=[ 1269 state 1270 for i, state in enumerate( old_states ) if i in component_state_set 1271 ] 1272 ) 1273 1274 # we will build new transition list based on those belonging to the component 1275 self.set_transitions( transitions= [] ) 1276 1277 # for tracking which new transitions leave each state 1278 transitions_from_state = { state : set() for state in component_states } 1279 new_transitions = [] 1280 1281 for tr in old_transitions : 1282 1283 origin_state_name = old_states[ tr.origin_state_idx ].name 1284 target_state_name = old_states[ tr.target_state_idx ].name 1285 1286 # skip transitions that connect separate strongly connected components 1287 if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) : 1288 continue 1289 1290 # track transitions (by index in new transitions list) that leave this state 1291 transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) ) 1292 1293 my_origin_state_idx = self.state_idx_map[ origin_state_name ] 1294 my_target_state_idx = self.state_idx_map[ target_state_name ] 1295 1296 new_transitions.append( 1297 Transition( 1298 origin_state_idx=my_origin_state_idx, 1299 target_state_idx=my_target_state_idx, 1300 prob=tr.prob, 1301 symbol_idx=tr.symbol_idx 1302 ) ) 1303 1304 self.set_transitions( transitions=new_transitions ) 1305 1306 # if we removed an outgoing transition from a state, we need to distribute its probability 1307 # among the remaining outgoing transitions from the state 1308 for state in component_states : 1309 1310 # get the set of transitions leaving this state 1311 state_trs = transitions_from_state[ state ] 1312 1313 # sum the probabilities of the outgoing transitions from the state 1314 p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] ) 1315 1316 # how much probability is missing 1317 diff = 1.0 - p_sum 1318 1319 # if significant difference 1320 if abs( diff ) > self.EPS : 1321 1322 # calculate how much of the difference each transition gets 1323 adjustment = diff / len( state_trs ) 1324 1325 # update the transitions 1326 for i in state_trs : 1327 1328 # transition with origional probability 1329 tr = self.transitions[ i ] 1330 1331 # adjusted probability 1332 self.transitions[ i ] = Transition( 1333 origin_state_idx=tr.origin_state_idx, 1334 target_state_idx=tr.target_state_idx, 1335 prob=tr.prob + adjustment, 1336 symbol_idx=tr.symbol_idx ) 1337 1338 if rename_states : 1339 self.set_states( [ 1340 CausalState( name=f"{i}" ) 1341 for i, s in enumerate( self.states ) 1342 ] ) 1343 1344 def to_q_weighted( self, denominator_limit=1000 ) : 1345 1346 """ 1347 Approximates the existing transition probabilities with exact fractions, stores 1348 the fractional probabilities as Fraction in Transition.pq, and sets the floating 1349 point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 1350 conversion, the function recurses with `denominator_limit=denominator_limit*10`. 1351 1352 Args: 1353 denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 1354 the conversion. 1355 """ 1356 1357 if self.is_q_weighted : 1358 return 1359 1360 if not self.is_row_stochastic() : 1361 raise ValueError( "Cannot convert to q-weighted because not row stochastic" ) 1362 1363 t_from = [[] for _ in range(len(self.states))] 1364 1365 for i, tr in enumerate( self.transitions ) : 1366 t_from[ tr.origin_state_idx ].append( i ) 1367 1368 new_transitions = [] 1369 for t_list in t_from : 1370 1371 if not t_list : 1372 continue 1373 1374 p_q_sum = Fraction(0,1) 1375 p_qs = [] 1376 1377 for t_idx in t_list : 1378 1379 p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit ) 1380 p_q_sum += p_q 1381 p_qs.append( p_q ) 1382 1383 if p_q_sum != Fraction(1,1) : 1384 1385 max_pq_i = np.argmax( p_qs ) 1386 max_oq = p_qs[ max_pq_i ] 1387 1388 diff = p_q_sum - Fraction(1,1) 1389 1390 # If recurse with higher resolution 1391 if diff > max_oq : 1392 return self.to_q_weighted( denominator_limit*10 ) 1393 else : 1394 p_qs[ max_pq_i ] -= diff 1395 1396 for i, t_idx in enumerate( t_list ) : 1397 new_transitions.append( 1398 Transition( 1399 origin_state_idx=self.transitions[ t_idx ].origin_state_idx, 1400 target_state_idx=self.transitions[ t_idx ].target_state_idx, 1401 prob=float(p_qs[ i ]), 1402 symbol_idx=self.transitions[ t_idx ].symbol_idx, 1403 pq=p_qs[ i ] 1404 ) 1405 ) 1406 1407 self.set_transitions( new_transitions ) 1408 self.is_q_weighted = True 1409 1410 #-------------------------------------------------------# 1411 # Data Generation # 1412 #-------------------------------------------------------# 1413 1414 def isomorphic_shift( 1415 self, 1416 input_symbol_indices: np.ndarray, 1417 input_state_indices: np.ndarray, 1418 shift : int = 1 1419 ) -> dict[str, np.ndarray]: 1420 1421 """ 1422 Generates a new sequence of of symbols that are permuted with the symbols emitted by 1423 isomorphic states, if they exists. 1424 1425 :math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br> 1426 :math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]` 1427 1428 :math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br> 1429 :math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}` 1430 1431 :math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n` 1432 1433 :math:`\\texttt{output\\_symbol\\_indices}[i] := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br> 1434 :math:`\\texttt{output\\_state\\_indices}[i] := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br> 1435 :math:`\\texttt{output\\_state\\_indices}[i+1] := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)` 1436 1437 Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself. 1438 1439 Args: 1440 input_symbol_indices (np.ndarray): The sequence of generated symbols. 1441 input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end. 1442 shift : int: How much to shift the symbols across the isomorphic states. 1443 """ 1444 1445 if not any( state.isomorphs for state in self.states ): 1446 raise ValueError("HMM has no states with isomorphs") 1447 1448 inputs = np.asarray(input_symbol_indices) 1449 states = np.asarray(input_state_indices) 1450 1451 n_states = len(self.states) 1452 1453 tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32) 1454 1455 for tr in self.transitions: 1456 tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx 1457 1458 # Build isomorph remapping: identity by default, overridden where isomorphs exist 1459 iso_table = np.arange(n_states, dtype=np.int32) 1460 1461 for i, state in enumerate(self.states): 1462 if state.isomorphs: 1463 isomorph = sorted(state.isomorphs)[0] 1464 iso_table[ i ] = self.state_idx_map[isomorph] 1465 1466 for i, state in enumerate(self.states): 1467 if state.isomorphs: 1468 1469 # extend the isomorph list to include the identity 1470 isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] ) 1471 1472 # find the identity index 1473 pos = isormorphs_with_identity.index( i ) 1474 1475 # cyclical shift 1476 iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ] 1477 1478 origins = states[:-1] 1479 targets = states[1:] 1480 1481 out_origins = iso_table[origins] 1482 out_targets = iso_table[targets] 1483 1484 inv_sym = tr_sym_table[ out_origins, out_targets ] 1485 inv_sts = np.empty(states.size, dtype=states.dtype) 1486 1487 inv_sts[:-1] = out_origins 1488 inv_sts[-1] = out_targets[-1] 1489 1490 return { 1491 "symbol_index": inv_sym.astype(inputs.dtype), 1492 "state_index": inv_sts, 1493 } 1494 1495 def generate_data( 1496 self, 1497 file_prefix: str, 1498 n_gen: int, 1499 include_states: bool, 1500 isomorphic_shifts : set[int]=None, 1501 random_seed : int=42 ) -> dict[any] : 1502 1503 trs = [ [] for _ in range( len( self.states ) ) ] 1504 for tr in self.transitions : 1505 trs[ tr.origin_state_idx ].append( ( 1506 tr.symbol_idx, 1507 float( tr.prob ), 1508 tr.target_state_idx ) ) 1509 1510 data = am_fast.generate_data( 1511 n_gen=n_gen, 1512 start_state=self.start_state, 1513 transitions=trs, 1514 alphabet=sorted(list(self.alphabet)), 1515 include_states=include_states, 1516 random_seed=random_seed 1517 ) 1518 1519 if isomorphic_shifts is not None : 1520 1521 if not include_states : 1522 raise ValueError( "Isomorphic inversion requires include_states=True" ) 1523 1524 data[ "isomorphic_shifts" ] = {} 1525 1526 for shift in isomorphic_shifts : 1527 1528 try : 1529 1530 shifted = self.isomorphic_shift( 1531 input_symbol_indices=data[ "symbol_index" ], 1532 input_state_indices=data[ "state_index" ], 1533 shift=shift 1534 ) 1535 1536 data[ "isomorphic_shifts" ][ shift ] = { 1537 "symbol_index" : shifted[ "symbol_index" ], 1538 "state_index" : shifted[ "state_index" ] 1539 } 1540 1541 except Exception as e : 1542 print( f"Exception {e}" ) 1543 1544 am_fast.save_data( 1545 data=data, 1546 file_prefix=file_prefix, 1547 alphabet=sorted(list(self.alphabet)), 1548 n_states=len( self.states ), 1549 start_state=self.start_state, 1550 random_seed=random_seed, 1551 machine_metadata=self.get_metadata() ) 1552 1553 return data 1554 1555 #-------------------------------------------------------# 1556 # Basic Visualization # 1557 #-------------------------------------------------------# 1558 1559 def draw_block_entropy_curve(self) : 1560 1561 h_mu = self.h_mu() 1562 E = self.E() 1563 1564 if not "H_L" in self.complexity : 1565 C = self.block_convergence() 1566 1567 H_L = self.complexity[ "H_L" ] 1568 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1569 y = E + L*h_mu 1570 1571 plt.style.use('seaborn-v0_8-darkgrid') 1572 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1573 colors = plt.cm.tab10.colors 1574 1575 ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$') 1576 ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$') 1577 1578 ax.fill_between( 1579 L, H_L, y, 1580 color='gray', 1581 alpha=0.5, 1582 label=r'Transient information $\mathbf{T}$' 1583 ) 1584 1585 ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15) 1586 ax.set_xlabel('L', fontsize=12, labelpad=10) 1587 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1588 1589 legend = ax.legend( 1590 loc='upper left', 1591 title_fontsize='11', 1592 fontsize='10', 1593 frameon=True, 1594 facecolor='white', 1595 shadow=True 1596 ) 1597 1598 legend.get_title().set_fontweight('bold') 1599 plt.tight_layout() 1600 plt.show() 1601 1602 def draw_block_measure_curves(self) : 1603 1604 h_mu = self.h_mu() 1605 E = self.E() 1606 1607 if not "H_L" in self.complexity : 1608 C = self.block_convergence() 1609 1610 H_L = self.complexity[ "H_L" ] 1611 E_L = self.complexity[ "E_L" ] 1612 T_L = self.complexity[ "T_L" ] 1613 S_L = self.complexity[ "S_L" ] 1614 H_sync = self.complexity[ "H_sync" ][1:] 1615 h_mu_L = self.complexity[ "h_mu_L" ] 1616 1617 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1618 y = E + L*h_mu 1619 1620 plt.style.use('seaborn-v0_8-darkgrid') 1621 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1622 colors = plt.cm.tab10.colors 1623 1624 ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$') 1625 ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$') 1626 ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$') 1627 ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$') 1628 ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$') 1629 ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$') 1630 1631 ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15) 1632 ax.set_xlabel('L', fontsize=12, labelpad=10) 1633 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1634 1635 legend = ax.legend( 1636 loc='upper left', 1637 title_fontsize='11', 1638 fontsize='10', 1639 frameon=True, 1640 facecolor='white', 1641 shadow=True 1642 ) 1643 1644 legend.get_title().set_fontweight('bold') 1645 plt.tight_layout() 1646 plt.show() 1647 1648 def draw_graph( 1649 self, 1650 engine : str = 'dot', 1651 output_dir : Path = Path('.'), 1652 show : bool = True, 1653 symbols_only : bool = False 1654 ) -> None : 1655 1656 """ 1657 Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it. 1658 1659 Returns: 1660 1661 networkx.DiGraph : the resulting graph. 1662 """ 1663 1664 G = self.as_digraph() 1665 1666 subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) ) 1667 1668 am_vis.draw_graph( 1669 self, 1670 output_dir=output_dir, 1671 title="am_graph", 1672 view=show, 1673 subgraphs=subgraphs, 1674 engine=engine, 1675 symbols_only=symbols_only ) 1676 1677 #-------------------------------------------------------# 1678 # Alternate Forms # 1679 #-------------------------------------------------------# 1680 1681 1682 def as_digraph( self ) -> nx.DiGraph : 1683 1684 """ 1685 Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights. 1686 1687 Returns: 1688 1689 networkx.DiGraph : the resulting graph. 1690 """ 1691 1692 G = nx.DiGraph() 1693 G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] ) 1694 1695 for tr in self.transitions : 1696 G.add_edge( tr.origin_state_idx, tr.target_state_idx ) 1697 1698 return G 1699 1700 def as_dfa( self, with_probs : bool ) : 1701 1702 """ 1703 Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions. 1704 1705 Args: 1706 with_probs (bool): If true the DFA transitions are labeled based on 1707 the symbol of the machines transition concatenated with its 1708 probability, othwise, the only the symbols. 1709 1710 Returns: 1711 1712 automata.fa.dfa.DFA : the resulting DFA. 1713 """ 1714 1715 precision=8 1716 1717 def edge_label( symb, prob ) : 1718 return f"({symb},{round(prob, precision)})" 1719 1720 # Build states, symbols, and transitions 1721 dfa_states = { i for i, _ in enumerate( self.states ) } 1722 1723 if not with_probs : 1724 dfa_symbols = set( { t.symbol_idx for t in self.transitions } ) 1725 else : 1726 dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } ) 1727 1728 dfa_transitions = defaultdict(dict) 1729 1730 if not with_probs : 1731 for t in self.transitions : 1732 dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx 1733 else : 1734 for t in self.transitions : 1735 dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx 1736 1737 # Construct the DFA 1738 return DFA( 1739 states=dfa_states, 1740 input_symbols=dfa_symbols, 1741 transitions=dfa_transitions, 1742 initial_state=self.start_state, 1743 allow_partial=True, 1744 final_states={ 1745 s for s in dfa_states 1746 } 1747 )
40class HMM(Machine) : 41 42 """Hidden Markov model implementing epsilon machines, mixed state presentations, 43 complexity measures, and data generation. 44 45 Args: 46 states (list[CausalState] | None): A list of causal states. 47 transitions (list[Transition] | None): A list of transitions between states. 48 start_state (int): Index of the start state. 49 alphabet (list[str]): List of symbols making up the alphabet. 50 name (str): Name of the model. 51 description (str): Description of the model. 52 """ 53 54 def __init__( 55 self, 56 states : list[CausalState] | None = None, 57 transitions : list[Transition] | None = None, 58 start_state : int = 0, 59 alphabet : list[str] | None = None, 60 name : str = "", 61 description : str = "" ) : 62 63 self.alphabet : list[str] | None = alphabet or [] 64 self.states : list[CausalState] | None= states or [] 65 self.transitions : list[Transition] | None= transitions or [] 66 67 self.set_alphabet( self.alphabet ) 68 self.set_states( self.states ) 69 self.set_transitions( self.transitions ) 70 71 self.start_state : int = start_state 72 73 self.name : str = name 74 self.description : str = description 75 76 # To be depreciated 77 self.isoclass : str = None 78 79 # --- derived -------- 80 81 self.complexity : dict[str, any] = {} 82 83 self.symbol_idx_map : dict[str,int] = {} 84 self.state_idx_map : dict[str,int] = {} 85 86 self.pi_fractional = None 87 self.pi : np.ndarray | None = None 88 self.T : np.ndarray | None = None 89 self.msp : MSP | None = None 90 self.reverse_am : HMM | None = None 91 self.is_q_weighted : bool = False 92 self.is_minimal : bool = False 93 94 # --- const ---------- 95 96 self.EPS : float = 1e-12 97 98 #-------------------------------------------------------# 99 # Overrides / Getters # 100 #-------------------------------------------------------# 101 102 @override 103 def get_states(self) -> list[CausalState] : 104 return self.states 105 106 @override 107 def get_transitions(self) -> list[Transition] : 108 return self.transitions 109 110 @override 111 def get_alphabet(self) -> list[Symbol]: 112 return [ Symbol(a) for a in self.alphabet ] 113 114 #-------------------------------------------------------# 115 # Serialization # 116 #-------------------------------------------------------# 117 118 119 def to_dict(self) -> dict[str,any]: 120 121 """Create a dict representing the HMM configuration. 122 123 Returns: 124 125 dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass. 126 """ 127 128 return { 129 "name" : self.name, 130 "description" : self.description, 131 "states" : [ asdict(state) for state in self.states ], 132 "transitions" : [ asdict(transition) for transition in self.transitions ], 133 "alphabet" : self.alphabet, 134 "isoclass" : self.isoclass 135 } 136 137 def from_dict( self, config : dict[str,any] ) : 138 139 """Configure the HMM configuration from a dictionary. 140 141 Args: 142 config (dict[str,any]): The HMM configuration. 143 """ 144 145 self.name = config.name 146 self.description = config.description 147 148 self.set_states( states=[ 149 CausalState( 150 name=state[ "name" ], 151 classes=set( state[ "classes" ] ) 152 ) 153 for state in config.states 154 ] ) 155 156 self.set_transitions( transitions=[ 157 Transition( 158 origin_state_idx=tr[ "origin_state_idx" ], 159 target_state_idx=tr[ "target_state_idx" ], 160 prob=tr[ "prob" ], 161 symbol_idx=tr[ "symbol_idx" ] 162 ) 163 for tr in config.transitions 164 ] ) 165 166 self.set_alphabet( alphabet=config.alphabet ) 167 168 def save_config( 169 self, 170 path : Path, 171 with_complexity : bool = False, 172 with_non_trivial_complexity : bool = False ) : 173 174 config = self.to_dict() 175 176 if with_complexity : 177 178 complexity = self.get_complexities( 179 with_non_trivial=with_non_trivial_complexity 180 ) 181 182 config[ "complexity" ] = complexity 183 184 config[ "structural_properties" ] = { 185 "unifilar" : self.is_unifilar(), 186 "row_stochastic" : self.is_row_stochastic(), 187 "strongly_connected" : self.is_strongly_connected(), 188 "aperiodic" : self.is_aperiodic(), 189 "minimal" : self._is_minimal_as_dfa( topological_only=False ), 190 "topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ), 191 "is_epsilon_machine" : self.is_epsilon_machine() 192 } 193 194 with open( path / "am_config.json", "w", encoding="utf-8" ) as f : 195 json.dump( config, f, ensure_ascii=False, indent=2, default=list ) 196 197 def from_file( self, path : Path ) : 198 with open( Path / "am_config.json", "r" ) as f: 199 config = json.load(f) 200 self.from_dict() 201 202 #-------------------------------------------------------# 203 # Setters and State Management # 204 #-------------------------------------------------------# 205 206 def clear_cache(self) : 207 208 """ 209 Reset all derived properties so they will be recomputed when requested later. 210 """ 211 212 self.complexity = {} 213 self.T = None 214 self.T_x = None 215 self.pi = None 216 self.pi_fractional = None 217 self.msp = None 218 self.reverse_am = None 219 self.is_q_weighted = False 220 self.is_minimal = False 221 222 def set_states( self, states : list[CausalState] ) : 223 self.clear_cache() 224 self.states = states.copy() 225 self.state_idx_map = {} 226 for idx, state in enumerate( self.states ) : 227 self.state_idx_map[ state.name ] = idx 228 229 def set_alphabet( self, alphabet : list[str] ) : 230 231 self.clear_cache() 232 233 old_alphabet = self.alphabet.copy() 234 235 aSet = set() 236 aSet.update( alphabet ) 237 238 self.alphabet = sorted(list(aSet)) 239 240 self.symbol_idx_map = {} 241 for idx, symbol in enumerate( self.alphabet ) : 242 self.symbol_idx_map[ symbol ] = idx 243 244 for i, tr in enumerate( self.transitions ) : 245 symbol = old_alphabet[ tr.symbol_idx ] 246 self.transitions[ i ] = Transition( 247 origin_state_idx=tr.origin_state_idx, 248 target_state_idx=tr.target_state_idx, 249 prob=tr.prob, 250 symbol_idx=self.symbol_idx_map[ symbol ] 251 ) 252 253 def set_transitions( self, transitions : list[Transition] ) : 254 self.clear_cache() 255 self.transitions = transitions.copy() 256 257 def extend_states( self, states : list[CausalState] ) : 258 self.set_states( self.states + states ) 259 260 def extend_alphabet( self, alphabet : list[str] ) : 261 self.set_alphabet( self.alphabet + alphabet ) 262 263 def extend_transitions( self, transitions : list[Transition] ) : 264 self.set_transitions( self.transitions + transitions ) 265 266 def get_complexity_measure_if_exists(self, measure ) : 267 m = self.complexity.get( measure, None ) 268 return m 269 270 def set_complexity_measure(self, measure, value ) : 271 self.complexity[ measure ] = value 272 273 #-------------------------------------------------------# 274 # Get and Compute # 275 #-------------------------------------------------------# 276 277 def get_complexities( 278 self, 279 with_non_trivial=False ) : 280 281 trivial = [ 282 self.C_mu, 283 self.h_mu, 284 self.H_1, 285 self.rho_mu 286 ] 287 288 non_trivial = [ 289 self.E, 290 self.T_inf, 291 self.S, 292 self.chi 293 ] 294 295 complexities = { m.__name__ : m() for m in trivial } 296 297 if with_non_trivial : 298 299 complexities |= { m.__name__ : m() for m in non_trivial } 300 301 for key in [ 'H_L', 'h_mu_L', 'H_sync' ] : 302 if key in self.complexity : 303 complexities[ key ] = self.complexity[ key ] 304 305 return complexities 306 307 #-------------------------------------------------------# 308 309 def get_metadata(self) : 310 return { 311 "name" : self.name, 312 'complexity' : self.complexity, 313 "description" : self.description 314 } 315 316 def get_transition_matrix(self) : 317 318 if self.T is not None : 319 return self.T 320 321 n_states = len( self.states ) 322 T = np.zeros((n_states, n_states)) 323 324 for tr in self.transitions : 325 T[ tr.origin_state_idx, tr.target_state_idx ] = tr.prob 326 327 self.T = T 328 329 return self.T 330 331 #-------------------------------------------------------# 332 333 def get_T_X(self) : 334 335 if self.T_x is not None : 336 return self.T_x 337 338 n_states = len( self.states ) 339 n_symbols = len( self.alphabet ) 340 341 T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ] 342 343 for tr in self.transitions : 344 T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob 345 346 self.T_x = T_x 347 return self.T_x 348 349 #-------------------------------------------------------# 350 351 def get_msp_qw( 352 self, 353 exact_state_cap: int = 1000, 354 verbose: bool = True, 355 ): 356 if self.msp is not None: 357 return self.msp 358 359 try : 360 361 print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" ) 362 363 self.msp = compute_msp_exact( 364 T_x=self.get_Tx_fractional(), 365 pi=self.get_fractional_stationary_distribution(), 366 n_states=len(self.states), 367 alphabet=self.alphabet, 368 exact_state_cap=1000, 369 bool = True 370 ) 371 372 return self.msp 373 374 except RuntimeError as e : 375 warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." ) 376 377 return self.get_msp() 378 379 def get_msp( 380 self, 381 exact_state_cap: int = 175_000, 382 jsd_eps: float = 1e-7, 383 k_ann: int = 50, 384 verbose = True, 385 ) : 386 387 if self.msp is not None: 388 return self.msp 389 390 T_x = self.get_T_X() 391 pi = self.get_stationary_distribution() 392 393 T_stacked = np.stack(T_x) 394 n_symbols = len(self.alphabet) 395 n_input_states = T_stacked.shape[1] 396 397 print( "\nComputing Mixed State Presentation..." ) 398 399 self.msp = compute_msp( 400 T_x=T_x, 401 pi=pi, 402 n_states=len(self.states), 403 alphabet=self.alphabet, 404 exact_state_cap=exact_state_cap, 405 verbose=verbose 406 ) 407 408 return self.msp 409 410 def get_reverse_am(self) : 411 412 if self.reverse_am is not None: 413 return self.reverse_am 414 415 pi = self.get_stationary_distribution() 416 self.reverse_am = copy.deepcopy(self) 417 418 new_transitions = [] 419 for tr in self.transitions: 420 i = tr.target_state_idx 421 j = tr.origin_state_idx 422 423 p_reversed = (pi[j] * tr.prob) / pi[i] 424 425 new_transitions.append( 426 Transition( 427 origin_state_idx=i, 428 target_state_idx=j, 429 prob=p_reversed, 430 symbol_idx=tr.symbol_idx 431 ) 432 ) 433 434 self.reverse_am.set_transitions(new_transitions) 435 436 if self.reverse_am.is_epsilon_machine(): 437 return self.reverse_am 438 439 rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 ) 440 441 self.reverse_am.set_states( rmsp.states ) 442 self.reverse_am.set_transitions( rmsp.transitions ) 443 self.reverse_am.msp = rmsp 444 self.reverse_am.start_state = 0 445 446 self.reverse_am.collapse_to_largest_strongly_connected_subgraph() 447 self.reverse_am.minimize() 448 449 return self.reverse_am 450 451 #-------------------------------------------------------# 452 453 def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] : 454 455 self.to_q_weighted() 456 457 n_states = len( self.states ) 458 n_symbols = len( self.alphabet ) 459 460 T_x = [] 461 462 for x in range( n_symbols ) : 463 T_x.append( [] ) 464 for i in range( n_states ) : 465 T_x[ x ].append( [ 0 for _ in range( n_states ) ] ) 466 467 for tr in self.transitions : 468 T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq 469 470 return T_x 471 472 def get_T_sympy( self ) : 473 474 self.to_q_weighted() 475 476 n = len( self.states ) 477 T = sympy.zeros( n, n ) 478 479 for tr in self.transitions : 480 T[ tr.origin_state_idx, tr.target_state_idx ] = tr.pq 481 482 return T 483 484 def get_fractional_stationary_distribution(self) : 485 486 T = self.get_T_sympy() 487 488 if self.pi_fractional is not None : 489 return self.pi_fractional 490 491 G = self.as_digraph() 492 493 if not nx.is_strongly_connected(G): 494 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 495 496 self.pi_fractional = solve_for_pi_fractional( T ) 497 498 return self.pi_fractional 499 500 def get_stationary_distribution(self): 501 502 if self.pi is not None : 503 return self.pi 504 505 G = self.as_digraph() 506 507 if not nx.is_strongly_connected(G): 508 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 509 510 T = self.get_transition_matrix() 511 return solve_for_pi( T ) 512 513 #-------------------------------------------------------# 514 # Complexity Measures # 515 #-------------------------------------------------------# 516 517 def C_mu( self ) : 518 519 """The *statistical complexity* (aka *forecasting complexity*) : 520 521 .. math:: 522 523 C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma), 524 525 where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 526 527 .. note:: 528 529 **Interpretations** 530 531 * The amount of historical information a process stores. 532 * The amount of structure in a process. 533 534 Returns: 535 536 float: :math:`C_{\\mu}`. 537 538 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 539 Decomposition of Intrinsic Computation*, 2016. 540 <https://arxiv.org/abs/1309.3792> 541 """ 542 543 m = self.get_complexity_measure_if_exists( "C_mu" ) 544 545 if m is not None : 546 return m 547 548 pi = self.get_stationary_distribution() 549 550 h = 0 551 for i, pr in enumerate( pi ) : 552 553 if pr < self.EPS : 554 continue 555 556 h += -pr * np.log2( pr ) 557 558 self.set_complexity_measure( "C_mu", h ) 559 560 return h 561 562 #-------------------------------------------------------# 563 564 def h_mu( self ) : 565 566 """The *entropy rate* : 567 568 .. math:: 569 570 h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma), 571 572 where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 573 574 .. note:: 575 576 **Interpretations** 577 578 * The lower bound on achievable loss in bits. 579 * The irreducable randomness in the process. 580 * The intrinsic Randomness in the process. 581 582 Returns: 583 584 float: :math:`h_{\\mu}`. 585 586 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 587 Decomposition of Intrinsic Computation*, 2016. 588 <https://arxiv.org/abs/1309.3792> 589 """ 590 591 m = self.get_complexity_measure_if_exists( "h_mu" ) 592 593 if m is not None : 594 return m 595 596 T = self.get_transition_matrix() 597 pi = self.get_stationary_distribution() 598 599 n_states = pi.size 600 601 h = 0 602 for i, pr in enumerate( pi ) : 603 604 if pr < self.EPS : 605 continue 606 607 row_entropy = 0 608 for j in range( len( pi ) ) : 609 610 if T[ i, j ] < self.EPS : 611 continue 612 613 row_entropy -= T[ i, j ] * np.log2( T[ i, j ] ) 614 615 h += pr * row_entropy 616 617 self.set_complexity_measure( "h_mu", h ) 618 619 return h 620 621 #-------------------------------------------------------# 622 623 def H_1(self) -> float : 624 625 """The *single symbol uncertainty*: 626 627 .. math:: 628 629 H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)}, 630 631 where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2. 632 633 .. note:: 634 635 **Interpretations** 636 637 * How uncertain you are on average about a single measurement with no context. 638 639 Returns: 640 641 float: :math:`H(1)`. 642 643 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 644 <https://arxiv.org/abs/1105.2988> 645 """ 646 647 m = self.get_complexity_measure_if_exists("H_1") 648 if m is not None: 649 return m 650 651 pi = self.get_stationary_distribution() 652 T_X = self.get_T_X() # dict: symbol -> matrix 653 654 h = 0.0 655 for T_x in T_X: 656 # Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j] 657 p_sym = 0.0 658 for i, pr in enumerate(pi): 659 if pr < self.EPS: 660 continue 661 p_sym += pr * T_x[i, :].sum() 662 663 if p_sym < self.EPS: 664 continue 665 h -= p_sym * np.log2(p_sym) 666 667 self.set_complexity_measure("H_1", h) 668 return h 669 670 #-------------------------------------------------------# 671 672 def rho_mu(self) -> float : 673 674 """The *anticipated information* [^James_2018], p.3.: 675 676 .. math:: 677 678 \\rho_{\\mu}= H(1) - h_{\\mu} 679 680 Returns: 681 682 float: :math:`\\rho_{\\mu}` 683 684 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 685 <https://arxiv.org/abs/1105.2988> 686 """ 687 688 m = self.get_complexity_measure_if_exists("rho_mu") 689 690 if m is not None: 691 return m 692 693 rho = self.H_1() - self.h_mu() 694 695 self.set_complexity_measure("rho_mu", rho) 696 697 return rho 698 699 #-------------------------------------------------------# 700 701 def block_convergence( self ) : 702 703 """ 704 Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]: 705 706 - $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 707 708 - $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 709 710 - $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 711 712 - $H(L) = H[X_{0:L}]$, 713 714 - $h_{\\mu}(L) = H(L) - H(L-1)$, and 715 716 - $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$. 717 718 You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 719 720 <img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;"> 721 722 Returns: 723 724 ComplexityMeasures: An object containing the estimated measures with the following attributes: 725 726 - E (float): The excess entropy. 727 - T_inf (float): The transient information ($\\mathbf{T}$). 728 - S (float): The synchronization information. 729 - E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$). 730 - T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$). 731 - S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$). 732 - H_L (numpy.ndarray): The block entropy ($H(L)$). 733 - h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$). 734 - H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$). 735 - converged (bool): True if the algorithm converged. 736 737 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 738 Decomposition of Intrinsic Computation*, 2016. 739 <https://arxiv.org/abs/1309.3792> 740 """ 741 742 trs = [ [] for _ in range( len( self.states ) ) ] 743 for tr in self.transitions : 744 trs[ tr.origin_state_idx ].append( ( 745 tr.symbol_idx, 746 float( tr.prob ), 747 tr.target_state_idx ) ) 748 749 pi = self.get_stationary_distribution() 750 751 state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ] 752 branches = [(1.0, list(state_dist))] 753 754 print( "\nComputing Block Entropy\n" ) 755 756 C = am_fast.block_entropy_convergence( 757 h_mu = self.h_mu(), 758 n_states = len( self.states ), 759 n_symbols = len( self.alphabet ), 760 convergence_tol = 1e-6, 761 precision = 10, 762 eps = 1e-25, 763 branches = branches, 764 trans = trs, 765 max_branches = 30_000_000 766 ) 767 768 print( "Done\n" ) 769 770 self.set_complexity_measure( f"E", C.E ) 771 self.set_complexity_measure( f"S", C.S ) 772 self.set_complexity_measure( f"T_inf", C.T ) 773 self.set_complexity_measure( f"E_L", C.E_L.tolist() ) 774 self.set_complexity_measure( f"T_L", C.T_L.tolist() ) 775 self.set_complexity_measure( f"S_L", C.S_L.tolist() ) 776 self.set_complexity_measure( f"H_L", C.H_L.tolist() ) 777 self.set_complexity_measure( f"h_mu_L", C.h_mu_L.tolist() ) 778 self.set_complexity_measure( f"H_sync", C.H_sync.tolist() ) 779 780 return C 781 782 #-------------------------------------------------------# 783 784 def E( self ) -> float : 785 786 """The *excess entropy* [^crutchfield_exact_2016], p.4: 787 788 .. math:: 789 790 \\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}] 791 792 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 793 794 .. note:: 795 796 **Interpretations** 797 798 * The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016]. 799 * How much information an observer must extract to synchronize to the process. 800 * Measures how long the process appears more complex than it asymptotically is. 801 * Vanishes for immediately synchronizable processes. 802 803 Returns: 804 805 float: :math:`\\mathbf{E}` 806 807 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 808 Decomposition of Intrinsic Computation*, 2016. 809 <https://arxiv.org/abs/1309.3792> 810 """ 811 812 m = self.get_complexity_measure_if_exists( "E" ) 813 814 if m is not None : 815 return m 816 817 try : 818 msp = self.get_msp() 819 E, S, T = msp.get_E_S_T() 820 self.set_complexity_measure( "E", E ) 821 self.set_complexity_measure( "S", S ) 822 self.set_complexity_measure( "T_inf", T ) 823 824 except Exception as e : 825 826 print( f"MSP failed {e}" ) 827 828 C = self.block_convergence() 829 E = C.E 830 self.set_complexity_measure( "E", E ) 831 832 return E 833 834 #-------------------------------------------------------# 835 836 def S( self ) -> float : 837 838 """The *synchronization* information: 839 840 .. math:: 841 842 \\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L), 843 844 where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4. 845 846 .. note:: 847 848 **Interpretations** 849 850 * The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016]. 851 852 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 853 854 Returns: 855 856 float: :math:`\\mathbf{S}` 857 858 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 859 Decomposition of Intrinsic Computation*, 2016. 860 <https://arxiv.org/abs/1309.3792> 861 """ 862 863 m = self.get_complexity_measure_if_exists( "S" ) 864 865 if m is not None : 866 return m 867 868 try : 869 msp = self.get_msp() 870 E, S, T = msp.get_E_S_T() 871 self.set_complexity_measure( "E", E ) 872 self.set_complexity_measure( "S", S ) 873 self.set_complexity_measure( "T_inf", T ) 874 875 except Exception as e : 876 print( f"{e} \nFalling back to iterative estimation.") 877 exit() 878 879 C = self.block_convergence() 880 S = C.S 881 self.set_complexity_measure( "S", S ) 882 883 return S 884 885 #-------------------------------------------------------# 886 887 def T_inf( self ) -> float : 888 889 """The *transient information*[^crutchfield_exact_2016], p.4: 890 891 .. math:: 892 893 \\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right] 894 895 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 896 897 .. note:: 898 899 **Interpretations** 900 901 * The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016]. 902 903 Returns: 904 905 float: :math:`\\mathbf{T}` 906 907 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 908 Decomposition of Intrinsic Computation*, 2016. 909 <https://arxiv.org/abs/1309.3792> 910 """ 911 912 m = self.get_complexity_measure_if_exists( "T_inf" ) 913 914 if m is not None : 915 return m 916 917 try : 918 msp = self.get_msp() 919 E, S, T = msp.get_E_S_T() 920 self.set_complexity_measure( "E", E ) 921 self.set_complexity_measure( "S", S ) 922 self.set_complexity_measure( "T_inf", T ) 923 924 except Exception as e : 925 print( f"{e} \nFalling back to iterative estimation.") 926 C = self.block_convergence() 927 T_inf = C.T 928 self.set_complexity_measure( "T_inf", T_inf ) 929 930 return T_inf 931 932 #-------------------------------------------------------# 933 934 def chi( self ) -> float : 935 936 """The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2: 937 938 .. math:: 939 940 \\chi = C_{\\mu} - \\mathbf{E} 941 942 :math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`. 943 944 .. note:: 945 946 **Interpretations** 947 948 * Difference between internal stored information and apparent information to an observer. 949 * How muching information is hiding in the system. 950 951 Returns: 952 953 float: :math:`\\chi` 954 955 [^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009. 956 <https://arxiv.org/abs/0902.1209> 957 958 [^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021. 959 <https://arxiv.org/abs/0905.4787> 960 """ 961 962 m = self.get_complexity_measure_if_exists( "chi" ) 963 964 if m is not None : 965 return m 966 967 chi = self.C_mu() - self.E() 968 969 if chi < 0 : 970 971 # if chi is 0, accumulated floating point error can result in small negative values 972 if chi < -1e-5: 973 warnings.warn(f"Crypticity is negative ({chi:.6e}).") 974 975 chi = np.clamp( chi, 0 ) 976 977 self.set_complexity_measure( "chi", chi ) 978 979 return chi 980 981 #-------------------------------------------------------# 982 # Properties # 983 #-------------------------------------------------------# 984 985 def is_row_stochastic(self) : 986 987 """ 988 Check that all states have outgoing transition probabilities that sum to 1. 989 """ 990 991 sums = np.zeros( len( self.states ) ) 992 for tr in self.transitions : 993 sums[ tr.origin_state_idx ] += tr.prob 994 return np.allclose( sums, 1.0 ) 995 996 #-------------------------------------------------------# 997 998 def is_unifilar(self) : 999 1000 """ 1001 Check that no state emits the same symbol on transitions to different states. 1002 """ 1003 1004 symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 ) 1005 1006 for tr in self.transitions : 1007 1008 if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 : 1009 symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx 1010 1011 elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx : 1012 return False 1013 1014 return True 1015 1016 #-------------------------------------------------------# 1017 1018 def is_strongly_connected(self) : 1019 1020 """ 1021 Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html). 1022 """ 1023 1024 return nx.is_strongly_connected( self.as_digraph() ) 1025 1026 #-------------------------------------------------------# 1027 1028 def is_aperiodic(self) : 1029 1030 """ 1031 Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph." 1032 """ 1033 1034 return nx.is_aperiodic( self.as_digraph() ) 1035 1036 #-------------------------------------------------------# 1037 1038 def _is_minimal_as_dfa( self, topological_only : bool, verbose=True ) : 1039 1040 with_probs = not topological_only 1041 1042 # Construct the DFA 1043 dfa = self.as_dfa( with_probs=with_probs ) 1044 1045 # Minimize the DFA 1046 #dfa = dfa.minify(retain_names=True) 1047 dfa = am_fast.minify_cpp( dfa, retain_names=True ) 1048 1049 # check we have minimal number of states 1050 if len( dfa.states ) != len( self.states ) : 1051 if verbose : 1052 print( f"Not minimal reduces from {len( self.states )} to {len( dfa.states )} states" ) 1053 return False 1054 1055 return True 1056 1057 def is_topological_epsilon_machine( self, verbose=True ) : 1058 1059 """ 1060 Checks if the HMM is a topological $\\epsilon$-machine [^1]. 1061 1062 [^1]: Johnson et al, Enumerating Finitary Processes, 2024. 1063 <https://arxiv.org/abs/1011.0036> 1064 """ 1065 1066 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1067 if verbose : 1068 print( f"Either non unifilar or not strongly connected" ) 1069 return False 1070 else : 1071 return self._is_minimal_as_dfa( topological_only=True, verbose=verbose ) 1072 1073 def is_epsilon_machine( self, verbose=True ) : 1074 1075 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1076 if verbose : 1077 print( f"Either non unifilar or not strongly connected" ) 1078 return False 1079 else : 1080 return self._is_minimal_as_dfa( topological_only=False, verbose=verbose ) 1081 1082 #-------------------------------------------------------# 1083 # Properties # 1084 #-------------------------------------------------------# 1085 1086 def minimize(self, retain_names: bool = True, verbose=False): 1087 1088 """ 1089 Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM 1090 is unifilar and strongly connected. Converts the HMM to a DFA with symbols 1091 labeled jointly with symbols and probabilities, and uses Myhill-Nerode 1092 equivalence for minimization. Relies on `automata_lib` and uses 1093 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states 1094 final. 1095 1096 Args: 1097 retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`. 1098 1099 Returns: 1100 1101 automata.fa.dfa.DFA : the resulting DFA. 1102 """ 1103 1104 if self.is_minimal : 1105 return 1106 1107 start = time.perf_counter() 1108 1109 if not self.is_unifilar(): 1110 raise ValueError( 1111 "DFA minimization is not valid for non-unifilar HMMs" 1112 ) 1113 1114 was_strongly_connected = self.is_strongly_connected() 1115 1116 was_row_stochastic = self.is_row_stochastic() 1117 n_states_before = len(self.states) 1118 1119 dfa = self.as_dfa(with_probs=True) 1120 1121 #min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True) 1122 min_dfa = am_fast.minify_cpp( dfa, retain_names=True ) 1123 1124 # Build lookup from original state index -> CausalState object 1125 orig_state = {i: s for i, s in enumerate(self.states)} 1126 eq_list = list(min_dfa.states) 1127 1128 start_eq = min_dfa.initial_state 1129 1130 # Separate the start state, then sort the rest by the 1131 # smallest original state index inside each equivalence class. 1132 other_eqs = [eq for eq in eq_list if eq != start_eq] 1133 other_eqs.sort(key=lambda eq: min(eq)) 1134 1135 # Recombine so start eq comes first, followed by the sorted remaining classes 1136 eq_list = [start_eq] + other_eqs 1137 # ---------------------------------------------------------- 1138 1139 # Recompute eq_to_idx with the new ordering 1140 eq_to_idx = {eq: i for i, eq in enumerate(eq_list)} 1141 1142 # new_start is now guaranteed to be 0 1143 new_start = 0 1144 1145 # Map each original state index -> its equivalence class 1146 # Guard: minify() silently drops unreachable states 1147 orig_to_eq = {s: eq for eq in min_dfa.states for s in eq} 1148 1149 # Build lookup from original state index -> its transitions 1150 orig_trs = defaultdict(list) 1151 for t in self.transitions: 1152 orig_trs[t.origin_state_idx].append(t) 1153 1154 new_trs = [] 1155 for eq in min_dfa.states: 1156 rep = next(iter(eq)) 1157 origin_idx = eq_to_idx[eq] 1158 for t in orig_trs[rep]: 1159 target_eq = orig_to_eq[t.target_state_idx] 1160 target_idx = eq_to_idx[target_eq] 1161 new_trs.append(Transition( 1162 origin_state_idx = origin_idx, 1163 target_state_idx = target_idx, 1164 prob = t.prob, 1165 symbol_idx = t.symbol_idx, 1166 )) 1167 1168 members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list] # sorted for determinism 1169 1170 # Compute new names 1171 if retain_names: 1172 new_names = [ 1173 "{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1 1174 else members[0].name 1175 for members in members_list 1176 ] 1177 else: 1178 new_names = [str(j) for j in range(len(eq_list))] 1179 1180 old_name_to_new_name = { 1181 m.name: new_names[j] 1182 for j, members in enumerate(members_list) 1183 for m in members 1184 } 1185 1186 # Build the new states, preserving classes and isomorphs regardless of naming 1187 new_states = [] 1188 for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)): 1189 classes = set().union(*(m.classes for m in members)) 1190 isomorphs = { 1191 old_name_to_new_name.get(iso, iso) 1192 for m in members 1193 for iso in m.isomorphs 1194 if old_name_to_new_name.get(iso, iso) != name 1195 } 1196 new_states.append(CausalState( 1197 name = name, 1198 classes = classes, 1199 isomorphs = isomorphs, 1200 )) 1201 1202 self.set_states(new_states) 1203 self.set_transitions(new_trs) 1204 self.start_state = new_start 1205 1206 if n_states_before == len(new_states) and verbose : 1207 print( f"{n_states_before} state HMM was already minimal.\n" ) 1208 elif verbose : 1209 print( f"Minimized from {n_states_before} to {len(new_states)}\n" ) 1210 1211 if not ( was_strongly_connected == self.is_strongly_connected() ) : 1212 raise RuntimeError( 1213 f"Minimization broke strongly connected" 1214 ) 1215 1216 if not ( was_row_stochastic == self.is_row_stochastic() ) : 1217 raise RuntimeError( 1218 f"Minimization broke row stochasticity" 1219 ) 1220 1221 self.is_minimal = True 1222 1223 #-------------------------------------------------------# 1224 # Modifiers # 1225 #-------------------------------------------------------# 1226 1227 1228 def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) : 1229 1230 # get equivalent networkx graph 1231 G = self.as_digraph() 1232 1233 # if already strongly connected, nothing to do 1234 if not nx.is_strongly_connected( G ) : 1235 1236 start = time.perf_counter() 1237 subgraph_nodes = list( nx.strongly_connected_components( G ) ) 1238 1239 # decompose into strongly connected components and sort by length 1240 # subgraph_nodes = list(nx.strongly_connected_components( G )) 1241 subgraph_nodes.sort(key=len) 1242 component_state_set = subgraph_nodes[-1] 1243 1244 # Take the largest strongly connected component (as list of state names) 1245 component_states = sorted( list( component_state_set ) ) 1246 1247 # make temporary copies of the old transitions and states 1248 old_transitions = [ 1249 Transition( 1250 origin_state_idx=tr.origin_state_idx, 1251 target_state_idx=tr.target_state_idx, 1252 prob=tr.prob, 1253 symbol_idx=tr.symbol_idx 1254 ) 1255 1256 for tr in self.transitions 1257 ] 1258 1259 old_states = [ 1260 CausalState( 1261 name=s.name, 1262 classes=s.classes, 1263 isomorphs=s.isomorphs 1264 ) 1265 for s in self.states 1266 ] 1267 1268 self.set_states( 1269 states=[ 1270 state 1271 for i, state in enumerate( old_states ) if i in component_state_set 1272 ] 1273 ) 1274 1275 # we will build new transition list based on those belonging to the component 1276 self.set_transitions( transitions= [] ) 1277 1278 # for tracking which new transitions leave each state 1279 transitions_from_state = { state : set() for state in component_states } 1280 new_transitions = [] 1281 1282 for tr in old_transitions : 1283 1284 origin_state_name = old_states[ tr.origin_state_idx ].name 1285 target_state_name = old_states[ tr.target_state_idx ].name 1286 1287 # skip transitions that connect separate strongly connected components 1288 if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) : 1289 continue 1290 1291 # track transitions (by index in new transitions list) that leave this state 1292 transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) ) 1293 1294 my_origin_state_idx = self.state_idx_map[ origin_state_name ] 1295 my_target_state_idx = self.state_idx_map[ target_state_name ] 1296 1297 new_transitions.append( 1298 Transition( 1299 origin_state_idx=my_origin_state_idx, 1300 target_state_idx=my_target_state_idx, 1301 prob=tr.prob, 1302 symbol_idx=tr.symbol_idx 1303 ) ) 1304 1305 self.set_transitions( transitions=new_transitions ) 1306 1307 # if we removed an outgoing transition from a state, we need to distribute its probability 1308 # among the remaining outgoing transitions from the state 1309 for state in component_states : 1310 1311 # get the set of transitions leaving this state 1312 state_trs = transitions_from_state[ state ] 1313 1314 # sum the probabilities of the outgoing transitions from the state 1315 p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] ) 1316 1317 # how much probability is missing 1318 diff = 1.0 - p_sum 1319 1320 # if significant difference 1321 if abs( diff ) > self.EPS : 1322 1323 # calculate how much of the difference each transition gets 1324 adjustment = diff / len( state_trs ) 1325 1326 # update the transitions 1327 for i in state_trs : 1328 1329 # transition with origional probability 1330 tr = self.transitions[ i ] 1331 1332 # adjusted probability 1333 self.transitions[ i ] = Transition( 1334 origin_state_idx=tr.origin_state_idx, 1335 target_state_idx=tr.target_state_idx, 1336 prob=tr.prob + adjustment, 1337 symbol_idx=tr.symbol_idx ) 1338 1339 if rename_states : 1340 self.set_states( [ 1341 CausalState( name=f"{i}" ) 1342 for i, s in enumerate( self.states ) 1343 ] ) 1344 1345 def to_q_weighted( self, denominator_limit=1000 ) : 1346 1347 """ 1348 Approximates the existing transition probabilities with exact fractions, stores 1349 the fractional probabilities as Fraction in Transition.pq, and sets the floating 1350 point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 1351 conversion, the function recurses with `denominator_limit=denominator_limit*10`. 1352 1353 Args: 1354 denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 1355 the conversion. 1356 """ 1357 1358 if self.is_q_weighted : 1359 return 1360 1361 if not self.is_row_stochastic() : 1362 raise ValueError( "Cannot convert to q-weighted because not row stochastic" ) 1363 1364 t_from = [[] for _ in range(len(self.states))] 1365 1366 for i, tr in enumerate( self.transitions ) : 1367 t_from[ tr.origin_state_idx ].append( i ) 1368 1369 new_transitions = [] 1370 for t_list in t_from : 1371 1372 if not t_list : 1373 continue 1374 1375 p_q_sum = Fraction(0,1) 1376 p_qs = [] 1377 1378 for t_idx in t_list : 1379 1380 p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit ) 1381 p_q_sum += p_q 1382 p_qs.append( p_q ) 1383 1384 if p_q_sum != Fraction(1,1) : 1385 1386 max_pq_i = np.argmax( p_qs ) 1387 max_oq = p_qs[ max_pq_i ] 1388 1389 diff = p_q_sum - Fraction(1,1) 1390 1391 # If recurse with higher resolution 1392 if diff > max_oq : 1393 return self.to_q_weighted( denominator_limit*10 ) 1394 else : 1395 p_qs[ max_pq_i ] -= diff 1396 1397 for i, t_idx in enumerate( t_list ) : 1398 new_transitions.append( 1399 Transition( 1400 origin_state_idx=self.transitions[ t_idx ].origin_state_idx, 1401 target_state_idx=self.transitions[ t_idx ].target_state_idx, 1402 prob=float(p_qs[ i ]), 1403 symbol_idx=self.transitions[ t_idx ].symbol_idx, 1404 pq=p_qs[ i ] 1405 ) 1406 ) 1407 1408 self.set_transitions( new_transitions ) 1409 self.is_q_weighted = True 1410 1411 #-------------------------------------------------------# 1412 # Data Generation # 1413 #-------------------------------------------------------# 1414 1415 def isomorphic_shift( 1416 self, 1417 input_symbol_indices: np.ndarray, 1418 input_state_indices: np.ndarray, 1419 shift : int = 1 1420 ) -> dict[str, np.ndarray]: 1421 1422 """ 1423 Generates a new sequence of of symbols that are permuted with the symbols emitted by 1424 isomorphic states, if they exists. 1425 1426 :math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br> 1427 :math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]` 1428 1429 :math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br> 1430 :math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}` 1431 1432 :math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n` 1433 1434 :math:`\\texttt{output\\_symbol\\_indices}[i] := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br> 1435 :math:`\\texttt{output\\_state\\_indices}[i] := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br> 1436 :math:`\\texttt{output\\_state\\_indices}[i+1] := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)` 1437 1438 Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself. 1439 1440 Args: 1441 input_symbol_indices (np.ndarray): The sequence of generated symbols. 1442 input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end. 1443 shift : int: How much to shift the symbols across the isomorphic states. 1444 """ 1445 1446 if not any( state.isomorphs for state in self.states ): 1447 raise ValueError("HMM has no states with isomorphs") 1448 1449 inputs = np.asarray(input_symbol_indices) 1450 states = np.asarray(input_state_indices) 1451 1452 n_states = len(self.states) 1453 1454 tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32) 1455 1456 for tr in self.transitions: 1457 tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx 1458 1459 # Build isomorph remapping: identity by default, overridden where isomorphs exist 1460 iso_table = np.arange(n_states, dtype=np.int32) 1461 1462 for i, state in enumerate(self.states): 1463 if state.isomorphs: 1464 isomorph = sorted(state.isomorphs)[0] 1465 iso_table[ i ] = self.state_idx_map[isomorph] 1466 1467 for i, state in enumerate(self.states): 1468 if state.isomorphs: 1469 1470 # extend the isomorph list to include the identity 1471 isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] ) 1472 1473 # find the identity index 1474 pos = isormorphs_with_identity.index( i ) 1475 1476 # cyclical shift 1477 iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ] 1478 1479 origins = states[:-1] 1480 targets = states[1:] 1481 1482 out_origins = iso_table[origins] 1483 out_targets = iso_table[targets] 1484 1485 inv_sym = tr_sym_table[ out_origins, out_targets ] 1486 inv_sts = np.empty(states.size, dtype=states.dtype) 1487 1488 inv_sts[:-1] = out_origins 1489 inv_sts[-1] = out_targets[-1] 1490 1491 return { 1492 "symbol_index": inv_sym.astype(inputs.dtype), 1493 "state_index": inv_sts, 1494 } 1495 1496 def generate_data( 1497 self, 1498 file_prefix: str, 1499 n_gen: int, 1500 include_states: bool, 1501 isomorphic_shifts : set[int]=None, 1502 random_seed : int=42 ) -> dict[any] : 1503 1504 trs = [ [] for _ in range( len( self.states ) ) ] 1505 for tr in self.transitions : 1506 trs[ tr.origin_state_idx ].append( ( 1507 tr.symbol_idx, 1508 float( tr.prob ), 1509 tr.target_state_idx ) ) 1510 1511 data = am_fast.generate_data( 1512 n_gen=n_gen, 1513 start_state=self.start_state, 1514 transitions=trs, 1515 alphabet=sorted(list(self.alphabet)), 1516 include_states=include_states, 1517 random_seed=random_seed 1518 ) 1519 1520 if isomorphic_shifts is not None : 1521 1522 if not include_states : 1523 raise ValueError( "Isomorphic inversion requires include_states=True" ) 1524 1525 data[ "isomorphic_shifts" ] = {} 1526 1527 for shift in isomorphic_shifts : 1528 1529 try : 1530 1531 shifted = self.isomorphic_shift( 1532 input_symbol_indices=data[ "symbol_index" ], 1533 input_state_indices=data[ "state_index" ], 1534 shift=shift 1535 ) 1536 1537 data[ "isomorphic_shifts" ][ shift ] = { 1538 "symbol_index" : shifted[ "symbol_index" ], 1539 "state_index" : shifted[ "state_index" ] 1540 } 1541 1542 except Exception as e : 1543 print( f"Exception {e}" ) 1544 1545 am_fast.save_data( 1546 data=data, 1547 file_prefix=file_prefix, 1548 alphabet=sorted(list(self.alphabet)), 1549 n_states=len( self.states ), 1550 start_state=self.start_state, 1551 random_seed=random_seed, 1552 machine_metadata=self.get_metadata() ) 1553 1554 return data 1555 1556 #-------------------------------------------------------# 1557 # Basic Visualization # 1558 #-------------------------------------------------------# 1559 1560 def draw_block_entropy_curve(self) : 1561 1562 h_mu = self.h_mu() 1563 E = self.E() 1564 1565 if not "H_L" in self.complexity : 1566 C = self.block_convergence() 1567 1568 H_L = self.complexity[ "H_L" ] 1569 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1570 y = E + L*h_mu 1571 1572 plt.style.use('seaborn-v0_8-darkgrid') 1573 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1574 colors = plt.cm.tab10.colors 1575 1576 ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$') 1577 ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$') 1578 1579 ax.fill_between( 1580 L, H_L, y, 1581 color='gray', 1582 alpha=0.5, 1583 label=r'Transient information $\mathbf{T}$' 1584 ) 1585 1586 ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15) 1587 ax.set_xlabel('L', fontsize=12, labelpad=10) 1588 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1589 1590 legend = ax.legend( 1591 loc='upper left', 1592 title_fontsize='11', 1593 fontsize='10', 1594 frameon=True, 1595 facecolor='white', 1596 shadow=True 1597 ) 1598 1599 legend.get_title().set_fontweight('bold') 1600 plt.tight_layout() 1601 plt.show() 1602 1603 def draw_block_measure_curves(self) : 1604 1605 h_mu = self.h_mu() 1606 E = self.E() 1607 1608 if not "H_L" in self.complexity : 1609 C = self.block_convergence() 1610 1611 H_L = self.complexity[ "H_L" ] 1612 E_L = self.complexity[ "E_L" ] 1613 T_L = self.complexity[ "T_L" ] 1614 S_L = self.complexity[ "S_L" ] 1615 H_sync = self.complexity[ "H_sync" ][1:] 1616 h_mu_L = self.complexity[ "h_mu_L" ] 1617 1618 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1619 y = E + L*h_mu 1620 1621 plt.style.use('seaborn-v0_8-darkgrid') 1622 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1623 colors = plt.cm.tab10.colors 1624 1625 ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$') 1626 ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$') 1627 ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$') 1628 ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$') 1629 ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$') 1630 ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$') 1631 1632 ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15) 1633 ax.set_xlabel('L', fontsize=12, labelpad=10) 1634 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1635 1636 legend = ax.legend( 1637 loc='upper left', 1638 title_fontsize='11', 1639 fontsize='10', 1640 frameon=True, 1641 facecolor='white', 1642 shadow=True 1643 ) 1644 1645 legend.get_title().set_fontweight('bold') 1646 plt.tight_layout() 1647 plt.show() 1648 1649 def draw_graph( 1650 self, 1651 engine : str = 'dot', 1652 output_dir : Path = Path('.'), 1653 show : bool = True, 1654 symbols_only : bool = False 1655 ) -> None : 1656 1657 """ 1658 Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it. 1659 1660 Returns: 1661 1662 networkx.DiGraph : the resulting graph. 1663 """ 1664 1665 G = self.as_digraph() 1666 1667 subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) ) 1668 1669 am_vis.draw_graph( 1670 self, 1671 output_dir=output_dir, 1672 title="am_graph", 1673 view=show, 1674 subgraphs=subgraphs, 1675 engine=engine, 1676 symbols_only=symbols_only ) 1677 1678 #-------------------------------------------------------# 1679 # Alternate Forms # 1680 #-------------------------------------------------------# 1681 1682 1683 def as_digraph( self ) -> nx.DiGraph : 1684 1685 """ 1686 Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights. 1687 1688 Returns: 1689 1690 networkx.DiGraph : the resulting graph. 1691 """ 1692 1693 G = nx.DiGraph() 1694 G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] ) 1695 1696 for tr in self.transitions : 1697 G.add_edge( tr.origin_state_idx, tr.target_state_idx ) 1698 1699 return G 1700 1701 def as_dfa( self, with_probs : bool ) : 1702 1703 """ 1704 Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions. 1705 1706 Args: 1707 with_probs (bool): If true the DFA transitions are labeled based on 1708 the symbol of the machines transition concatenated with its 1709 probability, othwise, the only the symbols. 1710 1711 Returns: 1712 1713 automata.fa.dfa.DFA : the resulting DFA. 1714 """ 1715 1716 precision=8 1717 1718 def edge_label( symb, prob ) : 1719 return f"({symb},{round(prob, precision)})" 1720 1721 # Build states, symbols, and transitions 1722 dfa_states = { i for i, _ in enumerate( self.states ) } 1723 1724 if not with_probs : 1725 dfa_symbols = set( { t.symbol_idx for t in self.transitions } ) 1726 else : 1727 dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } ) 1728 1729 dfa_transitions = defaultdict(dict) 1730 1731 if not with_probs : 1732 for t in self.transitions : 1733 dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx 1734 else : 1735 for t in self.transitions : 1736 dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx 1737 1738 # Construct the DFA 1739 return DFA( 1740 states=dfa_states, 1741 input_symbols=dfa_symbols, 1742 transitions=dfa_transitions, 1743 initial_state=self.start_state, 1744 allow_partial=True, 1745 final_states={ 1746 s for s in dfa_states 1747 } 1748 )
Hidden Markov model implementing epsilon machines, mixed state presentations, complexity measures, and data generation.
Arguments:
- states (list[CausalState] | None): A list of causal states.
- transitions (list[Transition] | None): A list of transitions between states.
- start_state (int): Index of the start state.
- alphabet (list[str]): List of symbols making up the alphabet.
- name (str): Name of the model.
- description (str): Description of the model.
54 def __init__( 55 self, 56 states : list[CausalState] | None = None, 57 transitions : list[Transition] | None = None, 58 start_state : int = 0, 59 alphabet : list[str] | None = None, 60 name : str = "", 61 description : str = "" ) : 62 63 self.alphabet : list[str] | None = alphabet or [] 64 self.states : list[CausalState] | None= states or [] 65 self.transitions : list[Transition] | None= transitions or [] 66 67 self.set_alphabet( self.alphabet ) 68 self.set_states( self.states ) 69 self.set_transitions( self.transitions ) 70 71 self.start_state : int = start_state 72 73 self.name : str = name 74 self.description : str = description 75 76 # To be depreciated 77 self.isoclass : str = None 78 79 # --- derived -------- 80 81 self.complexity : dict[str, any] = {} 82 83 self.symbol_idx_map : dict[str,int] = {} 84 self.state_idx_map : dict[str,int] = {} 85 86 self.pi_fractional = None 87 self.pi : np.ndarray | None = None 88 self.T : np.ndarray | None = None 89 self.msp : MSP | None = None 90 self.reverse_am : HMM | None = None 91 self.is_q_weighted : bool = False 92 self.is_minimal : bool = False 93 94 # --- const ---------- 95 96 self.EPS : float = 1e-12
119 def to_dict(self) -> dict[str,any]: 120 121 """Create a dict representing the HMM configuration. 122 123 Returns: 124 125 dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass. 126 """ 127 128 return { 129 "name" : self.name, 130 "description" : self.description, 131 "states" : [ asdict(state) for state in self.states ], 132 "transitions" : [ asdict(transition) for transition in self.transitions ], 133 "alphabet" : self.alphabet, 134 "isoclass" : self.isoclass 135 }
Create a dict representing the HMM configuration.
Returns:
dict[str,any]: Dictionary containing, name, description, states, transitions, alphabet, and isoclass.
137 def from_dict( self, config : dict[str,any] ) : 138 139 """Configure the HMM configuration from a dictionary. 140 141 Args: 142 config (dict[str,any]): The HMM configuration. 143 """ 144 145 self.name = config.name 146 self.description = config.description 147 148 self.set_states( states=[ 149 CausalState( 150 name=state[ "name" ], 151 classes=set( state[ "classes" ] ) 152 ) 153 for state in config.states 154 ] ) 155 156 self.set_transitions( transitions=[ 157 Transition( 158 origin_state_idx=tr[ "origin_state_idx" ], 159 target_state_idx=tr[ "target_state_idx" ], 160 prob=tr[ "prob" ], 161 symbol_idx=tr[ "symbol_idx" ] 162 ) 163 for tr in config.transitions 164 ] ) 165 166 self.set_alphabet( alphabet=config.alphabet )
Configure the HMM configuration from a dictionary.
Arguments:
- config (dict[str,any]): The HMM configuration.
168 def save_config( 169 self, 170 path : Path, 171 with_complexity : bool = False, 172 with_non_trivial_complexity : bool = False ) : 173 174 config = self.to_dict() 175 176 if with_complexity : 177 178 complexity = self.get_complexities( 179 with_non_trivial=with_non_trivial_complexity 180 ) 181 182 config[ "complexity" ] = complexity 183 184 config[ "structural_properties" ] = { 185 "unifilar" : self.is_unifilar(), 186 "row_stochastic" : self.is_row_stochastic(), 187 "strongly_connected" : self.is_strongly_connected(), 188 "aperiodic" : self.is_aperiodic(), 189 "minimal" : self._is_minimal_as_dfa( topological_only=False ), 190 "topologically_minimal" : self._is_minimal_as_dfa( topological_only=True ), 191 "is_epsilon_machine" : self.is_epsilon_machine() 192 } 193 194 with open( path / "am_config.json", "w", encoding="utf-8" ) as f : 195 json.dump( config, f, ensure_ascii=False, indent=2, default=list )
206 def clear_cache(self) : 207 208 """ 209 Reset all derived properties so they will be recomputed when requested later. 210 """ 211 212 self.complexity = {} 213 self.T = None 214 self.T_x = None 215 self.pi = None 216 self.pi_fractional = None 217 self.msp = None 218 self.reverse_am = None 219 self.is_q_weighted = False 220 self.is_minimal = False
Reset all derived properties so they will be recomputed when requested later.
229 def set_alphabet( self, alphabet : list[str] ) : 230 231 self.clear_cache() 232 233 old_alphabet = self.alphabet.copy() 234 235 aSet = set() 236 aSet.update( alphabet ) 237 238 self.alphabet = sorted(list(aSet)) 239 240 self.symbol_idx_map = {} 241 for idx, symbol in enumerate( self.alphabet ) : 242 self.symbol_idx_map[ symbol ] = idx 243 244 for i, tr in enumerate( self.transitions ) : 245 symbol = old_alphabet[ tr.symbol_idx ] 246 self.transitions[ i ] = Transition( 247 origin_state_idx=tr.origin_state_idx, 248 target_state_idx=tr.target_state_idx, 249 prob=tr.prob, 250 symbol_idx=self.symbol_idx_map[ symbol ] 251 )
277 def get_complexities( 278 self, 279 with_non_trivial=False ) : 280 281 trivial = [ 282 self.C_mu, 283 self.h_mu, 284 self.H_1, 285 self.rho_mu 286 ] 287 288 non_trivial = [ 289 self.E, 290 self.T_inf, 291 self.S, 292 self.chi 293 ] 294 295 complexities = { m.__name__ : m() for m in trivial } 296 297 if with_non_trivial : 298 299 complexities |= { m.__name__ : m() for m in non_trivial } 300 301 for key in [ 'H_L', 'h_mu_L', 'H_sync' ] : 302 if key in self.complexity : 303 complexities[ key ] = self.complexity[ key ] 304 305 return complexities
316 def get_transition_matrix(self) : 317 318 if self.T is not None : 319 return self.T 320 321 n_states = len( self.states ) 322 T = np.zeros((n_states, n_states)) 323 324 for tr in self.transitions : 325 T[ tr.origin_state_idx, tr.target_state_idx ] = tr.prob 326 327 self.T = T 328 329 return self.T
333 def get_T_X(self) : 334 335 if self.T_x is not None : 336 return self.T_x 337 338 n_states = len( self.states ) 339 n_symbols = len( self.alphabet ) 340 341 T_x = [ np.zeros((n_states, n_states)) for _ in range( n_symbols ) ] 342 343 for tr in self.transitions : 344 T_x[ tr.symbol_idx ][tr.origin_state_idx, tr.target_state_idx] = tr.prob 345 346 self.T_x = T_x 347 return self.T_x
351 def get_msp_qw( 352 self, 353 exact_state_cap: int = 1000, 354 verbose: bool = True, 355 ): 356 if self.msp is not None: 357 return self.msp 358 359 try : 360 361 print( "\nTrying to Compute Mixed State Presentation using Exact Fractions\n" ) 362 363 self.msp = compute_msp_exact( 364 T_x=self.get_Tx_fractional(), 365 pi=self.get_fractional_stationary_distribution(), 366 n_states=len(self.states), 367 alphabet=self.alphabet, 368 exact_state_cap=1000, 369 bool = True 370 ) 371 372 return self.msp 373 374 except RuntimeError as e : 375 warnings.warn( f"Exact msp failed: {e} Falling back to msp approximation." ) 376 377 return self.get_msp()
379 def get_msp( 380 self, 381 exact_state_cap: int = 175_000, 382 jsd_eps: float = 1e-7, 383 k_ann: int = 50, 384 verbose = True, 385 ) : 386 387 if self.msp is not None: 388 return self.msp 389 390 T_x = self.get_T_X() 391 pi = self.get_stationary_distribution() 392 393 T_stacked = np.stack(T_x) 394 n_symbols = len(self.alphabet) 395 n_input_states = T_stacked.shape[1] 396 397 print( "\nComputing Mixed State Presentation..." ) 398 399 self.msp = compute_msp( 400 T_x=T_x, 401 pi=pi, 402 n_states=len(self.states), 403 alphabet=self.alphabet, 404 exact_state_cap=exact_state_cap, 405 verbose=verbose 406 ) 407 408 return self.msp
410 def get_reverse_am(self) : 411 412 if self.reverse_am is not None: 413 return self.reverse_am 414 415 pi = self.get_stationary_distribution() 416 self.reverse_am = copy.deepcopy(self) 417 418 new_transitions = [] 419 for tr in self.transitions: 420 i = tr.target_state_idx 421 j = tr.origin_state_idx 422 423 p_reversed = (pi[j] * tr.prob) / pi[i] 424 425 new_transitions.append( 426 Transition( 427 origin_state_idx=i, 428 target_state_idx=j, 429 prob=p_reversed, 430 symbol_idx=tr.symbol_idx 431 ) 432 ) 433 434 self.reverse_am.set_transitions(new_transitions) 435 436 if self.reverse_am.is_epsilon_machine(): 437 return self.reverse_am 438 439 rmsp = self.reverse_am.get_msp_qw( exact_state_cap=len(self.states)*4 ) 440 441 self.reverse_am.set_states( rmsp.states ) 442 self.reverse_am.set_transitions( rmsp.transitions ) 443 self.reverse_am.msp = rmsp 444 self.reverse_am.start_state = 0 445 446 self.reverse_am.collapse_to_largest_strongly_connected_subgraph() 447 self.reverse_am.minimize() 448 449 return self.reverse_am
453 def get_Tx_fractional(self) -> list[ list[ list[ Fraction ] ] ] : 454 455 self.to_q_weighted() 456 457 n_states = len( self.states ) 458 n_symbols = len( self.alphabet ) 459 460 T_x = [] 461 462 for x in range( n_symbols ) : 463 T_x.append( [] ) 464 for i in range( n_states ) : 465 T_x[ x ].append( [ 0 for _ in range( n_states ) ] ) 466 467 for tr in self.transitions : 468 T_x[ tr.symbol_idx ][ tr.origin_state_idx ][ tr.target_state_idx ] = tr.pq 469 470 return T_x
484 def get_fractional_stationary_distribution(self) : 485 486 T = self.get_T_sympy() 487 488 if self.pi_fractional is not None : 489 return self.pi_fractional 490 491 G = self.as_digraph() 492 493 if not nx.is_strongly_connected(G): 494 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 495 496 self.pi_fractional = solve_for_pi_fractional( T ) 497 498 return self.pi_fractional
500 def get_stationary_distribution(self): 501 502 if self.pi is not None : 503 return self.pi 504 505 G = self.as_digraph() 506 507 if not nx.is_strongly_connected(G): 508 raise ValueError( "Single stationary distribution requires strongly connected HMM." ) 509 510 T = self.get_transition_matrix() 511 return solve_for_pi( T )
517 def C_mu( self ) : 518 519 """The *statistical complexity* (aka *forecasting complexity*) : 520 521 .. math:: 522 523 C_{\\mu} = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\log_2 \\Pr(\\sigma), 524 525 where :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 526 527 .. note:: 528 529 **Interpretations** 530 531 * The amount of historical information a process stores. 532 * The amount of structure in a process. 533 534 Returns: 535 536 float: :math:`C_{\\mu}`. 537 538 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 539 Decomposition of Intrinsic Computation*, 2016. 540 <https://arxiv.org/abs/1309.3792> 541 """ 542 543 m = self.get_complexity_measure_if_exists( "C_mu" ) 544 545 if m is not None : 546 return m 547 548 pi = self.get_stationary_distribution() 549 550 h = 0 551 for i, pr in enumerate( pi ) : 552 553 if pr < self.EPS : 554 continue 555 556 h += -pr * np.log2( pr ) 557 558 self.set_complexity_measure( "C_mu", h ) 559 560 return h
The statistical complexity (aka forecasting complexity) :
$$C_{\mu} = - \sum_{\sigma \in \mathcal{S}} \Pr(\sigma) \log_2 \Pr(\sigma),$$
where \( \mathcal{S} \) is the set of states 1, p.2.
Interpretations
- The amount of historical information a process stores.
- The amount of structure in a process.
Returns:
float: \( C_{\mu} \).
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
564 def h_mu( self ) : 565 566 """The *entropy rate* : 567 568 .. math:: 569 570 h_{\\mu}(\\boldsymbol{\\mathcal{S}}) = - \\sum_{\\sigma \\in \\mathcal{S}} \\Pr(\\sigma) \\sum_{x \\in \\mathcal{A}} \\Pr(x|\\sigma) \\log_2 \\Pr(x|\\sigma), 571 572 where :math:`\\mathcal{A}` is the alphabet and :math:`\\mathcal{S}` is the set of states [^crutchfield_exact_2016], p.2. 573 574 .. note:: 575 576 **Interpretations** 577 578 * The lower bound on achievable loss in bits. 579 * The irreducable randomness in the process. 580 * The intrinsic Randomness in the process. 581 582 Returns: 583 584 float: :math:`h_{\\mu}`. 585 586 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 587 Decomposition of Intrinsic Computation*, 2016. 588 <https://arxiv.org/abs/1309.3792> 589 """ 590 591 m = self.get_complexity_measure_if_exists( "h_mu" ) 592 593 if m is not None : 594 return m 595 596 T = self.get_transition_matrix() 597 pi = self.get_stationary_distribution() 598 599 n_states = pi.size 600 601 h = 0 602 for i, pr in enumerate( pi ) : 603 604 if pr < self.EPS : 605 continue 606 607 row_entropy = 0 608 for j in range( len( pi ) ) : 609 610 if T[ i, j ] < self.EPS : 611 continue 612 613 row_entropy -= T[ i, j ] * np.log2( T[ i, j ] ) 614 615 h += pr * row_entropy 616 617 self.set_complexity_measure( "h_mu", h ) 618 619 return h
The entropy rate :
$$h_{\mu}(\boldsymbol{\mathcal{S}}) = - \sum_{\sigma \in \mathcal{S}} \Pr(\sigma) \sum_{x \in \mathcal{A}} \Pr(x|\sigma) \log_2 \Pr(x|\sigma),$$
where \( \mathcal{A} \) is the alphabet and \( \mathcal{S} \) is the set of states 1, p.2.
Interpretations
- The lower bound on achievable loss in bits.
- The irreducable randomness in the process.
- The intrinsic Randomness in the process.
Returns:
float: \( h_{\mu} \).
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
623 def H_1(self) -> float : 624 625 """The *single symbol uncertainty*: 626 627 .. math:: 628 629 H(1)=-\\sum_{x\\in\\mathcal{A}} \\Pr(x) \\log_2{\\Pr(x)}, 630 631 where :math:`\\mathcal{A}` is the alphabet [^James_2018], p.2. 632 633 .. note:: 634 635 **Interpretations** 636 637 * How uncertain you are on average about a single measurement with no context. 638 639 Returns: 640 641 float: :math:`H(1)`. 642 643 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 644 <https://arxiv.org/abs/1105.2988> 645 """ 646 647 m = self.get_complexity_measure_if_exists("H_1") 648 if m is not None: 649 return m 650 651 pi = self.get_stationary_distribution() 652 T_X = self.get_T_X() # dict: symbol -> matrix 653 654 h = 0.0 655 for T_x in T_X: 656 # Pr(x) = sum_i pi[i] * sum_j T^(x)[i,j] 657 p_sym = 0.0 658 for i, pr in enumerate(pi): 659 if pr < self.EPS: 660 continue 661 p_sym += pr * T_x[i, :].sum() 662 663 if p_sym < self.EPS: 664 continue 665 h -= p_sym * np.log2(p_sym) 666 667 self.set_complexity_measure("H_1", h) 668 return h
The single symbol uncertainty:
$$H(1)=-\sum_{x\in\mathcal{A}} \Pr(x) \log_2{\Pr(x)},$$
where \( \mathcal{A} \) is the alphabet 1, p.2.
Interpretations
- How uncertain you are on average about a single measurement with no context.
Returns:
float: \( H(1) \).
-
James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. https://arxiv.org/abs/1105.2988 ↩
672 def rho_mu(self) -> float : 673 674 """The *anticipated information* [^James_2018], p.3.: 675 676 .. math:: 677 678 \\rho_{\\mu}= H(1) - h_{\\mu} 679 680 Returns: 681 682 float: :math:`\\rho_{\\mu}` 683 684 [^James_2018]: James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. 685 <https://arxiv.org/abs/1105.2988> 686 """ 687 688 m = self.get_complexity_measure_if_exists("rho_mu") 689 690 if m is not None: 691 return m 692 693 rho = self.H_1() - self.h_mu() 694 695 self.set_complexity_measure("rho_mu", rho) 696 697 return rho
The anticipated information 1, p.3.:
$$\rho_{\mu}= H(1) - h_{\mu}$$
Returns:
float: \( \rho_{\mu} \)
-
James et al., Anatomy of a Bit: Information in a Time Series Observation, 2018. https://arxiv.org/abs/1105.2988 ↩
701 def block_convergence( self ) : 702 703 """ 704 Run [block entropy convergence](am_fast.html#block_entropy_convergence). Estimates [$\\mathbf{E}$](am_hmm.html#HMM.E), [$\\mathbf{S}$](am_hmm.html#HMM.S), [$\\mathbf{T}$](am_hmm.html#HMM.T_inf), and block measures[^crutchfield_exact_2016]: 705 706 - $\\mathbf{E}(L) = H(L) - L \\cdot h_{\\mu}$, 707 708 - $\\mathbf{T}(L) = \\sum_{l=1}^{L} l \\left[ h_{\\mu}(l) - h_{\\mu} \\right]$, 709 710 - $\\mathbf{S}(L) = \\sum_{l=0}^{L} \\mathcal{H}(l)$, 711 712 - $H(L) = H[X_{0:L}]$, 713 714 - $h_{\\mu}(L) = H(L) - H(L-1)$, and 715 716 - $\\mathcal{H}(L) = -\\sum_{w \\in \\mathcal{A}^L} Pr(w) \\sum_{\\sigma \\in \\mathcal{S}} Pr(\\sigma|w) \\log_2 Pr(\\sigma|w)$. 717 718 You can plot these curves, and the block entropy curves using, [`amachine.HMM.draw_block_measure_curves`](am_hmm.html#HMM.draw_block_measure_curves), and [`amachine.HMM.draw_block_entropy_curve`](am_hmm.html#HMM.draw_block_entropy_curve). 719 720 <img src="../resources/curves.png" alt="block measures plots" style="width: 100%; margin-left: 0%;"> 721 722 Returns: 723 724 ComplexityMeasures: An object containing the estimated measures with the following attributes: 725 726 - E (float): The excess entropy. 727 - T_inf (float): The transient information ($\\mathbf{T}$). 728 - S (float): The synchronization information. 729 - E_L (numpy.ndarray): The block excess entropy ($\\mathbf{E}(L)$). 730 - T_L (numpy.ndarray): The block transient information ($\\mathbf{T}(L)$). 731 - S_L (numpy.ndarray): The block synchronization information ($\\mathbf{S}(L)$). 732 - H_L (numpy.ndarray): The block entropy ($H(L)$). 733 - h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\\mu}(L)$). 734 - H_sync (numpy.ndarray): The state-block synchronization ($\\mathcal{H}(L)$). 735 - converged (bool): True if the algorithm converged. 736 737 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 738 Decomposition of Intrinsic Computation*, 2016. 739 <https://arxiv.org/abs/1309.3792> 740 """ 741 742 trs = [ [] for _ in range( len( self.states ) ) ] 743 for tr in self.transitions : 744 trs[ tr.origin_state_idx ].append( ( 745 tr.symbol_idx, 746 float( tr.prob ), 747 tr.target_state_idx ) ) 748 749 pi = self.get_stationary_distribution() 750 751 state_dist = [ float( pi[ i ] ) for i in range( len( self.states ) ) ] 752 branches = [(1.0, list(state_dist))] 753 754 print( "\nComputing Block Entropy\n" ) 755 756 C = am_fast.block_entropy_convergence( 757 h_mu = self.h_mu(), 758 n_states = len( self.states ), 759 n_symbols = len( self.alphabet ), 760 convergence_tol = 1e-6, 761 precision = 10, 762 eps = 1e-25, 763 branches = branches, 764 trans = trs, 765 max_branches = 30_000_000 766 ) 767 768 print( "Done\n" ) 769 770 self.set_complexity_measure( f"E", C.E ) 771 self.set_complexity_measure( f"S", C.S ) 772 self.set_complexity_measure( f"T_inf", C.T ) 773 self.set_complexity_measure( f"E_L", C.E_L.tolist() ) 774 self.set_complexity_measure( f"T_L", C.T_L.tolist() ) 775 self.set_complexity_measure( f"S_L", C.S_L.tolist() ) 776 self.set_complexity_measure( f"H_L", C.H_L.tolist() ) 777 self.set_complexity_measure( f"h_mu_L", C.h_mu_L.tolist() ) 778 self.set_complexity_measure( f"H_sync", C.H_sync.tolist() ) 779 780 return C
Run block entropy convergence. Estimates $\mathbf{E}$, $\mathbf{S}$, $\mathbf{T}$, and block measures1:
$\mathbf{E}(L) = H(L) - L \cdot h_{\mu}$,
$\mathbf{T}(L) = \sum_{l=1}^{L} l \left[ h_{\mu}(l) - h_{\mu} \right]$,
$\mathbf{S}(L) = \sum_{l=0}^{L} \mathcal{H}(l)$,
$H(L) = H[X_{0:L}]$,
$h_{\mu}(L) = H(L) - H(L-1)$, and
$\mathcal{H}(L) = -\sum_{w \in \mathcal{A}^L} Pr(w) \sum_{\sigma \in \mathcal{S}} Pr(\sigma|w) \log_2 Pr(\sigma|w)$.
You can plot these curves, and the block entropy curves using, amachine.HMM.draw_block_measure_curves, and amachine.HMM.draw_block_entropy_curve.

Returns:
ComplexityMeasures: An object containing the estimated measures with the following attributes:
- E (float): The excess entropy.
- T_inf (float): The transient information ($\mathbf{T}$).
- S (float): The synchronization information.
- E_L (numpy.ndarray): The block excess entropy ($\mathbf{E}(L)$).
- T_L (numpy.ndarray): The block transient information ($\mathbf{T}(L)$).
- S_L (numpy.ndarray): The block synchronization information ($\mathbf{S}(L)$).
- H_L (numpy.ndarray): The block entropy ($H(L)$).
- h_mu_L (numpy.ndarray): The entropy rate estimates ($h_{\mu}(L)$).
- H_sync (numpy.ndarray): The state-block synchronization ($\mathcal{H}(L)$).
- converged (bool): True if the algorithm converged.
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
784 def E( self ) -> float : 785 786 """The *excess entropy* [^crutchfield_exact_2016], p.4: 787 788 .. math:: 789 790 \\mathbf{E} \\equiv \\sum_{L=1}^{\\infty} I[X_{-\\infty:0}; X_{0:\\infty}] 791 792 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 793 794 .. note:: 795 796 **Interpretations** 797 798 * The information from the past that reduces uncertainty in the future [^crutchfield_exact_2016]. 799 * How much information an observer must extract to synchronize to the process. 800 * Measures how long the process appears more complex than it asymptotically is. 801 * Vanishes for immediately synchronizable processes. 802 803 Returns: 804 805 float: :math:`\\mathbf{E}` 806 807 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 808 Decomposition of Intrinsic Computation*, 2016. 809 <https://arxiv.org/abs/1309.3792> 810 """ 811 812 m = self.get_complexity_measure_if_exists( "E" ) 813 814 if m is not None : 815 return m 816 817 try : 818 msp = self.get_msp() 819 E, S, T = msp.get_E_S_T() 820 self.set_complexity_measure( "E", E ) 821 self.set_complexity_measure( "S", S ) 822 self.set_complexity_measure( "T_inf", T ) 823 824 except Exception as e : 825 826 print( f"MSP failed {e}" ) 827 828 C = self.block_convergence() 829 E = C.E 830 self.set_complexity_measure( "E", E ) 831 832 return E
The excess entropy 1, p.4:
$$\mathbf{E} \equiv \sum_{L=1}^{\infty} I[X_{-\infty:0}; X_{0:\infty}]$$
Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()
Interpretations
- The information from the past that reduces uncertainty in the future 1.
- How much information an observer must extract to synchronize to the process.
- Measures how long the process appears more complex than it asymptotically is.
- Vanishes for immediately synchronizable processes.
Returns:
float: \( \mathbf{E} \)
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
836 def S( self ) -> float : 837 838 """The *synchronization* information: 839 840 .. math:: 841 842 \\mathbf{S} \\equiv \\sum_{L=1}^{\\infty} \\mathcal{H}(L), 843 844 where :math:`\\mathcal{H}(L)` is the average state uncertainty having seen all length-L words [^crutchfield_exact_2016], p.4. 845 846 .. note:: 847 848 **Interpretations** 849 850 * The total amount of state information that an observer must extract to become synchronized [^crutchfield_exact_2016]. 851 852 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 853 854 Returns: 855 856 float: :math:`\\mathbf{S}` 857 858 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 859 Decomposition of Intrinsic Computation*, 2016. 860 <https://arxiv.org/abs/1309.3792> 861 """ 862 863 m = self.get_complexity_measure_if_exists( "S" ) 864 865 if m is not None : 866 return m 867 868 try : 869 msp = self.get_msp() 870 E, S, T = msp.get_E_S_T() 871 self.set_complexity_measure( "E", E ) 872 self.set_complexity_measure( "S", S ) 873 self.set_complexity_measure( "T_inf", T ) 874 875 except Exception as e : 876 print( f"{e} \nFalling back to iterative estimation.") 877 exit() 878 879 C = self.block_convergence() 880 S = C.S 881 self.set_complexity_measure( "S", S ) 882 883 return S
The synchronization information:
$$\mathbf{S} \equiv \sum_{L=1}^{\infty} \mathcal{H}(L),$$
where \( \mathcal{H}(L) \) is the average state uncertainty having seen all length-L words 1, p.4.
Interpretations
- The total amount of state information that an observer must extract to become synchronized 1.
Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()
Returns:
float: \( \mathbf{S} \)
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
887 def T_inf( self ) -> float : 888 889 """The *transient information*[^crutchfield_exact_2016], p.4: 890 891 .. math:: 892 893 \\mathbf{T} \\equiv \\sum_{L=1}^{\\infty} L \\left[ h_{\\mu}(L) - h_{\\mu} \\right] 894 895 Computed via :meth:`get_msp` and :meth:`amachine.am_msp.MSP.get_E_S_T`, or :meth:`amachine.am_fast.block_entropy_convergence` 896 897 .. note:: 898 899 **Interpretations** 900 901 * The amount of information one must extract from observations so that the block entropy converges to its linear asymptote[^crutchfield_exact_2016]. 902 903 Returns: 904 905 float: :math:`\\mathbf{T}` 906 907 [^crutchfield_exact_2016]: Crutchfield et al., *Exact Complexity: The Spectral 908 Decomposition of Intrinsic Computation*, 2016. 909 <https://arxiv.org/abs/1309.3792> 910 """ 911 912 m = self.get_complexity_measure_if_exists( "T_inf" ) 913 914 if m is not None : 915 return m 916 917 try : 918 msp = self.get_msp() 919 E, S, T = msp.get_E_S_T() 920 self.set_complexity_measure( "E", E ) 921 self.set_complexity_measure( "S", S ) 922 self.set_complexity_measure( "T_inf", T ) 923 924 except Exception as e : 925 print( f"{e} \nFalling back to iterative estimation.") 926 C = self.block_convergence() 927 T_inf = C.T 928 self.set_complexity_measure( "T_inf", T_inf ) 929 930 return T_inf
The transient information1, p.4:
$$\mathbf{T} \equiv \sum_{L=1}^{\infty} L \left[ h_{\mu}(L) - h_{\mu} \right]$$
Computed via get_msp() and amachine.am_msp.MSP.get_E_S_T(), or amachine.am_fast.block_entropy_convergence()
Interpretations
- The amount of information one must extract from observations so that the block entropy converges to its linear asymptote1.
Returns:
float: \( \mathbf{T} \)
-
Crutchfield et al., Exact Complexity: The Spectral Decomposition of Intrinsic Computation, 2016. https://arxiv.org/abs/1309.3792 ↩
934 def chi( self ) -> float : 935 936 """The foward crypticity[^crutchfield_crypticity_2009][^Mahoney_crypticity_2021], p.2: 937 938 .. math:: 939 940 \\chi = C_{\\mu} - \\mathbf{E} 941 942 :math:`C_{\\mu}` is trivially computed from the stationary distribution in :meth:`C_mu` and :math:`\\mathbf{E}` in :meth:`E`. 943 944 .. note:: 945 946 **Interpretations** 947 948 * Difference between internal stored information and apparent information to an observer. 949 * How muching information is hiding in the system. 950 951 Returns: 952 953 float: :math:`\\chi` 954 955 [^crutchfield_crypticity_2009]: Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009. 956 <https://arxiv.org/abs/0902.1209> 957 958 [^Mahoney_crypticity_2021]: Mahoney et al., Information Accessibility and Cryptic Processes, 2021. 959 <https://arxiv.org/abs/0905.4787> 960 """ 961 962 m = self.get_complexity_measure_if_exists( "chi" ) 963 964 if m is not None : 965 return m 966 967 chi = self.C_mu() - self.E() 968 969 if chi < 0 : 970 971 # if chi is 0, accumulated floating point error can result in small negative values 972 if chi < -1e-5: 973 warnings.warn(f"Crypticity is negative ({chi:.6e}).") 974 975 chi = np.clamp( chi, 0 ) 976 977 self.set_complexity_measure( "chi", chi ) 978 979 return chi
$$\chi = C_{\mu} - \mathbf{E}$$
\( C_{\mu} \) is trivially computed from the stationary distribution in C_mu() and \( \mathbf{E} \) in E().
Interpretations
- Difference between internal stored information and apparent information to an observer.
- How muching information is hiding in the system.
Returns:
float: \( \chi \)
-
Crutchfield et al., Time’s barbed arrow: Irreversibility, crypticity, and stored information, 2009. https://arxiv.org/abs/0902.1209 ↩
-
Mahoney et al., Information Accessibility and Cryptic Processes, 2021. https://arxiv.org/abs/0905.4787 ↩
985 def is_row_stochastic(self) : 986 987 """ 988 Check that all states have outgoing transition probabilities that sum to 1. 989 """ 990 991 sums = np.zeros( len( self.states ) ) 992 for tr in self.transitions : 993 sums[ tr.origin_state_idx ] += tr.prob 994 return np.allclose( sums, 1.0 )
Check that all states have outgoing transition probabilities that sum to 1.
998 def is_unifilar(self) : 999 1000 """ 1001 Check that no state emits the same symbol on transitions to different states. 1002 """ 1003 1004 symbol_trs = np.full( ( len( self.states ), len( self.alphabet) ), -1 ) 1005 1006 for tr in self.transitions : 1007 1008 if symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] == -1 : 1009 symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] = tr.target_state_idx 1010 1011 elif symbol_trs[ tr.origin_state_idx, tr.symbol_idx ] != tr.target_state_idx : 1012 return False 1013 1014 return True
Check that no state emits the same symbol on transitions to different states.
1018 def is_strongly_connected(self) : 1019 1020 """ 1021 Check if every state is reachable from every other state. Relies on [nx.is_strongly_connected](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.components.is_strongly_connected.html). 1022 """ 1023 1024 return nx.is_strongly_connected( self.as_digraph() )
Check if every state is reachable from every other state. Relies on nx.is_strongly_connected.
1028 def is_aperiodic(self) : 1029 1030 """ 1031 Checks if machine is periodic. Relies on [nx.is_aperiodic](https://networkx.org/documentation/latest/reference/algorithms/generated/networkx.algorithms.dag.is_aperiodic.html), "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph." 1032 """ 1033 1034 return nx.is_aperiodic( self.as_digraph() )
Checks if machine is periodic. Relies on nx.is_aperiodic, "A strongly connected directed graph is aperiodic if there is no integer k > 1 that divides the length of every cycle in the graph."
1057 def is_topological_epsilon_machine( self, verbose=True ) : 1058 1059 """ 1060 Checks if the HMM is a topological $\\epsilon$-machine [^1]. 1061 1062 [^1]: Johnson et al, Enumerating Finitary Processes, 2024. 1063 <https://arxiv.org/abs/1011.0036> 1064 """ 1065 1066 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1067 if verbose : 1068 print( f"Either non unifilar or not strongly connected" ) 1069 return False 1070 else : 1071 return self._is_minimal_as_dfa( topological_only=True, verbose=verbose )
Checks if the HMM is a topological $\epsilon$-machine 1.
-
Johnson et al, Enumerating Finitary Processes, 2024. https://arxiv.org/abs/1011.0036 ↩
1073 def is_epsilon_machine( self, verbose=True ) : 1074 1075 if not ( self.is_unifilar() and self.is_strongly_connected() ) : 1076 if verbose : 1077 print( f"Either non unifilar or not strongly connected" ) 1078 return False 1079 else : 1080 return self._is_minimal_as_dfa( topological_only=False, verbose=verbose )
1086 def minimize(self, retain_names: bool = True, verbose=False): 1087 1088 """ 1089 Minimizes the HMM, resulting in an :math:`\\epsilon-`machine if the HMM 1090 is unifilar and strongly connected. Converts the HMM to a DFA with symbols 1091 labeled jointly with symbols and probabilities, and uses Myhill-Nerode 1092 equivalence for minimization. Relies on `automata_lib` and uses 1093 `automata.fa.dfa.DFA.minify` with `allow_partial=True`, and all states 1094 final. 1095 1096 Args: 1097 retain_names (bool): If `True`, the merged states will be named by their union, e.g. `{s_0, s_1}`, and other states will retain their origion names. Otherwise, they will be relabled `{ '0', '1', ..., 'n-1' }`. 1098 1099 Returns: 1100 1101 automata.fa.dfa.DFA : the resulting DFA. 1102 """ 1103 1104 if self.is_minimal : 1105 return 1106 1107 start = time.perf_counter() 1108 1109 if not self.is_unifilar(): 1110 raise ValueError( 1111 "DFA minimization is not valid for non-unifilar HMMs" 1112 ) 1113 1114 was_strongly_connected = self.is_strongly_connected() 1115 1116 was_row_stochastic = self.is_row_stochastic() 1117 n_states_before = len(self.states) 1118 1119 dfa = self.as_dfa(with_probs=True) 1120 1121 #min_dfa = self.as_dfa(with_probs=True).minify(retain_names=True) 1122 min_dfa = am_fast.minify_cpp( dfa, retain_names=True ) 1123 1124 # Build lookup from original state index -> CausalState object 1125 orig_state = {i: s for i, s in enumerate(self.states)} 1126 eq_list = list(min_dfa.states) 1127 1128 start_eq = min_dfa.initial_state 1129 1130 # Separate the start state, then sort the rest by the 1131 # smallest original state index inside each equivalence class. 1132 other_eqs = [eq for eq in eq_list if eq != start_eq] 1133 other_eqs.sort(key=lambda eq: min(eq)) 1134 1135 # Recombine so start eq comes first, followed by the sorted remaining classes 1136 eq_list = [start_eq] + other_eqs 1137 # ---------------------------------------------------------- 1138 1139 # Recompute eq_to_idx with the new ordering 1140 eq_to_idx = {eq: i for i, eq in enumerate(eq_list)} 1141 1142 # new_start is now guaranteed to be 0 1143 new_start = 0 1144 1145 # Map each original state index -> its equivalence class 1146 # Guard: minify() silently drops unreachable states 1147 orig_to_eq = {s: eq for eq in min_dfa.states for s in eq} 1148 1149 # Build lookup from original state index -> its transitions 1150 orig_trs = defaultdict(list) 1151 for t in self.transitions: 1152 orig_trs[t.origin_state_idx].append(t) 1153 1154 new_trs = [] 1155 for eq in min_dfa.states: 1156 rep = next(iter(eq)) 1157 origin_idx = eq_to_idx[eq] 1158 for t in orig_trs[rep]: 1159 target_eq = orig_to_eq[t.target_state_idx] 1160 target_idx = eq_to_idx[target_eq] 1161 new_trs.append(Transition( 1162 origin_state_idx = origin_idx, 1163 target_state_idx = target_idx, 1164 prob = t.prob, 1165 symbol_idx = t.symbol_idx, 1166 )) 1167 1168 members_list = [[orig_state[i] for i in sorted(eq)] for eq in eq_list] # sorted for determinism 1169 1170 # Compute new names 1171 if retain_names: 1172 new_names = [ 1173 "{" + ",".join(str(m.name) for m in members) + "}" if len(members) > 1 1174 else members[0].name 1175 for members in members_list 1176 ] 1177 else: 1178 new_names = [str(j) for j in range(len(eq_list))] 1179 1180 old_name_to_new_name = { 1181 m.name: new_names[j] 1182 for j, members in enumerate(members_list) 1183 for m in members 1184 } 1185 1186 # Build the new states, preserving classes and isomorphs regardless of naming 1187 new_states = [] 1188 for j, (eq, members, name) in enumerate(zip(eq_list, members_list, new_names)): 1189 classes = set().union(*(m.classes for m in members)) 1190 isomorphs = { 1191 old_name_to_new_name.get(iso, iso) 1192 for m in members 1193 for iso in m.isomorphs 1194 if old_name_to_new_name.get(iso, iso) != name 1195 } 1196 new_states.append(CausalState( 1197 name = name, 1198 classes = classes, 1199 isomorphs = isomorphs, 1200 )) 1201 1202 self.set_states(new_states) 1203 self.set_transitions(new_trs) 1204 self.start_state = new_start 1205 1206 if n_states_before == len(new_states) and verbose : 1207 print( f"{n_states_before} state HMM was already minimal.\n" ) 1208 elif verbose : 1209 print( f"Minimized from {n_states_before} to {len(new_states)}\n" ) 1210 1211 if not ( was_strongly_connected == self.is_strongly_connected() ) : 1212 raise RuntimeError( 1213 f"Minimization broke strongly connected" 1214 ) 1215 1216 if not ( was_row_stochastic == self.is_row_stochastic() ) : 1217 raise RuntimeError( 1218 f"Minimization broke row stochasticity" 1219 ) 1220 1221 self.is_minimal = True
Minimizes the HMM, resulting in an \( \epsilon- \)machine if the HMM
is unifilar and strongly connected. Converts the HMM to a DFA with symbols
labeled jointly with symbols and probabilities, and uses Myhill-Nerode
equivalence for minimization. Relies on automata_lib and uses
automata.fa.dfa.DFA.minify with allow_partial=True, and all states
final.
Arguments:
- retain_names (bool): If
True, the merged states will be named by their union, e.g.{s_0, s_1}, and other states will retain their origion names. Otherwise, they will be relabled{ '0', '1', ..., 'n-1' }.
Returns:
automata.fa.dfa.DFA : the resulting DFA.
1228 def collapse_to_largest_strongly_connected_subgraph( self, rename_states=True ) : 1229 1230 # get equivalent networkx graph 1231 G = self.as_digraph() 1232 1233 # if already strongly connected, nothing to do 1234 if not nx.is_strongly_connected( G ) : 1235 1236 start = time.perf_counter() 1237 subgraph_nodes = list( nx.strongly_connected_components( G ) ) 1238 1239 # decompose into strongly connected components and sort by length 1240 # subgraph_nodes = list(nx.strongly_connected_components( G )) 1241 subgraph_nodes.sort(key=len) 1242 component_state_set = subgraph_nodes[-1] 1243 1244 # Take the largest strongly connected component (as list of state names) 1245 component_states = sorted( list( component_state_set ) ) 1246 1247 # make temporary copies of the old transitions and states 1248 old_transitions = [ 1249 Transition( 1250 origin_state_idx=tr.origin_state_idx, 1251 target_state_idx=tr.target_state_idx, 1252 prob=tr.prob, 1253 symbol_idx=tr.symbol_idx 1254 ) 1255 1256 for tr in self.transitions 1257 ] 1258 1259 old_states = [ 1260 CausalState( 1261 name=s.name, 1262 classes=s.classes, 1263 isomorphs=s.isomorphs 1264 ) 1265 for s in self.states 1266 ] 1267 1268 self.set_states( 1269 states=[ 1270 state 1271 for i, state in enumerate( old_states ) if i in component_state_set 1272 ] 1273 ) 1274 1275 # we will build new transition list based on those belonging to the component 1276 self.set_transitions( transitions= [] ) 1277 1278 # for tracking which new transitions leave each state 1279 transitions_from_state = { state : set() for state in component_states } 1280 new_transitions = [] 1281 1282 for tr in old_transitions : 1283 1284 origin_state_name = old_states[ tr.origin_state_idx ].name 1285 target_state_name = old_states[ tr.target_state_idx ].name 1286 1287 # skip transitions that connect separate strongly connected components 1288 if not ( tr.origin_state_idx in component_state_set and tr.target_state_idx in component_state_set ) : 1289 continue 1290 1291 # track transitions (by index in new transitions list) that leave this state 1292 transitions_from_state[ tr.origin_state_idx ].add( len( new_transitions ) ) 1293 1294 my_origin_state_idx = self.state_idx_map[ origin_state_name ] 1295 my_target_state_idx = self.state_idx_map[ target_state_name ] 1296 1297 new_transitions.append( 1298 Transition( 1299 origin_state_idx=my_origin_state_idx, 1300 target_state_idx=my_target_state_idx, 1301 prob=tr.prob, 1302 symbol_idx=tr.symbol_idx 1303 ) ) 1304 1305 self.set_transitions( transitions=new_transitions ) 1306 1307 # if we removed an outgoing transition from a state, we need to distribute its probability 1308 # among the remaining outgoing transitions from the state 1309 for state in component_states : 1310 1311 # get the set of transitions leaving this state 1312 state_trs = transitions_from_state[ state ] 1313 1314 # sum the probabilities of the outgoing transitions from the state 1315 p_sum = np.sum( [ self.transitions[ i ].prob for i in state_trs ] ) 1316 1317 # how much probability is missing 1318 diff = 1.0 - p_sum 1319 1320 # if significant difference 1321 if abs( diff ) > self.EPS : 1322 1323 # calculate how much of the difference each transition gets 1324 adjustment = diff / len( state_trs ) 1325 1326 # update the transitions 1327 for i in state_trs : 1328 1329 # transition with origional probability 1330 tr = self.transitions[ i ] 1331 1332 # adjusted probability 1333 self.transitions[ i ] = Transition( 1334 origin_state_idx=tr.origin_state_idx, 1335 target_state_idx=tr.target_state_idx, 1336 prob=tr.prob + adjustment, 1337 symbol_idx=tr.symbol_idx ) 1338 1339 if rename_states : 1340 self.set_states( [ 1341 CausalState( name=f"{i}" ) 1342 for i, s in enumerate( self.states ) 1343 ] )
1345 def to_q_weighted( self, denominator_limit=1000 ) : 1346 1347 """ 1348 Approximates the existing transition probabilities with exact fractions, stores 1349 the fractional probabilities as Fraction in Transition.pq, and sets the floating 1350 point probabilty to `float(pq)`. If `denominator_limit` is too small for a sane 1351 conversion, the function recurses with `denominator_limit=denominator_limit*10`. 1352 1353 Args: 1354 denominator_limit (int): The initial input to :meth:`Fraction.limit_denominator` in 1355 the conversion. 1356 """ 1357 1358 if self.is_q_weighted : 1359 return 1360 1361 if not self.is_row_stochastic() : 1362 raise ValueError( "Cannot convert to q-weighted because not row stochastic" ) 1363 1364 t_from = [[] for _ in range(len(self.states))] 1365 1366 for i, tr in enumerate( self.transitions ) : 1367 t_from[ tr.origin_state_idx ].append( i ) 1368 1369 new_transitions = [] 1370 for t_list in t_from : 1371 1372 if not t_list : 1373 continue 1374 1375 p_q_sum = Fraction(0,1) 1376 p_qs = [] 1377 1378 for t_idx in t_list : 1379 1380 p_q = Fraction( self.transitions[ t_idx ].prob ).limit_denominator( denominator_limit ) 1381 p_q_sum += p_q 1382 p_qs.append( p_q ) 1383 1384 if p_q_sum != Fraction(1,1) : 1385 1386 max_pq_i = np.argmax( p_qs ) 1387 max_oq = p_qs[ max_pq_i ] 1388 1389 diff = p_q_sum - Fraction(1,1) 1390 1391 # If recurse with higher resolution 1392 if diff > max_oq : 1393 return self.to_q_weighted( denominator_limit*10 ) 1394 else : 1395 p_qs[ max_pq_i ] -= diff 1396 1397 for i, t_idx in enumerate( t_list ) : 1398 new_transitions.append( 1399 Transition( 1400 origin_state_idx=self.transitions[ t_idx ].origin_state_idx, 1401 target_state_idx=self.transitions[ t_idx ].target_state_idx, 1402 prob=float(p_qs[ i ]), 1403 symbol_idx=self.transitions[ t_idx ].symbol_idx, 1404 pq=p_qs[ i ] 1405 ) 1406 ) 1407 1408 self.set_transitions( new_transitions ) 1409 self.is_q_weighted = True
Approximates the existing transition probabilities with exact fractions, stores
the fractional probabilities as Fraction in Transition.pq, and sets the floating
point probabilty to float(pq). If denominator_limit is too small for a sane
conversion, the function recurses with denominator_limit=denominator_limit*10.
Arguments:
- denominator_limit (int): The initial input to
Fraction.limit_denominator()in - the conversion.
1415 def isomorphic_shift( 1416 self, 1417 input_symbol_indices: np.ndarray, 1418 input_state_indices: np.ndarray, 1419 shift : int = 1 1420 ) -> dict[str, np.ndarray]: 1421 1422 """ 1423 Generates a new sequence of of symbols that are permuted with the symbols emitted by 1424 isomorphic states, if they exists. 1425 1426 :math:`\\sigma_o = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i]\\right]`<br> 1427 :math:`\\sigma_t = \\mathcal{S}\\left[\\texttt{input\\_state\\_indices}[i+1]\\right]` 1428 1429 :math:`\\mathcal{I}(\\sigma_o) = \\\\{\\sigma^0_o,\\, \\sigma^1_o,\\, \\dots,\\, \\sigma^{n-1}_o \\\\}`<br> 1430 :math:`\\mathcal{I}(\\sigma_t) = \\\\{\\sigma^0_t,\\, \\sigma^1_t,\\, \\dots,\\, \\sigma^{n-1}_t \\\\}` 1431 1432 :math:`k = \\bigl(\\mathcal{I}(\\sigma_o).\\texttt{index}(\\sigma_o) + \\texttt{shift}\\bigr) \\bmod n` 1433 1434 :math:`\\texttt{output\\_symbol\\_indices}[i] := T(\\sigma_o^k,\\, \\sigma_t^k).\\text{symbol\\_index}`<br> 1435 :math:`\\texttt{output\\_state\\_indices}[i] := \\mathcal{S}.\\texttt{index}(\\sigma_o^k)`<br> 1436 :math:`\\texttt{output\\_state\\_indices}[i+1] := \\mathcal{S}.\\texttt{index}(\\sigma_t^k)` 1437 1438 Where :math:`\\mathcal{I}(\\sigma)`` is the ordered set of states isomorphic to :math:`\\sigma` including :math:`\\sigma` itself. 1439 1440 Args: 1441 input_symbol_indices (np.ndarray): The sequence of generated symbols. 1442 input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end. 1443 shift : int: How much to shift the symbols across the isomorphic states. 1444 """ 1445 1446 if not any( state.isomorphs for state in self.states ): 1447 raise ValueError("HMM has no states with isomorphs") 1448 1449 inputs = np.asarray(input_symbol_indices) 1450 states = np.asarray(input_state_indices) 1451 1452 n_states = len(self.states) 1453 1454 tr_sym_table = np.full((n_states, n_states), -1, dtype=np.int32) 1455 1456 for tr in self.transitions: 1457 tr_sym_table[ tr.origin_state_idx, tr.target_state_idx ] = tr.symbol_idx 1458 1459 # Build isomorph remapping: identity by default, overridden where isomorphs exist 1460 iso_table = np.arange(n_states, dtype=np.int32) 1461 1462 for i, state in enumerate(self.states): 1463 if state.isomorphs: 1464 isomorph = sorted(state.isomorphs)[0] 1465 iso_table[ i ] = self.state_idx_map[isomorph] 1466 1467 for i, state in enumerate(self.states): 1468 if state.isomorphs: 1469 1470 # extend the isomorph list to include the identity 1471 isormorphs_with_identity = sorted( [ i ] + [ self.state_idx_map[iso] for iso in state.isomorphs ] ) 1472 1473 # find the identity index 1474 pos = isormorphs_with_identity.index( i ) 1475 1476 # cyclical shift 1477 iso_table[ i ] = isormorphs_with_identity[ ( pos + shift ) % len( isormorphs_with_identity ) ] 1478 1479 origins = states[:-1] 1480 targets = states[1:] 1481 1482 out_origins = iso_table[origins] 1483 out_targets = iso_table[targets] 1484 1485 inv_sym = tr_sym_table[ out_origins, out_targets ] 1486 inv_sts = np.empty(states.size, dtype=states.dtype) 1487 1488 inv_sts[:-1] = out_origins 1489 inv_sts[-1] = out_targets[-1] 1490 1491 return { 1492 "symbol_index": inv_sym.astype(inputs.dtype), 1493 "state_index": inv_sts, 1494 }
Generates a new sequence of of symbols that are permuted with the symbols emitted by isomorphic states, if they exists.
\( \sigma_o = \mathcal{S}\left[\texttt{input_state_indices}[i]\right] \)
\( \sigma_t = \mathcal{S}\left[\texttt{input_state_indices}[i+1]\right] \)
\( \mathcal{I}(\sigma_o) = \{\sigma^0_o,\, \sigma^1_o,\, \dots,\, \sigma^{n-1}_o \} \)
\( \mathcal{I}(\sigma_t) = \{\sigma^0_t,\, \sigma^1_t,\, \dots,\, \sigma^{n-1}_t \} \)
\( k = \bigl(\mathcal{I}(\sigma_o).\texttt{index}(\sigma_o) + \texttt{shift}\bigr) \bmod n \)
\( \texttt{output_symbol_indices}[i] := T(\sigma_o^k,\, \sigma_t^k).\text{symbol_index} \)
\( \texttt{output_state_indices}[i] := \mathcal{S}.\texttt{index}(\sigma_o^k) \)
\( \texttt{output_state_indices}[i+1] := \mathcal{S}.\texttt{index}(\sigma_t^k) \)
Where \( \mathcal{I}(\sigma) \)` is the ordered set of states isomorphic to \( \sigma \) including \( \sigma \) itself.
Arguments:
- input_symbol_indices (np.ndarray): The sequence of generated symbols.
- input_state_indices (np.ndarray): The sequence of states that generated symbols with the final state at the end.
- shift : int: How much to shift the symbols across the isomorphic states.
1496 def generate_data( 1497 self, 1498 file_prefix: str, 1499 n_gen: int, 1500 include_states: bool, 1501 isomorphic_shifts : set[int]=None, 1502 random_seed : int=42 ) -> dict[any] : 1503 1504 trs = [ [] for _ in range( len( self.states ) ) ] 1505 for tr in self.transitions : 1506 trs[ tr.origin_state_idx ].append( ( 1507 tr.symbol_idx, 1508 float( tr.prob ), 1509 tr.target_state_idx ) ) 1510 1511 data = am_fast.generate_data( 1512 n_gen=n_gen, 1513 start_state=self.start_state, 1514 transitions=trs, 1515 alphabet=sorted(list(self.alphabet)), 1516 include_states=include_states, 1517 random_seed=random_seed 1518 ) 1519 1520 if isomorphic_shifts is not None : 1521 1522 if not include_states : 1523 raise ValueError( "Isomorphic inversion requires include_states=True" ) 1524 1525 data[ "isomorphic_shifts" ] = {} 1526 1527 for shift in isomorphic_shifts : 1528 1529 try : 1530 1531 shifted = self.isomorphic_shift( 1532 input_symbol_indices=data[ "symbol_index" ], 1533 input_state_indices=data[ "state_index" ], 1534 shift=shift 1535 ) 1536 1537 data[ "isomorphic_shifts" ][ shift ] = { 1538 "symbol_index" : shifted[ "symbol_index" ], 1539 "state_index" : shifted[ "state_index" ] 1540 } 1541 1542 except Exception as e : 1543 print( f"Exception {e}" ) 1544 1545 am_fast.save_data( 1546 data=data, 1547 file_prefix=file_prefix, 1548 alphabet=sorted(list(self.alphabet)), 1549 n_states=len( self.states ), 1550 start_state=self.start_state, 1551 random_seed=random_seed, 1552 machine_metadata=self.get_metadata() ) 1553 1554 return data
1560 def draw_block_entropy_curve(self) : 1561 1562 h_mu = self.h_mu() 1563 E = self.E() 1564 1565 if not "H_L" in self.complexity : 1566 C = self.block_convergence() 1567 1568 H_L = self.complexity[ "H_L" ] 1569 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1570 y = E + L*h_mu 1571 1572 plt.style.use('seaborn-v0_8-darkgrid') 1573 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1574 colors = plt.cm.tab10.colors 1575 1576 ax.plot( L, H_L, color='#1E88E5', linewidth=2.0, linestyle='-', label=r'$H(L)$') 1577 ax.plot( L, y, color='#D81B60', linewidth=1.5, linestyle='--', label=r'$\mathbf{E} + h_{\mu}L$') 1578 1579 ax.fill_between( 1580 L, H_L, y, 1581 color='gray', 1582 alpha=0.5, 1583 label=r'Transient information $\mathbf{T}$' 1584 ) 1585 1586 ax.set_title('Block Entropy Curve', fontsize=16, fontweight='bold', pad=15) 1587 ax.set_xlabel('L', fontsize=12, labelpad=10) 1588 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1589 1590 legend = ax.legend( 1591 loc='upper left', 1592 title_fontsize='11', 1593 fontsize='10', 1594 frameon=True, 1595 facecolor='white', 1596 shadow=True 1597 ) 1598 1599 legend.get_title().set_fontweight('bold') 1600 plt.tight_layout() 1601 plt.show()
1603 def draw_block_measure_curves(self) : 1604 1605 h_mu = self.h_mu() 1606 E = self.E() 1607 1608 if not "H_L" in self.complexity : 1609 C = self.block_convergence() 1610 1611 H_L = self.complexity[ "H_L" ] 1612 E_L = self.complexity[ "E_L" ] 1613 T_L = self.complexity[ "T_L" ] 1614 S_L = self.complexity[ "S_L" ] 1615 H_sync = self.complexity[ "H_sync" ][1:] 1616 h_mu_L = self.complexity[ "h_mu_L" ] 1617 1618 L = np.asarray( [ i+1 for i in range( len( H_L ) ) ], dtype='float64') 1619 y = E + L*h_mu 1620 1621 plt.style.use('seaborn-v0_8-darkgrid') 1622 fig, ax = plt.subplots(figsize=(10, 6), dpi=120) 1623 colors = plt.cm.tab10.colors 1624 1625 ax.plot( L, H_L, color=colors[0], linewidth=2.0, linestyle='-', label=r'$H(L)$') 1626 ax.plot( L, E_L, color=colors[1], linewidth=2.0, linestyle='-', label=r'$\mathbf{E}(L)$') 1627 ax.plot( L, T_L, color=colors[2], linewidth=2.0, linestyle='-', label=r'$\mathbf{T}(L)$') 1628 ax.plot( L, S_L, color=colors[3], linewidth=2.0, linestyle='-', label=r'$\mathbf{S}(L)$') 1629 ax.plot( L, H_sync, color=colors[4], linewidth=2.0, linestyle='-', label=r'$\mathcal{H}(L)$') 1630 ax.plot( L, h_mu_L, color=colors[5], linewidth=2.0, linestyle='-', label=r'$h_{\mu}(L)$') 1631 1632 ax.set_title('Block Measures', fontsize=16, fontweight='bold', pad=15) 1633 ax.set_xlabel('L', fontsize=12, labelpad=10) 1634 ax.set_ylabel('Bits', fontsize=12, labelpad=10) 1635 1636 legend = ax.legend( 1637 loc='upper left', 1638 title_fontsize='11', 1639 fontsize='10', 1640 frameon=True, 1641 facecolor='white', 1642 shadow=True 1643 ) 1644 1645 legend.get_title().set_fontweight('bold') 1646 plt.tight_layout() 1647 plt.show()
1649 def draw_graph( 1650 self, 1651 engine : str = 'dot', 1652 output_dir : Path = Path('.'), 1653 show : bool = True, 1654 symbols_only : bool = False 1655 ) -> None : 1656 1657 """ 1658 Draws the machine using [pygraphiviz](https://pygraphviz.github.io/documentation/stable/) and saves it. 1659 1660 Returns: 1661 1662 networkx.DiGraph : the resulting graph. 1663 """ 1664 1665 G = self.as_digraph() 1666 1667 subgraphs = None if nx.is_strongly_connected( G ) else list( nx.strongly_connected_components( G ) ) 1668 1669 am_vis.draw_graph( 1670 self, 1671 output_dir=output_dir, 1672 title="am_graph", 1673 view=show, 1674 subgraphs=subgraphs, 1675 engine=engine, 1676 symbols_only=symbols_only )
1683 def as_digraph( self ) -> nx.DiGraph : 1684 1685 """ 1686 Builds a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) constructed from the machine's transitions with no edge symbols or weights. 1687 1688 Returns: 1689 1690 networkx.DiGraph : the resulting graph. 1691 """ 1692 1693 G = nx.DiGraph() 1694 G.add_nodes_from( [ i for i, s in enumerate( self.states ) ] ) 1695 1696 for tr in self.transitions : 1697 G.add_edge( tr.origin_state_idx, tr.target_state_idx ) 1698 1699 return G
Builds a networkx.DiGraph constructed from the machine's transitions with no edge symbols or weights.
Returns:
networkx.DiGraph : the resulting graph.
1701 def as_dfa( self, with_probs : bool ) : 1702 1703 """ 1704 Builds an [automata.fa.dfa.DFA](https://caleb531.github.io/automata/api/fa/class-dfa/) constructed from the machine's transitions. 1705 1706 Args: 1707 with_probs (bool): If true the DFA transitions are labeled based on 1708 the symbol of the machines transition concatenated with its 1709 probability, othwise, the only the symbols. 1710 1711 Returns: 1712 1713 automata.fa.dfa.DFA : the resulting DFA. 1714 """ 1715 1716 precision=8 1717 1718 def edge_label( symb, prob ) : 1719 return f"({symb},{round(prob, precision)})" 1720 1721 # Build states, symbols, and transitions 1722 dfa_states = { i for i, _ in enumerate( self.states ) } 1723 1724 if not with_probs : 1725 dfa_symbols = set( { t.symbol_idx for t in self.transitions } ) 1726 else : 1727 dfa_symbols = set( { edge_label( t.symbol_idx, t.prob ) for t in self.transitions } ) 1728 1729 dfa_transitions = defaultdict(dict) 1730 1731 if not with_probs : 1732 for t in self.transitions : 1733 dfa_transitions[ t.origin_state_idx ][ t.symbol_idx ] = t.target_state_idx 1734 else : 1735 for t in self.transitions : 1736 dfa_transitions[ t.origin_state_idx ][ edge_label( t.symbol_idx, t.prob ) ] = t.target_state_idx 1737 1738 # Construct the DFA 1739 return DFA( 1740 states=dfa_states, 1741 input_symbols=dfa_symbols, 1742 transitions=dfa_transitions, 1743 initial_state=self.start_state, 1744 allow_partial=True, 1745 final_states={ 1746 s for s in dfa_states 1747 } 1748 )
Builds an automata.fa.dfa.DFA constructed from the machine's transitions.
Arguments:
- with_probs (bool): If true the DFA transitions are labeled based on the symbol of the machines transition concatenated with its probability, othwise, the only the symbols.
Returns:
automata.fa.dfa.DFA : the resulting DFA.