Go to the documentation of this file.00001 #ifndef __SHM_PUBLISHER_HPP__
00002 #define __SHM_PUBLISHER_HPP__
00003
00004 #include <boost/interprocess/managed_shared_memory.hpp>
00005 #include "ros/ros.h"
00006 #include "std_msgs/UInt64.h"
00007 #include "shm_object.hpp"
00008
00009 namespace shm_transport
00010 {
00011
00012 class Topic;
00013
00014 class Publisher
00015 {
00016 public:
00017 Publisher() {
00018 }
00019
00020 ~Publisher() {
00021 }
00022
00023 Publisher(const Publisher & p) {
00024 *this = p;
00025 }
00026
00027 Publisher & operator = (const Publisher & p) {
00028 pub_ = p.pub_;
00029 pobj_ = p.pobj_;
00030 return *this;
00031 }
00032
00033 template < class M >
00034 void publish(const M & msg) const {
00035 if (!pobj_)
00036 return;
00037
00038 #define RETRY 2
00039
00040 uint32_t serlen = ros::serialization::serializationLength(msg);
00041 ShmMessage * ptr = NULL;
00042
00043 int attempt = 0;
00044 for (; attempt < RETRY && ptr == NULL; attempt++) {
00045 try {
00046 ptr = (ShmMessage *)pobj_->pshm_->allocate(sizeof(ShmMessage) + serlen);
00047 } catch (boost::interprocess::bad_alloc e) {
00048 pobj_->plck_->lock();
00049
00050 ShmMessage * first_msg =
00051 (ShmMessage *)pobj_->pshm_->get_address_from_handle(pobj_->pmsg_->getFirstHandle());
00052 if (first_msg->ref != 0) {
00053 pobj_->plck_->unlock();
00054 ROS_WARN("the oldest is in use, abandon this message <%p>...", &msg);
00055 break;
00056 }
00057
00058 pobj_->pmsg_->releaseFirst(pobj_->pshm_);
00059 pobj_->plck_->unlock();
00060 }
00061 }
00062 if (ptr) {
00063
00064 pobj_->plck_->lock();
00065 ptr->construct(pobj_);
00066 pobj_->plck_->unlock();
00067
00068 ptr->len = serlen;
00069 ros::serialization::OStream out(ptr->data, serlen);
00070 ros::serialization::serialize(out, msg);
00071
00072 std_msgs::UInt64 actual_msg;
00073 actual_msg.data = pobj_->pshm_->get_handle_from_address(ptr);
00074 pub_.publish(actual_msg);
00075 } else if (attempt >= RETRY) {
00076 ROS_WARN("bad_alloc happened %d times, abandon this message <%p>...", attempt, &msg);
00077 } else {
00078
00079 }
00080 #undef RETRY
00081 }
00082
00083 void shutdown() {
00084 pub_.shutdown();
00085 }
00086
00087 std::string getTopic() const {
00088 return pub_.getTopic();
00089 }
00090
00091 uint32_t getNumSubscribers() const {
00092 return pub_.getNumSubscribers();
00093 }
00094
00095 private:
00096 Publisher(const ros::Publisher & pub, const std::string & topic, uint32_t mem_size)
00097 : pub_(pub) {
00098
00099 std::string t = topic;
00100 for (int i = 0; i < t.length(); i++)
00101 if (t[i] == '/')
00102 t[i] = '_';
00103 mng_shm * pshm = new mng_shm(boost::interprocess::open_or_create, t.c_str(), mem_size);
00104 pobj_ = ShmObjectPtr(new ShmObject(pshm, t));
00105 }
00106
00107 ros::Publisher pub_;
00108 ShmObjectPtr pobj_;
00109
00110 friend class Topic;
00111 };
00112
00113 }
00114
00115 #endif // __SHM_PUBLISHER_HPP__
00116