test_HybridFactorGraph.py
Go to the documentation of this file.
1 """
2 GTSAM Copyright 2010-2019, Georgia Tech Research Corporation,
3 Atlanta, Georgia 30332-0415
4 All Rights Reserved
5 
6 See LICENSE for the license information
7 
8 Unit tests for Hybrid Factor Graphs.
9 Author: Fan Jiang, Varun Agrawal, Frank Dellaert
10 """
11 # pylint: disable=invalid-name, no-name-in-module, no-member
12 
13 import unittest
14 
15 import numpy as np
16 
17 import gtsam
18 from gtsam import (DiscreteConditional, GaussianConditional, HybridBayesNet,
19  HybridGaussianConditional, HybridGaussianFactor,
20  HybridGaussianFactorGraph, HybridValues, JacobianFactor,
21  TableDistribution, noiseModel)
22 from gtsam.symbol_shorthand import C, M, X, Z
23 from gtsam.utils.test_case import GtsamTestCase
24 
25 DEBUG_MARGINALS = False
26 
27 
29  """Unit tests for HybridGaussianFactorGraph."""
30 
31  def test_create(self):
32  """Test construction of hybrid factor graph."""
33  model = noiseModel.Unit.Create(3)
34 
35  jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
36  jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
37 
38  gmf = HybridGaussianFactor((C(0), 2), [(jf1, 0), (jf2, 0)])
39 
41  hfg.push_back(jf1)
42  hfg.push_back(jf2)
43  hfg.push_back(gmf)
44 
45  hbn = hfg.eliminateSequential()
46 
47  self.assertEqual(hbn.size(), 2)
48 
49  hybridCond = hbn.at(0).inner()
50  self.assertIsInstance(hybridCond, HybridGaussianConditional)
51  self.assertEqual(len(hybridCond.keys()), 2)
52 
53  discrete_conditional = hbn.at(hbn.size() - 1).inner()
54  self.assertIsInstance(discrete_conditional, TableDistribution)
55 
56  def test_optimize(self):
57  """Test construction of hybrid factor graph."""
58  model = noiseModel.Unit.Create(3)
59 
60  jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
61  jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
62 
63  gmf = HybridGaussianFactor((C(0), 2), [(jf1, 0), (jf2, 0)])
64 
66  hfg.push_back(jf1)
67  hfg.push_back(jf2)
68  hfg.push_back(gmf)
69 
70  dtf = gtsam.DecisionTreeFactor([(C(0), 2)], "0 1")
71  hfg.push_back(dtf)
72 
73  hbn = hfg.eliminateSequential()
74 
75  hv = hbn.optimize()
76  self.assertEqual(hv.atDiscrete(C(0)), 1)
77 
78  @staticmethod
79  def tiny(num_measurements: int = 1,
80  prior_mean: float = 5.0,
81  prior_sigma: float = 0.5) -> HybridBayesNet:
82  """
83  Create a tiny two variable hybrid model which represents
84  the generative probability P(Z, x0, mode) = P(Z|x0, mode)P(x0)P(mode).
85  num_measurements: number of measurements in Z = {z0, z1...}
86  """
87  # Create hybrid Bayes net.
88  bayesNet = HybridBayesNet()
89 
90  # Create mode key: 0 is low-noise, 1 is high-noise.
91  mode = (M(0), 2)
92 
93  # Create hybrid Gaussian conditional Z(0) = X(0) + noise for each measurement.
94  I_1x1 = np.eye(1)
95  for i in range(num_measurements):
96  conditional0 = GaussianConditional.FromMeanAndStddev(Z(i),
97  I_1x1,
98  X(0), [0],
99  sigma=0.5)
100  conditional1 = GaussianConditional.FromMeanAndStddev(Z(i),
101  I_1x1,
102  X(0), [0],
103  sigma=3)
104  bayesNet.push_back(
105  HybridGaussianConditional(mode, [conditional0, conditional1]))
106 
107  # Create prior on X(0).
108  prior_on_x0 = GaussianConditional.FromMeanAndStddev(
109  X(0), [prior_mean], prior_sigma)
110  bayesNet.push_back(prior_on_x0)
111 
112  # Add prior on mode.
113  bayesNet.push_back(DiscreteConditional(mode, "4/6"))
114 
115  return bayesNet
116 
117  def test_evaluate(self):
118  """Test evaluate with two different prior noise models."""
119  # TODO(dellaert): really a HBN test
120  # Create a tiny Bayes net P(x0) P(m0) P(z0|x0)
121  bayesNet1 = self.tiny(prior_sigma=0.5, num_measurements=1)
122  bayesNet2 = self.tiny(prior_sigma=5.0, num_measurements=1)
123  # bn1: # 1/sqrt(2*pi*0.5^2)
124  # bn2: # 1/sqrt(2*pi*5.0^2)
125  expected_ratio = np.sqrt(2 * np.pi * 5.0**2) / np.sqrt(
126  2 * np.pi * 0.5**2)
127  mean0 = HybridValues()
128  mean0.insert(X(0), [5.0])
129  mean0.insert(Z(0), [5.0])
130  mean0.insert(M(0), 0)
131  self.assertAlmostEqual(bayesNet1.evaluate(mean0) /
132  bayesNet2.evaluate(mean0),
133  expected_ratio,
134  delta=1e-9)
135  mean1 = HybridValues()
136  mean1.insert(X(0), [5.0])
137  mean1.insert(Z(0), [5.0])
138  mean1.insert(M(0), 1)
139  self.assertAlmostEqual(bayesNet1.evaluate(mean1) /
140  bayesNet2.evaluate(mean1),
141  expected_ratio,
142  delta=1e-9)
143 
144  @staticmethod
145  def measurements(sample: HybridValues, indices) -> gtsam.VectorValues:
146  """Create measurements from a sample, grabbing Z(i) for indices."""
147  measurements = gtsam.VectorValues()
148  for i in indices:
149  measurements.insert(Z(i), sample.at(Z(i)))
150  return measurements
151 
152  @classmethod
154  target,
155  proposal_density: HybridBayesNet,
156  N=10000):
157  """Do importance sampling to estimate discrete marginal P(mode)."""
158  # Allocate space for marginals on mode.
159  marginals = np.zeros((2, ))
160 
161  # Do importance sampling.
162  for s in range(N):
163  proposed = proposal_density.sample() # sample from proposal
164  target_proposed = target(proposed) # evaluate target
165  weight = target_proposed / proposal_density.evaluate(proposed)
166  marginals[proposed.atDiscrete(M(0))] += weight
167 
168  # print marginals:
169  marginals /= marginals.sum()
170  return marginals
171 
172  def test_tiny(self):
173  """Test a tiny two variable hybrid model."""
174  # Create P(x0)P(mode)P(z0|x0,mode)
175  prior_sigma = 0.5
176  bayesNet = self.tiny(prior_sigma=prior_sigma)
177 
178  # Deterministic values exactly at the mean, for both x and Z:
179  values = HybridValues()
180  values.insert(X(0), [5.0])
181  values.insert(M(0), 0) # low-noise, standard deviation 0.5
182  measurements = gtsam.VectorValues()
183  measurements.insert(Z(0), [5.0])
184  values.insert(measurements)
185 
186  def unnormalized_posterior(x):
187  """Posterior is proportional to joint, centered at 5.0 as well."""
188  x.insert(measurements)
189  return bayesNet.evaluate(x)
190 
191  # Create proposal density on (x0, mode), making sure it has same mean:
192  posterior_information = 1 / (prior_sigma**2) + 1 / (0.5**2)
193  posterior_sigma = posterior_information**(-0.5)
194  proposal_density = self.tiny(num_measurements=0,
195  prior_mean=5.0,
196  prior_sigma=posterior_sigma)
197 
198  # Estimate marginals using importance sampling.
199  marginals = self.estimate_marginals(target=unnormalized_posterior,
200  proposal_density=proposal_density)
201  if DEBUG_MARGINALS:
202  print(f"True mode: {values.atDiscrete(M(0))}")
203  print(f"P(mode=0; Z) = {marginals[0]}")
204  print(f"P(mode=1; Z) = {marginals[1]}")
205 
206  # Check that the estimate is close to the true value.
207  self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
208  self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
209 
210  # Convert to factor graph with given measurements.
211  fg = bayesNet.toFactorGraph(measurements)
212  self.assertEqual(fg.size(), 3)
213 
214  # Check ratio between unnormalized posterior and factor graph is the same for all modes:
215  for mode in [1, 0]:
216  values.insert_or_assign(M(0), mode)
217  self.assertAlmostEqual(
218  bayesNet.evaluate(values) / np.exp(-fg.error(values)),
219  0.6366197723675815)
220  self.assertAlmostEqual(bayesNet.error(values), fg.error(values))
221 
222  # Test elimination.
223  posterior = fg.eliminateSequential()
224 
225  def true_posterior(x):
226  """Posterior from elimination."""
227  x.insert(measurements)
228  return posterior.evaluate(x)
229 
230  # Estimate marginals using importance sampling.
231  marginals = self.estimate_marginals(target=true_posterior,
232  proposal_density=proposal_density)
233  if DEBUG_MARGINALS:
234  print(f"True mode: {values.atDiscrete(M(0))}")
235  print(f"P(mode=0; z0) = {marginals[0]}")
236  print(f"P(mode=1; z0) = {marginals[1]}")
237 
238  # Check that the estimate is close to the true value.
239  self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
240  self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
241 
242  @staticmethod
243  def calculate_ratio(bayesNet: HybridBayesNet,
244  fg: HybridGaussianFactorGraph, sample: HybridValues):
245  """Calculate ratio between Bayes net and factor graph."""
246  return bayesNet.evaluate(sample) / fg.probPrime(sample) if \
247  fg.probPrime(sample) > 0 else 0
248 
249  def test_ratio(self):
250  """
251  Given a tiny two variable hybrid model, with 2 measurements, test the
252  ratio of the bayes net model representing P(z,x,n)=P(z|x, n)P(x)P(n)
253  and the factor graph P(x, n | z)=P(x | n, z)P(n|z),
254  both of which represent the same posterior.
255  """
256  # Create generative model P(z, x, n)=P(z|x, n)P(x)P(n)
257  prior_sigma = 0.5
258  bayesNet = self.tiny(prior_sigma=prior_sigma, num_measurements=2)
259 
260  # Deterministic values exactly at the mean, for both x and Z:
261  values = HybridValues()
262  values.insert(X(0), [5.0])
263  values.insert(M(0), 0) # high-noise, standard deviation 3
264  measurements = gtsam.VectorValues()
265  measurements.insert(Z(0), [4.0])
266  measurements.insert(Z(1), [6.0])
267  values.insert(measurements)
268 
269  def unnormalized_posterior(x):
270  """Posterior is proportional to joint, centered at 5.0 as well."""
271  x.insert(measurements)
272  return bayesNet.evaluate(x)
273 
274  # Create proposal density on (x0, mode), making sure it has same mean:
275  posterior_information = 1 / (prior_sigma**2) + 2.0 / (3.0**2)
276  posterior_sigma = posterior_information**(-0.5)
277  proposal_density = self.tiny(num_measurements=0,
278  prior_mean=5.0,
279  prior_sigma=posterior_sigma)
280 
281  # Estimate marginals using importance sampling.
282  marginals = self.estimate_marginals(target=unnormalized_posterior,
283  proposal_density=proposal_density)
284  if DEBUG_MARGINALS:
285  print(f"True mode: {values.atDiscrete(M(0))}")
286  print(f"P(mode=0; Z) = {marginals[0]}")
287  print(f"P(mode=1; Z) = {marginals[1]}")
288 
289  # Check that the estimate is close to the true value.
290  self.assertAlmostEqual(marginals[0], 0.23, delta=0.01)
291  self.assertAlmostEqual(marginals[1], 0.77, delta=0.01)
292 
293  # Convert to factor graph using measurements.
294  fg = bayesNet.toFactorGraph(measurements)
295  self.assertEqual(fg.size(), 4)
296 
297  # Calculate ratio between Bayes net probability and the factor graph:
298  expected_ratio = self.calculate_ratio(bayesNet, fg, values)
299  # print(f"expected_ratio: {expected_ratio}\n")
300 
301  # Check with a number of other samples.
302  for i in range(10):
303  samples = bayesNet.sample()
304  samples.update(measurements)
305  ratio = self.calculate_ratio(bayesNet, fg, samples)
306  # print(f"Ratio: {ratio}\n")
307  if (ratio > 0):
308  self.assertAlmostEqual(ratio, expected_ratio)
309 
310  # Test elimination.
311  posterior = fg.eliminateSequential()
312 
313  # Calculate ratio between Bayes net probability and the factor graph:
314  expected_ratio = self.calculate_ratio(posterior, fg, values)
315  # print(f"expected_ratio: {expected_ratio}\n")
316 
317  # Check with a number of other samples.
318  for i in range(10):
319  samples = posterior.sample()
320  samples.insert(measurements)
321  ratio = self.calculate_ratio(posterior, fg, samples)
322  # print(f"Ratio: {ratio}\n")
323  if (ratio > 0):
324  self.assertAlmostEqual(ratio, expected_ratio)
325 
326 
327 if __name__ == "__main__":
328  unittest.main()
Eigen::internal::print
EIGEN_STRONG_INLINE Packet4f print(const Packet4f &a)
Definition: NEON/PacketMath.h:3115
gtsam::DecisionTreeFactor
Definition: DecisionTreeFactor.h:45
test_HybridFactorGraph.TestHybridGaussianFactorGraph.test_optimize
def test_optimize(self)
Definition: test_HybridFactorGraph.py:56
gtsam::HybridValues
Definition: HybridValues.h:37
gtsam::HybridBayesNet
Definition: HybridBayesNet.h:37
gtsam::JacobianFactor
Definition: JacobianFactor.h:91
gtsam::symbol_shorthand
Definition: inference/Symbol.h:147
X
#define X
Definition: icosphere.cpp:20
test_HybridFactorGraph.TestHybridGaussianFactorGraph.test_tiny
def test_tiny(self)
Definition: test_HybridFactorGraph.py:172
gtsam::range
Double_ range(const Point2_ &p, const Point2_ &q)
Definition: slam/expressions.h:30
gtsam::VectorValues
Definition: VectorValues.h:74
test_HybridFactorGraph.TestHybridGaussianFactorGraph.test_evaluate
def test_evaluate(self)
Definition: test_HybridFactorGraph.py:117
test_HybridFactorGraph.TestHybridGaussianFactorGraph.estimate_marginals
def estimate_marginals(cls, target, HybridBayesNet proposal_density, N=10000)
Definition: test_HybridFactorGraph.py:153
test_HybridFactorGraph.TestHybridGaussianFactorGraph
Definition: test_HybridFactorGraph.py:28
gtsam::HybridGaussianConditional
A conditional of gaussian conditionals indexed by discrete variables, as part of a Bayes Network....
Definition: HybridGaussianConditional.h:55
gtsam::HybridGaussianFactorGraph
Definition: HybridGaussianFactorGraph.h:106
gtsam::utils.test_case
Definition: test_case.py:1
test_HybridFactorGraph.TestHybridGaussianFactorGraph.test_create
def test_create(self)
Definition: test_HybridFactorGraph.py:31
test_HybridFactorGraph.TestHybridGaussianFactorGraph.measurements
gtsam.VectorValues measurements(HybridValues sample, indices)
Definition: test_HybridFactorGraph.py:145
test_HybridFactorGraph.TestHybridGaussianFactorGraph.tiny
HybridBayesNet tiny(int num_measurements=1, float prior_mean=5.0, float prior_sigma=0.5)
Definition: test_HybridFactorGraph.py:79
gtsam::DiscreteConditional
Definition: DiscreteConditional.h:37
C
Matrix< Scalar, Dynamic, Dynamic > C
Definition: bench_gemm.cpp:50
gtsam::HybridGaussianFactor
Implementation of a discrete-conditioned hybrid factor. Implements a joint discrete-continuous factor...
Definition: HybridGaussianFactor.h:60
gtsam::utils.test_case.GtsamTestCase
Definition: test_case.py:16
len
size_t len(handle h)
Get the length of a Python object.
Definition: pytypes.h:2446
test_HybridFactorGraph.TestHybridGaussianFactorGraph.test_ratio
def test_ratio(self)
Definition: test_HybridFactorGraph.py:249
Z
#define Z
Definition: icosphere.cpp:21
test_HybridFactorGraph.TestHybridGaussianFactorGraph.calculate_ratio
def calculate_ratio(HybridBayesNet bayesNet, HybridGaussianFactorGraph fg, HybridValues sample)
Definition: test_HybridFactorGraph.py:243
M
Matrix< RealScalar, Dynamic, Dynamic > M
Definition: bench_gemm.cpp:51


gtsam
Author(s):
autogenerated on Fri Jan 10 2025 04:06:51