$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/poll_set.h" 00036 #include "ros/file_log.h" 00037 00038 #include "ros/transport/transport.h" 00039 00040 #include <ros/assert.h> 00041 00042 #include <boost/bind.hpp> 00043 00044 #include <fcntl.h> 00045 00046 namespace ros 00047 { 00048 00049 PollSet::PollSet() 00050 : sockets_changed_(false) 00051 { 00052 if ( create_signal_pair(signal_pipe_) != 0 ) { 00053 ROS_FATAL("create_signal_pair() failed"); 00054 ROS_BREAK(); 00055 } 00056 addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1)); 00057 addEvents(signal_pipe_[0], POLLIN); 00058 } 00059 00060 PollSet::~PollSet() 00061 { 00062 close_signal_pair(signal_pipe_); 00063 } 00064 00065 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport) 00066 { 00067 SocketInfo info; 00068 info.fd_ = fd; 00069 info.events_ = 0; 00070 info.transport_ = transport; 00071 info.func_ = update_func; 00072 00073 { 00074 boost::mutex::scoped_lock lock(socket_info_mutex_); 00075 00076 bool b = socket_info_.insert(std::make_pair(fd, info)).second; 00077 if (!b) 00078 { 00079 ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd); 00080 return false; 00081 } 00082 00083 sockets_changed_ = true; 00084 } 00085 00086 signal(); 00087 00088 return true; 00089 } 00090 00091 bool PollSet::delSocket(int fd) 00092 { 00093 if(fd < 0) 00094 { 00095 return false; 00096 } 00097 00098 boost::mutex::scoped_lock lock(socket_info_mutex_); 00099 M_SocketInfo::iterator it = socket_info_.find(fd); 00100 if (it != socket_info_.end()) 00101 { 00102 socket_info_.erase(it); 00103 00104 { 00105 boost::mutex::scoped_lock lock(just_deleted_mutex_); 00106 just_deleted_.push_back(fd); 00107 } 00108 00109 sockets_changed_ = true; 00110 signal(); 00111 00112 return true; 00113 } 00114 00115 ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd); 00116 00117 return false; 00118 } 00119 00120 00121 bool PollSet::addEvents(int sock, int events) 00122 { 00123 boost::mutex::scoped_lock lock(socket_info_mutex_); 00124 00125 M_SocketInfo::iterator it = socket_info_.find(sock); 00126 00127 if (it == socket_info_.end()) 00128 { 00129 ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock); 00130 return false; 00131 } 00132 00133 it->second.events_ |= events; 00134 00135 signal(); 00136 00137 return true; 00138 } 00139 00140 bool PollSet::delEvents(int sock, int events) 00141 { 00142 boost::mutex::scoped_lock lock(socket_info_mutex_); 00143 00144 M_SocketInfo::iterator it = socket_info_.find(sock); 00145 if (it != socket_info_.end()) 00146 { 00147 it->second.events_ &= ~events; 00148 } 00149 else 00150 { 00151 ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock); 00152 return false; 00153 } 00154 00155 signal(); 00156 00157 return true; 00158 } 00159 00160 void PollSet::signal() 00161 { 00162 boost::mutex::scoped_try_lock lock(signal_mutex_); 00163 00164 if (lock.owns_lock()) 00165 { 00166 char b = 0; 00167 if (write_signal(signal_pipe_[1], &b, 1) < 0) 00168 { 00169 // do nothing... this prevents warnings on gcc 4.3 00170 } 00171 } 00172 } 00173 00174 00175 void PollSet::update(int poll_timeout) 00176 { 00177 createNativePollset(); 00178 00179 // Poll across the sockets we're servicing 00180 int ret; 00181 size_t ufds_count = ufds_.size(); 00182 if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0) 00183 { 00184 ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string()); 00185 } 00186 else if (ret > 0) // ret = 0 implies the poll timed out, nothing to do 00187 { 00188 // We have one or more sockets to service 00189 for(size_t i=0; i<ufds_count; i++) 00190 { 00191 if (ufds_[i].revents == 0) 00192 { 00193 continue; 00194 } 00195 00196 SocketUpdateFunc func; 00197 TransportPtr transport; 00198 int events = 0; 00199 { 00200 boost::mutex::scoped_lock lock(socket_info_mutex_); 00201 M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd); 00202 // the socket has been entirely deleted 00203 if (it == socket_info_.end()) 00204 { 00205 continue; 00206 } 00207 00208 const SocketInfo& info = it->second; 00209 00210 // Store off the function and transport in case the socket is deleted from another thread 00211 func = info.func_; 00212 transport = info.transport_; 00213 events = info.events_; 00214 } 00215 00216 // If these are registered events for this socket, OR the events are ERR/HUP/NVAL, 00217 // call through to the registered function 00218 int revents = ufds_[i].revents; 00219 if (func 00220 && ((events & revents) 00221 || (revents & POLLERR) 00222 || (revents & POLLHUP) 00223 || (revents & POLLNVAL))) 00224 { 00225 bool skip = false; 00226 if (revents & (POLLNVAL|POLLERR|POLLHUP)) 00227 { 00228 // If a socket was just closed and then the file descriptor immediately reused, we can 00229 // get in here with what we think is a valid socket (since it was just re-added to our set) 00230 // but which is actually referring to the previous fd with the same #. If this is the case, 00231 // we ignore the first instance of one of these errors. If it's a real error we'll 00232 // hit it again next time through. 00233 boost::mutex::scoped_lock lock(just_deleted_mutex_); 00234 if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end()) 00235 { 00236 skip = true; 00237 } 00238 } 00239 00240 if (!skip) 00241 { 00242 func(revents & (events|POLLERR|POLLHUP|POLLNVAL)); 00243 } 00244 } 00245 00246 ufds_[i].revents = 0; 00247 } 00248 00249 boost::mutex::scoped_lock lock(just_deleted_mutex_); 00250 just_deleted_.clear(); 00251 } 00252 } 00253 00254 void PollSet::createNativePollset() 00255 { 00256 boost::mutex::scoped_lock lock(socket_info_mutex_); 00257 00258 if (!sockets_changed_) 00259 { 00260 return; 00261 } 00262 00263 // Build the list of structures to pass to poll for the sockets we're servicing 00264 ufds_.resize(socket_info_.size()); 00265 M_SocketInfo::iterator sock_it = socket_info_.begin(); 00266 M_SocketInfo::iterator sock_end = socket_info_.end(); 00267 for (int i = 0; sock_it != sock_end; ++sock_it, ++i) 00268 { 00269 const SocketInfo& info = sock_it->second; 00270 socket_pollfd& pfd = ufds_[i]; 00271 pfd.fd = info.fd_; 00272 pfd.events = info.events_; 00273 pfd.revents = 0; 00274 } 00275 } 00276 00277 void PollSet::onLocalPipeEvents(int events) 00278 { 00279 if(events & POLLIN) 00280 { 00281 char b; 00282 while(read_signal(signal_pipe_[0], &b, 1) > 0) 00283 { 00284 //do nothing keep draining 00285 }; 00286 } 00287 00288 } 00289 00290 }