XmlRpcDispatch.cpp
Go to the documentation of this file.
1 
4 #include "xmlrpcpp/XmlRpcUtil.h"
5 
6 #include "ros/time.h"
7 
8 #include <math.h>
9 #include <errno.h>
10 #include <sys/timeb.h>
11 
12 #if defined(_WINDOWS)
13 # include <winsock2.h>
14 static int poll( struct pollfd *pfd, int nfds, int timeout)
15 {
16  // workaround: "Windows 8 Bugs 309411 – WSAPoll does not report failed connections"
17  // https://curl.haxx.se/mail/lib-2012-10/0038.html
18  // the following logic is to use select() to check all writable socket connnection status.
19  // if all the sockets to be checked are not connected, it reports SOCKET_ERROR to
20  // error out, instead of going to WSAPoll() which causes infinitely wait situation.
21  FD_SET writable;
22  FD_SET error;
23  FD_ZERO(&writable);
24  FD_ZERO(&error);
25  for (int i = 0; i < nfds; ++i)
26  {
27  if (pfd[i].events & POLLOUT)
28  {
29  FD_SET(pfd[i].fd, &writable);
30  FD_SET(pfd[i].fd, &error);
31  }
32  }
33 
34  int connectionError = 0;
35  if (writable.fd_count > 0)
36  {
37  int result = select(0, nullptr, &writable, &error, nullptr);
38  if (SOCKET_ERROR == result)
39  {
40  return SOCKET_ERROR;
41  }
42 
43  if (0 != result)
44  {
45  for (int i = 0; i < nfds; ++i)
46  {
47  if ((pfd[i].events & POLLOUT) &&
48  (FD_ISSET(pfd[i].fd, &error)))
49  {
50  connectionError++;
51  }
52  }
53  }
54  }
55 
56  if (connectionError == nfds)
57  {
58  // error out if all sockets are failed to connect.
59  return SOCKET_ERROR;
60  }
61  else
62  {
63  return WSAPoll(pfd, nfds, timeout);
64  }
65 }
66 
67 # define USE_FTIME
68 # if defined(_MSC_VER)
69 # define timeb _timeb
70 # define ftime _ftime_s
71 # endif
72 #else
73 # include <sys/poll.h>
74 # include <sys/time.h>
75 #endif // _WINDOWS
76 
77 
78 using namespace XmlRpc;
79 
80 
82 {
83  _endTime = -1.0;
84  _doClear = false;
85  _inWork = false;
86 }
87 
88 
90 {
91 }
92 
93 // Monitor this source for the specified events and call its event handler
94 // when the event occurs
95 void
96 XmlRpcDispatch::addSource(XmlRpcSource* source, unsigned mask)
97 {
98  _sources.push_back(MonitoredSource(source, mask));
99 }
100 
101 // Stop monitoring this source. Does not close the source.
102 void
104 {
105  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
106  if (it->getSource() == source)
107  {
108  _sources.erase(it);
109  break;
110  }
111 }
112 
113 
114 // Modify the types of events to watch for on this source
115 void
116 XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask)
117 {
118  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
119  if (it->getSource() == source)
120  {
121  it->getMask() = eventMask;
122  break;
123  }
124 }
125 
126 
127 
128 // Watch current set of sources and process events
129 void
130 XmlRpcDispatch::work(double timeout)
131 {
132  // Loosely based on `man select` > Correspondence between select() and poll() notifications
133  // and cloudius-systems/osv#35, cloudius-systems/osv@b53d39a using poll to emulate select
134  const unsigned POLLIN_REQ = POLLIN; // Request read
135  const unsigned POLLIN_CHK = (POLLIN | POLLHUP | POLLERR); // Readable or connection lost
136  const unsigned POLLOUT_REQ = POLLOUT; // Request write
137  const unsigned POLLOUT_CHK = (POLLOUT | POLLERR); // Writable or connection lost
138 #if !defined(_WINDOWS)
139  const unsigned POLLEX_REQ = POLLPRI; // Out-of-band data received
140  const unsigned POLLEX_CHK = (POLLPRI | POLLNVAL); // Out-of-band data or invalid fd
141 #else
142  const unsigned POLLEX_REQ = POLLRDBAND; // Out-of-band data received
143  const unsigned POLLEX_CHK = (POLLRDBAND | POLLNVAL); // Out-of-band data or invalid fd
144 #endif
145 
146  // Compute end time
147  _endTime = (timeout < 0.0) ? -1.0 : (getTime() + timeout);
148  _doClear = false;
149  _inWork = true;
150  int timeout_ms = static_cast<int>(floor(timeout * 1000.));
151 
152  // Only work while there is something to monitor
153  while (_sources.size() > 0) {
154 
155  // Construct the sets of descriptors we are interested in
156  const unsigned source_cnt = _sources.size();
157  std::vector<pollfd> fds(source_cnt);
158  std::vector<XmlRpcSource *> sources(source_cnt);
159 
160  SourceList::iterator it;
161  std::size_t i = 0;
162  for (it=_sources.begin(); it!=_sources.end(); ++it, ++i) {
163  sources[i] = it->getSource();
164  fds[i].fd = sources[i]->getfd();
165  fds[i].revents = 0; // some platforms may not clear this in poll()
166  fds[i].events = 0;
167  if (it->getMask() & ReadableEvent) fds[i].events |= POLLIN_REQ;
168  if (it->getMask() & WritableEvent) fds[i].events |= POLLOUT_REQ;
169  if (it->getMask() & Exception) fds[i].events |= POLLEX_REQ;
170  }
171 
172  // Check for events
173  int nEvents = poll(&fds[0], source_cnt, (timeout_ms < 0) ? -1 : timeout_ms);
174 
175  if (nEvents < 0)
176  {
177 #if defined(_WINDOWS)
178  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", WSAGetLastError());
179 #else
180  if(errno != EINTR)
181  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", nEvents);
182 #endif
183  _inWork = false;
184  return;
185  }
186 
187  // Process events
188  for (i=0; i < source_cnt; ++i)
189  {
190  XmlRpcSource* src = sources[i];
191  pollfd & pfd = fds[i];
192  unsigned newMask = (unsigned) -1;
193  // Only handle requested events to avoid being prematurely removed from dispatch
194  bool readable = (pfd.events & POLLIN_REQ) == POLLIN_REQ;
195  bool writable = (pfd.events & POLLOUT_REQ) == POLLOUT_REQ;
196  bool oob = (pfd.events & POLLEX_REQ) == POLLEX_REQ;
197  if (readable && (pfd.revents & POLLIN_CHK))
198  newMask &= src->handleEvent(ReadableEvent);
199  if (writable && (pfd.revents & POLLOUT_CHK))
200  newMask &= src->handleEvent(WritableEvent);
201  if (oob && (pfd.revents & POLLEX_CHK))
202  newMask &= src->handleEvent(Exception);
203 
204  // Find the source iterator. It may have moved as a result of the way
205  // that sources are removed and added in the call stack starting
206  // from the handleEvent() calls above.
207  SourceList::iterator thisIt;
208  for (thisIt = _sources.begin(); thisIt != _sources.end(); thisIt++)
209  {
210  if(thisIt->getSource() == src)
211  break;
212  }
213  if(thisIt == _sources.end())
214  {
215  XmlRpcUtil::error("Error in XmlRpcDispatch::work: couldn't find source iterator");
216  continue;
217  }
218 
219  if ( ! newMask) {
220  _sources.erase(thisIt); // Stop monitoring this one
221  if ( ! src->getKeepOpen())
222  src->close();
223  } else if (newMask != (unsigned) -1) {
224  thisIt->getMask() = newMask;
225  }
226  }
227 
228  // Check whether to clear all sources
229  if (_doClear)
230  {
231  SourceList closeList = _sources;
232  _sources.clear();
233  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it) {
234  XmlRpcSource *src = it->getSource();
235  src->close();
236  }
237 
238  _doClear = false;
239  }
240 
241  // Check whether end time has passed
242  if (0 <= _endTime && getTime() > _endTime)
243  break;
244  }
245 
246  _inWork = false;
247 }
248 
249 
250 // Exit from work routine. Presumably this will be called from
251 // one of the source event handlers.
252 void
254 {
255  _endTime = 0.0; // Return from work asap
256 }
257 
258 // Clear all sources from the monitored sources list
259 void
261 {
262  if (_inWork)
263  _doClear = true; // Finish reporting current events before clearing
264  else
265  {
266  SourceList closeList = _sources;
267  _sources.clear();
268  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it)
269  it->getSource()->close();
270  }
271 }
272 
273 
274 double
276 {
277 #ifdef USE_FTIME
278  struct timeb tbuff;
279 
280  ftime(&tbuff);
281  return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) +
282  ((double) tbuff.timezone * 60));
283 #else
284  uint32_t sec, nsec;
285 
286  ros::ros_steadytime(sec, nsec);
287  return ((double)sec + (double)nsec / 1e9);
288 #endif /* USE_FTIME */
289 }
290 
291 
connected/data can be written without blocking
ROSTIME_DECL void ros_steadytime(uint32_t &sec, uint32_t &nsec)
void removeSource(XmlRpcSource *source)
static void error(const char *fmt,...)
Dump error messages somewhere.
Definition: XmlRpcUtil.cpp:96
void clear()
Clear all sources from the monitored sources list. Sources are closed.
virtual void close()
Close the owned fd. If deleteOnClose was specified at construction, the object is deleted...
void work(double msTime)
XmlRpcDispatch()
Constructor.
An RPC source represents a file descriptor to monitor.
Definition: XmlRpcSource.h:16
void setSourceEvents(XmlRpcSource *source, unsigned eventMask)
Modify the types of events to watch for on this source.
void addSource(XmlRpcSource *source, unsigned eventMask)
bool getKeepOpen() const
Return whether the file descriptor should be kept open if it is no longer monitored.
Definition: XmlRpcSource.h:32
out-of-band data has arrived
std::list< MonitoredSource > SourceList
void exit()
Exit from work routine.
virtual unsigned handleEvent(unsigned eventType)=0
Return true to continue monitoring this source.


xmlrpcpp
Author(s): Chris Morley, Konstantin Pilipchuk, Morgan Quigley, Austin Hendrix, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:22