Source code for wmpy.solvers.wmsampler

from typing import Collection, Optional

import numpy as np

from pysmt.shortcuts import Bool
from pysmt.fnode import FNode

from wmpy.core import AssignmentConverter
from wmpy.enumeration import Enumerator
from wmpy.integration import Integrator, LattEIntegrator
from wmpy.sampling import RejectionSampler


[docs] class WMSampler: """The class implements a sampling algorithm for weighted SMT formulas. The algorithm is divided: 1) an offline preprocessing phase, enumerating the convex regions and computing their relative mass via WMI 2) an online sampling phase, where the information above determines the different subsamples' sizes """ DEF_INTEGRATOR = LattEIntegrator def __init__( self, enumerator: Enumerator, domain: Collection[FNode], integrator: Optional[Integrator] = None, seed: Optional[int] = None, ): """Default constructor. Solves the preprocessing step by solving WMI. Uses this information for constructing a weighted strata for subsequent sampling efforts. Args: enumerator: an instance of Enumerator (support, weight) domain: the continuous integration domain (a list of pysmt real variables) integrator: an Integrator instance (default: LatteIntegrator) seed: the seed number (optional) """ if integrator is None: integrator = self.DEF_INTEGRATOR() converter = AssignmentConverter(enumerator, domain) convex_integrals = [] n_unassigned_bools = [] for truth_assignment, nub in enumerator.enumerate(Bool(True)): convex_integrals.append(converter.convert(truth_assignment)) n_unassigned_bools.append(nub) factors = [2**nb for nb in n_unassigned_bools] unnormalized_masses = integrator.integrate_batch(convex_integrals) * factors wmi: float = np.sum(unnormalized_masses) if wmi <= 0: raise ValueError("Can't compute the normalization constant") self.N = len(domain) self.subsamplers = [] self.ratios: list[np.ndarray] = [] for i, ci in enumerate(convex_integrals): self.subsamplers.append(RejectionSampler(*ci, seed)) self.ratios.append(unnormalized_masses[i] / wmi) self.ratios = np.array(self.ratios) self.mult = np.ones(len(self.ratios))
[docs] def sample(self, n_samples: int, max_iterations: int = 1) -> np.ndarray: """Draws a sample from (support, weight) in two phases. The subsample ratios are determined based on the strata precomputed at construction. Convex subsampling is carried over using rejection sampling. The procedure tries to satisfy the subsample size requirement up to `max_iterations`, returning M <= `n_samples` points. Args: n_samples: desired sample size max_iterations: maximum number of attempts (default: 1) Returns: A numpy array with shape (M, N). """ required_size = np.max( [np.ones(len(self.subsamplers)), n_samples * self.ratios], axis=0 ).astype(int) subsamples = [np.array([]).reshape((-1, self.N)) for _ in self.subsamplers] done = np.zeros(len(self.subsamplers)).astype(bool) it = 0 while it < max_iterations: it += 1 for i in range(len(self.subsamplers)): if len(subsamples[i]) < required_size[i]: new_subsample = self.subsamplers[i].sample( int(required_size[i] * self.mult[i]) ) self.mult[i] = required_size[i] / np.max([1, len(new_subsample)]) subsamples[i] = np.concatenate( (subsamples[i], new_subsample), axis=0 )[: required_size[i]] done[i] = len(subsamples[i]) == required_size[i] if done.all(): break return np.concatenate(subsamples, axis=0)