shm_publisher.hpp
Go to the documentation of this file.
00001 #ifndef __SHM_PUBLISHER_HPP__
00002 #define __SHM_PUBLISHER_HPP__
00003 
00004 #include <boost/interprocess/managed_shared_memory.hpp>
00005 #include "ros/ros.h"
00006 #include "std_msgs/UInt64.h"
00007 #include "shm_object.hpp"
00008 
00009 namespace shm_transport
00010 {
00011 
00012 class Topic;
00013 
00014 class Publisher
00015 {
00016 public:
00017   Publisher() {
00018   }
00019 
00020   ~Publisher() {
00021   }
00022 
00023   Publisher(const Publisher & p) {
00024     *this = p;
00025   }
00026 
00027   Publisher & operator = (const Publisher & p) {
00028     pub_ = p.pub_;
00029     pobj_ = p.pobj_;
00030     return *this;
00031   }
00032 
00033   template < class M >
00034   void publish(const M & msg) const {
00035     if (!pobj_)
00036       return;
00037 
00038 #define RETRY 2
00039     // allocation shm message
00040     uint32_t serlen = ros::serialization::serializationLength(msg);
00041     ShmMessage * ptr = NULL;
00042     // bad_alloc exception may occur if some ros messages are lost
00043     int attempt = 0;
00044     for (; attempt < RETRY && ptr == NULL; attempt++) {
00045       try {
00046         ptr = (ShmMessage *)pobj_->pshm_->allocate(sizeof(ShmMessage) + serlen);
00047       } catch (boost::interprocess::bad_alloc e) {
00048         pobj_->plck_->lock();
00049         // ROS_INFO("bad_alloc happened, releasing the oldest and trying again...");
00050         ShmMessage * first_msg =
00051           (ShmMessage *)pobj_->pshm_->get_address_from_handle(pobj_->pmsg_->getFirstHandle());
00052         if (first_msg->ref != 0) {
00053           pobj_->plck_->unlock();
00054           ROS_WARN("the oldest is in use, abandon this message <%p>...", &msg);
00055           break;
00056         }
00057         // free the oldest message, and try again
00058         pobj_->pmsg_->releaseFirst(pobj_->pshm_);
00059         pobj_->plck_->unlock();
00060       }
00061     }
00062     if (ptr) {
00063       // construct shm message
00064       pobj_->plck_->lock();
00065       ptr->construct(pobj_);
00066       pobj_->plck_->unlock();
00067       // serialize data
00068       ptr->len = serlen;
00069       ros::serialization::OStream out(ptr->data, serlen);
00070       ros::serialization::serialize(out, msg);
00071       // publish the real message (handle of ShmStruct)
00072       std_msgs::UInt64 actual_msg;
00073       actual_msg.data = pobj_->pshm_->get_handle_from_address(ptr);
00074       pub_.publish(actual_msg);
00075     } else if (attempt >= RETRY) {
00076       ROS_WARN("bad_alloc happened %d times, abandon this message <%p>...", attempt, &msg);
00077     } else {
00078 
00079     }
00080 #undef RETRY
00081   }
00082 
00083   void shutdown() {
00084     pub_.shutdown();
00085   }
00086 
00087   std::string getTopic() const {
00088     return pub_.getTopic();
00089   }
00090 
00091   uint32_t getNumSubscribers() const {
00092     return pub_.getNumSubscribers();
00093   }
00094 
00095 private:
00096   Publisher(const ros::Publisher & pub, const std::string & topic, uint32_t mem_size)
00097       : pub_(pub) {
00098     // change '/' in topic to '_'
00099     std::string t = topic;
00100     for (int i = 0; i < t.length(); i++)
00101       if (t[i] == '/')
00102         t[i] = '_';
00103     mng_shm * pshm = new mng_shm(boost::interprocess::open_or_create, t.c_str(), mem_size);
00104     pobj_ = ShmObjectPtr(new ShmObject(pshm, t));
00105   }
00106 
00107   ros::Publisher pub_;
00108   ShmObjectPtr   pobj_;
00109 
00110 friend class Topic;
00111 };
00112 
00113 } // namespace shm_transport
00114 
00115 #endif // __SHM_PUBLISHER_HPP__
00116 


shm_transport
Author(s): Jrdevil-Wang , Wende Tan
autogenerated on Thu Jun 6 2019 18:47:43