00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/poll_set.h"
00036 #include "ros/file_log.h"
00037
00038 #include "ros/transport/transport.h"
00039
00040 #include <ros/assert.h>
00041
00042 #include <boost/bind.hpp>
00043
00044 #include <sys/poll.h>
00045 #include <arpa/inet.h>
00046 #include <netdb.h>
00047 #include <fcntl.h>
00048
00049 namespace ros
00050 {
00051
00052 PollSet::PollSet()
00053 : sockets_changed_(false)
00054 {
00055 signal_pipe_[0] = -1;
00056 signal_pipe_[1] = -1;
00057
00058
00059 if(pipe(signal_pipe_) != 0)
00060 {
00061 ROS_FATAL( "pipe() failed");
00062 ROS_BREAK();
00063 }
00064 if(fcntl(signal_pipe_[0], F_SETFL, O_NONBLOCK) == -1)
00065 {
00066 ROS_FATAL( "fcntl() failed");
00067 ROS_BREAK();
00068 }
00069 if(fcntl(signal_pipe_[1], F_SETFL, O_NONBLOCK) == -1)
00070 {
00071 ROS_FATAL( "fcntl() failed");
00072 ROS_BREAK();
00073 }
00074
00075 addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
00076 addEvents(signal_pipe_[0], POLLIN);
00077 }
00078
00079 PollSet::~PollSet()
00080 {
00081 ::close(signal_pipe_[0]);
00082 ::close(signal_pipe_[1]);
00083 }
00084
00085 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
00086 {
00087 SocketInfo info;
00088 info.fd_ = fd;
00089 info.events_ = 0;
00090 info.transport_ = transport;
00091 info.func_ = update_func;
00092
00093 {
00094 boost::mutex::scoped_lock lock(socket_info_mutex_);
00095
00096 bool b = socket_info_.insert(std::make_pair(fd, info)).second;
00097 if (!b)
00098 {
00099 ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
00100 return false;
00101 }
00102
00103 sockets_changed_ = true;
00104 }
00105
00106 signal();
00107
00108 return true;
00109 }
00110
00111 bool PollSet::delSocket(int fd)
00112 {
00113 if(fd < 0)
00114 {
00115 return false;
00116 }
00117
00118 boost::mutex::scoped_lock lock(socket_info_mutex_);
00119 M_SocketInfo::iterator it = socket_info_.find(fd);
00120 if (it != socket_info_.end())
00121 {
00122 socket_info_.erase(it);
00123
00124 {
00125 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00126 just_deleted_.push_back(fd);
00127 }
00128
00129 sockets_changed_ = true;
00130 signal();
00131
00132 return true;
00133 }
00134
00135 ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
00136
00137 return false;
00138 }
00139
00140
00141 bool PollSet::addEvents(int sock, int events)
00142 {
00143 boost::mutex::scoped_lock lock(socket_info_mutex_);
00144
00145 M_SocketInfo::iterator it = socket_info_.find(sock);
00146
00147 if (it == socket_info_.end())
00148 {
00149 ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00150 return false;
00151 }
00152
00153 it->second.events_ |= events;
00154
00155 signal();
00156
00157 return true;
00158 }
00159
00160 bool PollSet::delEvents(int sock, int events)
00161 {
00162 boost::mutex::scoped_lock lock(socket_info_mutex_);
00163
00164 M_SocketInfo::iterator it = socket_info_.find(sock);
00165 if (it != socket_info_.end())
00166 {
00167 it->second.events_ &= ~events;
00168 }
00169 else
00170 {
00171 ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00172 return false;
00173 }
00174
00175 signal();
00176
00177 return true;
00178 }
00179
00180 void PollSet::signal()
00181 {
00182 boost::mutex::scoped_try_lock lock(signal_mutex_);
00183
00184 if (lock.owns_lock())
00185 {
00186 char b = 0;
00187 if (write(signal_pipe_[1], &b, 1) < 0)
00188 {
00189
00190 }
00191 }
00192 }
00193
00194
00195 void PollSet::update(int poll_timeout)
00196 {
00197 createNativePollset();
00198
00199
00200 int ret;
00201 size_t ufds_count = ufds_.size();
00202 if((ret = poll(&ufds_.front(), ufds_count, poll_timeout)) < 0)
00203 {
00204
00205
00206 if(errno != EINTR)
00207 {
00208 ROS_ERROR("poll failed with error [%s]", strerror(errno));
00209 }
00210 }
00211 else if (ret > 0)
00212 {
00213
00214 for(size_t i=0; i<ufds_count; i++)
00215 {
00216 if (ufds_[i].revents == 0)
00217 {
00218 continue;
00219 }
00220
00221 SocketUpdateFunc func;
00222 TransportPtr transport;
00223 int events = 0;
00224 {
00225 boost::mutex::scoped_lock lock(socket_info_mutex_);
00226 M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd);
00227
00228 if (it == socket_info_.end())
00229 {
00230 continue;
00231 }
00232
00233 const SocketInfo& info = it->second;
00234
00235
00236 func = info.func_;
00237 transport = info.transport_;
00238 events = info.events_;
00239 }
00240
00241
00242
00243 int revents = ufds_[i].revents;
00244 if (func
00245 && ((events & revents)
00246 || (revents & POLLERR)
00247 || (revents & POLLHUP)
00248 || (revents & POLLNVAL)))
00249 {
00250 bool skip = false;
00251 if (revents & (POLLNVAL|POLLERR|POLLHUP))
00252 {
00253
00254
00255
00256
00257
00258 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00259 if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end())
00260 {
00261 skip = true;
00262 }
00263 }
00264
00265 if (!skip)
00266 {
00267 func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
00268 }
00269 }
00270
00271 ufds_[i].revents = 0;
00272 }
00273
00274 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00275 just_deleted_.clear();
00276 }
00277 }
00278
00279 void PollSet::createNativePollset()
00280 {
00281 boost::mutex::scoped_lock lock(socket_info_mutex_);
00282
00283 if (!sockets_changed_)
00284 {
00285 return;
00286 }
00287
00288
00289 ufds_.resize(socket_info_.size());
00290 M_SocketInfo::iterator sock_it = socket_info_.begin();
00291 M_SocketInfo::iterator sock_end = socket_info_.end();
00292 for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
00293 {
00294 const SocketInfo& info = sock_it->second;
00295 struct pollfd& pfd = ufds_[i];
00296 pfd.fd = info.fd_;
00297 pfd.events = info.events_;
00298 pfd.revents = 0;
00299 }
00300 }
00301
00302 void PollSet::onLocalPipeEvents(int events)
00303 {
00304 if(events & POLLIN)
00305 {
00306 char b;
00307 while(read(signal_pipe_[0], &b, 1) > 0)
00308 {
00309
00310 };
00311 }
00312
00313 }
00314
00315 }