interval_intersection.cpp
Go to the documentation of this file.
00001 /**********************************************************************
00002  *
00003  * Software License Agreement (BSD License)
00004  *
00005  *  Copyright (c) 2009, Willow Garage, Inc.
00006  *  All rights reserved.
00007  *
00008  *  Redistribution and use in source and binary forms, with or without
00009  *  modification, are permitted provided that the following conditions
00010  *  are met:
00011  *
00012  *   * Redistributions of source code must retain the above copyright
00013  *     notice, this list of conditions and the following disclaimer.
00014  *   * Redistributions in binary form must reproduce the above
00015  *     copyright notice, this list of conditions and the following
00016  *     disclaimer in the documentation and/or other materials provided
00017  *     with the distribution.
00018  *   * Neither the name of the Willow Garage nor the names of its
00019  *     contributors may be used to endorse or promote products derived
00020  *     from this software without specific prior written permission.
00021  *
00022  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033  *  POSSIBILITY OF SUCH DAMAGE.
00034  *********************************************************************/
00035 
00036 //#include <stdio.h>
00037 //#include <iostream>
00038 #include <vector>
00039 #include <deque>
00040 //#include <fstream>
00041 
00042 #include <boost/thread/mutex.hpp>
00043 #include <boost/bind.hpp>
00044 #include "ros/console.h"
00045 #include "interval_intersection/interval_intersection.hpp"
00046 
00047 using namespace std;
00048 using namespace interval_intersection;
00049 
00050 IntervalIntersector::IntervalIntersector(boost::function<void (const calibration_msgs::Interval&)> output_callback):
00051   max_queue_size(200),
00052   output_callback_(output_callback)
00053 {
00054 }
00055 
00056 IntervalIntersector::~IntervalIntersector()
00057 {
00058   // If callbacks continue during or after destruction, bad things can happen,
00059   // so ensure proper shutdown of the callback streams before destruction.
00060 }
00061 
00062 boost::function<void (const calibration_msgs::IntervalConstPtr&)> IntervalIntersector::getNewInputStream() {
00063   // Changes the configuration of the set of queues,
00064   // so we lock all mutexes.
00065   boost::mutex::scoped_lock processing_lock(processing_mutex);
00066   size_t n = queues.size();  // Protected by above lock
00067   for (size_t i=0; i<n; i++) {
00068     mutexes[i]->lock();
00069   }
00070   queues.resize(n+1);
00071   queue_stats.resize(n+1);
00072   queue_stats[n].reset(new queue_stat());
00073   mutexes.resize(n+1);
00074   mutexes[n].reset(new boost::mutex());
00075   for (size_t i=0; i<n; i++) {
00076     mutexes[i]->unlock();
00077   }
00078   return boost::bind(&IntervalIntersector::inputCallback, this, _1, n);
00079 }
00080 
00087 void IntervalIntersector::inputCallback(const calibration_msgs::IntervalConstPtr& interval_ptr, size_t i) {
00088   ROS_DEBUG("Got message on stream [%zu]", i);
00089   boost::mutex::scoped_lock lock(*mutexes[i]);
00090   if (queues[i].size() < max_queue_size) {
00091     queues[i].push_back(interval_ptr);
00092     queue_stats[i]->count++;
00093     if( interval_ptr->start == interval_ptr->end ) {
00094       queue_stats[i]->nil_count++;
00095     }
00096   }
00097   lock.unlock();
00098 
00099   process_queues();
00100 }
00101 
00102 void IntervalIntersector::process_queues() {
00103   //cout << "processing" << endl;
00104   while (1) {
00105     // 1) Determine which interval to publish next
00106     ros::Time start = ros::TIME_MIN;
00107     ros::Time end = ros::TIME_MAX;
00108     int queue_to_pop = -1;
00109     boost::mutex::scoped_lock processing_lock(processing_mutex);
00110     for (size_t i=0; i<queues.size(); i++) {
00111       // It looks like we could get away without locking the mutexes to
00112       // determine the min and max since even though the queues may change
00113       // because new elements are added, the first element if it exists
00114       // does not change because we hold the processing mutex.
00115       // However there is a possibility that the location of queue[i][0]
00116       // changes after its address has been computed but before its value
00117       // has been read. In other words the statement
00118       // "start = queues[i][0]->start" is not atomic.
00119       // In fact even queues[i].empty() is only guaranteed to make sense
00120       // in between other method calls, not during.
00121       // Whether or not deque is in fact thread-safe here is implementation
00122       // dependent.
00123       boost::mutex::scoped_lock lock(*mutexes[i]);
00124       if (queues[i].empty()) {
00125         // We can't determine the next interval: nothing to do
00126         //cout << "nothing to do" << endl;
00127         return;
00128       }
00129       if (queues[i][0]->start > start) {
00130         start = queues[i][0]->start;
00131       }
00132       if (queues[i][0]->end < end) {
00133         end = queues[i][0]->end;
00134         queue_to_pop = i;
00135       }
00136       // mutexes[i] released
00137     }
00138     if (queue_to_pop < 0) {
00139       ROS_ERROR("IntervalIntersection logic error");
00140       exit(-1);
00141     }
00142     // 2) Publish the interval
00143     //cout << "about to publish" << endl;
00144     if (start < end) {
00145       // Output the interval
00146       calibration_msgs::Interval interval;
00147       interval.start = start;
00148       interval.end = end;
00149       output_callback_(interval);
00150     }
00151     else
00152     {
00153       ROS_DEBUG("Publishing null interval");
00154       calibration_msgs::Interval interval;
00155       interval.start = start;
00156       interval.end = start;
00157       output_callback_(interval);
00158     }
00159     // 3) Pop the input interval with earliest end time
00160     boost::mutex::scoped_lock lock(*mutexes[queue_to_pop]);
00161     queues[queue_to_pop].pop_front();
00162     // mutexes[queue_to_pop] unlocks
00163     // processing_mutex unlocks
00164   }
00165 }
00166 
00167 calibration_msgs::IntervalStatus IntervalIntersector::get_status() {
00168   calibration_msgs::IntervalStatus status;
00169   // we only provide the raw status; the upper levels have to fill in the
00170   // header stamp and the names
00171 
00172   boost::mutex::scoped_lock processing_lock(processing_mutex);
00173   size_t n = queue_stats.size();
00174   status.names.resize(n);
00175   status.yeild_rates.resize(n);
00176   for (size_t i=0; i<n; i++) {
00177     boost::mutex::scoped_lock lock(*mutexes[i]);
00178     int count = queue_stats[i]->count;
00179     if( 0 == count ) {
00180        status.yeild_rates[i] = 0.0;
00181     } else {
00182        double good = count - queue_stats[i]->nil_count;
00183        status.yeild_rates[i] = good / count;
00184     }
00185   }
00186   return status;
00187 }
00188 


interval_intersection
Author(s): Romain Thibaux
autogenerated on Sun Oct 5 2014 22:43:47