Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/poll_set.h"
00036 #include "ros/file_log.h"
00037
00038 #include "ros/transport/transport.h"
00039
00040 #include <ros/assert.h>
00041
00042 #include <boost/bind.hpp>
00043
00044 #include <fcntl.h>
00045
00046 namespace ros
00047 {
00048
00049 PollSet::PollSet()
00050 : sockets_changed_(false)
00051 {
00052 if ( create_signal_pair(signal_pipe_) != 0 ) {
00053 ROS_FATAL("create_signal_pair() failed");
00054 ROS_BREAK();
00055 }
00056 addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
00057 addEvents(signal_pipe_[0], POLLIN);
00058 }
00059
00060 PollSet::~PollSet()
00061 {
00062 close_signal_pair(signal_pipe_);
00063 }
00064
00065 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
00066 {
00067 SocketInfo info;
00068 info.fd_ = fd;
00069 info.events_ = 0;
00070 info.transport_ = transport;
00071 info.func_ = update_func;
00072
00073 {
00074 boost::mutex::scoped_lock lock(socket_info_mutex_);
00075
00076 bool b = socket_info_.insert(std::make_pair(fd, info)).second;
00077 if (!b)
00078 {
00079 ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
00080 return false;
00081 }
00082
00083 sockets_changed_ = true;
00084 }
00085
00086 signal();
00087
00088 return true;
00089 }
00090
00091 bool PollSet::delSocket(int fd)
00092 {
00093 if(fd < 0)
00094 {
00095 return false;
00096 }
00097
00098 boost::mutex::scoped_lock lock(socket_info_mutex_);
00099 M_SocketInfo::iterator it = socket_info_.find(fd);
00100 if (it != socket_info_.end())
00101 {
00102 socket_info_.erase(it);
00103
00104 {
00105 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00106 just_deleted_.push_back(fd);
00107 }
00108
00109 sockets_changed_ = true;
00110 signal();
00111
00112 return true;
00113 }
00114
00115 ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
00116
00117 return false;
00118 }
00119
00120
00121 bool PollSet::addEvents(int sock, int events)
00122 {
00123 boost::mutex::scoped_lock lock(socket_info_mutex_);
00124
00125 M_SocketInfo::iterator it = socket_info_.find(sock);
00126
00127 if (it == socket_info_.end())
00128 {
00129 ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00130 return false;
00131 }
00132
00133 it->second.events_ |= events;
00134
00135 signal();
00136
00137 return true;
00138 }
00139
00140 bool PollSet::delEvents(int sock, int events)
00141 {
00142 boost::mutex::scoped_lock lock(socket_info_mutex_);
00143
00144 M_SocketInfo::iterator it = socket_info_.find(sock);
00145 if (it != socket_info_.end())
00146 {
00147 it->second.events_ &= ~events;
00148 }
00149 else
00150 {
00151 ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
00152 return false;
00153 }
00154
00155 signal();
00156
00157 return true;
00158 }
00159
00160 void PollSet::signal()
00161 {
00162 boost::mutex::scoped_try_lock lock(signal_mutex_);
00163
00164 if (lock.owns_lock())
00165 {
00166 char b = 0;
00167 if (write_signal(signal_pipe_[1], &b, 1) < 0)
00168 {
00169
00170 }
00171 }
00172 }
00173
00174
00175 void PollSet::update(int poll_timeout)
00176 {
00177 createNativePollset();
00178
00179
00180 int ret;
00181 size_t ufds_count = ufds_.size();
00182 if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0)
00183 {
00184 ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
00185 }
00186 else if (ret > 0)
00187 {
00188
00189 for(size_t i=0; i<ufds_count; i++)
00190 {
00191 if (ufds_[i].revents == 0)
00192 {
00193 continue;
00194 }
00195
00196 SocketUpdateFunc func;
00197 TransportPtr transport;
00198 int events = 0;
00199 {
00200 boost::mutex::scoped_lock lock(socket_info_mutex_);
00201 M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd);
00202
00203 if (it == socket_info_.end())
00204 {
00205 continue;
00206 }
00207
00208 const SocketInfo& info = it->second;
00209
00210
00211 func = info.func_;
00212 transport = info.transport_;
00213 events = info.events_;
00214 }
00215
00216
00217
00218 int revents = ufds_[i].revents;
00219 if (func
00220 && ((events & revents)
00221 || (revents & POLLERR)
00222 || (revents & POLLHUP)
00223 || (revents & POLLNVAL)))
00224 {
00225 bool skip = false;
00226 if (revents & (POLLNVAL|POLLERR|POLLHUP))
00227 {
00228
00229
00230
00231
00232
00233 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00234 if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end())
00235 {
00236 skip = true;
00237 }
00238 }
00239
00240 if (!skip)
00241 {
00242 func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
00243 }
00244 }
00245
00246 ufds_[i].revents = 0;
00247 }
00248
00249 boost::mutex::scoped_lock lock(just_deleted_mutex_);
00250 just_deleted_.clear();
00251 }
00252 }
00253
00254 void PollSet::createNativePollset()
00255 {
00256 boost::mutex::scoped_lock lock(socket_info_mutex_);
00257
00258 if (!sockets_changed_)
00259 {
00260 return;
00261 }
00262
00263
00264 ufds_.resize(socket_info_.size());
00265 M_SocketInfo::iterator sock_it = socket_info_.begin();
00266 M_SocketInfo::iterator sock_end = socket_info_.end();
00267 for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
00268 {
00269 const SocketInfo& info = sock_it->second;
00270 socket_pollfd& pfd = ufds_[i];
00271 pfd.fd = info.fd_;
00272 pfd.events = info.events_;
00273 pfd.revents = 0;
00274 }
00275 }
00276
00277 void PollSet::onLocalPipeEvents(int events)
00278 {
00279 if(events & POLLIN)
00280 {
00281 char b;
00282 while(read_signal(signal_pipe_[0], &b, 1) > 0)
00283 {
00284
00285 };
00286 }
00287
00288 }
00289
00290 }
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05