XmlRpcDispatch.cpp
Go to the documentation of this file.
1 
4 #include "xmlrpcpp/XmlRpcUtil.h"
5 
6 #include "ros/time.h"
7 
8 #include <math.h>
9 #include <errno.h>
10 #include <sys/timeb.h>
11 
12 #if defined(_WINDOWS)
13 # include <winsock2.h>
14 static int poll( struct pollfd *pfd, int nfds, int timeout)
15 {
16  // workaround: "Windows 8 Bugs 309411 – WSAPoll does not report failed connections"
17  // https://curl.haxx.se/mail/lib-2012-10/0038.html
18  // the following logic is to use select() to check all writable socket connnection status.
19  // if all the sockets to be checked are not connected, it reports SOCKET_ERROR to
20  // error out, instead of going to WSAPoll() which causes infinitely wait situation.
21  FD_SET writable;
22  FD_SET error;
23  FD_ZERO(&writable);
24  FD_ZERO(&error);
25  for (int i = 0; i < nfds; ++i)
26  {
27  if (pfd[i].events & POLLOUT)
28  {
29  FD_SET(pfd[i].fd, &writable);
30  FD_SET(pfd[i].fd, &error);
31  }
32  }
33 
34  int connectionError = 0;
35  if (writable.fd_count > 0)
36  {
37  int result = select(0, nullptr, &writable, &error, nullptr);
38  if (SOCKET_ERROR == result)
39  {
40  return SOCKET_ERROR;
41  }
42 
43  if (0 != result)
44  {
45  for (int i = 0; i < nfds; ++i)
46  {
47  if ((pfd[i].events & POLLOUT) &&
48  (FD_ISSET(pfd[i].fd, &error)))
49  {
50  connectionError++;
51  }
52  }
53  }
54  }
55 
56  if (connectionError == nfds)
57  {
58  // error out if all sockets are failed to connect.
59  return SOCKET_ERROR;
60  }
61  else
62  {
63  return WSAPoll(pfd, nfds, timeout);
64  }
65 }
66 
67 # define USE_FTIME
68 # if defined(_MSC_VER)
69 # define timeb _timeb
70 # define ftime _ftime_s
71 # endif
72 #else
73 # include <sys/poll.h>
74 # include <sys/time.h>
75 #endif // _WINDOWS
76 
77 
78 using namespace XmlRpc;
79 
80 
82 {
83  _endTime = -1.0;
84  _doClear = false;
85  _inWork = false;
86 }
87 
88 
90 {
91 }
92 
93 // Monitor this source for the specified events and call its event handler
94 // when the event occurs
95 void
96 XmlRpcDispatch::addSource(XmlRpcSource* source, unsigned mask)
97 {
98  _sources.push_back(MonitoredSource(source, mask));
99 }
100 
101 // Stop monitoring this source. Does not close the source.
102 void
104 {
105  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
106  if (it->getSource() == source)
107  {
108  _sources.erase(it);
109  break;
110  }
111 }
112 
113 
114 // Modify the types of events to watch for on this source
115 void
116 XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask)
117 {
118  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
119  if (it->getSource() == source)
120  {
121  it->getMask() = eventMask;
122  break;
123  }
124 }
125 
126 
127 
128 // Watch current set of sources and process events
129 void
130 XmlRpcDispatch::work(double timeout)
131 {
132  // Loosely based on `man select` > Correspondence between select() and poll() notifications
133  // and cloudius-systems/osv#35, cloudius-systems/osv@b53d39a using poll to emulate select
134  const unsigned POLLIN_REQ = POLLIN; // Request read
135  const unsigned POLLIN_CHK = (POLLIN | POLLHUP | POLLERR); // Readable or connection lost
136  const unsigned POLLOUT_REQ = POLLOUT; // Request write
137  const unsigned POLLOUT_CHK = (POLLOUT | POLLERR); // Writable or connection lost
138 #if !defined(_WINDOWS)
139  const unsigned POLLEX_REQ = POLLPRI; // Out-of-band data received
140  const unsigned POLLEX_CHK = (POLLPRI | POLLNVAL); // Out-of-band data or invalid fd
141 #else
142  const unsigned POLLEX_REQ = POLLRDBAND; // Out-of-band data received
143  const unsigned POLLEX_CHK = (POLLRDBAND | POLLNVAL); // Out-of-band data or invalid fd
144 #endif
145 
146  // Compute end time
147  _endTime = (timeout < 0.0) ? -1.0 : (getTime() + timeout);
148  _doClear = false;
149  _inWork = true;
150  int timeout_ms = static_cast<int>(floor(timeout * 1000.));
151 
152  // Only work while there is something to monitor
153  while (_sources.size() > 0) {
154 
155  // Construct the sets of descriptors we are interested in
156  const unsigned source_cnt = _sources.size();
157  std::vector<pollfd> fds(source_cnt);
158  std::vector<XmlRpcSource *> sources(source_cnt);
159 
160  SourceList::iterator it;
161  std::size_t i = 0;
162  for (it=_sources.begin(); it!=_sources.end(); ++it, ++i) {
163  sources[i] = it->getSource();
164  fds[i].fd = sources[i]->getfd();
165  fds[i].revents = 0; // some platforms may not clear this in poll()
166  fds[i].events = 0;
167  if (it->getMask() & ReadableEvent) fds[i].events |= POLLIN_REQ;
168  if (it->getMask() & WritableEvent) fds[i].events |= POLLOUT_REQ;
169  if (it->getMask() & Exception) fds[i].events |= POLLEX_REQ;
170  }
171 
172  // Check for events
173  int nEvents = poll(&fds[0], source_cnt, (timeout_ms < 0) ? -1 : timeout_ms);
174 
175  if (nEvents < 0)
176  {
177 #if defined(_WINDOWS)
178  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", WSAGetLastError());
179  _inWork = false;
180  return;
181 #else
182  if(errno != EINTR)
183  {
184  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", nEvents);
185  _inWork = false;
186  return;
187  }
188  // If we receive EINTR, the revents will be empty and no handleEvents will be called.
189  // It will loop back to the poll unless the timeout is reached.
190 #endif
191  }
192 
193  // Process events
194  for (i=0; i < source_cnt; ++i)
195  {
196  XmlRpcSource* src = sources[i];
197  pollfd & pfd = fds[i];
198  unsigned newMask = (unsigned) -1;
199  // Only handle requested events to avoid being prematurely removed from dispatch
200  bool readable = (pfd.events & POLLIN_REQ) == POLLIN_REQ;
201  bool writable = (pfd.events & POLLOUT_REQ) == POLLOUT_REQ;
202  bool oob = (pfd.events & POLLEX_REQ) == POLLEX_REQ;
203  if (readable && (pfd.revents & POLLIN_CHK))
204  newMask &= src->handleEvent(ReadableEvent);
205  if (writable && (pfd.revents & POLLOUT_CHK))
206  newMask &= src->handleEvent(WritableEvent);
207  if (oob && (pfd.revents & POLLEX_CHK))
208  newMask &= src->handleEvent(Exception);
209 
210  // Find the source iterator. It may have moved as a result of the way
211  // that sources are removed and added in the call stack starting
212  // from the handleEvent() calls above.
213  SourceList::iterator thisIt;
214  for (thisIt = _sources.begin(); thisIt != _sources.end(); thisIt++)
215  {
216  if(thisIt->getSource() == src)
217  break;
218  }
219  if(thisIt == _sources.end())
220  {
221  XmlRpcUtil::error("Error in XmlRpcDispatch::work: couldn't find source iterator");
222  continue;
223  }
224 
225  if ( ! newMask) {
226  _sources.erase(thisIt); // Stop monitoring this one
227  if ( ! src->getKeepOpen())
228  src->close();
229  } else if (newMask != (unsigned) -1) {
230  thisIt->getMask() = newMask;
231  }
232  }
233 
234  // Check whether to clear all sources
235  if (_doClear)
236  {
237  SourceList closeList = _sources;
238  _sources.clear();
239  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it) {
240  XmlRpcSource *src = it->getSource();
241  src->close();
242  }
243 
244  _doClear = false;
245  }
246 
247  // Check whether end time has passed
248  if (0 <= _endTime && getTime() > _endTime)
249  break;
250  }
251 
252  _inWork = false;
253 }
254 
255 
256 // Exit from work routine. Presumably this will be called from
257 // one of the source event handlers.
258 void
260 {
261  _endTime = 0.0; // Return from work asap
262 }
263 
264 // Clear all sources from the monitored sources list
265 void
267 {
268  if (_inWork)
269  _doClear = true; // Finish reporting current events before clearing
270  else
271  {
272  SourceList closeList = _sources;
273  _sources.clear();
274  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it)
275  it->getSource()->close();
276  }
277 }
278 
279 
280 double
282 {
283 #ifdef USE_FTIME
284  struct timeb tbuff;
285 
286  ftime(&tbuff);
287  return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) +
288  ((double) tbuff.timezone * 60));
289 #else
290  uint32_t sec, nsec;
291 
292  ros::ros_steadytime(sec, nsec);
293  return ((double)sec + (double)nsec / 1e9);
294 #endif /* USE_FTIME */
295 }
296 
297 
ros::ros_steadytime
ROSTIME_DECL void ros_steadytime(uint32_t &sec, uint32_t &nsec)
XmlRpc::XmlRpcDispatch::WritableEvent
@ WritableEvent
connected/data can be written without blocking
Definition: XmlRpcDispatch.h:34
time.h
XmlRpc::XmlRpcDispatch::_endTime
double _endTime
Definition: XmlRpcDispatch.h:82
XmlRpc
Definition: XmlRpcClient.h:20
XmlRpc::XmlRpcDispatch::SourceList
std::list< MonitoredSource > SourceList
Definition: XmlRpcDispatch.h:75
XmlRpc::XmlRpcDispatch::exit
void exit()
Exit from work routine.
Definition: XmlRpcDispatch.cpp:259
XmlRpc::XmlRpcDispatch::ReadableEvent
@ ReadableEvent
data available to read
Definition: XmlRpcDispatch.h:33
XmlRpc::XmlRpcSource::close
virtual void close()
Close the owned fd. If deleteOnClose was specified at construction, the object is deleted.
Definition: XmlRpcSource.cpp:20
XmlRpc::XmlRpcDispatch::XmlRpcDispatch
XmlRpcDispatch()
Constructor.
Definition: XmlRpcDispatch.cpp:81
XmlRpc::XmlRpcDispatch::addSource
void addSource(XmlRpcSource *source, unsigned eventMask)
Definition: XmlRpcDispatch.cpp:96
XmlRpc::XmlRpcSource
An RPC source represents a file descriptor to monitor.
Definition: XmlRpcSource.h:16
XmlRpcSource.h
XmlRpc::XmlRpcDispatch::getTime
double getTime()
Definition: XmlRpcDispatch.cpp:281
XmlRpc::XmlRpcUtil::error
static void error(const char *fmt,...)
Dump error messages somewhere.
Definition: XmlRpcUtil.cpp:96
XmlRpc::XmlRpcDispatch::removeSource
void removeSource(XmlRpcSource *source)
Definition: XmlRpcDispatch.cpp:103
XmlRpc::XmlRpcDispatch::_sources
SourceList _sources
Definition: XmlRpcDispatch.h:78
XmlRpc::XmlRpcSource::handleEvent
virtual unsigned handleEvent(unsigned eventType)=0
Return true to continue monitoring this source.
XmlRpc::XmlRpcDispatch::work
void work(double msTime)
Definition: XmlRpcDispatch.cpp:130
XmlRpc::XmlRpcDispatch::~XmlRpcDispatch
~XmlRpcDispatch()
Definition: XmlRpcDispatch.cpp:89
XmlRpc::XmlRpcDispatch::clear
void clear()
Clear all sources from the monitored sources list. Sources are closed.
Definition: XmlRpcDispatch.cpp:266
XmlRpc::XmlRpcSource::getKeepOpen
bool getKeepOpen() const
Return whether the file descriptor should be kept open if it is no longer monitored.
Definition: XmlRpcSource.h:32
XmlRpc::XmlRpcDispatch::setSourceEvents
void setSourceEvents(XmlRpcSource *source, unsigned eventMask)
Modify the types of events to watch for on this source.
Definition: XmlRpcDispatch.cpp:116
XmlRpc::XmlRpcDispatch::_doClear
bool _doClear
Definition: XmlRpcDispatch.h:84
XmlRpc::XmlRpcDispatch::MonitoredSource
Definition: XmlRpcDispatch.h:66
XmlRpc::XmlRpcDispatch::_inWork
bool _inWork
Definition: XmlRpcDispatch.h:85
XmlRpc::XmlRpcDispatch::Exception
@ Exception
out-of-band data has arrived
Definition: XmlRpcDispatch.h:35
XmlRpcUtil.h
XmlRpcDispatch.h


xmlrpcpp
Author(s): Chris Morley, Konstantin Pilipchuk, Morgan Quigley, Austin Hendrix, Dirk Thomas , Jacob Perron
autogenerated on Sat Sep 14 2024 02:59:32