00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "FileDescriptorActivity.hpp"
00040 #include "../ExecutionEngine.hpp"
00041 #include "../base/TaskCore.hpp"
00042 #include "../Logger.hpp"
00043
00044
00045 #include <algorithm>
00046
00047 #ifdef WIN32
00048 #include <io.h>
00049 #include <fcntl.h>
00050 #define pipe(X) _pipe((X), 1024, _O_BINARY)
00051 #define close _close
00052 #define write _write
00053 #undef max
00054
00055 #else
00056 #include <sys/select.h>
00057 #include <unistd.h>
00058 #include <errno.h>
00059 #endif
00060
00061 #include <boost/cstdint.hpp>
00062
00063 using namespace RTT;
00064 using namespace extras;
00065 using namespace base;
00066 const int FileDescriptorActivity::CMD_BREAK_LOOP;
00067 const int FileDescriptorActivity::CMD_TRIGGER;
00068 const int FileDescriptorActivity::CMD_UPDATE_SETS;
00069
00070
00079 FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface* _r, const std::string& name )
00080 : Activity(priority, 0.0, _r, name)
00081 , m_running(false)
00082 , m_timeout(0)
00083 {
00084 FD_ZERO(&m_fd_set);
00085 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00086 }
00087
00097 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, RunnableInterface* _r, const std::string& name )
00098 : Activity(scheduler, priority, 0.0, _r, name)
00099 , m_running(false)
00100 , m_timeout(0)
00101 {
00102 FD_ZERO(&m_fd_set);
00103 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00104 }
00105
00106 FileDescriptorActivity::~FileDescriptorActivity()
00107 {
00108 stop();
00109 }
00110
00111 bool FileDescriptorActivity::isRunning() const
00112 { return Activity::isRunning() && m_running; }
00113 int FileDescriptorActivity::getTimeout() const
00114 { return m_timeout; }
00115 void FileDescriptorActivity::setTimeout(int timeout)
00116 { m_timeout = timeout; }
00117 void FileDescriptorActivity::watch(int fd)
00118 { RTT::os::MutexLock lock(m_lock);
00119 if (fd < 0)
00120 {
00121 log(Error) << "negative file descriptor given to FileDescriptorActivity::watch" << endlog();
00122 return;
00123 }
00124
00125 m_watched_fds.insert(fd);
00126 FD_SET(fd, &m_fd_set);
00127 triggerUpdateSets();
00128 }
00129 void FileDescriptorActivity::unwatch(int fd)
00130 { RTT::os::MutexLock lock(m_lock);
00131 m_watched_fds.erase(fd);
00132 FD_CLR(fd, &m_fd_set);
00133 triggerUpdateSets();
00134 }
00135 void FileDescriptorActivity::clearAllWatches()
00136 { RTT::os::MutexLock lock(m_lock);
00137 m_watched_fds.clear();
00138 FD_ZERO(&m_fd_set);
00139 triggerUpdateSets();
00140 }
00141 void FileDescriptorActivity::triggerUpdateSets()
00142 {
00143
00144 int i = write(m_interrupt_pipe[1], &CMD_UPDATE_SETS, 1);
00145 i = i;
00146 }
00147 bool FileDescriptorActivity::isUpdated(int fd) const
00148 { return FD_ISSET(fd, &m_fd_work); }
00149 bool FileDescriptorActivity::hasError() const
00150 { return m_has_error; }
00151 bool FileDescriptorActivity::hasTimeout() const
00152 { return m_has_timeout; }
00153 bool FileDescriptorActivity::isWatched(int fd) const
00154 { RTT::os::MutexLock lock(m_lock);
00155 return FD_ISSET(fd, &m_fd_set); }
00156
00157 bool FileDescriptorActivity::start()
00158 {
00159 if (pipe(m_interrupt_pipe) == -1)
00160 {
00161 log(Error) << "FileDescriptorActivity: cannot create control pipe" << endlog();
00162 return false;
00163 }
00164
00165 if (!Activity::start())
00166 {
00167 close(m_interrupt_pipe[0]);
00168 close(m_interrupt_pipe[1]);
00169 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00170 return false;
00171 }
00172 return true;
00173 }
00174
00175 bool FileDescriptorActivity::trigger()
00176 { return write(m_interrupt_pipe[1], &CMD_TRIGGER, 1) == 1; }
00177
00178 struct fd_watch {
00179 int& fd;
00180 fd_watch(int& fd) : fd(fd) {}
00181 ~fd_watch()
00182 {
00183 close(fd);
00184 fd = -1;
00185 };
00186 };
00187
00188 void FileDescriptorActivity::loop()
00189 {
00190 int pipe = m_interrupt_pipe[0];
00191 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
00192 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
00193
00194 while(true)
00195 {
00196 int max_fd;
00197 { RTT::os::MutexLock lock(m_lock);
00198 if (m_watched_fds.empty())
00199 max_fd = pipe;
00200 else
00201 max_fd = std::max(pipe, *m_watched_fds.rbegin());
00202
00203 m_fd_work = m_fd_set;
00204 }
00205 FD_SET(pipe, &m_fd_work);
00206
00207 int ret;
00208 m_running = false;
00209 if (m_timeout == 0)
00210 {
00211 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
00212 }
00213 else
00214 {
00215 timeval timeout = { m_timeout / 1000, (m_timeout % 1000) * 1000 };
00216 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
00217 }
00218
00219 m_has_error = false;
00220 m_has_timeout = false;
00221 if (ret == -1)
00222 {
00223 log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
00224 m_has_error = true;
00225 }
00226 else if (ret == 0)
00227 {
00228 log(Error) << "FileDescriptorActivity: timeout in select()" << endlog();
00229 m_has_timeout = true;
00230 }
00231
00232 bool do_break = false, do_trigger = true, do_update_sets = false;
00233 if (ret > 0 && FD_ISSET(pipe, &m_fd_work))
00234 {
00235
00236
00237
00238 fd_set watch_pipe;
00239 timeval timeout;
00240
00241 do_trigger = false;
00242 do
00243 {
00244 boost::uint8_t code;
00245 if (read(pipe, &code, 1) == 1)
00246 {
00247 if (code == CMD_BREAK_LOOP)
00248 {
00249 do_break = true;
00250 }
00251 else if (code == CMD_UPDATE_SETS)
00252 do_update_sets = true;
00253 else
00254 do_trigger = true;
00255 }
00256
00257
00258 FD_ZERO(&watch_pipe);
00259 FD_SET(pipe, &watch_pipe);
00260 timeout.tv_sec = 0;
00261 timeout.tv_usec = 0;
00262 }
00263 while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
00264
00265 if (do_break)
00266 break;
00267 }
00268
00269 if (do_trigger)
00270 {
00271 try
00272 {
00273 m_running = true;
00274 step();
00275 m_running = false;
00276 }
00277 catch(...)
00278 {
00279 m_running = false;
00280 throw;
00281 }
00282 }
00283 }
00284 }
00285
00286 bool FileDescriptorActivity::breakLoop()
00287 {
00288 if (write(m_interrupt_pipe[1], &CMD_BREAK_LOOP, 1) != 1)
00289 return false;
00290
00291
00292
00293
00294 return true;
00295 }
00296
00297 void FileDescriptorActivity::step()
00298 {
00299 m_running = true;
00300 if (runner != 0)
00301 runner->step();
00302 m_running = false;
00303 }
00304
00305 bool FileDescriptorActivity::stop()
00306 {
00307
00308
00309
00310
00311
00312
00313 return Activity::stop();
00314 }
00315