import subprocess
from fractions import Fraction
from functools import reduce
from pathlib import Path
from shutil import which
from tempfile import TemporaryDirectory
from typing import Collection
import numpy as np
from wmpy.core import Polynomial, Polytope
LATTE_INSTALLED = which("integrate") is not None
[docs]
class LattEIntegrator:
"""This class is a wrapper for the LattE integrator.
It computes the exact integral of a polynomial over a convex polytope.
"""
ALG_TRIANGULATE = "--triangulate"
ALG_CONE_DECOMPOSE = "--cone-decompose"
DEF_ALGORITHM = ALG_CONE_DECOMPOSE
ALGORITHMS = [ALG_TRIANGULATE, ALG_CONE_DECOMPOSE]
_POLYNOMIAL_FILENAME = "polynomial.txt"
_POLYTOPE_FILENAME = "polytope.hrep"
_OUTPUT_FILENAME = "output.txt"
def __init__(self, algorithm: str = DEF_ALGORITHM):
"""Default constructor.
Args:
algorithm: either "--triangulate" or "--cone-decompose"
Raises:
ValueError if algorithm is not supported."
"""
if not LATTE_INSTALLED:
raise RuntimeError(
"Can't execute LattE's 'integrate' command. Use 'wmpy install --latte' to install it."
)
if algorithm not in LattEIntegrator.ALGORITHMS:
raise ValueError(f"Algorithm should be one of {self.ALGORITHMS}.")
self.algorithm = algorithm
[docs]
def integrate(self, polytope: Polytope, polynomial: Polynomial) -> float:
"""Computes a convex integral.
Args:
polytope: convex integration bounds (a Polytope)
polynomial: integrand (a Polynomial)
Returns:
The result of the integration as a non-negative scalar value.
"""
if polynomial.is_zero:
return 0.0
with TemporaryDirectory(dir=".") as tmpdir:
tmpdir_path = Path(tmpdir).resolve()
polytope_path = tmpdir_path / self._POLYTOPE_FILENAME
polynomial_path = tmpdir_path / self._POLYNOMIAL_FILENAME
output_path = tmpdir_path / self._OUTPUT_FILENAME
LattEIntegrator._write_polytope_file(polytope, polytope_path)
LattEIntegrator._write_polynomial_file(polynomial, polynomial_path)
cmd = [
"integrate",
"--valuation=integrate",
self.algorithm,
f"--monomials={polynomial_path}",
str(polytope_path),
]
with output_path.open("w") as f:
process_output = subprocess.run(
cmd, stdout=f, stderr=f, cwd=tmpdir_path
)
if process_output.returncode != 0:
with output_path.open("r") as f_err:
error_output = f_err.read()
raise RuntimeError(
f"LattE returned non-zero value: {process_output.returncode}\n"
f"Error output:\n{error_output}"
)
# Unfortunately LattE returns an exit status != 0 if the polytope is empty.
# In the general case this may happen, raising an exception
# is not a good idea.
# TODO: handle this properly.
result = LattEIntegrator._read_output_file(output_path)
if result is None:
raise RuntimeError("Unhandled error while executing LattE integrale.")
return result
[docs]
def integrate_batch(
self, convex_integrals: Collection[tuple[Polytope, Polynomial]]
) -> np.ndarray:
"""Computes a batch of integrals.
Args:
convex_integrals: a collection of bounds/integrand pairs
Returns:
The result of the batch of integrations as a numpy array.
"""
volumes = []
for polytope, polynomial in convex_integrals:
volumes.append(self.integrate(polytope, polynomial))
return np.array(volumes)
@staticmethod
def _write_polynomial_file(polynomial: Polynomial, path: Path) -> None:
mono_str = []
for exponents, coefficient in polynomial.monomials.items():
exp_str = "[" + ",".join(str(e) for e in exponents) + "]"
mono_str.append(f"[{coefficient}, {exp_str}]")
with path.open("w") as f:
f.write("[" + ",".join(mono_str) + "]")
@staticmethod
def _write_polytope_file(polytope: Polytope, path: Path) -> None:
"""Writes the polytope in a file with LattE's format.
This requires converting all the coefficients to integers.
"""
# this part converts the coefficient to integers (a LattE requirement)
A, B, _ = polytope.to_numpy()
n_vars = A.shape[1]
n_in = B.shape[0]
bA = np.concatenate((B.reshape(-1, 1), A), axis=1)
f_den = np.vectorize(lambda x: Fraction(x).denominator)
f_lcmm = lambda vec: reduce(np.lcm, vec)
mult = np.apply_along_axis(f_lcmm, 1, f_den(bA))
bA_int = (bA * mult[:, None]).astype(int)
bAm_int = np.concatenate((bA_int[:, 0].reshape(-1, 1), -bA_int[:, 1:]), axis=1)
content = f"{n_in} {n_vars + 1}\n"
content += "\n".join([" ".join(map(str, row)) for row in bAm_int])
with path.open("w") as f:
f.write(content)
@staticmethod
def _read_output_file(path: Path) -> float:
with path.open("r") as f:
error = None
lines = f.readlines()
txt_block = "\n".join(lines)
if "The number of lattice points is 1." in txt_block:
return 0.0
elif "Empty polytope or unbounded polytope!" in txt_block:
error = "Empty or unbounded polytope"
elif "Given polyhedron is unbounded!" in txt_block:
error = "Unbounded polytope"
if error is not None:
raise RuntimeError(error + txt_block)
for line in lines:
# Result in the "Answer" line may be written in fraction form
if "Decimal" in line:
# print("Res: {}".format(line))
return float(line.partition(": ")[-1].strip())
error = "LattE reached an unexpected state (memory limit?)"
raise RuntimeError(error + txt_block)