Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include <vector>
00039 #include <deque>
00040
00041
00042 #include <boost/thread/mutex.hpp>
00043 #include <boost/bind.hpp>
00044 #include "ros/console.h"
00045 #include "interval_intersection/interval_intersection.hpp"
00046
00047 using namespace std;
00048 using namespace interval_intersection;
00049
00050 IntervalIntersector::IntervalIntersector(boost::function<void (const calibration_msgs::Interval&)> output_callback):
00051 max_queue_size(200),
00052 output_callback_(output_callback)
00053 {
00054 }
00055
00056 IntervalIntersector::~IntervalIntersector()
00057 {
00058
00059
00060 }
00061
00062 boost::function<void (const calibration_msgs::IntervalConstPtr&)> IntervalIntersector::getNewInputStream() {
00063
00064
00065 boost::mutex::scoped_lock processing_lock(processing_mutex);
00066 size_t n = queues.size();
00067 for (size_t i=0; i<n; i++) {
00068 mutexes[i]->lock();
00069 }
00070 queues.resize(n+1);
00071 queue_stats.resize(n+1);
00072 queue_stats[n].reset(new queue_stat());
00073 mutexes.resize(n+1);
00074 mutexes[n].reset(new boost::mutex());
00075 for (size_t i=0; i<n; i++) {
00076 mutexes[i]->unlock();
00077 }
00078 return boost::bind(&IntervalIntersector::inputCallback, this, _1, n);
00079 }
00080
00087 void IntervalIntersector::inputCallback(const calibration_msgs::IntervalConstPtr& interval_ptr, size_t i) {
00088 ROS_DEBUG("Got message on stream [%zu]", i);
00089 boost::mutex::scoped_lock lock(*mutexes[i]);
00090 if (queues[i].size() < max_queue_size) {
00091 queues[i].push_back(interval_ptr);
00092 queue_stats[i]->count++;
00093 if( interval_ptr->start == interval_ptr->end ) {
00094 queue_stats[i]->nil_count++;
00095 }
00096 }
00097 lock.unlock();
00098
00099 process_queues();
00100 }
00101
00102 void IntervalIntersector::process_queues() {
00103
00104 while (1) {
00105
00106 ros::Time start = ros::TIME_MIN;
00107 ros::Time end = ros::TIME_MAX;
00108 int queue_to_pop = -1;
00109 boost::mutex::scoped_lock processing_lock(processing_mutex);
00110 for (size_t i=0; i<queues.size(); i++) {
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123 boost::mutex::scoped_lock lock(*mutexes[i]);
00124 if (queues[i].empty()) {
00125
00126
00127 return;
00128 }
00129 if (queues[i][0]->start > start) {
00130 start = queues[i][0]->start;
00131 }
00132 if (queues[i][0]->end < end) {
00133 end = queues[i][0]->end;
00134 queue_to_pop = i;
00135 }
00136
00137 }
00138 if (queue_to_pop < 0) {
00139 ROS_ERROR("IntervalIntersection logic error");
00140 exit(-1);
00141 }
00142
00143
00144 if (start < end) {
00145
00146 calibration_msgs::Interval interval;
00147 interval.start = start;
00148 interval.end = end;
00149 output_callback_(interval);
00150 }
00151 else
00152 {
00153 ROS_DEBUG("Publishing null interval");
00154 calibration_msgs::Interval interval;
00155 interval.start = start;
00156 interval.end = start;
00157 output_callback_(interval);
00158 }
00159
00160 boost::mutex::scoped_lock lock(*mutexes[queue_to_pop]);
00161 queues[queue_to_pop].pop_front();
00162
00163
00164 }
00165 }
00166
00167 calibration_msgs::IntervalStatus IntervalIntersector::get_status() {
00168 calibration_msgs::IntervalStatus status;
00169
00170
00171
00172 boost::mutex::scoped_lock processing_lock(processing_mutex);
00173 size_t n = queue_stats.size();
00174 status.names.resize(n);
00175 status.yeild_rates.resize(n);
00176 for (size_t i=0; i<n; i++) {
00177 boost::mutex::scoped_lock lock(*mutexes[i]);
00178 int count = queue_stats[i]->count;
00179 if( 0 == count ) {
00180 status.yeild_rates[i] = 0.0;
00181 } else {
00182 double good = count - queue_stats[i]->nil_count;
00183 status.yeild_rates[i] = good / count;
00184 }
00185 }
00186 return status;
00187 }
00188