XmlRpcDispatch.cpp
Go to the documentation of this file.
1 
4 #include "xmlrpcpp/XmlRpcUtil.h"
5 
6 #include "ros/time.h"
7 
8 #include <math.h>
9 #include <errno.h>
10 #include <sys/timeb.h>
11 #include <sys/poll.h>
12 
13 #if defined(_WINDOWS)
14 # include <winsock2.h>
15 static inline int poll( struct pollfd *pfd, int nfds, int timeout)
16 {
17  return WSAPoll(pfd, nfds, timeout);
18 }
19 
20 # define USE_FTIME
21 # if defined(_MSC_VER)
22 # define timeb _timeb
23 # define ftime _ftime_s
24 # endif
25 #else
26 # include <sys/time.h>
27 #endif // _WINDOWS
28 
29 
30 using namespace XmlRpc;
31 
32 
34 {
35  _endTime = -1.0;
36  _doClear = false;
37  _inWork = false;
38 }
39 
40 
42 {
43 }
44 
45 // Monitor this source for the specified events and call its event handler
46 // when the event occurs
47 void
48 XmlRpcDispatch::addSource(XmlRpcSource* source, unsigned mask)
49 {
50  _sources.push_back(MonitoredSource(source, mask));
51 }
52 
53 // Stop monitoring this source. Does not close the source.
54 void
56 {
57  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
58  if (it->getSource() == source)
59  {
60  _sources.erase(it);
61  break;
62  }
63 }
64 
65 
66 // Modify the types of events to watch for on this source
67 void
68 XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask)
69 {
70  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
71  if (it->getSource() == source)
72  {
73  it->getMask() = eventMask;
74  break;
75  }
76 }
77 
78 
79 
80 // Watch current set of sources and process events
81 void
82 XmlRpcDispatch::work(double timeout)
83 {
84  // Loosely based on `man select` > Correspondence between select() and poll() notifications
85  // and cloudius-systems/osv#35, cloudius-systems/osv@b53d39a using poll to emulate select
86  const unsigned POLLIN_REQ = POLLIN; // Request read
87  const unsigned POLLIN_CHK = (POLLIN | POLLHUP | POLLERR); // Readable or connection lost
88  const unsigned POLLOUT_REQ = POLLOUT; // Request write
89  const unsigned POLLOUT_CHK = (POLLOUT | POLLERR); // Writable or connection lost
90  const unsigned POLLEX_REQ = POLLPRI; // Out-of-band data received
91  const unsigned POLLEX_CHK = (POLLPRI | POLLNVAL); // Out-of-band data or invalid fd
92 
93  // Compute end time
94  _endTime = (timeout < 0.0) ? -1.0 : (getTime() + timeout);
95  _doClear = false;
96  _inWork = true;
97  int timeout_ms = static_cast<int>(floor(timeout * 1000.));
98 
99  // Only work while there is something to monitor
100  while (_sources.size() > 0) {
101 
102  // Construct the sets of descriptors we are interested in
103  const unsigned source_cnt = _sources.size();
104  pollfd fds[source_cnt];
105  XmlRpcSource * sources[source_cnt];
106 
107  SourceList::iterator it;
108  std::size_t i = 0;
109  for (it=_sources.begin(); it!=_sources.end(); ++it, ++i) {
110  sources[i] = it->getSource();
111  fds[i].fd = sources[i]->getfd();
112  fds[i].revents = 0; // some platforms may not clear this in poll()
113  fds[i].events = 0;
114  if (it->getMask() & ReadableEvent) fds[i].events |= POLLIN_REQ;
115  if (it->getMask() & WritableEvent) fds[i].events |= POLLOUT_REQ;
116  if (it->getMask() & Exception) fds[i].events |= POLLEX_REQ;
117  }
118 
119  // Check for events
120  int nEvents = poll(fds, source_cnt, (timeout_ms < 0) ? -1 : timeout_ms);
121 
122  if (nEvents < 0)
123  {
124  if(errno != EINTR)
125  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", nEvents);
126  _inWork = false;
127  return;
128  }
129 
130  // Process events
131  for (i=0; i < source_cnt; ++i)
132  {
133  XmlRpcSource* src = sources[i];
134  pollfd & pfd = fds[i];
135  unsigned newMask = (unsigned) -1;
136  // Only handle requested events to avoid being prematurely removed from dispatch
137  bool readable = (pfd.events & POLLIN_REQ) == POLLIN_REQ;
138  bool writable = (pfd.events & POLLOUT_REQ) == POLLOUT_REQ;
139  bool oob = (pfd.events & POLLEX_REQ) == POLLEX_REQ;
140  if (readable && (pfd.revents & POLLIN_CHK))
141  newMask &= src->handleEvent(ReadableEvent);
142  if (writable && (pfd.revents & POLLOUT_CHK))
143  newMask &= src->handleEvent(WritableEvent);
144  if (oob && (pfd.revents & POLLEX_CHK))
145  newMask &= src->handleEvent(Exception);
146 
147  // Find the source iterator. It may have moved as a result of the way
148  // that sources are removed and added in the call stack starting
149  // from the handleEvent() calls above.
150  SourceList::iterator thisIt;
151  for (thisIt = _sources.begin(); thisIt != _sources.end(); thisIt++)
152  {
153  if(thisIt->getSource() == src)
154  break;
155  }
156  if(thisIt == _sources.end())
157  {
158  XmlRpcUtil::error("Error in XmlRpcDispatch::work: couldn't find source iterator");
159  continue;
160  }
161 
162  if ( ! newMask) {
163  _sources.erase(thisIt); // Stop monitoring this one
164  if ( ! src->getKeepOpen())
165  src->close();
166  } else if (newMask != (unsigned) -1) {
167  thisIt->getMask() = newMask;
168  }
169  }
170 
171  // Check whether to clear all sources
172  if (_doClear)
173  {
174  SourceList closeList = _sources;
175  _sources.clear();
176  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it) {
177  XmlRpcSource *src = it->getSource();
178  src->close();
179  }
180 
181  _doClear = false;
182  }
183 
184  // Check whether end time has passed
185  if (0 <= _endTime && getTime() > _endTime)
186  break;
187  }
188 
189  _inWork = false;
190 }
191 
192 
193 // Exit from work routine. Presumably this will be called from
194 // one of the source event handlers.
195 void
197 {
198  _endTime = 0.0; // Return from work asap
199 }
200 
201 // Clear all sources from the monitored sources list
202 void
204 {
205  if (_inWork)
206  _doClear = true; // Finish reporting current events before clearing
207  else
208  {
209  SourceList closeList = _sources;
210  _sources.clear();
211  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it)
212  it->getSource()->close();
213  }
214 }
215 
216 
217 double
219 {
220 #ifdef USE_FTIME
221  struct timeb tbuff;
222 
223  ftime(&tbuff);
224  return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) +
225  ((double) tbuff.timezone * 60));
226 #else
227  uint32_t sec, nsec;
228 
229  ros::ros_steadytime(sec, nsec);
230  return ((double)sec + (double)nsec / 1e9);
231 #endif /* USE_FTIME */
232 }
233 
234 
bool getKeepOpen() const
Return whether the file descriptor should be kept open if it is no longer monitored.
Definition: XmlRpcSource.h:32
connected/data can be written without blocking
ROSTIME_DECL void ros_steadytime(uint32_t &sec, uint32_t &nsec)
void removeSource(XmlRpcSource *source)
static void error(const char *fmt,...)
Dump error messages somewhere.
Definition: XmlRpcUtil.cpp:95
void clear()
Clear all sources from the monitored sources list. Sources are closed.
int getfd() const
Return the file descriptor being monitored.
Definition: XmlRpcSource.h:27
virtual void close()
Close the owned fd. If deleteOnClose was specified at construction, the object is deleted...
void work(double msTime)
XmlRpcDispatch()
Constructor.
An RPC source represents a file descriptor to monitor.
Definition: XmlRpcSource.h:16
void setSourceEvents(XmlRpcSource *source, unsigned eventMask)
Modify the types of events to watch for on this source.
void addSource(XmlRpcSource *source, unsigned eventMask)
out-of-band data has arrived
std::list< MonitoredSource > SourceList
void exit()
Exit from work routine.
virtual unsigned handleEvent(unsigned eventType)=0
Return true to continue monitoring this source.


xmlrpcpp
Author(s): Chris Morley, Konstantin Pilipchuk, Morgan Quigley, Austin Hendrix
autogenerated on Sun Feb 3 2019 03:29:51