Source code for pycobi.utility

import re
from pathlib import Path
from typing import Any, Iterable, List, Optional, Union

import numpy as np
import pandas as pd


# auto-object related utility functions
#######################################


# ---------------------------------------------------------------------------
# Regexes for parsing auto-07p's per-point diagnostic text blocks.
#
# A typical block (one per point) looks like:
#
#     BR    PT  IT         PAR           L2-NORM
#      1     5   0       -4.56412E+00   1.82904E+00
#      ...
#      1     5         Hopf Function:   0.00000E+00
#      1     5         Eigenvalues  :   Stable:   2
#      1     5         Eigenvalue  1:  -2.12891E+00   0.00000E+00
#      1     5         Eigenvalue  2:  -5.17236E+00   0.00000E+00
#      1     5         Fold Function:   9.49088E-01
#      ...
#
# Equilibrium continuations emit "Eigenvalues" + "Eigenvalue N:"; limit-cycle
# continuations emit "Multipliers" + "Multiplier N:" with Floquet multipliers.
# ---------------------------------------------------------------------------

_STABLE_COUNT_RE = re.compile(
    r'(?:Eigenvalues|Multipliers)\s*:\s*Stable:\s*(\d+)',
)
# Capture (index, real, imag) per eigenvalue / multiplier line. The leading
# `\d+\s+-?\d+` consumes the BR/PT prefix; PT is signed in auto's output.
_EIG_LINE_RE = re.compile(
    r'^\s*\d+\s+-?\d+\s+(?:Eigenvalue|Multiplier)\s+(\d+):\s*(\S+)\s+(\S+)\s*$',
    re.MULTILINE,
)
_NO_CONVERGENCE = "NOTE:No converge"


def _normalize_icp(branch_id: Any, icp: Any) -> tuple:
    """Coerce auto-07p's ICP entry (int for a single-parameter continuation,
    iterable for multi-parameter) to a tuple."""
    icp = (icp,) if isinstance(icp, int) else tuple(icp)
    return branch_id, icp


[docs]def get_branch_info(branch: Any) -> tuple: """Extract the branch id and continuation parameter(s) from an auto-07p solution branch object. Three call shapes are supported, tried in order: 1. `bifDiag` (auto's top-level multi-branch container) — read `.BR` / `.c['ICP']` from the first sub-branch. 2. A single branch object indexed by `'BR'` with its own `.c` config. 3. An IVP-style nested structure where the BR field is buried under an `'RG'` (regular point) label on the first sub-branch. The previous implementation walked the first 10 label indices and re-raised whichever ``KeyError`` happened last; now we iterate the keys explicitly and, on exhaustion, raise a single ``KeyError`` naming every key that was tried. Returns ------- tuple ``(branch_id, icp)`` where ``icp`` is a tuple of one or more PAR indices. """ # --- Path 1: bifDiag with .BR / .c on the first sub-branch. try: return _normalize_icp(branch[0].BR, branch[0].c['ICP']) except (AttributeError, ValueError, KeyError, TypeError, IndexError): pass # --- Path 2: single branch object with 'BR' as a key. try: return _normalize_icp(branch['BR'], branch.c['ICP']) except (AttributeError, KeyError, TypeError): pass # --- Path 3: IVP-style — find a labeled point on branch[0] whose nested # ``RG`` solution carries the BR field, and read it from there. try: icp = branch[0].c['ICP'] labels = branch[0].labels.by_index except (AttributeError, KeyError, TypeError, IndexError) as e: raise AttributeError( f"get_branch_info: cannot navigate {type(branch).__name__!s}; " f"none of the supported branch shapes matched. Last error: {e}" ) from e tried = [] for sol_key, sol_entry in labels.items(): tried.append(sol_key) try: br_id = sol_entry['RG']['solution']['data']['BR'] except (KeyError, TypeError): continue return _normalize_icp(br_id, icp) raise KeyError( f"get_branch_info: no labeled point on branch[0] carries an " f"RG solution with a 'BR' field. Tried {len(tried)} label " f"indices: {tried}." )
[docs]def get_solution_keys(branch: Any) -> List[str]: """Extract the names of all solutions on a branch. Parameters ---------- branch Solution branch object as returned by `auto`. Returns ------- List[str] List with solution names. """ keys = [] for idx in range(len(branch.data)): keys += [key for key, val in branch[idx].labels.by_index.items() if val and 'solution' in tuple(val.values())[0]] return keys
[docs]def get_point_idx(diag: list, point: int) -> int: """Extract list idx of correct diagnostics string for continuation point with index `point`. Parameters ---------- diag Diagnostics as stored by `auto` on solution objects. point Index of the solution of interest on the branch. Returns ------- int Point index for `diag`. """ idx = point while idx < len(diag)-1: diag_tmp = diag[idx]['Text'] if "Location of special point" in diag_tmp and "Convergence" not in diag_tmp: idx += 1 elif "NOTE:Retrying step" in diag_tmp or "Total Time " in diag_tmp: idx += 1 else: # find index of line after first appearance of BR diag_tmp = diag_tmp.split('\n') idx_line = 1 while idx_line < len(diag_tmp)-1: if 'BR' in diag_tmp[idx_line].split(' '): break idx_line += 1 diag_tmp = diag_tmp[idx_line+1] # find point number in text line_split = [d for d in diag_tmp.split(' ') if d != ""] if abs(int(line_split[1])) < point+1: idx += 1 elif abs(int(line_split[1])) == point+1: return idx else: raise ValueError(f"Point with index {point+1} was not found on solution. Last auto output line that " f"was checked: \n {diag_tmp}") return idx
# extract system state information from auto objects ####################################################
[docs]def get_point_diagnostics(s: Any) -> str: """Return the Auto-07p string that contains diagnostic data for a particular solution. Parameters ---------- s Solution object on the branch. Returns ------- str String with solution diagnostics. """ try: diag = s.b.branch.diagnostics.data point_idx = get_point_idx(diag, point=s.b.idx) diag = diag[point_idx]["Text"] except AttributeError as e: raise e # diag = branch[0].diagnostics.data # point_idx = get_point_idx(diag, point=point) # diag = diag[point_idx]['Text'] return diag
[docs]def parse_point_diagnostics(s: Any, diag: Optional[str] = None) -> dict: """Parse the auto-07p per-point diagnostic text block into structured fields. Used as a one-shot parser by `get_solution_stability` and `get_solution_eigenvalues` so multi-metric summaries (`_create_summary` with stability + eigenvalues + lyapunov) only pay the regex cost once per point. Parameters ---------- s Solution object as returned by auto-07p. diag Optional pre-fetched diagnostic text for `s` (skips the `get_point_diagnostics(s)` lookup). Pass this when the caller already has the text, e.g. when iterating multiple points and batching the fetch. Returns ------- dict with keys: ``stable`` bool or None — True if the solution is stable. None if no Eigenvalues/Multipliers line is present AND auto's PT-sign convention can't be read from `s.b`. ``eigenvalues`` list[complex] — eigenvalues (equilibrium continuation) or Floquet multipliers (limit-cycle continuation), in the order auto-07p emits them. Empty if the point did not converge or no spectrum is recorded. ``text`` str — the raw diagnostic block (kept for callers that still need to grep for things this parser doesn't surface, e.g. Hopf/Fold/BP function values). """ if diag is None: diag = get_point_diagnostics(s) out = {'text': diag, 'stable': None, 'eigenvalues': []} if _NO_CONVERGENCE in diag: return out # --- stability --- m = _STABLE_COUNT_RE.search(diag) if m: try: ndim = s.data['NDIM'] except (AttributeError, KeyError, TypeError): ndim = None if ndim is not None: out['stable'] = int(m.group(1)) >= ndim else: # No spectrum recorded — fall back to auto's native PT-sign convention # (auto encodes stability as the sign of the point index in fort.7). try: out['stable'] = s.b['PT'] > 0 except (AttributeError, KeyError, TypeError): pass # leave as None # --- eigenvalues / Floquet multipliers --- for m in _EIG_LINE_RE.finditer(diag): real, imag = m.group(2), m.group(3) try: out['eigenvalues'].append(complex(float(real), float(imag))) except ValueError: # malformed numeric in the diagnostic line — skip this entry but # keep parsing the rest pass return out
[docs]def get_solution_stability(s: Any) -> bool: """Return stability of a solution. Thin wrapper around `parse_point_diagnostics` — when extracting multiple diagnostic fields for the same point, call `parse_point_diagnostics` once and reuse its dict to avoid re-parsing the text. """ return bool(parse_point_diagnostics(s)['stable'])
[docs]def get_solution_variables(s: Any, variables: list, extract_time: bool = False) -> list: """Extract state variable values (time series) from a steady-state (periodic) solution object. Parameters ---------- s Solution object as returned by `auto`. variables Keys of the state variables to be extracted. extract_time If true, attempt to extract a time vector as well. Returns ------- list List of variable values/time series of the provided solution. """ if hasattr(s, 'b') and extract_time: s = s.b['solution'] solutions = [s.indepvararray] else: solutions = [] solutions = [s[v] for v in variables] + solutions return solutions
[docs]def get_solution_params(s: Any, params: list) -> list: """Extract parameter values from a solution object. Parameters ---------- s Solution object as returned by `auto`. params Keys of the parameters to be extracted. Returns ------- list List of parameter values. """ if hasattr(s, 'b'): s = s.b['solution'] return [s[p] for p in params]
[docs]def get_solution_eigenvalues(s: Any, branch: int = None, point: int = None) -> list: """Return eigenvalues (or Floquet multipliers, for periodic solutions) of a point. Thin wrapper around `parse_point_diagnostics`. The `branch` and `point` arguments are kept for backward compatibility but are no longer used — `parse_point_diagnostics` reads the diagnostic block via `s.b.branch` / `s.b.idx` directly. """ return parse_point_diagnostics(s)['eigenvalues']
[docs]def get_lyapunov_exponents(eigenvals, period) -> list: """Calculate Lyapunov exponents from eigenvalues/floquet multipliers. Parameters ---------- eigenvals Eigenvalue or floquet multiplier spectrum. period Period of the periodic solution, if `eigenvals` is a Floquet multiplier spectrum of a periodic solution. Returns ------- list Local Lyapunov exponent spectrum. """ return [np.real(np.log(ev)/period) if period else np.real(ev) for ev in eigenvals]
[docs]def fractal_dimension(lyapunov_exponents: list) -> float: """Calculates the fractal or information dimension of an attractor of a dynamical system from its lyapunov epxonents, according to the Kaplan-Yorke formula (Kaplan and Yorke, 1979). Parameters ---------- lyapunov_exponents List containing the lyapunov spectrum of a solution of a dynamical system. Returns ------- float Fractal dimension of the attractor of the system. """ LEs = np.sort(lyapunov_exponents)[::-1] if np.sum(LEs) > 0: return len(LEs) k = 0 for j in range(len(LEs)-1): k = j+1 if np.sum(LEs[:k]) < 0: k -= 1 break return k + np.sum(LEs[:k]) / np.abs(LEs[k])
[docs]def write_auto_dat( df: pd.DataFrame, path: Union[str, Path], *, columns: Optional[Iterable] = None, n_points: Optional[int] = None, period: Optional[float] = None, normalize_time: bool = True, ) -> Path: r"""Write a time-series ``DataFrame`` as an auto-07p ``.dat`` file. Auto-07p reads ``.dat`` files via the ``dat=`` argument of its python ``run()`` command to initialise periodic-orbit continuations from a user-supplied time series (one column of time + one column per state variable, whitespace-separated). This helper writes such a file from a ``pandas.DataFrame`` — most commonly the output of :meth:`pyrates.CircuitTemplate.run`, but any time-indexed numeric DataFrame works. Parameters ---------- df Time-indexed DataFrame. The index is interpreted as time; columns hold the state variables. A ``MultiIndex`` on columns is fine — every column entry survives into the output file in the order encountered (or the order given by ``columns``). path Output file path. A ``.dat`` suffix is appended if missing. columns Optional subset of columns to write, in the order they should appear. Defaults to every column of ``df``. n_points If given, linearly interpolate the time series onto this many evenly-spaced points before writing. Useful when the simulation produced more samples than auto-07p needs, or to ensure a regular mesh from an adaptive solver. Defaults to writing every input row. period Length (in the same units as the index) of one period of the orbit the file represents. Only consulted when ``normalize_time=True``. Defaults to ``df.index[-1] - df.index[0]`` — i.e. assumes the DataFrame already covers exactly one period. normalize_time When True (default), shift the time column so it starts at 0 and divide by ``period`` so it ends at 1. Auto-07p's default convention is a normalised ``[0, 1]`` mesh and the orbit's actual period lives in ``PAR(11)``; turn this off only if the consuming workflow expects raw simulation time. Returns ------- pathlib.Path The (possibly suffix-extended) path the file was written to. Notes ----- Recipe for hooking the file into a periodic-orbit continuation: .. code-block:: python write_auto_dat(sim_df, 'orbit') # → orbit.dat ode = ODESystem.from_template(template, ..., auto_constants=('lc',)) sols, cont = ode.run(c='lc', ICP=['eta', 11], dat='orbit', name='lc0') The ``dat='orbit'`` argument tells auto-07p to read ``orbit.dat`` as the starting time series; ``ICP=['eta', 11]`` puts the orbit's period in PAR(11) so it varies along the continuation. """ if not isinstance(df, pd.DataFrame): raise TypeError(f"expected pandas.DataFrame, got {type(df).__name__}") if len(df) < 2: raise ValueError("DataFrame must have at least 2 rows to define a time series") cols = list(df.columns) if columns is None else list(columns) if not cols: raise ValueError("at least one column must be selected") missing = [c for c in cols if c not in df.columns] if missing: raise KeyError(f"columns not found in DataFrame: {missing}") data = df[cols].to_numpy() if not np.issubdtype(data.dtype, np.number): raise TypeError( f"selected columns must be numeric; got dtype {data.dtype}" ) data = data.astype(float, copy=False) time = np.asarray(df.index, dtype=float) # Optional interpolation onto a regular mesh of n_points samples — done # before time normalisation so the interpolation uses the original spacing. if n_points is not None: if n_points < 2: raise ValueError("n_points must be at least 2") t_uniform = np.linspace(time[0], time[-1], n_points) data = np.column_stack([ np.interp(t_uniform, time, data[:, j]) for j in range(data.shape[1]) ]) time = t_uniform if normalize_time: T = period if period is not None else float(time[-1] - time[0]) if T <= 0: raise ValueError( f"cannot normalize time: period={T} is not positive" ) time = (time - time[0]) / T out_path = Path(path) if out_path.suffix != ".dat": out_path = out_path.with_suffix(out_path.suffix + ".dat") \ if out_path.suffix else out_path.with_suffix(".dat") # Format: free-form whitespace-separated, matching auto-07p's own demos # (e.g. pen.dat). 14-digit scientific notation gives ~12 significant # figures — well within double precision and trivial for auto's # free-format Fortran reader to consume. fmt = "%23.14E" table = np.column_stack([time, data]) np.savetxt(out_path, table, fmt=fmt) return out_path