poll_set.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/poll_set.h"
00036 #include "ros/file_log.h"
00037 
00038 #include "ros/transport/transport.h"
00039 
00040 #include <ros/assert.h>
00041 
00042 #include <boost/bind.hpp>
00043 
00044 #include <fcntl.h>
00045 
00046 namespace ros
00047 {
00048 
00049 PollSet::PollSet()
00050 : sockets_changed_(false)
00051 {
00052         if ( create_signal_pair(signal_pipe_) != 0 ) {
00053         ROS_FATAL("create_signal_pair() failed");
00054     ROS_BREAK();
00055   }
00056   addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
00057   addEvents(signal_pipe_[0], POLLIN);
00058 }
00059 
00060 PollSet::~PollSet()
00061 {
00062   close_signal_pair(signal_pipe_);
00063 }
00064 
00065 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
00066 {
00067   SocketInfo info;
00068   info.fd_ = fd;
00069   info.events_ = 0;
00070   info.transport_ = transport;
00071   info.func_ = update_func;
00072 
00073   {
00074     boost::mutex::scoped_lock lock(socket_info_mutex_);
00075 
00076     bool b = socket_info_.insert(std::make_pair(fd, info)).second;
00077     if (!b)
00078     {
00079       ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
00080       return false;
00081     }
00082 
00083     sockets_changed_ = true;
00084   }
00085 
00086   signal();
00087 
00088   return true;
00089 }
00090 
00091 bool PollSet::delSocket(int fd)
00092 {
00093   if(fd < 0)
00094   {
00095     return false;
00096   }
00097 
00098   boost::mutex::scoped_lock lock(socket_info_mutex_);
00099   M_SocketInfo::iterator it = socket_info_.find(fd);
00100   if (it != socket_info_.end())
00101   {
00102     socket_info_.erase(it);
00103 
00104     {
00105       boost::mutex::scoped_lock lock(just_deleted_mutex_);
00106       just_deleted_.push_back(fd);
00107     }
00108 
00109     sockets_changed_ = true;
00110     signal();
00111 
00112     return true;
00113   }
00114 
00115   ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
00116 
00117   return false;
00118 }
00119 
00120 
00121 bool PollSet::addEvents(int sock, int events)
00122 {
00123   boost::mutex::scoped_lock lock(socket_info_mutex_);
00124 
00125   M_SocketInfo::iterator it = socket_info_.find(sock);
00126 
00127   if (it == socket_info_.end())
00128   {
00129     ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00130     return false;
00131   }
00132 
00133   it->second.events_ |= events;
00134 
00135   signal();
00136 
00137   return true;
00138 }
00139 
00140 bool PollSet::delEvents(int sock, int events)
00141 {
00142   boost::mutex::scoped_lock lock(socket_info_mutex_);
00143 
00144   M_SocketInfo::iterator it = socket_info_.find(sock);
00145   if (it != socket_info_.end())
00146   {
00147     it->second.events_ &= ~events;
00148   }
00149   else
00150   {
00151     ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00152     return false;
00153   }
00154 
00155   signal();
00156 
00157   return true;
00158 }
00159 
00160 void PollSet::signal()
00161 {
00162   boost::mutex::scoped_try_lock lock(signal_mutex_);
00163 
00164   if (lock.owns_lock())
00165   {
00166     char b = 0;
00167     if (write_signal(signal_pipe_[1], &b, 1) < 0)
00168     {
00169       // do nothing... this prevents warnings on gcc 4.3
00170     }
00171   }
00172 }
00173 
00174 
00175 void PollSet::update(int poll_timeout)
00176 {
00177   createNativePollset();
00178 
00179   // Poll across the sockets we're servicing
00180   int ret;
00181   size_t ufds_count = ufds_.size();
00182   if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0)
00183   {
00184           ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
00185     }
00186   else if (ret > 0)  // ret = 0 implies the poll timed out, nothing to do
00187   {
00188     // We have one or more sockets to service
00189     for(size_t i=0; i<ufds_count; i++)
00190     {
00191       if (ufds_[i].revents == 0)
00192       {
00193         continue;
00194       }
00195 
00196       SocketUpdateFunc func;
00197       TransportPtr transport;
00198       int events = 0;
00199       {
00200         boost::mutex::scoped_lock lock(socket_info_mutex_);
00201         M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd);
00202         // the socket has been entirely deleted
00203         if (it == socket_info_.end())
00204         {
00205           continue;
00206         }
00207 
00208         const SocketInfo& info = it->second;
00209 
00210         // Store off the function and transport in case the socket is deleted from another thread
00211         func = info.func_;
00212         transport = info.transport_;
00213         events = info.events_;
00214       }
00215 
00216       // If these are registered events for this socket, OR the events are ERR/HUP/NVAL,
00217       // call through to the registered function
00218       int revents = ufds_[i].revents;
00219       if (func
00220           && ((events & revents)
00221               || (revents & POLLERR)
00222               || (revents & POLLHUP)
00223               || (revents & POLLNVAL)))
00224       {
00225         bool skip = false;
00226         if (revents & (POLLNVAL|POLLERR|POLLHUP))
00227         {
00228           // If a socket was just closed and then the file descriptor immediately reused, we can
00229           // get in here with what we think is a valid socket (since it was just re-added to our set)
00230           // but which is actually referring to the previous fd with the same #.  If this is the case,
00231           // we ignore the first instance of one of these errors.  If it's a real error we'll
00232           // hit it again next time through.
00233           boost::mutex::scoped_lock lock(just_deleted_mutex_);
00234           if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end())
00235           {
00236             skip = true;
00237           }
00238         }
00239 
00240         if (!skip)
00241         {
00242           func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
00243         }
00244       }
00245 
00246       ufds_[i].revents = 0;
00247     }
00248 
00249     boost::mutex::scoped_lock lock(just_deleted_mutex_);
00250     just_deleted_.clear();
00251   }
00252 }
00253 
00254 void PollSet::createNativePollset()
00255 {
00256   boost::mutex::scoped_lock lock(socket_info_mutex_);
00257 
00258   if (!sockets_changed_)
00259   {
00260     return;
00261   }
00262 
00263   // Build the list of structures to pass to poll for the sockets we're servicing
00264   ufds_.resize(socket_info_.size());
00265   M_SocketInfo::iterator sock_it = socket_info_.begin();
00266   M_SocketInfo::iterator sock_end = socket_info_.end();
00267   for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
00268   {
00269     const SocketInfo& info = sock_it->second;
00270     socket_pollfd& pfd = ufds_[i];
00271     pfd.fd = info.fd_;
00272     pfd.events = info.events_;
00273     pfd.revents = 0;
00274   }
00275 }
00276 
00277 void PollSet::onLocalPipeEvents(int events)
00278 {
00279   if(events & POLLIN)
00280   {
00281     char b;
00282     while(read_signal(signal_pipe_[0], &b, 1) > 0)
00283     {
00284       //do nothing keep draining
00285     };
00286   }
00287 
00288 }
00289 
00290 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44