interval_intersection.cpp
Go to the documentation of this file.
1 /**********************************************************************
2  *
3  * Software License Agreement (BSD License)
4  *
5  * Copyright (c) 2009, Willow Garage, Inc.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * * Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  * * Redistributions in binary form must reproduce the above
15  * copyright notice, this list of conditions and the following
16  * disclaimer in the documentation and/or other materials provided
17  * with the distribution.
18  * * Neither the name of the Willow Garage nor the names of its
19  * contributors may be used to endorse or promote products derived
20  * from this software without specific prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33  * POSSIBILITY OF SUCH DAMAGE.
34  *********************************************************************/
35 
36 //#include <stdio.h>
37 //#include <iostream>
38 #include <vector>
39 #include <deque>
40 //#include <fstream>
41 
42 #include <boost/thread/mutex.hpp>
43 #include <boost/bind.hpp>
44 #include "ros/console.h"
46 
47 using namespace std;
48 using namespace interval_intersection;
49 
50 IntervalIntersector::IntervalIntersector(boost::function<void (const calibration_msgs::Interval&)> output_callback):
51  max_queue_size(200),
52  output_callback_(output_callback)
53 {
54 }
55 
57 {
58  // If callbacks continue during or after destruction, bad things can happen,
59  // so ensure proper shutdown of the callback streams before destruction.
60 }
61 
62 boost::function<void (const calibration_msgs::IntervalConstPtr&)> IntervalIntersector::getNewInputStream() {
63  // Changes the configuration of the set of queues,
64  // so we lock all mutexes.
65  boost::mutex::scoped_lock processing_lock(processing_mutex);
66  size_t n = queues.size(); // Protected by above lock
67  for (size_t i=0; i<n; i++) {
68  mutexes[i]->lock();
69  }
70  queues.resize(n+1);
71  queue_stats.resize(n+1);
72  queue_stats[n].reset(new queue_stat());
73  mutexes.resize(n+1);
74  mutexes[n].reset(new boost::mutex());
75  for (size_t i=0; i<n; i++) {
76  mutexes[i]->unlock();
77  }
78  return boost::bind(&IntervalIntersector::inputCallback, this, _1, n);
79 }
80 
87 void IntervalIntersector::inputCallback(const calibration_msgs::IntervalConstPtr& interval_ptr, size_t i) {
88  ROS_DEBUG("Got message on stream [%zu]", i);
89  boost::mutex::scoped_lock lock(*mutexes[i]);
90  if (queues[i].size() < max_queue_size) {
91  queues[i].push_back(interval_ptr);
92  queue_stats[i]->count++;
93  if( interval_ptr->start == interval_ptr->end ) {
94  queue_stats[i]->nil_count++;
95  }
96  }
97  lock.unlock();
98 
100 }
101 
103  //cout << "processing" << endl;
104  while (1) {
105  // 1) Determine which interval to publish next
107  ros::Time end = ros::TIME_MAX;
108  int queue_to_pop = -1;
109  boost::mutex::scoped_lock processing_lock(processing_mutex);
110  for (size_t i=0; i<queues.size(); i++) {
111  // It looks like we could get away without locking the mutexes to
112  // determine the min and max since even though the queues may change
113  // because new elements are added, the first element if it exists
114  // does not change because we hold the processing mutex.
115  // However there is a possibility that the location of queue[i][0]
116  // changes after its address has been computed but before its value
117  // has been read. In other words the statement
118  // "start = queues[i][0]->start" is not atomic.
119  // In fact even queues[i].empty() is only guaranteed to make sense
120  // in between other method calls, not during.
121  // Whether or not deque is in fact thread-safe here is implementation
122  // dependent.
123  boost::mutex::scoped_lock lock(*mutexes[i]);
124  if (queues[i].empty()) {
125  // We can't determine the next interval: nothing to do
126  //cout << "nothing to do" << endl;
127  return;
128  }
129  if (queues[i][0]->start > start) {
130  start = queues[i][0]->start;
131  }
132  if (queues[i][0]->end < end) {
133  end = queues[i][0]->end;
134  queue_to_pop = i;
135  }
136  // mutexes[i] released
137  }
138  if (queue_to_pop < 0) {
139  ROS_ERROR("IntervalIntersection logic error");
140  exit(-1);
141  }
142  // 2) Publish the interval
143  //cout << "about to publish" << endl;
144  if (start < end) {
145  // Output the interval
146  calibration_msgs::Interval interval;
147  interval.start = start;
148  interval.end = end;
149  output_callback_(interval);
150  }
151  else
152  {
153  ROS_DEBUG("Publishing null interval");
154  calibration_msgs::Interval interval;
155  interval.start = start;
156  interval.end = start;
157  output_callback_(interval);
158  }
159  // 3) Pop the input interval with earliest end time
160  boost::mutex::scoped_lock lock(*mutexes[queue_to_pop]);
161  queues[queue_to_pop].pop_front();
162  // mutexes[queue_to_pop] unlocks
163  // processing_mutex unlocks
164  }
165 }
166 
167 calibration_msgs::IntervalStatus IntervalIntersector::get_status() {
168  calibration_msgs::IntervalStatus status;
169  // we only provide the raw status; the upper levels have to fill in the
170  // header stamp and the names
171 
172  boost::mutex::scoped_lock processing_lock(processing_mutex);
173  size_t n = queue_stats.size();
174  status.names.resize(n);
175  status.yeild_rates.resize(n);
176  for (size_t i=0; i<n; i++) {
177  boost::mutex::scoped_lock lock(*mutexes[i]);
178  int count = queue_stats[i]->count;
179  if( 0 == count ) {
180  status.yeild_rates[i] = 0.0;
181  } else {
182  double good = count - queue_stats[i]->nil_count;
183  status.yeild_rates[i] = good / count;
184  }
185  }
186  return status;
187 }
188 
vector< boost::shared_ptr< queue_stat > > queue_stats
ROSCPP_DECL void start()
const Time TIME_MIN(0, 1)
boost::function< void(const calibration_msgs::IntervalConstPtr &)> getNewInputStream()
vector< deque< calibration_msgs::IntervalConstPtr > > queues
vector< boost::shared_ptr< boost::mutex > > mutexes
void inputCallback(const calibration_msgs::IntervalConstPtr &interval_ptr, size_t i)
Interval message callback.
boost::function< void(const calibration_msgs::Interval &)> output_callback_
ROSTIME_DECL const Time TIME_MAX
#define ROS_ERROR(...)
calibration_msgs::IntervalStatus get_status()
#define ROS_DEBUG(...)


interval_intersection
Author(s): Romain Thibaux
autogenerated on Fri Apr 2 2021 02:12:58