test_HybridFactorGraph.py
Go to the documentation of this file.
1 """
2 GTSAM Copyright 2010-2019, Georgia Tech Research Corporation,
3 Atlanta, Georgia 30332-0415
4 All Rights Reserved
5 
6 See LICENSE for the license information
7 
8 Unit tests for Hybrid Factor Graphs.
9 Author: Fan Jiang, Varun Agrawal, Frank Dellaert
10 """
11 # pylint: disable=invalid-name, no-name-in-module, no-member
12 
13 import unittest
14 
15 import numpy as np
16 from gtsam.symbol_shorthand import C, M, X, Z
17 from gtsam.utils.test_case import GtsamTestCase
18 
19 import gtsam
20 from gtsam import (DiscreteConditional, DiscreteKeys, GaussianConditional,
21  GaussianMixture, GaussianMixtureFactor, HybridBayesNet,
22  HybridGaussianFactorGraph, HybridValues, JacobianFactor,
23  Ordering, noiseModel)
24 
25 DEBUG_MARGINALS = False
26 
27 
29  """Unit tests for HybridGaussianFactorGraph."""
30  def test_create(self):
31  """Test construction of hybrid factor graph."""
32  model = noiseModel.Unit.Create(3)
33  dk = DiscreteKeys()
34  dk.push_back((C(0), 2))
35 
36  jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
37  jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
38 
39  gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2])
40 
41  hfg = HybridGaussianFactorGraph()
42  hfg.push_back(jf1)
43  hfg.push_back(jf2)
44  hfg.push_back(gmf)
45 
46  hbn = hfg.eliminateSequential()
47 
48  self.assertEqual(hbn.size(), 2)
49 
50  mixture = hbn.at(0).inner()
51  self.assertIsInstance(mixture, GaussianMixture)
52  self.assertEqual(len(mixture.keys()), 2)
53 
54  discrete_conditional = hbn.at(hbn.size() - 1).inner()
55  self.assertIsInstance(discrete_conditional, DiscreteConditional)
56 
57  def test_optimize(self):
58  """Test construction of hybrid factor graph."""
59  model = noiseModel.Unit.Create(3)
60  dk = DiscreteKeys()
61  dk.push_back((C(0), 2))
62 
63  jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
64  jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
65 
66  gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2])
67 
68  hfg = HybridGaussianFactorGraph()
69  hfg.push_back(jf1)
70  hfg.push_back(jf2)
71  hfg.push_back(gmf)
72 
73  dtf = gtsam.DecisionTreeFactor([(C(0), 2)], "0 1")
74  hfg.push_back(dtf)
75 
76  hbn = hfg.eliminateSequential()
77 
78  hv = hbn.optimize()
79  self.assertEqual(hv.atDiscrete(C(0)), 1)
80 
81  @staticmethod
82  def tiny(num_measurements: int = 1,
83  prior_mean: float = 5.0,
84  prior_sigma: float = 0.5) -> HybridBayesNet:
85  """
86  Create a tiny two variable hybrid model which represents
87  the generative probability P(Z, x0, mode) = P(Z|x0, mode)P(x0)P(mode).
88  num_measurements: number of measurements in Z = {z0, z1...}
89  """
90  # Create hybrid Bayes net.
91  bayesNet = HybridBayesNet()
92 
93  # Create mode key: 0 is low-noise, 1 is high-noise.
94  mode = (M(0), 2)
95 
96  # Create Gaussian mixture Z(0) = X(0) + noise for each measurement.
97  I_1x1 = np.eye(1)
98  keys = DiscreteKeys()
99  keys.push_back(mode)
100  for i in range(num_measurements):
101  conditional0 = GaussianConditional.FromMeanAndStddev(Z(i),
102  I_1x1,
103  X(0), [0],
104  sigma=0.5)
105  conditional1 = GaussianConditional.FromMeanAndStddev(Z(i),
106  I_1x1,
107  X(0), [0],
108  sigma=3)
109  bayesNet.push_back(GaussianMixture([Z(i)], [X(0)], keys,
110  [conditional0, conditional1]))
111 
112  # Create prior on X(0).
113  prior_on_x0 = GaussianConditional.FromMeanAndStddev(
114  X(0), [prior_mean], prior_sigma)
115  bayesNet.push_back(prior_on_x0)
116 
117  # Add prior on mode.
118  bayesNet.push_back(DiscreteConditional(mode, "4/6"))
119 
120  return bayesNet
121 
122  def test_evaluate(self):
123  """Test evaluate with two different prior noise models."""
124  # TODO(dellaert): really a HBN test
125  # Create a tiny Bayes net P(x0) P(m0) P(z0|x0)
126  bayesNet1 = self.tiny(prior_sigma=0.5, num_measurements=1)
127  bayesNet2 = self.tiny(prior_sigma=5.0, num_measurements=1)
128  # bn1: # 1/sqrt(2*pi*0.5^2)
129  # bn2: # 1/sqrt(2*pi*5.0^2)
130  expected_ratio = np.sqrt(2 * np.pi * 5.0**2) / np.sqrt(
131  2 * np.pi * 0.5**2)
132  mean0 = HybridValues()
133  mean0.insert(X(0), [5.0])
134  mean0.insert(Z(0), [5.0])
135  mean0.insert(M(0), 0)
136  self.assertAlmostEqual(bayesNet1.evaluate(mean0) /
137  bayesNet2.evaluate(mean0),
138  expected_ratio,
139  delta=1e-9)
140  mean1 = HybridValues()
141  mean1.insert(X(0), [5.0])
142  mean1.insert(Z(0), [5.0])
143  mean1.insert(M(0), 1)
144  self.assertAlmostEqual(bayesNet1.evaluate(mean1) /
145  bayesNet2.evaluate(mean1),
146  expected_ratio,
147  delta=1e-9)
148 
149  @staticmethod
150  def measurements(sample: HybridValues, indices) -> gtsam.VectorValues:
151  """Create measurements from a sample, grabbing Z(i) for indices."""
152  measurements = gtsam.VectorValues()
153  for i in indices:
154  measurements.insert(Z(i), sample.at(Z(i)))
155  return measurements
156 
157  @classmethod
158  def estimate_marginals(cls,
159  target,
160  proposal_density: HybridBayesNet,
161  N=10000):
162  """Do importance sampling to estimate discrete marginal P(mode)."""
163  # Allocate space for marginals on mode.
164  marginals = np.zeros((2, ))
165 
166  # Do importance sampling.
167  for s in range(N):
168  proposed = proposal_density.sample() # sample from proposal
169  target_proposed = target(proposed) # evaluate target
170  weight = target_proposed / proposal_density.evaluate(proposed)
171  marginals[proposed.atDiscrete(M(0))] += weight
172 
173  # print marginals:
174  marginals /= marginals.sum()
175  return marginals
176 
177  def test_tiny(self):
178  """Test a tiny two variable hybrid model."""
179  # Create P(x0)P(mode)P(z0|x0,mode)
180  prior_sigma = 0.5
181  bayesNet = self.tiny(prior_sigma=prior_sigma)
182 
183  # Deterministic values exactly at the mean, for both x and Z:
184  values = HybridValues()
185  values.insert(X(0), [5.0])
186  values.insert(M(0), 0) # low-noise, standard deviation 0.5
187  measurements = gtsam.VectorValues()
188  measurements.insert(Z(0), [5.0])
189  values.insert(measurements)
190 
191  def unnormalized_posterior(x):
192  """Posterior is proportional to joint, centered at 5.0 as well."""
193  x.insert(measurements)
194  return bayesNet.evaluate(x)
195 
196  # Create proposal density on (x0, mode), making sure it has same mean:
197  posterior_information = 1 / (prior_sigma**2) + 1 / (0.5**2)
198  posterior_sigma = posterior_information**(-0.5)
199  proposal_density = self.tiny(num_measurements=0,
200  prior_mean=5.0,
201  prior_sigma=posterior_sigma)
202 
203  # Estimate marginals using importance sampling.
204  marginals = self.estimate_marginals(target=unnormalized_posterior,
205  proposal_density=proposal_density)
206  if DEBUG_MARGINALS:
207  print(f"True mode: {values.atDiscrete(M(0))}")
208  print(f"P(mode=0; Z) = {marginals[0]}")
209  print(f"P(mode=1; Z) = {marginals[1]}")
210 
211  # Check that the estimate is close to the true value.
212  self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
213  self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
214 
215  # Convert to factor graph with given measurements.
216  fg = bayesNet.toFactorGraph(measurements)
217  self.assertEqual(fg.size(), 3)
218 
219  # Check ratio between unnormalized posterior and factor graph is the same for all modes:
220  for mode in [1, 0]:
221  values.insert_or_assign(M(0), mode)
222  self.assertAlmostEqual(bayesNet.evaluate(values) /
223  np.exp(-fg.error(values)),
224  0.6366197723675815)
225  self.assertAlmostEqual(bayesNet.error(values), fg.error(values))
226 
227  # Test elimination.
228  posterior = fg.eliminateSequential()
229 
230  def true_posterior(x):
231  """Posterior from elimination."""
232  x.insert(measurements)
233  return posterior.evaluate(x)
234 
235  # Estimate marginals using importance sampling.
236  marginals = self.estimate_marginals(target=true_posterior,
237  proposal_density=proposal_density)
238  if DEBUG_MARGINALS:
239  print(f"True mode: {values.atDiscrete(M(0))}")
240  print(f"P(mode=0; z0) = {marginals[0]}")
241  print(f"P(mode=1; z0) = {marginals[1]}")
242 
243  # Check that the estimate is close to the true value.
244  self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
245  self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
246 
247  @staticmethod
248  def calculate_ratio(bayesNet: HybridBayesNet,
249  fg: HybridGaussianFactorGraph, sample: HybridValues):
250  """Calculate ratio between Bayes net and factor graph."""
251  return bayesNet.evaluate(sample) / fg.probPrime(sample) if \
252  fg.probPrime(sample) > 0 else 0
253 
254  def test_ratio(self):
255  """
256  Given a tiny two variable hybrid model, with 2 measurements, test the
257  ratio of the bayes net model representing P(z,x,n)=P(z|x, n)P(x)P(n)
258  and the factor graph P(x, n | z)=P(x | n, z)P(n|z),
259  both of which represent the same posterior.
260  """
261  # Create generative model P(z, x, n)=P(z|x, n)P(x)P(n)
262  prior_sigma = 0.5
263  bayesNet = self.tiny(prior_sigma=prior_sigma, num_measurements=2)
264 
265  # Deterministic values exactly at the mean, for both x and Z:
266  values = HybridValues()
267  values.insert(X(0), [5.0])
268  values.insert(M(0), 0) # high-noise, standard deviation 3
269  measurements = gtsam.VectorValues()
270  measurements.insert(Z(0), [4.0])
271  measurements.insert(Z(1), [6.0])
272  values.insert(measurements)
273 
274  def unnormalized_posterior(x):
275  """Posterior is proportional to joint, centered at 5.0 as well."""
276  x.insert(measurements)
277  return bayesNet.evaluate(x)
278 
279  # Create proposal density on (x0, mode), making sure it has same mean:
280  posterior_information = 1 / (prior_sigma**2) + 2.0 / (3.0**2)
281  posterior_sigma = posterior_information**(-0.5)
282  proposal_density = self.tiny(num_measurements=0,
283  prior_mean=5.0,
284  prior_sigma=posterior_sigma)
285 
286  # Estimate marginals using importance sampling.
287  marginals = self.estimate_marginals(target=unnormalized_posterior,
288  proposal_density=proposal_density)
289  if DEBUG_MARGINALS:
290  print(f"True mode: {values.atDiscrete(M(0))}")
291  print(f"P(mode=0; Z) = {marginals[0]}")
292  print(f"P(mode=1; Z) = {marginals[1]}")
293 
294  # Check that the estimate is close to the true value.
295  self.assertAlmostEqual(marginals[0], 0.23, delta=0.01)
296  self.assertAlmostEqual(marginals[1], 0.77, delta=0.01)
297 
298  # Convert to factor graph using measurements.
299  fg = bayesNet.toFactorGraph(measurements)
300  self.assertEqual(fg.size(), 4)
301 
302  # Calculate ratio between Bayes net probability and the factor graph:
303  expected_ratio = self.calculate_ratio(bayesNet, fg, values)
304  # print(f"expected_ratio: {expected_ratio}\n")
305 
306  # Check with a number of other samples.
307  for i in range(10):
308  samples = bayesNet.sample()
309  samples.update(measurements)
310  ratio = self.calculate_ratio(bayesNet, fg, samples)
311  # print(f"Ratio: {ratio}\n")
312  if (ratio > 0):
313  self.assertAlmostEqual(ratio, expected_ratio)
314 
315  # Test elimination.
316  posterior = fg.eliminateSequential()
317 
318  # Calculate ratio between Bayes net probability and the factor graph:
319  expected_ratio = self.calculate_ratio(posterior, fg, values)
320  # print(f"expected_ratio: {expected_ratio}\n")
321 
322  # Check with a number of other samples.
323  for i in range(10):
324  samples = posterior.sample()
325  samples.insert(measurements)
326  ratio = self.calculate_ratio(posterior, fg, samples)
327  # print(f"Ratio: {ratio}\n")
328  if (ratio > 0):
329  self.assertAlmostEqual(ratio, expected_ratio)
330 
331 
332 if __name__ == "__main__":
333  unittest.main()
Matrix< RealScalar, Dynamic, Dynamic > M
Definition: bench_gemm.cpp:51
EIGEN_STRONG_INLINE Packet4f print(const Packet4f &a)
#define Z
Definition: icosphere.cpp:21
Matrix< Scalar, Dynamic, Dynamic > C
Definition: bench_gemm.cpp:50
Double_ range(const Point2_ &p, const Point2_ &q)
#define X
Definition: icosphere.cpp:20
size_t len(handle h)
Get the length of a Python object.
Definition: pytypes.h:2244


gtsam
Author(s):
autogenerated on Tue Jul 4 2023 02:37:45