Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 #include <vector>
00039 #include <deque>
00040 
00041 
00042 #include <boost/thread/mutex.hpp>
00043 #include <boost/bind.hpp>
00044 #include "ros/console.h"
00045 #include "interval_intersection/interval_intersection.hpp"
00046 
00047 using namespace std;
00048 using namespace interval_intersection;
00049 
00050 IntervalIntersector::IntervalIntersector(boost::function<void (const calibration_msgs::Interval&)> output_callback):
00051   max_queue_size(200),
00052   output_callback_(output_callback)
00053 {
00054 }
00055 
00056 IntervalIntersector::~IntervalIntersector()
00057 {
00058   
00059   
00060 }
00061 
00062 boost::function<void (const calibration_msgs::IntervalConstPtr&)> IntervalIntersector::getNewInputStream() {
00063   
00064   
00065   boost::mutex::scoped_lock processing_lock(processing_mutex);
00066   size_t n = queues.size();  
00067   for (size_t i=0; i<n; i++) {
00068     mutexes[i]->lock();
00069   }
00070   queues.resize(n+1);
00071   queue_stats.resize(n+1);
00072   queue_stats[n].reset(new queue_stat());
00073   mutexes.resize(n+1);
00074   mutexes[n].reset(new boost::mutex());
00075   for (size_t i=0; i<n; i++) {
00076     mutexes[i]->unlock();
00077   }
00078   return boost::bind(&IntervalIntersector::inputCallback, this, _1, n);
00079 }
00080 
00087 void IntervalIntersector::inputCallback(const calibration_msgs::IntervalConstPtr& interval_ptr, size_t i) {
00088   ROS_DEBUG("Got message on stream [%zu]", i);
00089   boost::mutex::scoped_lock lock(*mutexes[i]);
00090   if (queues[i].size() < max_queue_size) {
00091     queues[i].push_back(interval_ptr);
00092     queue_stats[i]->count++;
00093     if( interval_ptr->start == interval_ptr->end ) {
00094       queue_stats[i]->nil_count++;
00095     }
00096   }
00097   lock.unlock();
00098 
00099   process_queues();
00100 }
00101 
00102 void IntervalIntersector::process_queues() {
00103   
00104   while (1) {
00105     
00106     ros::Time start = ros::TIME_MIN;
00107     ros::Time end = ros::TIME_MAX;
00108     int queue_to_pop = -1;
00109     boost::mutex::scoped_lock processing_lock(processing_mutex);
00110     for (size_t i=0; i<queues.size(); i++) {
00111       
00112       
00113       
00114       
00115       
00116       
00117       
00118       
00119       
00120       
00121       
00122       
00123       boost::mutex::scoped_lock lock(*mutexes[i]);
00124       if (queues[i].empty()) {
00125         
00126         
00127         return;
00128       }
00129       if (queues[i][0]->start > start) {
00130         start = queues[i][0]->start;
00131       }
00132       if (queues[i][0]->end < end) {
00133         end = queues[i][0]->end;
00134         queue_to_pop = i;
00135       }
00136       
00137     }
00138     if (queue_to_pop < 0) {
00139       ROS_ERROR("IntervalIntersection logic error");
00140       exit(-1);
00141     }
00142     
00143     
00144     if (start < end) {
00145       
00146       calibration_msgs::Interval interval;
00147       interval.start = start;
00148       interval.end = end;
00149       output_callback_(interval);
00150     }
00151     else
00152     {
00153       ROS_DEBUG("Publishing null interval");
00154       calibration_msgs::Interval interval;
00155       interval.start = start;
00156       interval.end = start;
00157       output_callback_(interval);
00158     }
00159     
00160     boost::mutex::scoped_lock lock(*mutexes[queue_to_pop]);
00161     queues[queue_to_pop].pop_front();
00162     
00163     
00164   }
00165 }
00166 
00167 calibration_msgs::IntervalStatus IntervalIntersector::get_status() {
00168   calibration_msgs::IntervalStatus status;
00169   
00170   
00171 
00172   boost::mutex::scoped_lock processing_lock(processing_mutex);
00173   size_t n = queue_stats.size();
00174   status.names.resize(n);
00175   status.yeild_rates.resize(n);
00176   for (size_t i=0; i<n; i++) {
00177     boost::mutex::scoped_lock lock(*mutexes[i]);
00178     int count = queue_stats[i]->count;
00179     if( 0 == count ) {
00180        status.yeild_rates[i] = 0.0;
00181     } else {
00182        double good = count - queue_stats[i]->nil_count;
00183        status.yeild_rates[i] = good / count;
00184     }
00185   }
00186   return status;
00187 }
00188