master.h
Go to the documentation of this file.
00001 #ifndef H_CANOPEN_MASTER
00002 #define H_CANOPEN_MASTER
00003 
00004 #include <canopen_master/canopen.h>
00005 
00006 #include <boost/interprocess/sync/interprocess_condition.hpp>
00007 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00008 #include <boost/interprocess/sync/scoped_lock.hpp>
00009 #include <boost/interprocess/managed_shared_memory.hpp>
00010 
00011 namespace canopen{
00012 
00013 class IPCSyncWaiter{
00014     typedef boost::interprocess::interprocess_mutex interprocess_mutex;
00015     typedef boost::interprocess::interprocess_condition interprocess_condition;
00016     typedef boost::interprocess:: scoped_lock<interprocess_mutex> scoped_mutex_lock;
00017     
00018     interprocess_mutex master_mutex;
00019 
00020     interprocess_mutex sync_mutex;
00021 
00022     interprocess_mutex started_mutex;
00023     interprocess_condition started_cond;
00024     size_t sync_started;
00025 
00026     interprocess_mutex number_mutex;
00027     interprocess_condition number_cond;
00028     size_t number;
00029 
00030     template <typename AT> bool add(const AT &abs_time){
00031         scoped_mutex_lock sync_lock(sync_mutex, abs_time);
00032         if(!sync_lock) return false;
00033         // assume add and del share a common thread
00034         // scoped_mutex_lock cond_lock(number_mutex, abs_time);
00035         // if(!cond_lock) return false;
00036         ++number;
00037         return true;
00038     }
00039     template <typename AT> bool wait_started(const AT &abs_time){
00040         scoped_mutex_lock lock_started(started_mutex, abs_time);
00041         if(!lock_started) return false;
00042 
00043         while(sync_started == 0)
00044             if(!started_cond.timed_wait(lock_started, abs_time))
00045                 return false;
00046 
00047         --sync_started;
00048         return true;
00049     }
00050     template <typename AT> bool start_sync(const AT &abs_time){
00051         {
00052             scoped_mutex_lock lock_started(started_mutex, abs_time);
00053             if(!lock_started) return false;
00054             sync_started = number; // no additional lock needed for number
00055         }
00056         started_cond.notify_all();
00057         return true;
00058     }
00059     template <typename AT> bool done_one(const AT &abs_time){
00060         scoped_mutex_lock cond_lock(number_mutex, abs_time);
00061         if(!cond_lock || number == 0) return false;
00062         if(--number == 0){
00063             cond_lock.unlock();
00064             number_cond.notify_all();
00065         }
00066         return true;
00067     }
00068     template <typename AT> bool wait_done(const AT &abs_time){
00069         scoped_mutex_lock number_lock(number_mutex, abs_time);
00070         if(!number_lock) return false;
00071         while(number != 0 && number_cond.timed_wait(number_lock, abs_time)) {}
00072         bool done = number == 0;
00073         number = 0;
00074         return done;
00075     }
00076 public:
00077     template <typename DT> bool wait(const DT &d){
00078         boost::posix_time::ptime  abs_time = boost::get_system_time() + d;
00079         return add(abs_time) && wait_started(abs_time);
00080     }
00081     template <typename DT> bool done(const DT &d){
00082         boost::posix_time::ptime  abs_time = boost::get_system_time() + d;
00083         return done_one(abs_time);
00084     }
00085     template <typename AT> bool sync(const AT &abs_time){
00086         scoped_mutex_lock lock_sync(sync_mutex, abs_time);
00087         return lock_sync && start_sync(abs_time) && wait_done(abs_time);
00088     }
00089     scoped_mutex_lock get_lock(){
00090         return scoped_mutex_lock(master_mutex); // well-behaved compilers won't copy (RVO)
00091     }
00092     IPCSyncWaiter() : sync_started(0), number(0) {}
00093 };
00094 
00095 
00096 class IPCSyncMaster {
00097 public:
00098     class SyncObject{
00099         size_t sync_listeners;
00100         uint8_t last_sync;
00101         boost::interprocess::interprocess_mutex mutex;
00102     public:
00103         void enableSync(){
00104             boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00105             if(sync_listeners == 0) last_sync = 0; // reset sync counter
00106             ++sync_listeners;
00107         }
00108         void disableSync(){
00109             boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00110             if(sync_listeners) --sync_listeners;
00111         }
00112         bool nextSync( uint8_t & sync){
00113             boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00114             if(sync_listeners == 0) return false;
00115             if(properties.overflow_){
00116                 if(++last_sync > 240) last_sync = 1;
00117             }
00118             sync = last_sync;
00119             return true;
00120         }
00121         const SyncProperties properties;
00122         
00123         IPCSyncWaiter waiter;
00124         
00125         SyncObject(const SyncProperties &p) : sync_listeners(0), last_sync(0), properties(p) {}
00126     };
00127     IPCSyncMaster(boost::shared_ptr<can::CommInterface> interface)
00128     : interface_(interface), sync_obj_(0)
00129     {
00130     }
00131     void start(LayerStatus &status){
00132         if(thread_){
00133             status.warn("Sync thread already running");
00134             return;
00135         }
00136         sync_obj_ = getSyncObject(status);
00137         if(sync_obj_){
00138             thread_.reset(new boost::thread(&IPCSyncMaster::run, this));
00139         }else status.error("Sync object not found");
00140         
00141     }
00142     void stop(LayerStatus &status){
00143         if(!thread_){
00144             status.error("!thread");
00145             return;
00146         }
00147         thread_->interrupt();
00148         thread_->join();
00149         thread_.reset();
00150         sync_obj_ = 0; // locked externally
00151     }
00152     
00153     bool enableSync(){
00154         if(sync_obj_) sync_obj_->enableSync();
00155         return sync_obj_ != 0;
00156     }
00157     bool disableSync(){
00158         if(sync_obj_) sync_obj_->disableSync();
00159         return sync_obj_ != 0;
00160     }
00161     void wait(LayerStatus &status){
00162         if(sync_obj_){
00163             bool ok = sync_obj_->waiter.wait(boost::posix_time::milliseconds(sync_obj_->properties.period_ms_));
00164             if(!ok) status.warn("wait failed");
00165         }else status.error("!sync_obj");
00166     }
00167     void notify(LayerStatus &status){
00168         if(sync_obj_){
00169             bool ok = sync_obj_->waiter.done(boost::posix_time::milliseconds(sync_obj_->properties.period_ms_));
00170             if(!ok) status.warn("notify failed");
00171         }else status.error("!sync_obj");
00172     }
00173 
00174 private:
00175     virtual SyncObject * getSyncObject(LayerStatus &status) = 0;
00176     
00177     boost::shared_ptr<boost::thread> thread_;
00178     boost::shared_ptr<can::CommInterface> interface_;
00179     
00180     void run();
00181     SyncObject * sync_obj_;
00182     
00183 };
00184 
00185 
00186 class IPCSyncLayer: public SyncLayer {
00187     boost::shared_ptr<can::CommInterface> interface_;
00188 
00189     boost::shared_ptr<IPCSyncMaster> sync_master_;
00190     
00191     
00192     boost::mutex mutex_;
00193     boost::unordered_set<void const *> nodes_;
00194     virtual void handleRead(LayerStatus &status, const LayerState &current_state) {
00195         if(current_state > Init){
00196             boost::mutex::scoped_lock lock(mutex_);
00197             sync_master_->wait(status);
00198         }
00199     }
00200     virtual void handleWrite(LayerStatus &status, const LayerState &current_state) {
00201         if(current_state > Init){
00202             sync_master_->notify(status);
00203         }
00204     }
00205 
00206     virtual void handleInit(LayerStatus &status);
00207     virtual void handleShutdown(LayerStatus &status) {
00208         boost::mutex::scoped_lock lock(mutex_);
00209 
00210         if(!nodes_.empty()){
00211             sync_master_->disableSync();
00212         }
00213         nodes_.clear();
00214         sync_master_->stop(status);
00215     }
00216 
00217     virtual void handleHalt(LayerStatus &status)  { /* nothing to do */ }
00218     virtual void handleDiag(LayerReport &report)  { /* TODO */ }
00219     virtual void handleRecover(LayerStatus &status)  { /* TODO */ }
00220 
00221 public:
00222     IPCSyncLayer(const SyncProperties &p, boost::shared_ptr<can::CommInterface> interface, boost::shared_ptr<IPCSyncMaster> sync_master) 
00223     : SyncLayer(p), interface_(interface), sync_master_(sync_master)
00224     {
00225     }
00226     
00227     virtual void addNode(void * const ptr) {
00228         boost::mutex::scoped_lock lock(mutex_);
00229         bool was_empty = nodes_.empty();
00230         nodes_.insert(ptr);
00231         if(!nodes_.empty() && was_empty){
00232             sync_master_->enableSync();
00233         }
00234     }
00235     virtual void removeNode(void * const ptr)  { 
00236         boost::mutex::scoped_lock lock(mutex_);
00237         bool was_empty = nodes_.empty();
00238         nodes_.erase(ptr);
00239         
00240         if(nodes_.empty() && !was_empty){
00241             sync_master_->disableSync();
00242         }
00243     }
00244 };
00245 
00246 class LocalIPCSyncMaster : public IPCSyncMaster{
00247     SyncObject sync_obj_;
00248     virtual SyncObject * getSyncObject(LayerStatus &status) { return &sync_obj_; }
00249 public:
00250     bool matches(const SyncProperties &p) const { return p == sync_obj_.properties; }
00251     LocalIPCSyncMaster(const SyncProperties &properties, boost::shared_ptr<can::CommInterface> interface)
00252     : IPCSyncMaster(interface), sync_obj_(properties)  { }
00253 };
00254 
00255 class LocalMaster: public Master{
00256     boost::mutex mutex_;
00257     boost::unordered_map<can::Header, boost::shared_ptr<LocalIPCSyncMaster> > syncmasters_;
00258     boost::shared_ptr<can::CommInterface> interface_;
00259 public:
00260     virtual boost::shared_ptr<SyncLayer> getSync(const SyncProperties &properties);
00261     LocalMaster(boost::shared_ptr<can::CommInterface> interface) : interface_(interface)  {}
00262 
00263     class Allocator : public Master::Allocator{
00264     public:
00265         virtual boost::shared_ptr<Master> allocate(const std::string &name,  boost::shared_ptr<can::CommInterface> interface);
00266     };
00267 };
00268 
00269 class SharedIPCSyncMaster : public IPCSyncMaster{
00270     boost::interprocess::managed_shared_memory &managed_shm_;
00271     const SyncProperties properties_;
00272     virtual SyncObject * getSyncObject(LayerStatus &status);
00273 public:
00274     bool matches(const SyncProperties &p) const { return p == properties_; }
00275     SharedIPCSyncMaster(boost::interprocess::managed_shared_memory &managed_shm, const SyncProperties &properties, boost::shared_ptr<can::CommInterface> interface)
00276     : IPCSyncMaster(interface), managed_shm_(managed_shm), properties_(properties)  { }
00277 };
00278 
00279 class SharedMaster: public Master{
00280     // TODO: test multi chain 
00281     const std::string name_;
00282     boost::interprocess::managed_shared_memory managed_shm_;
00283     boost::mutex mutex_;
00284     boost::unordered_map<can::Header, boost::shared_ptr<SharedIPCSyncMaster> > syncmasters_;
00285     boost::shared_ptr<can::CommInterface> interface_;
00286 public:
00287     SharedMaster(const std::string &name, boost::shared_ptr<can::CommInterface> interface, const boost::interprocess::permissions & perm = boost::interprocess::permissions())
00288     : name_("canopen_master_shm_"+name),
00289         managed_shm_(boost::interprocess::open_or_create, name_.c_str(), 4096, 0, perm),
00290         interface_(interface)  {}
00291     virtual boost::shared_ptr<SyncLayer> getSync(const SyncProperties &properties);
00292 
00293     class Allocator : public Master::Allocator{
00294     public:
00295         virtual boost::shared_ptr<Master> allocate(const std::string &name, boost::shared_ptr<can::CommInterface> interface);
00296     };
00297 };
00298 
00299 class UnrestrictedMaster: public SharedMaster {
00300     class unrestricted : public boost::interprocess::permissions {
00301     public:
00302         unrestricted() { set_unrestricted(); }
00303     };
00304 public:
00305     UnrestrictedMaster(const std::string &name, boost::shared_ptr<can::CommInterface> interface)
00306     : SharedMaster(name, interface, unrestricted()) {}
00307 
00308     class Allocator : public Master::Allocator{
00309     public:
00310         virtual boost::shared_ptr<Master> allocate(const std::string &name, boost::shared_ptr<can::CommInterface> interface);
00311     };
00312 };
00313 
00314 } // canopen
00315 #endif // !H_CANOPEN_MASTER


canopen_master
Author(s): Mathias Lüdtke
autogenerated on Thu Jun 6 2019 20:43:54