XmlRpcDispatch.cpp
Go to the documentation of this file.
1 
4 #include "xmlrpcpp/XmlRpcUtil.h"
5 
6 #include "ros/time.h"
7 
8 #include <math.h>
9 #include <errno.h>
10 #include <sys/timeb.h>
11 
12 #if defined(_WINDOWS)
13 # include <winsock2.h>
14 static inline int poll( struct pollfd *pfd, int nfds, int timeout)
15 {
16  return WSAPoll(pfd, nfds, timeout);
17 }
18 
19 # define USE_FTIME
20 # if defined(_MSC_VER)
21 # define timeb _timeb
22 # define ftime _ftime_s
23 # endif
24 #else
25 # include <sys/poll.h>
26 # include <sys/time.h>
27 #endif // _WINDOWS
28 
29 
30 using namespace XmlRpc;
31 
32 
34 {
35  _endTime = -1.0;
36  _doClear = false;
37  _inWork = false;
38 }
39 
40 
42 {
43 }
44 
45 // Monitor this source for the specified events and call its event handler
46 // when the event occurs
47 void
48 XmlRpcDispatch::addSource(XmlRpcSource* source, unsigned mask)
49 {
50  _sources.push_back(MonitoredSource(source, mask));
51 }
52 
53 // Stop monitoring this source. Does not close the source.
54 void
56 {
57  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
58  if (it->getSource() == source)
59  {
60  _sources.erase(it);
61  break;
62  }
63 }
64 
65 
66 // Modify the types of events to watch for on this source
67 void
68 XmlRpcDispatch::setSourceEvents(XmlRpcSource* source, unsigned eventMask)
69 {
70  for (SourceList::iterator it=_sources.begin(); it!=_sources.end(); ++it)
71  if (it->getSource() == source)
72  {
73  it->getMask() = eventMask;
74  break;
75  }
76 }
77 
78 
79 
80 // Watch current set of sources and process events
81 void
82 XmlRpcDispatch::work(double timeout)
83 {
84  // Loosely based on `man select` > Correspondence between select() and poll() notifications
85  // and cloudius-systems/osv#35, cloudius-systems/osv@b53d39a using poll to emulate select
86  const unsigned POLLIN_REQ = POLLIN; // Request read
87  const unsigned POLLIN_CHK = (POLLIN | POLLHUP | POLLERR); // Readable or connection lost
88  const unsigned POLLOUT_REQ = POLLOUT; // Request write
89  const unsigned POLLOUT_CHK = (POLLOUT | POLLERR); // Writable or connection lost
90 #if !defined(_WINDOWS)
91  const unsigned POLLEX_REQ = POLLPRI; // Out-of-band data received
92  const unsigned POLLEX_CHK = (POLLPRI | POLLNVAL); // Out-of-band data or invalid fd
93 #else
94  const unsigned POLLEX_REQ = POLLRDBAND; // Out-of-band data received
95  const unsigned POLLEX_CHK = (POLLRDBAND | POLLNVAL); // Out-of-band data or invalid fd
96 #endif
97 
98  // Compute end time
99  _endTime = (timeout < 0.0) ? -1.0 : (getTime() + timeout);
100  _doClear = false;
101  _inWork = true;
102  int timeout_ms = static_cast<int>(floor(timeout * 1000.));
103 
104  // Only work while there is something to monitor
105  while (_sources.size() > 0) {
106 
107  // Construct the sets of descriptors we are interested in
108  const unsigned source_cnt = _sources.size();
109  std::vector<pollfd> fds(source_cnt);
110  std::vector<XmlRpcSource *> sources(source_cnt);
111 
112  SourceList::iterator it;
113  std::size_t i = 0;
114  for (it=_sources.begin(); it!=_sources.end(); ++it, ++i) {
115  sources[i] = it->getSource();
116  fds[i].fd = sources[i]->getfd();
117  fds[i].revents = 0; // some platforms may not clear this in poll()
118  fds[i].events = 0;
119  if (it->getMask() & ReadableEvent) fds[i].events |= POLLIN_REQ;
120  if (it->getMask() & WritableEvent) fds[i].events |= POLLOUT_REQ;
121  if (it->getMask() & Exception) fds[i].events |= POLLEX_REQ;
122  }
123 
124  // Check for events
125  int nEvents = poll(&fds[0], source_cnt, (timeout_ms < 0) ? -1 : timeout_ms);
126 
127  if (nEvents < 0)
128  {
129 #if defined(_WINDOWS)
130  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", WSAGetLastError());
131 #else
132  if(errno != EINTR)
133  XmlRpcUtil::error("Error in XmlRpcDispatch::work: error in poll (%d).", nEvents);
134 #endif
135  _inWork = false;
136  return;
137  }
138 
139  // Process events
140  for (i=0; i < source_cnt; ++i)
141  {
142  XmlRpcSource* src = sources[i];
143  pollfd & pfd = fds[i];
144  unsigned newMask = (unsigned) -1;
145  // Only handle requested events to avoid being prematurely removed from dispatch
146  bool readable = (pfd.events & POLLIN_REQ) == POLLIN_REQ;
147  bool writable = (pfd.events & POLLOUT_REQ) == POLLOUT_REQ;
148  bool oob = (pfd.events & POLLEX_REQ) == POLLEX_REQ;
149  if (readable && (pfd.revents & POLLIN_CHK))
150  newMask &= src->handleEvent(ReadableEvent);
151  if (writable && (pfd.revents & POLLOUT_CHK))
152  newMask &= src->handleEvent(WritableEvent);
153  if (oob && (pfd.revents & POLLEX_CHK))
154  newMask &= src->handleEvent(Exception);
155 
156  // Find the source iterator. It may have moved as a result of the way
157  // that sources are removed and added in the call stack starting
158  // from the handleEvent() calls above.
159  SourceList::iterator thisIt;
160  for (thisIt = _sources.begin(); thisIt != _sources.end(); thisIt++)
161  {
162  if(thisIt->getSource() == src)
163  break;
164  }
165  if(thisIt == _sources.end())
166  {
167  XmlRpcUtil::error("Error in XmlRpcDispatch::work: couldn't find source iterator");
168  continue;
169  }
170 
171  if ( ! newMask) {
172  _sources.erase(thisIt); // Stop monitoring this one
173  if ( ! src->getKeepOpen())
174  src->close();
175  } else if (newMask != (unsigned) -1) {
176  thisIt->getMask() = newMask;
177  }
178  }
179 
180  // Check whether to clear all sources
181  if (_doClear)
182  {
183  SourceList closeList = _sources;
184  _sources.clear();
185  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it) {
186  XmlRpcSource *src = it->getSource();
187  src->close();
188  }
189 
190  _doClear = false;
191  }
192 
193  // Check whether end time has passed
194  if (0 <= _endTime && getTime() > _endTime)
195  break;
196  }
197 
198  _inWork = false;
199 }
200 
201 
202 // Exit from work routine. Presumably this will be called from
203 // one of the source event handlers.
204 void
206 {
207  _endTime = 0.0; // Return from work asap
208 }
209 
210 // Clear all sources from the monitored sources list
211 void
213 {
214  if (_inWork)
215  _doClear = true; // Finish reporting current events before clearing
216  else
217  {
218  SourceList closeList = _sources;
219  _sources.clear();
220  for (SourceList::iterator it=closeList.begin(); it!=closeList.end(); ++it)
221  it->getSource()->close();
222  }
223 }
224 
225 
226 double
228 {
229 #ifdef USE_FTIME
230  struct timeb tbuff;
231 
232  ftime(&tbuff);
233  return ((double) tbuff.time + ((double)tbuff.millitm / 1000.0) +
234  ((double) tbuff.timezone * 60));
235 #else
236  uint32_t sec, nsec;
237 
238  ros::ros_steadytime(sec, nsec);
239  return ((double)sec + (double)nsec / 1e9);
240 #endif /* USE_FTIME */
241 }
242 
243 
connected/data can be written without blocking
ROSTIME_DECL void ros_steadytime(uint32_t &sec, uint32_t &nsec)
void removeSource(XmlRpcSource *source)
static void error(const char *fmt,...)
Dump error messages somewhere.
Definition: XmlRpcUtil.cpp:95
void clear()
Clear all sources from the monitored sources list. Sources are closed.
virtual void close()
Close the owned fd. If deleteOnClose was specified at construction, the object is deleted...
void work(double msTime)
XmlRpcDispatch()
Constructor.
An RPC source represents a file descriptor to monitor.
Definition: XmlRpcSource.h:16
void setSourceEvents(XmlRpcSource *source, unsigned eventMask)
Modify the types of events to watch for on this source.
void addSource(XmlRpcSource *source, unsigned eventMask)
bool getKeepOpen() const
Return whether the file descriptor should be kept open if it is no longer monitored.
Definition: XmlRpcSource.h:32
out-of-band data has arrived
std::list< MonitoredSource > SourceList
void exit()
Exit from work routine.
virtual unsigned handleEvent(unsigned eventType)=0
Return true to continue monitoring this source.


xmlrpcpp
Author(s): Chris Morley, Konstantin Pilipchuk, Morgan Quigley, Austin Hendrix
autogenerated on Thu Nov 28 2019 03:17:04