poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/poll_set.h"
36 #include "ros/file_log.h"
37 
39 
40 #include <ros/assert.h>
41 
42 #include <boost/bind.hpp>
43 
44 #include <fcntl.h>
45 
46 namespace ros
47 {
48 
50  : sockets_changed_(false), epfd_(create_socket_watcher())
51 {
52  if ( create_signal_pair(signal_pipe_) != 0 ) {
53  ROS_FATAL("create_signal_pair() failed");
54  ROS_BREAK();
55  }
56  addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
57  addEvents(signal_pipe_[0], POLLIN);
58 }
59 
61 {
64 }
65 
66 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
67 {
68  SocketInfo info;
69  info.fd_ = fd;
70  info.events_ = 0;
71  info.transport_ = transport;
72  info.func_ = update_func;
73 
74  {
75  boost::mutex::scoped_lock lock(socket_info_mutex_);
76 
77  bool b = socket_info_.insert(std::make_pair(fd, info)).second;
78  if (!b)
79  {
80  ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
81  return false;
82  }
83 
85 
86  sockets_changed_ = true;
87  }
88 
89  signal();
90 
91  return true;
92 }
93 
94 bool PollSet::delSocket(int fd)
95 {
96  if(fd < 0)
97  {
98  return false;
99  }
100 
101  boost::mutex::scoped_lock lock(socket_info_mutex_);
102  M_SocketInfo::iterator it = socket_info_.find(fd);
103  if (it != socket_info_.end())
104  {
105  socket_info_.erase(it);
106 
107  {
108  boost::mutex::scoped_lock lock(just_deleted_mutex_);
109  just_deleted_.push_back(fd);
110  }
111 
113 
114  sockets_changed_ = true;
115  signal();
116 
117  return true;
118  }
119 
120  ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
121 
122  return false;
123 }
124 
125 
126 bool PollSet::addEvents(int sock, int events)
127 {
128  boost::mutex::scoped_lock lock(socket_info_mutex_);
129 
130  M_SocketInfo::iterator it = socket_info_.find(sock);
131 
132  if (it == socket_info_.end())
133  {
134  ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
135  return false;
136  }
137 
138  it->second.events_ |= events;
139 
140  set_events_on_socket(epfd_, sock, it->second.events_);
141 
142  signal();
143 
144  return true;
145 }
146 
147 bool PollSet::delEvents(int sock, int events)
148 {
149  boost::mutex::scoped_lock lock(socket_info_mutex_);
150 
151  M_SocketInfo::iterator it = socket_info_.find(sock);
152  if (it != socket_info_.end())
153  {
154  it->second.events_ &= ~events;
155  }
156  else
157  {
158  ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
159  return false;
160  }
161 
162  set_events_on_socket(epfd_, sock, it->second.events_);
163 
164  signal();
165 
166  return true;
167 }
168 
170 {
171  boost::mutex::scoped_try_lock lock(signal_mutex_);
172 
173  if (lock.owns_lock())
174  {
175  char b = 0;
176  if (write_signal(signal_pipe_[1], &b, 1) < 0)
177  {
178  // do nothing... this prevents warnings on gcc 4.3
179  }
180  }
181 }
182 
183 
184 void PollSet::update(int poll_timeout)
185 {
187 
188  // Poll across the sockets we're servicing
189  boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout);
190  if (!ofds)
191  {
192  ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
193  }
194  else
195  {
196  for (std::vector<socket_pollfd>::iterator it = ofds->begin() ; it != ofds->end(); ++it)
197  {
198  int fd = it->fd;
199  int revents = it->revents;
200  SocketUpdateFunc func;
201  TransportPtr transport;
202  int events = 0;
203 
204  if (revents == 0)
205  {
206  continue;
207  }
208  {
209  boost::mutex::scoped_lock lock(socket_info_mutex_);
210  M_SocketInfo::iterator it = socket_info_.find(fd);
211  // the socket has been entirely deleted
212  if (it == socket_info_.end())
213  {
214  continue;
215  }
216 
217  const SocketInfo& info = it->second;
218 
219  // Store off the function and transport in case the socket is deleted from another thread
220  func = info.func_;
221  transport = info.transport_;
222  events = info.events_;
223  }
224 
225  // If these are registered events for this socket, OR the events are ERR/HUP/NVAL,
226  // call through to the registered function
227  if (func
228  && ((events & revents)
229  || (revents & POLLERR)
230  || (revents & POLLHUP)
231  || (revents & POLLNVAL)))
232  {
233  bool skip = false;
234  if (revents & (POLLNVAL|POLLERR|POLLHUP))
235  {
236  // If a socket was just closed and then the file descriptor immediately reused, we can
237  // get in here with what we think is a valid socket (since it was just re-added to our set)
238  // but which is actually referring to the previous fd with the same #. If this is the case,
239  // we ignore the first instance of one of these errors. If it's a real error we'll
240  // hit it again next time through.
241  boost::mutex::scoped_lock lock(just_deleted_mutex_);
242  if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end())
243  {
244  skip = true;
245  }
246  }
247 
248  if (!skip)
249  {
250  func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
251  }
252  }
253  }
254  }
255 
256  boost::mutex::scoped_lock lock(just_deleted_mutex_);
257  just_deleted_.clear();
258 
259 }
260 
262 {
263  boost::mutex::scoped_lock lock(socket_info_mutex_);
264 
265  if (!sockets_changed_)
266  {
267  return;
268  }
269 
270  // Build the list of structures to pass to poll for the sockets we're servicing
271  ufds_.resize(socket_info_.size());
272  M_SocketInfo::iterator sock_it = socket_info_.begin();
273  M_SocketInfo::iterator sock_end = socket_info_.end();
274  for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
275  {
276  const SocketInfo& info = sock_it->second;
277  socket_pollfd& pfd = ufds_[i];
278  pfd.fd = info.fd_;
279  pfd.events = info.events_;
280  pfd.revents = 0;
281  }
282  sockets_changed_ = false;
283 }
284 
286 {
287  if(events & POLLIN)
288  {
289  char b;
290  while(read_signal(signal_pipe_[0], &b, 1) > 0)
291  {
292  //do nothing keep draining
293  };
294  }
295 
296 }
297 
298 }
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
Definition: io.cpp:135
ROSCPP_DECL int create_socket_watcher()
Definition: io.cpp:102
boost::mutex signal_mutex_
Definition: poll_set.h:148
#define ROS_FATAL(...)
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
void onLocalPipeEvents(int events)
Called when events have been triggered on our signal pipe.
Definition: poll_set.cpp:285
TransportPtr transport_
Definition: poll_set.h:132
V_int just_deleted_
Definition: poll_set.h:144
M_SocketInfo socket_info_
Definition: poll_set.h:138
ROSCPP_DECL void close_socket_watcher(int fd)
Definition: io.cpp:114
signal_fd_t signal_pipe_[2]
Definition: poll_set.h:149
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
ssize_t write_signal(const signal_fd_t &signal, const void *buffer, const size_t &nbyte)
Definition: io.h:191
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sockets_changed_
Definition: poll_set.h:140
void update(int poll_timeout)
Process all socket events.
Definition: poll_set.cpp:184
void signal()
Signal our poll() call to finish if it&#39;s blocked waiting (see the poll_timeout option for update())...
Definition: poll_set.cpp:169
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:180
boost::function< void(int)> SocketUpdateFunc
Definition: poll_set.h:63
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:147
boost::mutex just_deleted_mutex_
Definition: poll_set.h:142
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
ssize_t read_signal(const signal_fd_t &signal, void *buffer, const size_t &nbyte)
Definition: io.h:208
struct pollfd socket_pollfd
Definition: io.h:138
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
Definition: io.cpp:147
void close_signal_pair(signal_fd_t signal_pair[2])
Definition: io.h:170
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:412
boost::mutex socket_info_mutex_
Definition: poll_set.h:139
void createNativePollset()
Creates the native pollset for our sockets, if any have changed.
Definition: poll_set.cpp:261
SocketUpdateFunc func_
Definition: poll_set.h:133
#define ROS_ERROR_STREAM(args)
#define ROS_BREAK()
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)
Definition: io.cpp:119
std::vector< socket_pollfd > ufds_
Definition: poll_set.h:146


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Mar 21 2018 07:13:27