Source code for neuroptimiser.utils

"""Utility functions and constants for the Neuroptimiser framework."""
__author__ = "Jorge M. Cruz-Duarte"
__email__ = "jorge.cruz-duarte@univ-lille.fr"
__all__ = [
    "from_yaml", "get_core_params", "process_parameters",
    "reset_all_processes",
    "tro2s", "trs2o",
    "get_arch_matrix",
    "get_2d_sys", "get_izhikevich_sys",
    "IZHIKEVICH_MODELS_KIND", "DYN_MODELS_KIND", "ADJ_MAT_OPTIONS"
]

import json
from copy import deepcopy

import numpy as np
import yaml

# Constants for Izhikevich models and dynamic systems
IZHIKEVICH_MODELS_KIND = ["RS", "IB", "CH", "FS", "TC", "TCn", "RZ", "LTS", "random"]

# Dynamic systems kinds for 2D systems
DYN_MODELS_KIND = ["saddle", "attractor", "repeller", "source", "sink"]

# Options for adjacency matrix generation
ADJ_MAT_OPTIONS = [
    "one-way-ring", "1dr", "ring",
    "two-way-ring", "2dr", "bidirectional-ring",
    "fully-connected", "all", "full",
    "random", "rand",
]


[docs] def reset_all_processes(*processes) -> None: """Reset all provided processes to their initial state. Arguments: *processes: Variable number of process instances to reset. Returns: None """ for proc in processes: if hasattr(proc, "reset"): proc.reset()
[docs] def tro2s(x: np.ndarray | float, lb: np.ndarray | float, ub: np.ndarray | float) -> np.ndarray | float: """Transform a value from the original scale to a normalized scale. Arguments: x: Value or array of values to transform. lb: Lower bound of the original scale. ub: Upper bound of the original scale. Returns: Normalized value or array of values in the range [-1, 1]. """ return 2 * (x - lb) / (ub - lb) - 1
[docs] def trs2o(x: np.ndarray | float, lb: np.ndarray | float, ub: np.ndarray | float) -> np.ndarray | float: """Transform a value from a normalized scale back to the original scale. Arguments: x: Normalized value or array of values in the range [-1, 1]. lb: Lower bound of the original scale. ub: Upper bound of the original scale. Returns: Value or array of values transformed back to the original scale. """ return (x + 1) / 2 * (ub - lb) + lb
[docs] def get_arch_matrix(length, topology: str = "ring", num_neighbours: int = None) -> np.ndarray: """Generate an adjacency matrix for a given topology. Arguments: length: Number of nodes in the network. topology: Type of network topology (e.g., "ring", "fully-connected", "random"). num_neighbours: Number of neighbours for random topology (if applicable). Returns: A square adjacency matrix representing the specified topology. """ base_matrix = np.eye(length, length) if length in (1, 2): return base_matrix if topology in ("one-way-ring", "1dr", "ring"): # 1d ring topology return np.roll(base_matrix, -1, 1) elif topology in ("two-way-ring", "2dr", "bidirectional-ring"): return np.roll(base_matrix, -1, 1) + np.roll(base_matrix, 1, 1) elif topology in ("fully-connected", "all", "full"): return np.ones((length, length)) - base_matrix elif topology in ("random", "rand"): if 0 < num_neighbours < length: # Randomly select the neighbours preserving the diagonal in zeros matrix = np.zeros((length, length)) for i in range(length): matrix[i, np.random.choice(np.delete(np.arange(length), i, 0), num_neighbours, replace=False)] = 1 return matrix else: raise ValueError(f"Invalid number of neighbours: {num_neighbours}") else: raise NotImplementedError("Topology not implemented yet (?)")
[docs] def get_2d_sys(kind="sink", trA_max=1.5, detA_max=3.0, eps=1e-6) -> np.ndarray: """Generate a 2D dynamic system matrix based on the specified kind. Arguments: kind: Type of dynamic system ("random", "saddle", "attractor", "repeller", "source", "sink", or "centre"). trA_max: Maximum trace value for the system matrix. detA_max: Maximum determinant value for the system matrix. eps: Small value to avoid division by zero or negative values. Returns: A 2x2 numpy array representing the system matrix. """ if kind == "random": _kind = np.random.choice(DYN_MODELS_KIND) return get_2d_sys(_kind, trA_max=trA_max, detA_max=detA_max, eps=eps) elif kind == "saddle": detA = np.random.uniform(-detA_max, eps) a = 2.0 * np.random.uniform(eps, trA_max) - 1.0 d = detA / a b = 2.0 * np.random.uniform(eps, trA_max) - 1.0 c = 0.0 else: abs_trA = np.random.uniform(eps, trA_max) trA = abs_trA if kind in ["repeller", "source"] else -abs_trA trAsq4 = (trA ** 2) / 4 if kind in ["attractor", "repeller"]: # discriminant = trA^2 - 4 (trA^2/4 - delta) = 4 delta > 0 (node) delta = np.random.uniform(eps, trAsq4 - eps) elif kind in ["source", "sink"]: # discriminant = trA^2 - 4 (trA^2/4 - delta) = 4 delta < 0 (spiral) delta = np.random.uniform(-trA_max, -eps) else: # Centre delta = 0.0 detA = trAsq4 - delta a = 2.0 * np.random.uniform(eps, trA_max) - 1.0 b = 2.0 * np.random.uniform(eps, trA_max) - 1.0 d = trA - a c = (a * d - detA) / b return np.array([[a, b], [c, d]])
[docs] def get_izhikevich_sys(kind="RS", scale=0.1) -> dict: """Get the parameters for an Izhikevich neuron model. Arguments: kind: Type of Izhikevich model (e.g., "RS", "IB", "CH", "FS", "TC", "TCn", "RZ", "LTS", or "random"). scale: Scale factor for random perturbation of parameters (default is 0.1). Returns: A dictionary containing the parameters of the Izhikevich model. """ if kind == "random": kind = np.random.choice(IZHIKEVICH_MODELS_KIND) + "r" return get_izhikevich_sys(kind) else: # Default parameters for Izhikevich model a = 0.02 b = 0.2 c = -65 d = 8.0 I = 0.1 vmin = -80. # [V] vmax = 30. umin = -20. # [V] umax = 0. Lt = 1.0 match kind: case "IB": c = -55 d = 4.0 case "CH": c = -50 d = 2.0 case "FS": a = 0.1 d = 2.0 case "TC": a = 0.02 b = 0.25 d = 0.05 I = 0.0 case "TCn": a = 0.02 b = 0.25 d = 0.05 I = -10.0 case "RZ": a = 0.1 b = 0.26 d = 2.0 case "LTS": a = 0.2 b = 0.25 d = 2.0 case _: pass # RS coeffs = { "a": a, "b": b, "c": c, "d": d, "I": I, "vmin": vmin, "vmax": vmax, "umin": umin, "umax": umax, "Lt": Lt, } if kind[-1] == "r": for key in ["a", "b", "c", "d", "I"]: value = coeffs[key] new_value = value + np.random.randn() * abs(value) * scale coeffs[key] = new_value return coeffs
[docs] def get_core_params(core_cfg, **kwargs): _num_steps = int(kwargs.get("num_steps", 500)) # Default core parameters cdp_alpha = float(core_cfg.get("alpha", 1.0)) cdp_dt = float(core_cfg.get("dt", 0.01)) cdp_max_steps = core_cfg.get("max_steps", None) if cdp_max_steps is None: cdp_max_steps = _num_steps cdp_noise_std = core_cfg.get("noise_std", (0.0, 0.3)) cdp_ref_mode = core_cfg.get("ref_mode", "pg") cdp_is_bounded = bool(core_cfg.get("is_bounded", True)) _hl_selector_params = core_cfg.get("selector", {}) threshold_cfg = core_cfg.get("threshold", {}) cdp_thr_mode = threshold_cfg.get("thr_mode", "diff_pg") cdp_thr_alpha = float(threshold_cfg.get("thr_alpha", 2.0)) cdp_thr_min = float(threshold_cfg.get("thr_min", 1e-6)) cdp_thr_max = float(threshold_cfg.get("thr_max", 2.0)) cdp_thr_k = float(threshold_cfg.get("thr_k", 0.05)) spiking_cfg = core_cfg.get("spiking", {}) cdp_spk_cond = spiking_cfg.get("spk_cond", "l2") cdp_spk_alpha = float(spiking_cfg.get("spk_alpha", 0.25)) cdp_spk_q_ord = spiking_cfg.get("spk_q_ord", 2) cdp_spk_weights = spiking_cfg.get("spk_weights", [1.0, 1.0]) hd_operator_cfg = core_cfg.get("hd_operator", {}) cdp_name = hd_operator_cfg.get("name", "linear") cdp_coeffs = hd_operator_cfg.get("coeffs", "sink") cdp_approx = hd_operator_cfg.get("approx", "rk4") hs_operator_cfg = core_cfg.get("hs_operator", {}) cdp_hs_operator = hs_operator_cfg.get("name", "differential") cdp_hs_variant = hs_operator_cfg.get("variant", "current-to-rand") selector_cfg = core_cfg.get("selector", {}) cdp_sel_mode = selector_cfg.get("sel_mode", "greedy") # Core parameters dictionary _core_params = { "alpha": cdp_alpha, "dt": cdp_dt, "max_steps": cdp_max_steps, "noise_std": cdp_noise_std, "ref_mode": cdp_ref_mode, "is_bounded": cdp_is_bounded, "name": cdp_name, "coeffs": cdp_coeffs, "approx": cdp_approx, "thr_mode": cdp_thr_mode, "thr_alpha": cdp_thr_alpha, "thr_min": cdp_thr_min, "thr_max": cdp_thr_max, "thr_k": cdp_thr_k, "spk_cond": cdp_spk_cond, "spk_alpha": cdp_spk_alpha, "spk_q_ord": cdp_spk_q_ord, "spk_weights": cdp_spk_weights, "hs_operator": cdp_hs_operator, "hs_variant": cdp_hs_variant, "sel_mode": cdp_sel_mode, } return _core_params, _hl_selector_params
[docs] def process_parameters(_experiment_config): # Extract nested config problems_cfg = _experiment_config.get("metadata", {}) optimiser_cfg = _experiment_config.get("optimiser", {}) full_core_cfg = _experiment_config.get("core_parameters", {}) default_core_cfg = full_core_cfg.get("default", {}) # Problem setup name = problems_cfg.get("name", "unnamed experiment") description = problems_cfg.get("description", "") debug_mode = bool(problems_cfg.get("debug_mode", False)) suite_name = problems_cfg.get("suite_name", "bbob") func_indices = problems_cfg.get("func_indices", None) num_dimensions = problems_cfg.get("num_dimensions", [2]) instances = problems_cfg.get("instances", [1]) budget_multiplier = int(problems_cfg.get("budget_multiplier", 1)) # Optimiser setup num_steps = int(float(optimiser_cfg.get("num_steps", 500))) num_agents = int(optimiser_cfg.get("num_agents", 30)) spiking_core = optimiser_cfg.get("spiking_core", "TwoDimSpikingCore") neighbourhood_cfg = optimiser_cfg.get("neighbourhood", {}) num_neighbours = int(neighbourhood_cfg.get("num_neighbours", 10)) neuron_topology = neighbourhood_cfg.get("neuron_topology", "2dr") unit_topology = neighbourhood_cfg.get("unit_topology", "random") # Generate the config parameters _config_params = { "num_iterations": num_steps, "num_agents": num_agents, "spiking_core": spiking_core, "num_neighbours": num_neighbours, "neuron_topology": neuron_topology, "unit_topology": unit_topology, } # Generate the suite config _experiment_params = { "name": name, "description": description, "debug_mode": debug_mode, "suite_name": suite_name, "budget_multiplier": budget_multiplier, "func_indices": func_indices, "instance": instances, "dimensions": num_dimensions, } # Identify the custom core parameters and process them custom_core_keys = [k for k in full_core_cfg.keys() if k.startswith("core_")] custom_core_weights = np.array([float(full_core_cfg[k].get("weight", -1)) for k in custom_core_keys]) num_custom_cores = len(custom_core_keys) if np.any(custom_core_weights == -1): raise ValueError("Custom core parameters must have a weight specified.") sum_core_weights = np.sum(custom_core_weights) custom_core_weights = custom_core_weights / sum_core_weights # From num_agents decide which has a config _core_params = [] if num_custom_cores > 0: for custom_core_key in np.random.choice( a=custom_core_keys, size=num_agents, p=custom_core_weights): # Get the two dicts dcp = deepcopy(default_core_cfg) # default core parameters ccp = full_core_cfg.get(custom_core_key, {}) # custom core parameters # Override the default one dcp.update(ccp) # Process the parameters unit_core_parameters, unit_sel_parameters = get_core_params(dcp, num_steps=num_steps) _core_params.append(unit_core_parameters) # print(f"{custom_core_key} with {ccp}") else: for _ in range(num_agents): dcp = deepcopy(default_core_cfg) unit_core_parameters, unit_sel_parameters = get_core_params(dcp, num_steps=num_steps) _core_params.append(unit_core_parameters) # Return the relevant dicts return _experiment_params, _config_params, _core_params
[docs] def from_yaml(yaml_path: str, disp: bool = False) -> tuple[dict, dict, dict]: """Load a YAML file and return its contents as a dictionary. Arguments: yaml_path: Path to the YAML file. disp: If True, print the loaded configuration for debugging purposes. Returns: A dictionary containing the contents of the YAML file. """ if not yaml_path.endswith(".yaml") and not yaml_path.endswith(".yml"): raise ValueError("The file must have a .yaml or .yml extension.") with open(yaml_path, 'r') as file: experiment_config = yaml.safe_load(file) experiment_params, config_params, core_params = process_parameters(experiment_config) if disp: print(f"\nLoaded experiment configuration from YAML: {yaml_path}") print("=" * 40) print("Experiment parameters:") print("-" * 40) for key, value in experiment_params.items(): print(f" {key}: {value}") print("\nConfig parameters:") print("-" * 40) for key, value in config_params.items(): print(f" {key}: {value}") print("\nDefault Core parameters:") print("-" * 40) for key, value in core_params[0].items(): print(f" {key}: {value}") print("\nFull core parameters for all units:") print("-" * 40) for i, cp in enumerate(core_params): print(f" NHU {i+1}:") for key, value in cp.items(): print(f" {key}: {value}") print("=" * 40) return experiment_params, config_params, core_params
_DEFAULT_CORE_PARAMS = dict( alpha = 1.0, dt = 0.01, max_steps = 100, noise_std = (0.0, 0.3), ref_mode = "pgn", is_bounded = True, name = "linear", coeffs = "random", approx = "rk4", thr_mode = "fixed", thr_alpha = 1.0, thr_min = 1e-6, thr_max = 1.0, thr_k = 0.05, spk_cond = "fixed", spk_alpha = 0.25, hs_operator = "fixed", hs_variant = "current-to-rand", ) _DEFAULT_CONFIG_PARAMS = dict( num_iterations=100, num_agents=30, spiking_core="TwoDimSpikingCore", num_neighbours=10, neuron_topology="2dr", unit_topology="random", ) def from_optuna_json(json_path: str, disp: bool = False) -> tuple[dict, dict, dict, list]: """Load an Optuna JSON file and return its contents as a dictionary. Arguments: json_path: Path to the JSON file. disp: If True, print the loaded configuration for debugging purposes. Returns: A tuple containing: - config_params: Dictionary of configuration parameters. - core_params: List of dictionaries for each unit's core parameters. - raw_params: Original parameters from the JSON file. - full_core_cfg: Full core configuration including defaults and custom cores. """ if not json_path.endswith(".json"): raise ValueError("The file must have a .json extension.") with open(json_path, 'r') as file: trial_data = json.load(file) params = trial_data.get("params", {}) # Remove all keys that do not start with "core*" config_params = {k: v for k, v in params.items() if not k.startswith("core")} # For each core parameter, create a dictionary _core_params = {k: v for k, v in params.items() if k.startswith("core")} num_custom_cores = len(set([k.split('_')[0] for k in _core_params.keys()])) # Normalize the weights because they might not sum to 1, optuna does not enforce that but during experiment we do internally custom_core_weights = np.array([_core_params[f"core{i+1}_weight"] for i in range(num_custom_cores)]) sum_core_weights = np.sum(custom_core_weights) proportion_dist = custom_core_weights / sum_core_weights full_core_cfg = {"default": {}} for i in range(num_custom_cores): full_core_cfg[f"core_{i+1}"] = { "weight": proportion_dist[i] } for key, value in _core_params.items(): if key.startswith(f"core{i+1}_") and key != f"core{i+1}_weight": param_key = key.replace(f"core{i+1}_", "") register = True if param_key == "hd_operator": param_key = "name" if param_key.startswith("coeffs_"): param_key = "coeffs" if param_key == "izh_small_var": full_core_cfg[f"core_{i + 1}"]["coeffs"] += "r" register = False if param_key == "coeff_all": full_core_cfg[f"core_{i + 1}"]["coeffs"] += "_all" register = False if param_key.startswith("hs_variant_"): param_key = "hs_variant" if register: full_core_cfg[f"core_{i+1}"][param_key] = value # Print the loaded configuration for debugging purposes if disp: print(f"\nLoaded experiment configuration from Optuna JSON: {json_path}") print("=" * 40) print("Config parameters:") print("-" * 40) for key, value in config_params.items(): print(f" {key}: {value}") print("\nDefault Core parameters:") print("-" * 40) for key, value in full_core_cfg["default"].items(): print(f" {key}: {value}") print("\nFull core parameters for each kind of unit:") print("-" * 40) for i in range(num_custom_cores): cp = full_core_cfg[f"core_{i+1}"] print(f" NHU {i+1}:") for key, value in cp.items(): print(f" {key}: {value}") print("=" * 40) # Ensure all core parameters have a value, if not use the default one proc_config_params = deepcopy(_DEFAULT_CONFIG_PARAMS) proc_config_params.update(config_params) # Define the kind of core parameters num_agents = proc_config_params["num_agents"] custom_params_indices = np.random.choice( a=range(num_custom_cores), size=num_agents, p=proportion_dist) # Assign the core parameters to each unit proc_core_params = [] for i in range(num_agents): dcp = deepcopy(_DEFAULT_CORE_PARAMS) idx = custom_params_indices[i] + 1 ccp = full_core_cfg.get(f"core_{idx}", {}) dcp.update(ccp) proc_core_params.append(dcp) return proc_config_params, proc_core_params, config_params, full_core_cfg