Source code for josiann.storage.trace

# Created on 03/02/2022 10:40
# Author : matteo

# ====================================================
# imports
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import numpy as np
import numpy.typing as npt

import josiann.typing as jot
from josiann.errors import ShapeError
from josiann.storage.parameters import SAParameters

logger = logging.getLogger(__name__)

PLOTTING_ENABLED = False

try:
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots

except ImportError:
    logger.info("Plotly is not installed, consider installing it or running 'pip install josiann[plot]'.")

else:
    PLOTTING_ENABLED = True


# ====================================================
# code
@dataclass
class Position:
    x: npt.NDArray[np.float64]  # (nb_walkers, nb_dimensions)
    cost: npt.NDArray[np.float64]  # (nb_walkers,)
    n: npt.NDArray[np.float64]  # (nb_walkers,)


class PositionTrace:
    # region magic methods
    def __init__(
        self,
        run_parameters: SAParameters,
        nb_iterations: int,
        nb_walkers: int,
        nb_dimensions: int,
        initial_position: npt.NDArray[np.float64],
        initial_cost: npt.NDArray[np.float64],
    ):
        """
        Args:
            nb_iterations: number of expected iterations for the SA algorithm.
            nb_walkers: number of walkers in parallel.
            nb_dimensions: number of dimensions per problem.
            initial_position: initial positions before running the SA algorithm.
            initial_cost: cost of initial positions.
        """
        self.parameters = run_parameters
        self.nb_walkers = nb_walkers
        self.nb_dimensions = nb_dimensions

        # /!\ store nb_iter + 1 values (initial values + iterations)
        self.position_trace = np.zeros((nb_iterations + 1, nb_walkers, nb_dimensions), dtype=np.float64)
        self.explored_trace = np.zeros((nb_iterations + 1, nb_walkers, nb_dimensions), dtype=np.float64)
        self.best_position_trace = np.zeros((nb_iterations + 1, nb_walkers, nb_dimensions + 2), dtype=np.float64)
        self.cost_trace = np.zeros((nb_iterations + 1, nb_walkers), dtype=np.float64)
        self.cost_trace_n = np.zeros((nb_iterations + 1, nb_walkers), dtype=np.int64)
        self.explored_cost_trace = np.zeros((nb_iterations + 1, nb_walkers), dtype=np.float64)
        self._accepted = np.zeros((nb_iterations + 1, nb_walkers), dtype=bool)
        self.rescued = np.zeros((nb_iterations + 1, nb_walkers), dtype=np.float64)

        self.converged_at_iteration = -1 * np.ones(nb_walkers)

        # store initial values
        self.position_trace[0] = initial_position
        self.explored_trace[0] = initial_position
        self.cost_trace[0] = initial_cost
        self.cost_trace_n[0] = 1
        self.explored_cost_trace[0] = initial_cost
        self._accepted[0] = True

        self._set_best(-1, 1)

    # endregion

    # region attributes
    @property
    def converged(self) -> npt.NDArray[np.bool_]:
        return self.converged_at_iteration > -1

    @property
    def nb_iterations(self) -> int:
        return self.position_trace.shape[0] - 1

    # endregion

    # region methods
    def finalize(self, iteration: int) -> None:
        """
        Cleanup trace at the end of the SA algorithm.
        """
        self.position_trace = self.position_trace[: iteration + 2]
        self.explored_trace = self.explored_trace[: iteration + 2]
        self.best_position_trace = self.best_position_trace[: iteration + 2]
        self.cost_trace = self.cost_trace[: iteration + 2]
        self.cost_trace_n = self.cost_trace_n[: iteration + 2]
        self.explored_cost_trace = self.explored_cost_trace[: iteration + 2]
        self._accepted = self._accepted[: iteration + 2]
        self.rescued = self.rescued[: iteration + 2]

    def store(
        self,
        iteration: int,
        position: npt.NDArray[np.float64 | np.int64],
        costs: npt.NDArray[np.float64],
        current_n: int,
        accepted: npt.NDArray[np.bool_],
        explored: npt.NDArray[np.float64 | np.int64],
        explored_costs: npt.NDArray[np.float64],
    ) -> None:
        """
        Save the current position of the vector to optimize, the current cost, temperature and number of averaged
            function evaluations.

        Args:
            iteration: iteration index for storing the data.
            position: the current positions.
            costs: the current costs.
            current_n: the current number of averaged function evaluations.
            accepted: were the current propositions accepted ?
            explored: the array of explored propositions.
            explored_costs: costs associated to explored positions.

        Returns:
            The index at which was stored the data.
        """
        self.position_trace[iteration + 1] = position
        self.explored_trace[iteration + 1] = explored

        self.cost_trace[iteration + 1] = costs
        self.explored_cost_trace[iteration + 1] = explored_costs

        self.cost_trace_n[iteration + 1, accepted] = current_n
        self.cost_trace_n[iteration + 1, ~accepted] = self.cost_trace_n[iteration, ~accepted]

        self._accepted[iteration + 1] = accepted

        self._set_best(iteration, current_n)
        self._update_convergence(iteration)

    def update(
        self,
        iteration: int,
        position: npt.NDArray[jot.DType],
        costs: npt.NDArray[np.float64],
        last_ns: npt.NDArray[np.int64],
        rescued: npt.NDArray[np.bool_],
    ) -> None:
        """
        Updates positions and costs. Also store new information.

        Args:
            iteration: iteration index for storing the data.
            position: the current vector.
            costs: the current costs.
            last_ns: the number of averaged function evaluations.
            rescued: were the walkers rescued at this iteration ?
        """
        if np.any(rescued):
            self.position_trace[iteration + 1, rescued] = position[rescued]
            self.cost_trace[iteration + 1, rescued] = costs[rescued]

            self.rescued[iteration + 1] = np.array(rescued)

            self.cost_trace_n[iteration + 1] = last_ns

    def _set_best(self, iteration: int, current_n: int) -> None:
        """
        Get the best vector with associated cost and iteration at which it was reached.

        Args:
            iteration: iteration index at which to get the best vector.
            current_n: the current number of averaged function evaluations.

        Returns:
            The best vector, the best cost and iteration number that reached it.
        """
        _START = max(0, iteration - self.parameters.window_size + 2)
        _STOP = iteration + 2

        lookup_array = self.cost_trace[_START:_STOP].copy()

        start_lookup_index = np.repeat(_START, self.nb_walkers)

        # # correct lookup array for walkers that already converged
        # last_valid_indices = np.argmax(np.isnan(self.cost_trace), axis=0) - 1
        #
        # for walker_index in range(self.nb_walkers):
        #     if last_valid_indices[walker_index] != -1:
        #         lookup_array[:, walker_index] = self.cost_trace[last_valid_indices[walker_index], walker_index]
        #         start_lookup_index[walker_index] = last_valid_indices[walker_index]

        # normalize lookup array to account for variance drop
        lookup_array *= current_n / self.cost_trace_n[_START:_STOP]

        # ---------------------------------------------------------------------
        # where walkers have converged, copy previous best position
        self.best_position_trace[iteration + 1, self.converged, : self.nb_dimensions] = self.best_position_trace[
            iteration, self.converged, : self.nb_dimensions
        ]
        self.best_position_trace[iteration + 1, self.converged, self.nb_dimensions] = self.best_position_trace[
            iteration, self.converged, self.nb_dimensions
        ]
        self.best_position_trace[iteration + 1, self.converged, self.nb_dimensions + 1] = self.best_position_trace[
            iteration, self.converged, self.nb_dimensions + 1
        ]

        # ---------------------------------------------------------------------
        # where walkers have not converged, find best position
        best_iteration = np.nanargmin(lookup_array[:, ~self.converged], axis=0) + start_lookup_index[~self.converged]

        # find first occurence of that best iteration (/!\ might be before _START)
        best_iteration = np.argmax(
            self.cost_trace[:_STOP, ~self.converged] == self.cost_trace[best_iteration, ~self.converged],
            axis=0,
        )

        self.best_position_trace[iteration + 1, ~self.converged, : self.nb_dimensions] = self.position_trace[
            best_iteration, ~self.converged
        ]
        self.best_position_trace[iteration + 1, ~self.converged, self.nb_dimensions] = self.cost_trace[
            best_iteration, ~self.converged
        ]
        self.best_position_trace[iteration + 1, ~self.converged, self.nb_dimensions + 1] = self.cost_trace_n[
            best_iteration, ~self.converged
        ]

    def _update_convergence(self, iteration: int) -> None:
        """
        For each parallel problem, has the cost trace reached convergence within a tolerance margin ?

        Args:
            iteration: current iteration number.
        """
        if not self.parameters.base.detect_convergence or iteration < self.parameters.window_size:
            return

        position_slice = slice(iteration - self.parameters.window_size, iteration)

        mean_window = np.mean(self.cost_trace[position_slice], axis=0)
        RMSD = np.sqrt(
            np.sum((self.cost_trace[position_slice] - mean_window) ** 2, axis=0) / (self.parameters.window_size - 1)
        )

        self.converged_at_iteration[(RMSD < self.parameters.base.tol) & (self.converged_at_iteration == -1)] = iteration

    def get_best(self, iteration: int | None = None) -> Position:
        if iteration is None:
            iteration = self.nb_iterations - 1

        return Position(
            self.best_position_trace[iteration + 1, :, : self.nb_dimensions].copy(),
            self.best_position_trace[iteration + 1, :, self.nb_dimensions].copy(),
            self.best_position_trace[iteration + 1, :, self.nb_dimensions + 1].copy(),
        )

    def get_best_all(self, iteration: int | None = None) -> Position:
        best = self.get_best(iteration)
        best_index = np.argmin(best.cost)

        return Position(best.x[best_index], best.cost[best_index], best.n[best_index])

    def mean_acceptance_fraction(self, iteration: int) -> float:
        """
        Get the mean proportion of accepted proposition in the last <window_size> propositions over all walkers.

        Args:
            iteration: current iteration number.

        Returns:
            The mean proportion of accepted proposition in the last <window_size> propositions over all walkers.
        """
        if iteration < self.parameters.window_size:
            return np.nan

        _START = iteration - self.parameters.window_size + 1
        _STOP = iteration + 1

        return float(
            np.mean(
                [np.sum(self._accepted[_START:_STOP, w]) / self.parameters.window_size for w in range(self.nb_walkers)]
            )
        )

    def are_stuck(self, iteration: int) -> npt.NDArray[np.bool_]:
        """
        Detect which walkers are stuck at the same position within the last <window_size> positions.

        Args:
            iteration: current iteration number.

        Returns:
            The list of stuck walkers.
        """
        if iteration < self.parameters.window_size:
            return np.zeros(self.nb_walkers, dtype=bool)

        _START = iteration - self.parameters.window_size + 1
        _STOP = iteration + 1

        return np.array([np.sum(self._accepted[_START:_STOP, w]) == 0 for w in range(self.nb_walkers)])

    # endregion


class ParameterTrace:
    # region magic methods
    def __init__(self, nb_iterations: int):
        """
        Args:
            nb_iterations: number of expected iterations for the SA algorithm.
        """
        self.temperature_trace = np.zeros(nb_iterations, dtype=np.float32)
        self.n_trace = np.zeros(nb_iterations, dtype=np.int32)
        self.sigma_trace = np.zeros(nb_iterations, dtype=np.float32)
        self.computation_time = np.zeros(nb_iterations, dtype=np.float32)

    # endregion

    # region attributes
    @property
    def nb_iterations(self) -> int:
        return self.temperature_trace.shape[0]

    # endregion

    # region methods
    def finalize(self, iteration: int) -> None:
        """
        Cleanup trace at the end of the SA algorithm.
        """
        self.temperature_trace = self.temperature_trace[: iteration + 1]
        self.n_trace = self.n_trace[: iteration + 1]
        self.sigma_trace = self.sigma_trace[: iteration + 1]
        self.computation_time = self.computation_time[: iteration + 1]

    def store(
        self,
        iteration: int,
        temperature: float,
        n: int,
        sigma: float,
        computation_time: float,
    ) -> None:
        """
        Save the current position of the vector to optimize, the current cost, temperature and number of averaged
            function evaluations.

        Args:
            iteration: iteration index for storing the data.
            temperature: the current temperature.
            n: the current number of averaged function evaluations.
            sigma: the current estimated standard deviation.
            computation_time: the time required for computing this iteration.

        Returns:
            The index at which was stored the data.
        """
        self.temperature_trace[iteration] = float(temperature)
        self.n_trace[iteration] = int(n)
        self.sigma_trace[iteration] = float(sigma)
        self.computation_time[iteration] = float(computation_time)

    # endregion


[docs] class Trace(ABC): """ Object for storing the trace history of an SA run. """ # region magic methods
[docs] def __init__( self, nb_iterations: int, nb_walkers: int, nb_dimensions: int, run_parameters: SAParameters, initial_position: npt.NDArray[np.float64], initial_cost: npt.NDArray[np.float64], ): """ Instantiate a Trace. Args: nb_iterations: number of expected iterations for the SA algorithm. nb_walkers: number of walkers that run in parallel. nb_dimensions: number of dimensions per optimization problem. run_parameters: parameters used for running the SA algorithm. initial_position: initial positions before running the SA algorithm. initial_cost: cost of initial positions. """ # trace values that will change during SA execution self.positions = PositionTrace( run_parameters, nb_iterations, nb_walkers, nb_dimensions, initial_position, initial_cost, ) self.parameters = ParameterTrace(nb_iterations)
@abstractmethod def __repr__(self) -> str: pass # endregion # region attributes @property def nb_walkers(self) -> int: """Number of walkers that run in parallel.""" return self.positions.nb_walkers @property def nb_dimensions(self) -> int: """Number of dimensions per optimization problem.""" return self.positions.nb_dimensions @property def nb_iterations(self) -> int: """Number of iterations that the SA algorithm run for.""" return self.positions.nb_iterations # endregion # region methods
[docs] def finalize(self, iteration: int) -> None: """ When the SA algorithm terminates, finalize() is called on position and parameter traces to delete rows for iterations that where never run. Args: iteration: final iteration that was computed before termination. :meta private: """ self.positions.finalize(iteration) self.parameters.finalize(iteration)
[docs] @abstractmethod def plot_positions( self, save: Path | None = None, true_values: npt.NDArray[Any] | None = None, show: bool = True, walker_titles: Sequence[str] | None = None, dimension_titles: Sequence[str] | None = None, ) -> None: """ Plot reached positions and costs for the vector to optimize along iterations. Args: save: optional path to save the plot as a html file. true_values: an optional sequence of known true values for each dimension of the vector to optimize. show: render the plot ? walker_titles: an optional list of sub-plot titles, one title per parallel walker. dimension_titles: an optional list of sub-plot titles, one title per dimension. """ pass
[docs] def plot_parameters(self, save: Path | str | None = None, show: bool = True) -> None: """ Plot temperature, number of repeats per iteration and number of averaged function evaluations along iterations. Args: save: optional path to save the plot as a html file. show: render the plot ? """ if not PLOTTING_ENABLED: raise ImportError("Plotly is not installed.") titles = ["Temperature", "sigma", "n", "Computation time (s)"] fig = make_subplots( rows=4, cols=1, shared_xaxes=True, subplot_titles=titles, vertical_spacing=0.1, ) if self.nb_iterations: fig.add_trace( go.Scatter( x=list(range(1, self.nb_iterations + 1)), y=self.parameters.temperature_trace, name="T", hovertext=[ f"<b>Temperature</b>: {_T:.4f}<br><b>Iteration</b>: {iteration + 1}" for iteration, _T in enumerate(self.parameters.temperature_trace) ], hoverinfo="text", showlegend=False, ), row=1, col=1, ) fig.add_trace( go.Scatter( x=list(range(1, self.nb_iterations + 1)), y=self.parameters.sigma_trace, name="sigma", hovertext=[ f"<b>Sigma</b>: {_sigma:.4f}<br><b>Iteration</b>: {iteration + 1}" for iteration, _sigma in enumerate(self.parameters.sigma_trace) ], hoverinfo="text", showlegend=False, ), row=2, col=1, ) fig.add_trace( go.Scatter( x=list(range(1, self.nb_iterations + 1)), y=self.parameters.n_trace, name="n", hovertext=[ f"<b>Number evaluations</b>: {_n}<br><b>Iteration</b>: {iteration + 1}" for iteration, _n in enumerate(self.parameters.n_trace) ], hoverinfo="text", showlegend=False, ), row=3, col=1, ) fig.add_trace( go.Scatter( x=list(range(1, self.nb_iterations + 1)), y=self.parameters.computation_time, name="T", hovertext=[ f"<b>Time</b>: {_time:.4f}<br><b>Iteration</b>: {iteration + 1}" for iteration, _time in enumerate(self.parameters.computation_time) ], hoverinfo="text", showlegend=False, ), row=4, col=1, ) fig.update_layout( height=800, width=600, margin=dict(t=40, b=10, l=10, r=10), paper_bgcolor="#FFF", plot_bgcolor="#FFF", font_color="#000000", ) for i in range(4): fig.layout.annotations[i].update(x=0.025, xanchor="left") if show: fig.show() if save is not None: fig.write_html(str(save))
# endregion class OneTrace(Trace): """ Object for storing the trace history of an SA run. """ # region magic methods def __repr__(self) -> str: return ( f"Trace of {self.nb_iterations} iteration(s), {self.nb_walkers} walker(s) and " f"{self.nb_dimensions} dimension(s)." ) # endregion # region methods def plot_positions( self, save: Path | None = None, true_values: npt.NDArray[Any] | None = None, show: bool = True, walker_titles: Sequence[str] | None = None, dimension_titles: Sequence[str] | None = None, ) -> None: """ Plot reached positions and costs for the vector to optimize along iterations. Args: save: optional path to save the plot as a html file. true_values: an optional sequence of known true values for each dimension of the vector to optimize. show: render the plot ? (default True) walker_titles: an optional list of sub-plot titles, one title per parallel walker. (default None) dimension_titles: an optional list of sub-plot titles, one title per dimension. (default None) """ if true_values is not None and len(true_values) != self.nb_dimensions: raise ShapeError( f"The vector of true values should have {self.nb_dimensions} dimensions, not {len(true_values)}." ) # if subplot_titles is not None: # if len(subplot_titles) != self.nb_dimensions: # raise ShapeError(f'Expected {self.nb_dimensions} sub-plot titles, got {len(subplot_titles)}.') titles = ["Costs", "n", "Best cost evolution"] if walker_titles is not None: # titles += [f'{subplot_titles[i]}' for i in range(self.nb_dimensions)] pass else: titles += [f"Dimension {i}" for i in range(self.nb_dimensions)] fig = make_subplots( rows=self.nb_dimensions + 3, cols=1, shared_xaxes=True, subplot_titles=titles, vertical_spacing=0.05, ) for w in range(self.nb_walkers): fig.add_trace( go.Scatter( x=list(range(self.nb_iterations)), y=self.positions.cost_trace[:, w], name=f"Walker #{w}", marker=dict(color="rgba(0, 0, 200, 0.3)"), hovertext=[ f"<b>Walker</b>: {w}<br><b>Cost</b>: {cost:.4f}<br><b>Iteration</b>: {iteration}" for iteration, cost in enumerate(self.positions.cost_trace[:, w]) ], hoverinfo="text", showlegend=True, legendgroup=f"Walker #{w}", ), row=1, col=1, ) fig.add_trace( go.Scatter( x=list(range(self.nb_iterations)), y=self.positions.cost_trace_n[:, w], name=f"Walker #{w}", marker=dict(color="rgba(0, 0, 200, 0.3)"), hovertext=[ f"<b>Walker</b>: {w}<br><b>n at cost evaluation</b>: {cost_n}<br><b>Iteration</b>: {iteration}" for iteration, cost_n in enumerate(self.positions.cost_trace_n[:, w]) ], hoverinfo="text", showlegend=False, legendgroup=f"Walker #{w}", ), row=2, col=1, ) # Best cost best_costs = np.zeros(self.nb_iterations + 1) with np.errstate(divide="ignore", invalid="ignore"): for it in range(self.nb_iterations + 1): best = self.positions.get_best(it - 1) best_costs[it] = best.cost[w] * np.insert(self.parameters.n_trace, 0, 1)[it] / best.n[w] fig.add_trace( go.Scatter( x=list(range(self.nb_iterations + 1)), y=best_costs, name="Best cost evolution", marker=dict(color="rgba(252, 196, 25, 1.)"), hovertext=[ f"<b>Walker</b>: {w}<br><b>Cost</b>: {cost}<br><b>Iteration</b>: {iteration}" for iteration, cost in enumerate(best_costs) ], hoverinfo="text", showlegend=False, ), row=3, col=1, ) for d in range(self.nb_dimensions): fig.add_trace( go.Scatter( x=list(range(self.nb_iterations)), y=self.positions.position_trace[:, w, d], marker=dict(color="rgba(0, 0, 0, 0.3)"), name=f"Walker #{w}", hovertext=[ f"<b>Walker</b>: {w}<br>" f"<b>Position</b>: " f"{self.positions.position_trace[iteration, w, d]:.4f}<br>" f"<b>Cost</b>: {cost:.4f}<br>" f"<b>Iteration</b>: {iteration}" for iteration, cost in enumerate(self.positions.cost_trace[:, w]) ], hoverinfo="text", showlegend=False, legendgroup=f"Walker #{w}", ), row=d + 4, col=1, ) # add rescue points rescue_iterations = np.where(self.positions.rescued[:, w])[0] fig.add_trace( go.Scatter( x=rescue_iterations, y=self.positions.position_trace[rescue_iterations, w, d], mode="markers", marker=dict(color="rgba(0, 255, 0, 0.3)", symbol=2, size=10), name=f"Rescues for walker #{w}", hovertext=[ f"<b>Walker</b>: {w}<br>" f"<b>Position</b>: " f"{self.positions.position_trace[iteration, w, d]:.4f}<br>" f"<b>Cost</b>: {self.positions.cost_trace[iteration, w]:.4f}<br>" f"<b>Iteration</b>: {iteration}" for iteration in rescue_iterations ], hoverinfo="text", showlegend=False, legendgroup=f"Walker #{w}", ), row=d + 4, col=1, ) # add best points # fig.add_trace(go.Scatter(x=list(range(self.nb_iterations)), # y=self.positions.best_position_trace[:, d], # mode='markers', # marker=dict(color='rgba(252, 196, 25, 1.)', # symbol=0, # size=3), # name='Best cost', # hovertext=[ # f"<b>Walker</b>: " # f"{int(self.positions.best_position_trace[iteration, -2])}<br>" # f"<b>Position</b>: {position:.4f}<br>" # f"<b>Cost</b>: " # f"{self.positions.best_position_trace[iteration, -3]:.4f}<br>" # f"<b>Iteration</b>: {iteration}" # for iteration, position in enumerate(self.positions.best_position_trace[:, d]) # ], # hoverinfo="text", # showlegend=d == 0, # legendgroup='Best cost'), row=d + 4, col=1) if true_values is not None: fig.add_trace( go.Scatter( x=[0, self.nb_iterations], y=[true_values[d], true_values[d]], mode="lines", marker=dict(color="rgba(200, 0, 0, 1)"), line=dict(dash="dash"), name="True value", showlegend=False, ), row=d + 4, col=1, ) fig.add_annotation( x=self.nb_iterations - 1, y=np.max(self.positions.position_trace[:, :, d]), xref=f"x{d + 4}", yref=f"y{d + 4}", text=f"True value : {true_values[d]}", showarrow=False, borderwidth=0, borderpad=4, bgcolor="#eb9a9a", opacity=0.8, ) fig["layout"].update( height=200 * (self.nb_dimensions + 2), width=600, margin=dict(t=40, b=10, l=10, r=10), xaxis_range=[0, self.nb_iterations - 1], paper_bgcolor="#FFF", plot_bgcolor="#FFF", font_color="#000000", ) if show: fig.show() if save is not None: fig.write_html(str(save)) # endregion