Source code for josiann.parallel.moves.discrete

"""Discrete move functions designed to work with josiann's parallel mode."""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any

import numpy as np
import numpy.typing as npt

import josiann.typing as jot
from josiann.moves.base import State
from josiann.moves.discrete import DiscreteMove
from josiann.parallel.moves.base import ParallelMove


[docs] class ParallelSetStep(ParallelMove, DiscreteMove): """ Step within a fixed set of possible values for x. For each dimension, the position immediately before or after x will be chosen at random when stepping. """ # region magic methods
[docs] def __init__( self, *, position_set: Sequence[Sequence[float] | npt.NDArray[np.floating[Any]]], bounds: npt.NDArray[jot.DType] | None = None, repr_attributes: tuple[str, ...] = (), **kwargs: Any, ): """ Instantiate a Move. Args: position_set: sets of only possible values for x in each dimension. bounds: optional sequence of (min, max) bounds for values to propose in each dimension. repr_attributes: tuple of attribute names to include in the move's representation. """ super().__init__( position_set=position_set, bounds=bounds, repr_attributes=repr_attributes, **kwargs, ) self._reversed_position_set = [v[::-1] for v in self._position_set] self._target_dim = 0
# endregion # region methods def _get_proposal(self, x: npt.NDArray[jot.DT_ARR], state: State) -> npt.NDArray[jot.DT_ARR]: """ Generate a new proposed vector x. Args: x: current vector x of shape (ndim,). state: current state of the SA algorithm. Returns: New proposed vector x of shape (ndim,). """ new_x = x.copy() # loop on vectors in x for index in range(len(x)): # for each, draw at random if the position increases ... if np.random.rand() > 0.5: mask = self._position_set[self._target_dim] > x[index, self._target_dim] if np.any(mask): new_x[index, self._target_dim] = self._position_set[self._target_dim][np.argmax(mask)] # boundary hit : cannot go higher than the highest value --> go down instead else: new_x[index, self._target_dim] = self._position_set[self._target_dim][-2] # ... or decreases else: mask = self._reversed_position_set[self._target_dim] < x[index, self._target_dim] if np.any(mask): new_x[index, self._target_dim] = self._reversed_position_set[self._target_dim][np.argmax(mask)] # boundary hit : cannot go lower than the lowest value --> go up instead else: new_x[index, self._target_dim] = self._position_set[self._target_dim][1] # next time, update the position for the next dimension of the vector self._target_dim += 1 if self._target_dim >= x.shape[1]: self._target_dim = 0 return new_x
# endregion