$search
00001 #ifndef SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H 00002 #define SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H 00003 00004 #include <boost/bind.hpp> 00005 #include <boost/thread.hpp> 00006 #include <boost/interprocess/managed_shared_memory.hpp> 00007 00008 #include <message_transport/simple_subscriber_plugin.h> 00009 #include <sharedmem_transport/SharedMemoryBlock.h> 00010 #include <sharedmem_transport/SharedMemHeader.h> 00011 00012 namespace sharedmem_transport { 00013 00014 template <class Base> 00015 class SharedmemSubscriber : public message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader> 00016 { 00017 public: 00018 SharedmemSubscriber() { 00019 segment_ = NULL; 00020 blockmgr_ = NULL; 00021 rec_thread_ = NULL; 00022 } 00023 00024 virtual ~SharedmemSubscriber() { 00025 ROS_DEBUG("Shutting down SharedmemSubscriber"); 00026 if (rec_thread_) { 00027 // We probably need to do something to clean up the 00028 // cancelled thread here 00029 rec_thread_->interrupt(); 00030 rec_thread_->join(); 00031 delete rec_thread_; 00032 } 00033 rec_thread_ = NULL; 00034 delete segment_; 00035 } 00036 00037 virtual std::string getTransportName() const 00038 { 00039 return "sharedmem"; 00040 } 00041 00042 protected: 00043 void receiveThread() { 00044 ROS_DEBUG("Receive thread running"); 00045 while (ros::ok()) { 00046 ROS_DEBUG("Waiting for data"); 00047 boost::shared_ptr<Base> message_ptr(new Base); 00048 if (blockmgr_->wait_data(*segment_, shm_handle_, *message_ptr) 00049 && user_cb_ && ros::ok()) { 00050 (*user_cb_)(message_ptr); 00051 } 00052 } 00053 ROS_DEBUG("Unregistering client"); 00054 } 00055 00056 virtual void internalCallback(const sharedmem_transport::SharedMemHeaderConstPtr& message, 00057 const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback& user_cb) 00058 { 00059 user_cb_ = &user_cb; 00060 ROS_DEBUG("received latched message"); 00061 if (!segment_) { 00062 try { 00063 segment_ = new boost::interprocess::managed_shared_memory(boost::interprocess::open_only,ROSSharedMemoryDefaultBlock); 00064 ROS_DEBUG("Connected to segment"); 00065 } catch (boost::interprocess::bad_alloc e) { 00066 segment_ = NULL; 00067 ROS_ERROR("Failed to connect to shared memory segment"); 00068 return; 00069 } 00070 blockmgr_ = (segment_->find<SharedMemoryBlock>("Manager")).first; 00071 if (!blockmgr_) { 00072 delete segment_; 00073 segment_ = NULL; 00074 ROS_ERROR("Cannot find Manager block in shared memory segment"); 00075 return; 00076 } 00077 ROS_DEBUG("Got block mgr %p",blockmgr_); 00078 shm_handle_ = blockmgr_->findHandle(*segment_,this->getTopic().c_str()); 00079 if (shm_handle_.is_valid()) { 00080 ROS_DEBUG("Got shm handle %p",shm_handle_.ptr); 00081 rec_thread_ = new boost::thread(&SharedmemSubscriber::receiveThread,this); 00082 } else { 00083 delete segment_; 00084 segment_ = NULL; 00085 ROS_ERROR("Cannot find memory block for %s", this->getTopic().c_str()); 00086 } 00087 } 00088 } 00089 00090 00091 const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback* user_cb_; 00092 boost::thread *rec_thread_; 00093 boost::interprocess::managed_shared_memory *segment_ ; 00094 SharedMemoryBlock *blockmgr_; 00095 shm_handle shm_handle_; 00096 }; 00097 00098 } //namespace transport 00099 00100 #endif // SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H