Go to the documentation of this file.00001 #ifndef H_CANOPEN_MASTER
00002 #define H_CANOPEN_MASTER
00003
00004 #include <canopen_master/canopen.h>
00005
00006 #include <boost/interprocess/sync/interprocess_condition.hpp>
00007 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00008 #include <boost/interprocess/sync/scoped_lock.hpp>
00009 #include <boost/interprocess/managed_shared_memory.hpp>
00010
00011 namespace canopen{
00012
00013 class IPCSyncWaiter{
00014 typedef boost::interprocess::interprocess_mutex interprocess_mutex;
00015 typedef boost::interprocess::interprocess_condition interprocess_condition;
00016 typedef boost::interprocess:: scoped_lock<interprocess_mutex> scoped_mutex_lock;
00017
00018 interprocess_mutex master_mutex;
00019
00020 interprocess_mutex sync_mutex;
00021
00022 interprocess_mutex started_mutex;
00023 interprocess_condition started_cond;
00024 size_t sync_started;
00025
00026 interprocess_mutex number_mutex;
00027 interprocess_condition number_cond;
00028 size_t number;
00029
00030 template <typename AT> bool add(const AT &abs_time){
00031 scoped_mutex_lock sync_lock(sync_mutex, abs_time);
00032 if(!sync_lock) return false;
00033
00034
00035
00036 ++number;
00037 return true;
00038 }
00039 template <typename AT> bool wait_started(const AT &abs_time){
00040 scoped_mutex_lock lock_started(started_mutex, abs_time);
00041 if(!lock_started) return false;
00042
00043 while(sync_started == 0)
00044 if(!started_cond.timed_wait(lock_started, abs_time))
00045 return false;
00046
00047 --sync_started;
00048 return true;
00049 }
00050 template <typename AT> bool start_sync(const AT &abs_time){
00051 {
00052 scoped_mutex_lock lock_started(started_mutex, abs_time);
00053 if(!lock_started) return false;
00054 sync_started = number;
00055 }
00056 started_cond.notify_all();
00057 return true;
00058 }
00059 template <typename AT> bool done_one(const AT &abs_time){
00060 scoped_mutex_lock cond_lock(number_mutex, abs_time);
00061 if(!cond_lock || number == 0) return false;
00062 if(--number == 0){
00063 cond_lock.unlock();
00064 number_cond.notify_all();
00065 }
00066 return true;
00067 }
00068 template <typename AT> bool wait_done(const AT &abs_time){
00069 scoped_mutex_lock number_lock(number_mutex, abs_time);
00070 if(!number_lock) return false;
00071 while(number != 0 && number_cond.timed_wait(number_lock, abs_time)) {}
00072 bool done = number == 0;
00073 number = 0;
00074 return done;
00075 }
00076 public:
00077 template <typename DT> bool wait(const DT &d){
00078 boost::posix_time::ptime abs_time = boost::get_system_time() + d;
00079 return add(abs_time) && wait_started(abs_time);
00080 }
00081 template <typename DT> bool done(const DT &d){
00082 boost::posix_time::ptime abs_time = boost::get_system_time() + d;
00083 return done_one(abs_time);
00084 }
00085 template <typename AT> bool sync(const AT &abs_time){
00086 scoped_mutex_lock lock_sync(sync_mutex, abs_time);
00087 return lock_sync && start_sync(abs_time) && wait_done(abs_time);
00088 }
00089 scoped_mutex_lock get_lock(){
00090 return scoped_mutex_lock(master_mutex);
00091 }
00092 IPCSyncWaiter() : sync_started(0), number(0) {}
00093 };
00094
00095
00096 class IPCSyncMaster {
00097 public:
00098 class SyncObject{
00099 size_t sync_listeners;
00100 uint8_t last_sync;
00101 boost::interprocess::interprocess_mutex mutex;
00102 public:
00103 void enableSync(){
00104 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00105 if(sync_listeners == 0) last_sync = 0;
00106 ++sync_listeners;
00107 }
00108 void disableSync(){
00109 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00110 if(sync_listeners) --sync_listeners;
00111 }
00112 bool nextSync( uint8_t & sync){
00113 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00114 if(sync_listeners == 0) return false;
00115 if(properties.overflow_){
00116 if(++last_sync > 240) last_sync = 1;
00117 }
00118 sync = last_sync;
00119 return true;
00120 }
00121 const SyncProperties properties;
00122
00123 IPCSyncWaiter waiter;
00124
00125 SyncObject(const SyncProperties &p) : sync_listeners(0), last_sync(0), properties(p) {}
00126 };
00127 IPCSyncMaster(boost::shared_ptr<can::CommInterface> interface)
00128 : interface_(interface), sync_obj_(0)
00129 {
00130 }
00131 void start(LayerStatus &status){
00132 if(thread_){
00133 status.warn("Sync thread already running");
00134 return;
00135 }
00136 sync_obj_ = getSyncObject(status);
00137 if(sync_obj_){
00138 thread_.reset(new boost::thread(&IPCSyncMaster::run, this));
00139 }else status.error("Sync object not found");
00140
00141 }
00142 void stop(LayerStatus &status){
00143 if(!thread_){
00144 status.error("!thread");
00145 return;
00146 }
00147 thread_->interrupt();
00148 thread_->join();
00149 thread_.reset();
00150 sync_obj_ = 0;
00151 }
00152
00153 bool enableSync(){
00154 if(sync_obj_) sync_obj_->enableSync();
00155 return sync_obj_ != 0;
00156 }
00157 bool disableSync(){
00158 if(sync_obj_) sync_obj_->disableSync();
00159 return sync_obj_ != 0;
00160 }
00161 void wait(LayerStatus &status){
00162 if(sync_obj_){
00163 bool ok = sync_obj_->waiter.wait(boost::posix_time::milliseconds(sync_obj_->properties.period_ms_));
00164 if(!ok) status.warn("wait failed");
00165 }else status.error("!sync_obj");
00166 }
00167 void notify(LayerStatus &status){
00168 if(sync_obj_){
00169 bool ok = sync_obj_->waiter.done(boost::posix_time::milliseconds(sync_obj_->properties.period_ms_));
00170 if(!ok) status.warn("notify failed");
00171 }else status.error("!sync_obj");
00172 }
00173
00174 private:
00175 virtual SyncObject * getSyncObject(LayerStatus &status) = 0;
00176
00177 boost::shared_ptr<boost::thread> thread_;
00178 boost::shared_ptr<can::CommInterface> interface_;
00179
00180 void run();
00181 SyncObject * sync_obj_;
00182
00183 };
00184
00185
00186 class IPCSyncLayer: public SyncLayer {
00187 boost::shared_ptr<can::CommInterface> interface_;
00188
00189 boost::shared_ptr<IPCSyncMaster> sync_master_;
00190
00191
00192 boost::mutex mutex_;
00193 boost::unordered_set<void const *> nodes_;
00194 virtual void handleRead(LayerStatus &status, const LayerState ¤t_state) {
00195 if(current_state > Init){
00196 boost::mutex::scoped_lock lock(mutex_);
00197 sync_master_->wait(status);
00198 }
00199 }
00200 virtual void handleWrite(LayerStatus &status, const LayerState ¤t_state) {
00201 if(current_state > Init){
00202 sync_master_->notify(status);
00203 }
00204 }
00205
00206 virtual void handleInit(LayerStatus &status);
00207 virtual void handleShutdown(LayerStatus &status) {
00208 boost::mutex::scoped_lock lock(mutex_);
00209
00210 if(!nodes_.empty()){
00211 sync_master_->disableSync();
00212 }
00213 nodes_.clear();
00214 sync_master_->stop(status);
00215 }
00216
00217 virtual void handleHalt(LayerStatus &status) { }
00218 virtual void handleDiag(LayerReport &report) { }
00219 virtual void handleRecover(LayerStatus &status) { }
00220
00221 public:
00222 IPCSyncLayer(const SyncProperties &p, boost::shared_ptr<can::CommInterface> interface, boost::shared_ptr<IPCSyncMaster> sync_master)
00223 : SyncLayer(p), interface_(interface), sync_master_(sync_master)
00224 {
00225 }
00226
00227 virtual void addNode(void * const ptr) {
00228 boost::mutex::scoped_lock lock(mutex_);
00229 bool was_empty = nodes_.empty();
00230 nodes_.insert(ptr);
00231 if(!nodes_.empty() && was_empty){
00232 sync_master_->enableSync();
00233 }
00234 }
00235 virtual void removeNode(void * const ptr) {
00236 boost::mutex::scoped_lock lock(mutex_);
00237 bool was_empty = nodes_.empty();
00238 nodes_.erase(ptr);
00239
00240 if(nodes_.empty() && !was_empty){
00241 sync_master_->disableSync();
00242 }
00243 }
00244 };
00245
00246 class LocalIPCSyncMaster : public IPCSyncMaster{
00247 SyncObject sync_obj_;
00248 virtual SyncObject * getSyncObject(LayerStatus &status) { return &sync_obj_; }
00249 public:
00250 bool matches(const SyncProperties &p) const { return p == sync_obj_.properties; }
00251 LocalIPCSyncMaster(const SyncProperties &properties, boost::shared_ptr<can::CommInterface> interface)
00252 : IPCSyncMaster(interface), sync_obj_(properties) { }
00253 };
00254
00255 class LocalMaster: public Master{
00256 boost::mutex mutex_;
00257 boost::unordered_map<can::Header, boost::shared_ptr<LocalIPCSyncMaster> > syncmasters_;
00258 boost::shared_ptr<can::CommInterface> interface_;
00259 public:
00260 virtual boost::shared_ptr<SyncLayer> getSync(const SyncProperties &properties);
00261 LocalMaster(boost::shared_ptr<can::CommInterface> interface) : interface_(interface) {}
00262
00263 class Allocator : public Master::Allocator{
00264 public:
00265 virtual boost::shared_ptr<Master> allocate(const std::string &name, boost::shared_ptr<can::CommInterface> interface);
00266 };
00267 };
00268
00269 class SharedIPCSyncMaster : public IPCSyncMaster{
00270 boost::interprocess::managed_shared_memory &managed_shm_;
00271 const SyncProperties properties_;
00272 virtual SyncObject * getSyncObject(LayerStatus &status);
00273 public:
00274 bool matches(const SyncProperties &p) const { return p == properties_; }
00275 SharedIPCSyncMaster(boost::interprocess::managed_shared_memory &managed_shm, const SyncProperties &properties, boost::shared_ptr<can::CommInterface> interface)
00276 : IPCSyncMaster(interface), managed_shm_(managed_shm), properties_(properties) { }
00277 };
00278
00279 class SharedMaster: public Master{
00280
00281 const std::string name_;
00282 boost::interprocess::managed_shared_memory managed_shm_;
00283 boost::mutex mutex_;
00284 boost::unordered_map<can::Header, boost::shared_ptr<SharedIPCSyncMaster> > syncmasters_;
00285 boost::shared_ptr<can::CommInterface> interface_;
00286 public:
00287 SharedMaster(const std::string &name, boost::shared_ptr<can::CommInterface> interface, const boost::interprocess::permissions & perm = boost::interprocess::permissions())
00288 : name_("canopen_master_shm_"+name),
00289 managed_shm_(boost::interprocess::open_or_create, name_.c_str(), 4096, 0, perm),
00290 interface_(interface) {}
00291 virtual boost::shared_ptr<SyncLayer> getSync(const SyncProperties &properties);
00292
00293 class Allocator : public Master::Allocator{
00294 public:
00295 virtual boost::shared_ptr<Master> allocate(const std::string &name, boost::shared_ptr<can::CommInterface> interface);
00296 };
00297 };
00298
00299 class UnrestrictedMaster: public SharedMaster {
00300 class unrestricted : public boost::interprocess::permissions {
00301 public:
00302 unrestricted() { set_unrestricted(); }
00303 };
00304 public:
00305 UnrestrictedMaster(const std::string &name, boost::shared_ptr<can::CommInterface> interface)
00306 : SharedMaster(name, interface, unrestricted()) {}
00307
00308 class Allocator : public Master::Allocator{
00309 public:
00310 virtual boost::shared_ptr<Master> allocate(const std::string &name, boost::shared_ptr<can::CommInterface> interface);
00311 };
00312 };
00313
00314 }
00315 #endif // !H_CANOPEN_MASTER