poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/poll_set.h"
36 #include "ros/file_log.h"
37 
39 
40 #include <ros/assert.h>
41 
42 #include <boost/bind.hpp>
43 
44 #include <fcntl.h>
45 
46 namespace ros
47 {
48 
50  : sockets_changed_(false), epfd_(create_socket_watcher())
51 {
52  if ( create_signal_pair(signal_pipe_) != 0 ) {
53  ROS_FATAL("create_signal_pair() failed");
54  ROS_BREAK();
55  }
56  addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
57  addEvents(signal_pipe_[0], POLLIN);
58 }
59 
61 {
64 }
65 
66 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
67 {
68  SocketInfo info;
69  info.fd_ = fd;
70  info.events_ = 0;
71  info.transport_ = transport;
72  info.func_ = update_func;
73 
74  {
75  boost::mutex::scoped_lock lock(socket_info_mutex_);
76 
77  bool b = socket_info_.insert(std::make_pair(fd, info)).second;
78  if (!b)
79  {
80  ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
81  return false;
82  }
83 
85 
86  sockets_changed_ = true;
87  }
88 
89  signal();
90 
91  return true;
92 }
93 
94 bool PollSet::delSocket(int fd)
95 {
96  if(fd < 0)
97  {
98  return false;
99  }
100 
101  boost::mutex::scoped_lock lock(socket_info_mutex_);
102  M_SocketInfo::iterator it = socket_info_.find(fd);
103  if (it != socket_info_.end())
104  {
105  socket_info_.erase(it);
106 
107  {
108  boost::mutex::scoped_lock lock(just_deleted_mutex_);
109  just_deleted_.push_back(fd);
110  }
111 
113 
114  sockets_changed_ = true;
115  signal();
116 
117  return true;
118  }
119 
120  ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
121 
122  return false;
123 }
124 
125 
126 bool PollSet::addEvents(int sock, int events)
127 {
128  boost::mutex::scoped_lock lock(socket_info_mutex_);
129 
130  M_SocketInfo::iterator it = socket_info_.find(sock);
131 
132  if (it == socket_info_.end())
133  {
134  ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
135  return false;
136  }
137 
138  it->second.events_ |= events;
139 
140  set_events_on_socket(epfd_, sock, it->second.events_);
141 
142  sockets_changed_ = true;
143  signal();
144 
145  return true;
146 }
147 
148 bool PollSet::delEvents(int sock, int events)
149 {
150  boost::mutex::scoped_lock lock(socket_info_mutex_);
151 
152  M_SocketInfo::iterator it = socket_info_.find(sock);
153  if (it != socket_info_.end())
154  {
155  it->second.events_ &= ~events;
156  }
157  else
158  {
159  ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
160  return false;
161  }
162 
163  set_events_on_socket(epfd_, sock, it->second.events_);
164 
165  sockets_changed_ = true;
166  signal();
167 
168  return true;
169 }
170 
172 {
173  boost::mutex::scoped_try_lock lock(signal_mutex_);
174 
175  if (lock.owns_lock())
176  {
177  char b = 0;
178  if (write_signal(signal_pipe_[1], &b, 1) < 0)
179  {
180  // do nothing... this prevents warnings on gcc 4.3
181  }
182  }
183 }
184 
185 
186 void PollSet::update(int poll_timeout)
187 {
189 
190  // Poll across the sockets we're servicing
191  boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout);
192  if (!ofds)
193  {
194  ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
195  }
196  else
197  {
198  for (std::vector<socket_pollfd>::iterator it = ofds->begin() ; it != ofds->end(); ++it)
199  {
200  int fd = it->fd;
201  int revents = it->revents;
202  SocketUpdateFunc func;
203  TransportPtr transport;
204  int events = 0;
205 
206  if (revents == 0)
207  {
208  continue;
209  }
210  {
211  boost::mutex::scoped_lock lock(socket_info_mutex_);
212  M_SocketInfo::iterator it = socket_info_.find(fd);
213  // the socket has been entirely deleted
214  if (it == socket_info_.end())
215  {
216  continue;
217  }
218 
219  const SocketInfo& info = it->second;
220 
221  // Store off the function and transport in case the socket is deleted from another thread
222  func = info.func_;
223  transport = info.transport_;
224  events = info.events_;
225  }
226 
227  // If these are registered events for this socket, OR the events are ERR/HUP/NVAL,
228  // call through to the registered function
229  if (func
230  && ((events & revents)
231  || (revents & POLLERR)
232  || (revents & POLLHUP)
233  || (revents & POLLNVAL)))
234  {
235  bool skip = false;
236  if (revents & (POLLNVAL|POLLERR|POLLHUP))
237  {
238  // If a socket was just closed and then the file descriptor immediately reused, we can
239  // get in here with what we think is a valid socket (since it was just re-added to our set)
240  // but which is actually referring to the previous fd with the same #. If this is the case,
241  // we ignore the first instance of one of these errors. If it's a real error we'll
242  // hit it again next time through.
243  boost::mutex::scoped_lock lock(just_deleted_mutex_);
244  if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end())
245  {
246  skip = true;
247  }
248  }
249 
250  if (!skip)
251  {
252  func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
253  }
254  }
255  }
256  }
257 
258  boost::mutex::scoped_lock lock(just_deleted_mutex_);
259  just_deleted_.clear();
260 
261 }
262 
264 {
265  boost::mutex::scoped_lock lock(socket_info_mutex_);
266 
267  if (!sockets_changed_)
268  {
269  return;
270  }
271 
272  // Build the list of structures to pass to poll for the sockets we're servicing
273  ufds_.resize(socket_info_.size());
274  M_SocketInfo::iterator sock_it = socket_info_.begin();
275  M_SocketInfo::iterator sock_end = socket_info_.end();
276  for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
277  {
278  const SocketInfo& info = sock_it->second;
279  socket_pollfd& pfd = ufds_[i];
280  pfd.fd = info.fd_;
281  pfd.events = info.events_;
282  pfd.revents = 0;
283  }
284  sockets_changed_ = false;
285 }
286 
288 {
289  if(events & POLLIN)
290  {
291  char b;
292  while(read_signal(signal_pipe_[0], &b, 1) > 0)
293  {
294  //do nothing keep draining
295  };
296  }
297 
298 }
299 
300 }
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
Definition: io.cpp:137
ROSCPP_DECL int create_socket_watcher()
Definition: io.cpp:102
boost::mutex signal_mutex_
Definition: poll_set.h:148
#define ROS_FATAL(...)
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
void onLocalPipeEvents(int events)
Called when events have been triggered on our signal pipe.
Definition: poll_set.cpp:287
TransportPtr transport_
Definition: poll_set.h:132
V_int just_deleted_
Definition: poll_set.h:144
M_SocketInfo socket_info_
Definition: poll_set.h:138
ROSCPP_DECL void close_socket_watcher(int fd)
Definition: io.cpp:114
signal_fd_t signal_pipe_[2]
Definition: poll_set.h:149
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
ssize_t write_signal(const signal_fd_t &signal, const void *buffer, const size_t &nbyte)
Definition: io.h:191
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sockets_changed_
Definition: poll_set.h:140
void update(int poll_timeout)
Process all socket events.
Definition: poll_set.cpp:186
void signal()
Signal our poll() call to finish if it&#39;s blocked waiting (see the poll_timeout option for update())...
Definition: poll_set.cpp:171
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:184
boost::function< void(int)> SocketUpdateFunc
Definition: poll_set.h:63
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
boost::mutex just_deleted_mutex_
Definition: poll_set.h:142
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
ssize_t read_signal(const signal_fd_t &signal, void *buffer, const size_t &nbyte)
Definition: io.h:208
struct pollfd socket_pollfd
Definition: io.h:138
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
Definition: io.cpp:149
void close_signal_pair(signal_fd_t signal_pair[2])
Definition: io.h:170
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:416
boost::mutex socket_info_mutex_
Definition: poll_set.h:139
void createNativePollset()
Creates the native pollset for our sockets, if any have changed.
Definition: poll_set.cpp:263
SocketUpdateFunc func_
Definition: poll_set.h:133
#define ROS_ERROR_STREAM(args)
#define ROS_BREAK()
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)
Definition: io.cpp:119
std::vector< socket_pollfd > ufds_
Definition: poll_set.h:146


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Feb 3 2019 03:29:54