poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/poll_set.h"
36 #include "ros/file_log.h"
37 
39 
40 #include <ros/assert.h>
41 
42 #include <boost/bind.hpp>
43 
44 #include <fcntl.h>
45 
46 namespace ros
47 {
48 
50 : sockets_changed_(false)
51 {
52  if ( create_signal_pair(signal_pipe_) != 0 ) {
53  ROS_FATAL("create_signal_pair() failed");
54  ROS_BREAK();
55  }
56  addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
57  addEvents(signal_pipe_[0], POLLIN);
58 }
59 
61 {
63 }
64 
65 bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
66 {
67  SocketInfo info;
68  info.fd_ = fd;
69  info.events_ = 0;
70  info.transport_ = transport;
71  info.func_ = update_func;
72 
73  {
74  boost::mutex::scoped_lock lock(socket_info_mutex_);
75 
76  bool b = socket_info_.insert(std::make_pair(fd, info)).second;
77  if (!b)
78  {
79  ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
80  return false;
81  }
82 
83  sockets_changed_ = true;
84  }
85 
86  signal();
87 
88  return true;
89 }
90 
91 bool PollSet::delSocket(int fd)
92 {
93  if(fd < 0)
94  {
95  return false;
96  }
97 
98  boost::mutex::scoped_lock lock(socket_info_mutex_);
99  M_SocketInfo::iterator it = socket_info_.find(fd);
100  if (it != socket_info_.end())
101  {
102  socket_info_.erase(it);
103 
104  {
105  boost::mutex::scoped_lock lock(just_deleted_mutex_);
106  just_deleted_.push_back(fd);
107  }
108 
109  sockets_changed_ = true;
110  signal();
111 
112  return true;
113  }
114 
115  ROSCPP_LOG_DEBUG("PollSet: Tried to delete fd [%d] which is not being tracked", fd);
116 
117  return false;
118 }
119 
120 
121 bool PollSet::addEvents(int sock, int events)
122 {
123  boost::mutex::scoped_lock lock(socket_info_mutex_);
124 
125  M_SocketInfo::iterator it = socket_info_.find(sock);
126 
127  if (it == socket_info_.end())
128  {
129  ROSCPP_LOG_DEBUG("PollSet: Tried to add events [%d] to fd [%d] which does not exist in this pollset", events, sock);
130  return false;
131  }
132 
133  it->second.events_ |= events;
134 
135  signal();
136 
137  return true;
138 }
139 
140 bool PollSet::delEvents(int sock, int events)
141 {
142  boost::mutex::scoped_lock lock(socket_info_mutex_);
143 
144  M_SocketInfo::iterator it = socket_info_.find(sock);
145  if (it != socket_info_.end())
146  {
147  it->second.events_ &= ~events;
148  }
149  else
150  {
151  ROSCPP_LOG_DEBUG("PollSet: Tried to delete events [%d] to fd [%d] which does not exist in this pollset", events, sock);
152  return false;
153  }
154 
155  signal();
156 
157  return true;
158 }
159 
161 {
162  boost::mutex::scoped_try_lock lock(signal_mutex_);
163 
164  if (lock.owns_lock())
165  {
166  char b = 0;
167  if (write_signal(signal_pipe_[1], &b, 1) < 0)
168  {
169  // do nothing... this prevents warnings on gcc 4.3
170  }
171  }
172 }
173 
174 
175 void PollSet::update(int poll_timeout)
176 {
178 
179  // Poll across the sockets we're servicing
180  int ret;
181  size_t ufds_count = ufds_.size();
182  if((ret = poll_sockets(&ufds_.front(), ufds_count, poll_timeout)) < 0)
183  {
184  ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
185  }
186  else if (ret > 0) // ret = 0 implies the poll timed out, nothing to do
187  {
188  // We have one or more sockets to service
189  for(size_t i=0; i<ufds_count; i++)
190  {
191  if (ufds_[i].revents == 0)
192  {
193  continue;
194  }
195 
196  SocketUpdateFunc func;
197  TransportPtr transport;
198  int events = 0;
199  {
200  boost::mutex::scoped_lock lock(socket_info_mutex_);
201  M_SocketInfo::iterator it = socket_info_.find(ufds_[i].fd);
202  // the socket has been entirely deleted
203  if (it == socket_info_.end())
204  {
205  continue;
206  }
207 
208  const SocketInfo& info = it->second;
209 
210  // Store off the function and transport in case the socket is deleted from another thread
211  func = info.func_;
212  transport = info.transport_;
213  events = info.events_;
214  }
215 
216  // If these are registered events for this socket, OR the events are ERR/HUP/NVAL,
217  // call through to the registered function
218  int revents = ufds_[i].revents;
219  if (func
220  && ((events & revents)
221  || (revents & POLLERR)
222  || (revents & POLLHUP)
223  || (revents & POLLNVAL)))
224  {
225  bool skip = false;
226  if (revents & (POLLNVAL|POLLERR|POLLHUP))
227  {
228  // If a socket was just closed and then the file descriptor immediately reused, we can
229  // get in here with what we think is a valid socket (since it was just re-added to our set)
230  // but which is actually referring to the previous fd with the same #. If this is the case,
231  // we ignore the first instance of one of these errors. If it's a real error we'll
232  // hit it again next time through.
233  boost::mutex::scoped_lock lock(just_deleted_mutex_);
234  if (std::find(just_deleted_.begin(), just_deleted_.end(), ufds_[i].fd) != just_deleted_.end())
235  {
236  skip = true;
237  }
238  }
239 
240  if (!skip)
241  {
242  func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
243  }
244  }
245 
246  ufds_[i].revents = 0;
247  }
248 
249  boost::mutex::scoped_lock lock(just_deleted_mutex_);
250  just_deleted_.clear();
251  }
252 }
253 
255 {
256  boost::mutex::scoped_lock lock(socket_info_mutex_);
257 
258  if (!sockets_changed_)
259  {
260  return;
261  }
262 
263  // Build the list of structures to pass to poll for the sockets we're servicing
264  ufds_.resize(socket_info_.size());
265  M_SocketInfo::iterator sock_it = socket_info_.begin();
266  M_SocketInfo::iterator sock_end = socket_info_.end();
267  for (int i = 0; sock_it != sock_end; ++sock_it, ++i)
268  {
269  const SocketInfo& info = sock_it->second;
270  socket_pollfd& pfd = ufds_[i];
271  pfd.fd = info.fd_;
272  pfd.events = info.events_;
273  pfd.revents = 0;
274  }
275 }
276 
278 {
279  if(events & POLLIN)
280  {
281  char b;
282  while(read_signal(signal_pipe_[0], &b, 1) > 0)
283  {
284  //do nothing keep draining
285  };
286  }
287 
288 }
289 
290 }
boost::mutex signal_mutex_
Definition: poll_set.h:148
#define ROS_FATAL(...)
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:121
void onLocalPipeEvents(int events)
Called when events have been triggered on our signal pipe.
Definition: poll_set.cpp:277
TransportPtr transport_
Definition: poll_set.h:132
V_int just_deleted_
Definition: poll_set.h:144
M_SocketInfo socket_info_
Definition: poll_set.h:138
signal_fd_t signal_pipe_[2]
Definition: poll_set.h:149
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:62
ssize_t write_signal(const signal_fd_t &signal, const void *buffer, const size_t &nbyte)
Definition: io.h:183
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
bool sockets_changed_
Definition: poll_set.h:140
void update(int poll_timeout)
Process all socket events.
Definition: poll_set.cpp:175
void signal()
Signal our poll() call to finish if it&#39;s blocked waiting (see the poll_timeout option for update())...
Definition: poll_set.cpp:160
ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:104
boost::function< void(int)> SocketUpdateFunc
Definition: poll_set.h:63
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:140
boost::mutex just_deleted_mutex_
Definition: poll_set.h:142
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:91
ssize_t read_signal(const signal_fd_t &signal, void *buffer, const size_t &nbyte)
Definition: io.h:200
struct pollfd socket_pollfd
Definition: io.h:138
void close_signal_pair(signal_fd_t signal_pair[2])
Definition: io.h:162
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:282
boost::mutex socket_info_mutex_
Definition: poll_set.h:139
void createNativePollset()
Creates the native pollset for our sockets, if any have changed.
Definition: poll_set.cpp:254
SocketUpdateFunc func_
Definition: poll_set.h:133
#define ROS_ERROR_STREAM(args)
#define ROS_BREAK()
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:65
std::vector< socket_pollfd > ufds_
Definition: poll_set.h:146


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Dec 20 2017 03:58:41