00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "FileDescriptorActivity.hpp"
00040 #include "../ExecutionEngine.hpp"
00041 #include "../base/TaskCore.hpp"
00042 #include "../Logger.hpp"
00043
00044
00045 #include <algorithm>
00046
00047 #ifdef WIN32
00048 #include <io.h>
00049 #include <fcntl.h>
00050 #define pipe(X) _pipe((X), 1024, _O_BINARY)
00051 #define close _close
00052 #define write _write
00053 #undef max
00054
00055 #else
00056 #include <sys/select.h>
00057 #include <unistd.h>
00058 #include <fcntl.h>
00059 #include <errno.h>
00060
00061 #endif
00062
00063 #include <boost/cstdint.hpp>
00064
00065 using namespace RTT;
00066 using namespace extras;
00067 using namespace base;
00068 const char FileDescriptorActivity::CMD_ANY_COMMAND;
00069
00078 FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface* _r, const std::string& name )
00079 : Activity(priority, 0.0, _r, name)
00080 , m_running(false)
00081 , m_timeout_us(0)
00082 , m_period(0)
00083 , m_has_error(false)
00084 , m_has_timeout(false)
00085 , m_break_loop(false)
00086 , m_trigger(false)
00087 , m_update_sets(false)
00088 {
00089 FD_ZERO(&m_fd_set);
00090 FD_ZERO(&m_fd_work);
00091 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00092 }
00093
00103 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, RunnableInterface* _r, const std::string& name )
00104 : Activity(scheduler, priority, 0.0, _r, name)
00105 , m_running(false)
00106 , m_timeout_us(0)
00107 , m_period(0)
00108 , m_has_error(false)
00109 , m_has_timeout(false)
00110 , m_break_loop(false)
00111 , m_trigger(false)
00112 , m_update_sets(false)
00113 {
00114 FD_ZERO(&m_fd_set);
00115 FD_ZERO(&m_fd_work);
00116 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00117 }
00118
00119 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, RunnableInterface* _r, const std::string& name )
00120 : Activity(scheduler, priority, 0.0, _r, name)
00121 , m_running(false)
00122 , m_timeout_us(0)
00123 , m_period(period >= 0.0 ? period : 0.0)
00124 , m_has_error(false)
00125 , m_has_timeout(false)
00126 , m_break_loop(false)
00127 , m_trigger(false)
00128 , m_update_sets(false)
00129 {
00130 FD_ZERO(&m_fd_set);
00131 FD_ZERO(&m_fd_work);
00132 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00133 }
00134
00135 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, unsigned cpu_affinity, RunnableInterface* _r, const std::string& name )
00136 : Activity(scheduler, priority, 0.0, cpu_affinity, _r, name)
00137 , m_running(false)
00138 , m_timeout_us(0)
00139 , m_period(period >= 0.0 ? period : 0.0)
00140 , m_has_error(false)
00141 , m_has_timeout(false)
00142 , m_break_loop(false)
00143 , m_trigger(false)
00144 , m_update_sets(false)
00145 {
00146 FD_ZERO(&m_fd_set);
00147 FD_ZERO(&m_fd_work);
00148 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00149 }
00150
00151 FileDescriptorActivity::~FileDescriptorActivity()
00152 {
00153 stop();
00154 }
00155
00156 Seconds FileDescriptorActivity::getPeriod() const
00157 { return m_period; }
00158
00159 bool FileDescriptorActivity::setPeriod(Seconds p)
00160 {
00161 if (p < 0)
00162 return false;
00163 m_period = p;
00164 return true;
00165 }
00166
00167 bool FileDescriptorActivity::isRunning() const
00168 { return Activity::isRunning() && m_running; }
00169 int FileDescriptorActivity::getTimeout() const
00170 { return m_timeout_us / 1000; }
00171 int FileDescriptorActivity::getTimeout_us() const
00172 { return m_timeout_us; }
00173 void FileDescriptorActivity::setTimeout(int timeout)
00174 {
00175 setTimeout_us(timeout * 1000);
00176 }
00177 void FileDescriptorActivity::setTimeout_us(int timeout_us)
00178 {
00179 if (0 <= timeout_us)
00180 {
00181 m_timeout_us = timeout_us;
00182 }
00183 else
00184 {
00185 log(Error) << "Ignoring invalid timeout (" << timeout_us << ")" << endlog();
00186 }
00187 }
00188 void FileDescriptorActivity::watch(int fd)
00189 { RTT::os::MutexLock lock(m_lock);
00190 if (fd < 0)
00191 {
00192 log(Error) << "negative file descriptor given to FileDescriptorActivity::watch" << endlog();
00193 return;
00194 }
00195
00196 m_watched_fds.insert(fd);
00197 FD_SET(fd, &m_fd_set);
00198 triggerUpdateSets();
00199 }
00200 void FileDescriptorActivity::unwatch(int fd)
00201 { RTT::os::MutexLock lock(m_lock);
00202 m_watched_fds.erase(fd);
00203 FD_CLR(fd, &m_fd_set);
00204 triggerUpdateSets();
00205 }
00206 void FileDescriptorActivity::clearAllWatches()
00207 { RTT::os::MutexLock lock(m_lock);
00208 m_watched_fds.clear();
00209 FD_ZERO(&m_fd_set);
00210 triggerUpdateSets();
00211 }
00212 void FileDescriptorActivity::triggerUpdateSets()
00213 {
00214 { RTT::os::MutexLock lock(m_command_mutex);
00215 m_update_sets = true;
00216 }
00217 int unused; (void)unused;
00218 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
00219 }
00220 bool FileDescriptorActivity::isUpdated(int fd) const
00221 { return FD_ISSET(fd, &m_fd_work); }
00222 bool FileDescriptorActivity::hasError() const
00223 { return m_has_error; }
00224 bool FileDescriptorActivity::hasTimeout() const
00225 { return m_has_timeout; }
00226 bool FileDescriptorActivity::isWatched(int fd) const
00227 { RTT::os::MutexLock lock(m_lock);
00228 return FD_ISSET(fd, &m_fd_set); }
00229
00230 bool FileDescriptorActivity::start()
00231 {
00232 if ( isActive() )
00233 return false;
00234
00235 if (pipe(m_interrupt_pipe) == -1)
00236 {
00237 log(Error) << "FileDescriptorActivity: cannot create control pipe" << endlog();
00238 return false;
00239 }
00240
00241 #ifndef WIN32
00242
00243 int flags = 0;
00244 if ((flags = fcntl(m_interrupt_pipe[0], F_GETFL, 0)) == -1 ||
00245 fcntl(m_interrupt_pipe[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
00246 (flags = fcntl(m_interrupt_pipe[1], F_GETFL, 0)) == -1 ||
00247 fcntl(m_interrupt_pipe[1], F_SETFL, flags | O_NONBLOCK) == -1)
00248 {
00249 close(m_interrupt_pipe[0]);
00250 close(m_interrupt_pipe[1]);
00251 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00252 log(Error) << "FileDescriptorActivity: could not set the control pipe to non-blocking mode" << endlog();
00253 return false;
00254 }
00255 #endif
00256
00257
00258 m_break_loop = false;
00259 m_trigger = false;
00260 m_update_sets = false;
00261
00262 if (!Activity::start())
00263 {
00264 close(m_interrupt_pipe[0]);
00265 close(m_interrupt_pipe[1]);
00266 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
00267 log(Error) << "FileDescriptorActivity: Activity::start() failed" << endlog();
00268 return false;
00269 }
00270 return true;
00271 }
00272
00273 bool FileDescriptorActivity::trigger()
00274 {
00275 if (isActive() ) {
00276 { RTT::os::MutexLock lock(m_command_mutex);
00277 m_trigger = true;
00278 }
00279 int unused; (void)unused;
00280 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
00281 return true;
00282 } else
00283 return false;
00284 }
00285
00286 struct fd_watch {
00287 int& fd;
00288 fd_watch(int& fd) : fd(fd) {}
00289 ~fd_watch()
00290 {
00291 if (fd != -1)
00292 close(fd);
00293 fd = -1;
00294 }
00295 };
00296
00297 void FileDescriptorActivity::loop()
00298 {
00299 int pipe = m_interrupt_pipe[0];
00300 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
00301 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
00302
00303 while(true)
00304 {
00305 int max_fd;
00306 { RTT::os::MutexLock lock(m_lock);
00307 if (m_watched_fds.empty())
00308 max_fd = pipe;
00309 else
00310 max_fd = std::max(pipe, *m_watched_fds.rbegin());
00311
00312 m_fd_work = m_fd_set;
00313 }
00314 FD_SET(pipe, &m_fd_work);
00315
00316 int ret;
00317 m_running = false;
00318 if (m_timeout_us == 0)
00319 {
00320 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
00321 }
00322 else
00323 {
00324 static const int USECS_PER_SEC = 1000000;
00325 timeval timeout = { m_timeout_us / USECS_PER_SEC,
00326 m_timeout_us % USECS_PER_SEC};
00327 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
00328 }
00329
00330 m_has_error = false;
00331 m_has_timeout = false;
00332 if (ret == -1)
00333 {
00334 log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
00335 m_has_error = true;
00336 }
00337 else if (ret == 0)
00338 {
00339 log(Error) << "FileDescriptorActivity: timeout in select()" << endlog();
00340 m_has_timeout = true;
00341 }
00342
00343
00344 if (ret > 0 && FD_ISSET(pipe, &m_fd_work))
00345 {
00346
00347
00348 fd_set watch_pipe;
00349 timeval timeout;
00350 char dummy;
00351 do
00352 {
00353 int unused; (void)unused;
00354 unused = read(pipe, &dummy, 1);
00355
00356
00357 FD_ZERO(&watch_pipe);
00358 FD_SET(pipe, &watch_pipe);
00359 timeout.tv_sec = 0;
00360 timeout.tv_usec = 0;
00361 }
00362 while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
00363 }
00364
00365
00366 bool do_trigger = true;
00367 { RTT::os::MutexLock lock(m_command_mutex);
00368
00369 if (m_trigger) {
00370 do_trigger = true;
00371 m_trigger = false;
00372 }
00373 if (m_update_sets) {
00374 m_update_sets = false;
00375 do_trigger = false;
00376 }
00377 if (m_break_loop) {
00378 m_break_loop = false;
00379 break;
00380 }
00381 }
00382
00383 if (do_trigger)
00384 {
00385 try
00386 {
00387 m_running = true;
00388 step();
00389 m_running = false;
00390 }
00391 catch(...)
00392 {
00393 m_running = false;
00394 throw;
00395 }
00396 }
00397 }
00398 }
00399
00400 bool FileDescriptorActivity::breakLoop()
00401 {
00402 { RTT::os::MutexLock lock(m_command_mutex);
00403 m_break_loop = true;
00404 }
00405 int unused; (void)unused;
00406 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
00407 return true;
00408 }
00409
00410 void FileDescriptorActivity::step()
00411 {
00412 m_running = true;
00413 if (runner != 0)
00414 runner->step();
00415 m_running = false;
00416 }
00417
00418 bool FileDescriptorActivity::stop()
00419 {
00420
00421
00422
00423
00424
00425
00426 if ( Activity::stop() == true )
00427 {
00428 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
00429 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
00430 return true;
00431 }
00432 return false;
00433 }
00434