XmlRpcDispatch.cpp
Go to the documentation of this file.
1 
4 #include "xmlrpcpp/XmlRpcUtil.h"
5 
6 #include "ros/time.h"
7 
8 #include <math.h>
9 #include <errno.h>
10 #include <sys/timeb.h>
11 
12 #if defined(_WINDOWS)
13 # include <winsock2.h>
14 static int poll( struct pollfd *pfd, int nfds, int timeout)
15 {
16  // workaround: "Windows 8 Bugs 309411 – WSAPoll does not report failed connections"
17  // https://curl.haxx.se/mail/lib-2012-10/0038.html
18  // the following logic is to use select() to check all writable socket connnection status.
19  // if all the sockets to be checked are not connected, it reports SOCKET_ERROR to
20  // error out, instead of going to WSAPoll() which causes infinitely wait situation.
21  FD_SET writable;
22  FD_SET error;
23  FD_ZERO(&writable);
24  FD_ZERO(&error);
25  for (int i = 0; i < nfds; ++i)
26  {
27  if (pfd[i].events & POLLOUT)
28  {
29  FD_SET(pfd[i].fd, &writable);
30  FD_SET(pfd[i].fd, &error);
31  }
32  }
33 
34  int connectionError = 0;
35  if (writable.fd_count > 0)
36  {
37  int result = select(0, nullptr, &writable, &error, nullptr);
38  if (SOCKET_ERROR == result)
39  {
40  return SOCKET_ERROR;
41  }
42 
43  if (0 != result)
44  {
45  for (int i = 0; i < nfds; ++i)
46  {
47  if ((pfd[i].events & POLLOUT) &&
48  (FD_ISSET(pfd[i].fd, &error)))
49  {
50  connectionError++;
51  }
52  }
53  }
54  }
55 
56  if (connectionError == nfds)
57  {
58  // error out if all sockets are failed to connect.
59  return SOCKET_ERROR;
60  }
61  else
62  {
63  return WSAPoll(pfd, nfds, timeout);
64  }
65 }
66 
67 # define USE_FTIME
68 # if defined(_MSC_VER)
69 # define timeb _timeb
70 # define ftime _ftime_s
71 # endif
72 #else
73 # include <sys/poll.h>
74 # include <sys/time.h>
75 #endif // _WINDOWS
76 
77 
78 using namespace XmlRpc;
79 
80 
82 {
83  _endTime = -1.0;
84  _doClear = false;
85  _inWork = false;
86 }
87 
88 
90 {
91 }
92 
93 // Monitor this source for the specified events and call its event handler
94 // when the event occurs
95 void
96 XmlRpcDispatch::addSource(XmlRpcSource* source, unsigned mask)
97 {
98  _sources.push_back(MonitoredSource(source, mask));
99 }
100 
101 // Stop monitoring this source. Does not close the source.
102 void
104 {
105  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
106  if (it->getSource() == source)
107  {
108  _sources.erase(it);
109  break;
110  }
111 }
112 
113 
114 // Modify the types of events to watch for on this source
115 void
116 XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask)
117 {
118  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
119  if (it->getSource() == source)
120  {
121  it->getMask() = eventMask;
122  break;
123  }
124 }
125 
126 
127 
128 // Watch current set of sources and process events
129 void
130 XmlRpcDispatch::work(double timeout)
131 {
132  // Loosely based on `man select` > Correspondence between select() and poll() notifications
133  // and cloudius-systems/osv#35, cloudius-systems/osv@b53d39a using poll to emulate select
134  const unsigned POLLIN_REQ = POLLIN; // Request read
135  const unsigned POLLIN_CHK = (POLLIN | POLLHUP | POLLERR); // Readable or connection lost
136  const unsigned POLLOUT_REQ = POLLOUT; // Request write
137  const unsigned POLLOUT_CHK = (POLLOUT | POLLERR); // Writable or connection lost
138 #if !defined(_WINDOWS)
139  const unsigned POLLEX_REQ = POLLPRI; // Out-of-band data received
140  const unsigned POLLEX_CHK = (POLLPRI | POLLNVAL); // Out-of-band data or invalid fd
141 #else
142  const unsigned POLLEX_REQ = POLLRDBAND; // Out-of-band data received
143  const unsigned POLLEX_CHK = (POLLRDBAND | POLLNVAL); // Out-of-band data or invalid fd
144 #endif
145 
146  // Compute end time
147  _endTime = (timeout < 0.0) ? -1.0 : (getTime() + timeout);
148  _doClear = false;
149  _inWork = true;
150  int timeout_ms = static_cast<int>(floor(timeout * 1000.));
151 
152  // Only work while there is something to monitor
153  while (_sources.size() > 0) {
154 
155  // Construct the sets of descriptors we are interested in
156  const unsigned source_cnt = _sources.size();
157  std::vector<pollfd> fds(source_cnt);
158  std::vector<XmlRpcSource *> sources(source_cnt);
159 
160  SourceList::iterator it;
161  std::size_t i = 0;
162  for (it=_sources.begin(); it!=_sources.end(); ++it, ++i) {
163  sources[i] = it->getSource();
164  fds[i].fd = sources[i]->getfd();
165  fds[i].revents = 0; // some platforms may not clear this in poll()
166  fds[i].events = 0;
167  if (it->getMask() & ReadableEvent) fds[i].events |= POLLIN_REQ;
168  if (it->getMask() & WritableEvent) fds[i].events |= POLLOUT_REQ;
169  if (it->getMask() & Exception) fds[i].events |= POLLEX_REQ;
170  }
171 
172  // Check for events
173  int nEvents = poll(&fds[0], source_cnt, (timeout_ms < 0) ? -1 : timeout_ms);
174 
175  if (nEvents < 0)
176  {
177 #if defined(_WINDOWS)
178  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", WSAGetLastError());
179 #else
180  if(errno != EINTR)
181  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", nEvents);
182 #endif
183  _inWork = false;
184  return;
185  }
186 
187  // Process events
188  for (i=0; i < source_cnt; ++i)
189  {
190  XmlRpcSource* src = sources[i];
191  pollfd & pfd = fds[i];
192  unsigned newMask = (unsigned) -1;
193  // Only handle requested events to avoid being prematurely removed from dispatch
194  bool readable = (pfd.events & POLLIN_REQ) == POLLIN_REQ;
195  bool writable = (pfd.events & POLLOUT_REQ) == POLLOUT_REQ;
196  bool oob = (pfd.events & POLLEX_REQ) == POLLEX_REQ;
197  if (readable && (pfd.revents & POLLIN_CHK))
198  newMask &= src->handleEvent(ReadableEvent);
199  if (writable && (pfd.revents & POLLOUT_CHK))
200  newMask &= src->handleEvent(WritableEvent);
201  if (oob && (pfd.revents & POLLEX_CHK))
202  newMask &= src->handleEvent(Exception);
203 
204  // Find the source iterator. It may have moved as a result of the way
205  // that sources are removed and added in the call stack starting
206  // from the handleEvent() calls above.
207  SourceList::iterator thisIt;
208  for (thisIt = _sources.begin(); thisIt != _sources.end(); thisIt++)
209  {
210  if(thisIt->getSource() == src)
211  break;
212  }
213  if(thisIt == _sources.end())
214  {
215  XmlRpcUtil::error("Error in XmlRpcDispatch::work: couldn't find source iterator");
216  continue;
217  }
218 
219  if ( ! newMask) {
220  _sources.erase(thisIt); // Stop monitoring this one
221  if ( ! src->getKeepOpen())
222  src->close();
223  } else if (newMask != (unsigned) -1) {
224  thisIt->getMask() = newMask;
225  }
226  }
227 
228  // Check whether to clear all sources
229  if (_doClear)
230  {
231  SourceList closeList = _sources;
232  _sources.clear();
233  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it) {
234  XmlRpcSource *src = it->getSource();
235  src->close();
236  }
237 
238  _doClear = false;
239  }
240 
241  // Check whether end time has passed
242  if (0 <= _endTime && getTime() > _endTime)
243  break;
244  }
245 
246  _inWork = false;
247 }
248 
249 
250 // Exit from work routine. Presumably this will be called from
251 // one of the source event handlers.
252 void
254 {
255  _endTime = 0.0; // Return from work asap
256 }
257 
258 // Clear all sources from the monitored sources list
259 void
261 {
262  if (_inWork)
263  _doClear = true; // Finish reporting current events before clearing
264  else
265  {
266  SourceList closeList = _sources;
267  _sources.clear();
268  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it)
269  it->getSource()->close();
270  }
271 }
272 
273 
274 double
276 {
277 #ifdef USE_FTIME
278  struct timeb tbuff;
279 
280  ftime(&tbuff);
281  return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) +
282  ((double) tbuff.timezone * 60));
283 #else
284  uint32_t sec, nsec;
285 
286  ros::ros_steadytime(sec, nsec);
287  return ((double)sec + (double)nsec / 1e9);
288 #endif /* USE_FTIME */
289 }
290 
291 
ros::ros_steadytime
ROSTIME_DECL void ros_steadytime(uint32_t &sec, uint32_t &nsec)
XmlRpc::XmlRpcDispatch::WritableEvent
@ WritableEvent
connected/data can be written without blocking
Definition: XmlRpcDispatch.h:34
time.h
XmlRpc::XmlRpcDispatch::_endTime
double _endTime
Definition: XmlRpcDispatch.h:82
XmlRpc
Definition: XmlRpcClient.h:20
XmlRpc::XmlRpcDispatch::SourceList
std::list< MonitoredSource > SourceList
Definition: XmlRpcDispatch.h:75
XmlRpc::XmlRpcDispatch::exit
void exit()
Exit from work routine.
Definition: XmlRpcDispatch.cpp:253
XmlRpc::XmlRpcDispatch::ReadableEvent
@ ReadableEvent
data available to read
Definition: XmlRpcDispatch.h:33
XmlRpc::XmlRpcSource::close
virtual void close()
Close the owned fd. If deleteOnClose was specified at construction, the object is deleted.
Definition: XmlRpcSource.cpp:20
XmlRpc::XmlRpcDispatch::XmlRpcDispatch
XmlRpcDispatch()
Constructor.
Definition: XmlRpcDispatch.cpp:81
XmlRpc::XmlRpcDispatch::addSource
void addSource(XmlRpcSource *source, unsigned eventMask)
Definition: XmlRpcDispatch.cpp:96
XmlRpc::XmlRpcSource
An RPC source represents a file descriptor to monitor.
Definition: XmlRpcSource.h:16
XmlRpcSource.h
XmlRpc::XmlRpcDispatch::getTime
double getTime()
Definition: XmlRpcDispatch.cpp:275
XmlRpc::XmlRpcUtil::error
static void error(const char *fmt,...)
Dump error messages somewhere.
Definition: XmlRpcUtil.cpp:96
XmlRpc::XmlRpcDispatch::removeSource
void removeSource(XmlRpcSource *source)
Definition: XmlRpcDispatch.cpp:103
XmlRpc::XmlRpcDispatch::_sources
SourceList _sources
Definition: XmlRpcDispatch.h:78
XmlRpc::XmlRpcSource::handleEvent
virtual unsigned handleEvent(unsigned eventType)=0
Return true to continue monitoring this source.
XmlRpc::XmlRpcDispatch::work
void work(double msTime)
Definition: XmlRpcDispatch.cpp:130
XmlRpc::XmlRpcDispatch::~XmlRpcDispatch
~XmlRpcDispatch()
Definition: XmlRpcDispatch.cpp:89
XmlRpc::XmlRpcDispatch::clear
void clear()
Clear all sources from the monitored sources list. Sources are closed.
Definition: XmlRpcDispatch.cpp:260
XmlRpc::XmlRpcSource::getKeepOpen
bool getKeepOpen() const
Return whether the file descriptor should be kept open if it is no longer monitored.
Definition: XmlRpcSource.h:32
XmlRpc::XmlRpcDispatch::setSourceEvents
void setSourceEvents(XmlRpcSource *source, unsigned eventMask)
Modify the types of events to watch for on this source.
Definition: XmlRpcDispatch.cpp:116
XmlRpc::XmlRpcDispatch::_doClear
bool _doClear
Definition: XmlRpcDispatch.h:84
XmlRpc::XmlRpcDispatch::MonitoredSource
Definition: XmlRpcDispatch.h:66
XmlRpc::XmlRpcDispatch::_inWork
bool _inWork
Definition: XmlRpcDispatch.h:85
XmlRpc::XmlRpcDispatch::Exception
@ Exception
out-of-band data has arrived
Definition: XmlRpcDispatch.h:35
XmlRpcUtil.h
XmlRpcDispatch.h


xmlrpcpp
Author(s): Chris Morley, Konstantin Pilipchuk, Morgan Quigley, Austin Hendrix, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:41