poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/poll_set.h"
36 #include "ros/file_log.h"
37 
39 
40 #include <ros/assert.h>
41 
42 #include <boost/bind.hpp>
43 
44 #include <fcntl.h>
45 
46 namespace ros
47 {
48 
50  : sockets_changed_(false), epfd_(create_socket_watcher())
51 {
52  if ( create_signal_pair(signal_pipe_) != 0 ) {
53  ROS_FATAL("create_signal_pair() failed");
54  ROS_BREAK();
55  }
56  addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
57  addEvents(signal_pipe_[0], POLLIN);
58 }
59 
61 {
64 }
65 
66 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
67 {
68  SocketInfo info;
69  info.fd_ = fd;
70  info.events_ = 0;
71  info.transport_ = transport;
72  info.func_ = update_func;
73 
74  {
75  boost::mutex::scoped_lock lock(socket_info_mutex_);
76 
77  bool b = socket_info_.insert(std::make_pair(fd, info)).second;
78  if (!b)
79  {
80  ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
81  return false;
82  }
83 
85 
86  sockets_changed_ = true;
87  }
88 
89  signal();
90 
91  return true;
92 }
93 
94 bool PollSet::delSocket(int fd)
95 {
96  if(fd < 0)
97  {
98  return false;
99  }
100 
101  boost::mutex::scoped_lock lock(socket_info_mutex_);
102  M_SocketInfo::iterator it = socket_info_.find(fd);
103  if (it != socket_info_.end())
104  {
105  socket_info_.erase(it);
106 
107  {
108  boost::mutex::scoped_lock lock(just_deleted_mutex_);
109  just_deleted_.push_back(fd);
110  }
111 
113 
114  sockets_changed_ = true;
115  signal();
116 
117  return true;
118  }
119 
120  ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
121 
122  return false;
123 }
124 
125 
126 bool PollSet::addEvents(int sock, int events)
127 {
128  boost::mutex::scoped_lock lock(socket_info_mutex_);
129 
130  M_SocketInfo::iterator it = socket_info_.find(sock);
131 
132  if (it == socket_info_.end())
133  {
134  ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
135  return false;
136  }
137 
138  it->second.events_ |= events;
139 
140  set_events_on_socket(epfd_, sock, it->second.events_);
141 
142  sockets_changed_ = true;
143  signal();
144 
145  return true;
146 }
147 
148 bool PollSet::delEvents(int sock, int events)
149 {
150  boost::mutex::scoped_lock lock(socket_info_mutex_);
151 
152  M_SocketInfo::iterator it = socket_info_.find(sock);
153  if (it != socket_info_.end())
154  {
155  it->second.events_ &= ~events;
156  }
157  else
158  {
159  ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
160  return false;
161  }
162 
163  set_events_on_socket(epfd_, sock, it->second.events_);
164 
165  sockets_changed_ = true;
166  signal();
167 
168  return true;
169 }
170 
172 {
173  boost::mutex::scoped_try_lock lock(signal_mutex_);
174 
175  if (lock.owns_lock())
176  {
177  char b = 0;
178  if (write_signal(signal_pipe_[1], &b, 1) < 0)
179  {
180  // do nothing... this prevents warnings on gcc 4.3
181  }
182  }
183 }
184 
185 
186 void PollSet::update(int poll_timeout)
187 {
189 
190  // Poll across the sockets we're servicing
191  boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout);
192  if (!ofds)
193  {
194  if (last_socket_error() != EINTR)
195  {
196  ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
197  }
198  }
199  else
200  {
201  for (std::vector<socket_pollfd>::iterator it = ofds->begin() ; it != ofds->end(); ++it)
202  {
203  int fd = it->fd;
204  int revents = it->revents;
205  SocketUpdateFunc func;
206  TransportPtr transport;
207  int events = 0;
208 
209  if (revents == 0)
210  {
211  continue;
212  }
213  {
214  boost::mutex::scoped_lock lock(socket_info_mutex_);
215  M_SocketInfo::iterator it = socket_info_.find(fd);
216  // the socket has been entirely deleted
217  if (it == socket_info_.end())
218  {
219  continue;
220  }
221 
222  const SocketInfo& info = it->second;
223 
224  // Store off the function and transport in case the socket is deleted from another thread
225  func = info.func_;
226  transport = info.transport_;
227  events = info.events_;
228  }
229 
230  // If these are registered events for this socket, OR the events are ERR/HUP/NVAL,
231  // call through to the registered function
232  if (func
233  && ((events & revents)
234  || (revents & POLLERR)
235  || (revents & POLLHUP)
236  || (revents & POLLNVAL)))
237  {
238  bool skip = false;
239  if (revents & (POLLNVAL|POLLERR|POLLHUP))
240  {
241  // If a socket was just closed and then the file descriptor immediately reused, we can
242  // get in here with what we think is a valid socket (since it was just re-added to our set)
243  // but which is actually referring to the previous fd with the same #. If this is the case,
244  // we ignore the first instance of one of these errors. If it's a real error we'll
245  // hit it again next time through.
246  boost::mutex::scoped_lock lock(just_deleted_mutex_);
247  if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end())
248  {
249  skip = true;
250  }
251  }
252 
253  if (!skip)
254  {
255  func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
256  }
257  }
258  }
259  }
260 
261  boost::mutex::scoped_lock lock(just_deleted_mutex_);
262  just_deleted_.clear();
263 
264 }
265 
267 {
268  boost::mutex::scoped_lock lock(socket_info_mutex_);
269 
270  if (!sockets_changed_)
271  {
272  return;
273  }
274 
275  // Build the list of structures to pass to poll for the sockets we're servicing
276  ufds_.resize(socket_info_.size());
277  M_SocketInfo::iterator sock_it = socket_info_.begin();
278  M_SocketInfo::iterator sock_end = socket_info_.end();
279  for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
280  {
281  const SocketInfo& info = sock_it->second;
282  socket_pollfd& pfd = ufds_[i];
283  pfd.fd = info.fd_;
284  pfd.events = info.events_;
285  pfd.revents = 0;
286  }
287  sockets_changed_ = false;
288 }
289 
291 {
292  if(events & POLLIN)
293  {
294  char b;
295  while(read_signal(signal_pipe_[0], &b, 1) > 0)
296  {
297  //do nothing keep draining
298  };
299  }
300 
301 }
302 
303 }
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
Definition: io.cpp:137
ROSCPP_DECL int create_socket_watcher()
Definition: io.cpp:102
boost::mutex signal_mutex_
Definition: poll_set.h:148
#define ROS_FATAL(...)
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
void onLocalPipeEvents(int events)
Called when events have been triggered on our signal pipe.
Definition: poll_set.cpp:290
TransportPtr transport_
Definition: poll_set.h:132
V_int just_deleted_
Definition: poll_set.h:144
M_SocketInfo socket_info_
Definition: poll_set.h:138
ROSCPP_DECL void close_socket_watcher(int fd)
Definition: io.cpp:114
signal_fd_t signal_pipe_[2]
Definition: poll_set.h:149
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
ssize_t write_signal(const signal_fd_t &signal, const void *buffer, const size_t &nbyte)
Definition: io.h:191
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sockets_changed_
Definition: poll_set.h:140
void update(int poll_timeout)
Process all socket events.
Definition: poll_set.cpp:186
void signal()
Signal our poll() call to finish if it&#39;s blocked waiting (see the poll_timeout option for update())...
Definition: poll_set.cpp:171
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:184
boost::function< void(int)> SocketUpdateFunc
Definition: poll_set.h:63
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
boost::mutex just_deleted_mutex_
Definition: poll_set.h:142
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
ssize_t read_signal(const signal_fd_t &signal, void *buffer, const size_t &nbyte)
Definition: io.h:208
struct pollfd socket_pollfd
Definition: io.h:138
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
Definition: io.cpp:149
void close_signal_pair(signal_fd_t signal_pair[2])
Definition: io.h:170
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:416
boost::mutex socket_info_mutex_
Definition: poll_set.h:139
void createNativePollset()
Creates the native pollset for our sockets, if any have changed.
Definition: poll_set.cpp:266
SocketUpdateFunc func_
Definition: poll_set.h:133
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
#define ROS_ERROR_STREAM(args)
#define ROS_BREAK()
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)
Definition: io.cpp:119
std::vector< socket_pollfd > ufds_
Definition: poll_set.h:146


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27