Go to the documentation of this file.00001 #ifndef SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H
00002 #define SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H
00003
00004 #include <boost/bind.hpp>
00005 #include <boost/thread.hpp>
00006 #include <boost/interprocess/managed_shared_memory.hpp>
00007
00008 #include <message_transport/simple_subscriber_plugin.h>
00009 #include <sharedmem_transport/SharedMemoryBlock.h>
00010 #include <sharedmem_transport/SharedMemHeader.h>
00011
00012 namespace sharedmem_transport {
00013
00014 template <class Base>
00015 class SharedmemSubscriber : public message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>
00016 {
00017 public:
00018 SharedmemSubscriber() {
00019 segment_ = NULL;
00020 blockmgr_ = NULL;
00021 rec_thread_ = NULL;
00022 }
00023
00024 virtual ~SharedmemSubscriber() {
00025 ROS_DEBUG("Shutting down SharedmemSubscriber");
00026 if (rec_thread_) {
00027
00028
00029 rec_thread_->interrupt();
00030 rec_thread_->join();
00031 delete rec_thread_;
00032 }
00033 rec_thread_ = NULL;
00034 delete segment_;
00035 }
00036
00037 virtual std::string getTransportName() const
00038 {
00039 return "sharedmem";
00040 }
00041
00042 protected:
00043 void receiveThread() {
00044 ROS_DEBUG("Receive thread running");
00045 while (ros::ok()) {
00046 ROS_DEBUG("Waiting for data");
00047 boost::shared_ptr<Base> message_ptr(new Base);
00048 if (blockmgr_->wait_data(*segment_, shm_handle_, *message_ptr)
00049 && user_cb_ && ros::ok()) {
00050 (*user_cb_)(message_ptr);
00051 }
00052 }
00053 ROS_DEBUG("Unregistering client");
00054 }
00055
00056 virtual void internalCallback(const sharedmem_transport::SharedMemHeaderConstPtr& message,
00057 const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback& user_cb)
00058 {
00059 user_cb_ = &user_cb;
00060 ROS_DEBUG("received latched message");
00061 if (!segment_) {
00062 try {
00063 segment_ = new boost::interprocess::managed_shared_memory(boost::interprocess::open_only,ROSSharedMemoryDefaultBlock);
00064 ROS_DEBUG("Connected to segment");
00065 } catch (boost::interprocess::bad_alloc e) {
00066 segment_ = NULL;
00067 ROS_ERROR("Failed to connect to shared memory segment");
00068 return;
00069 }
00070 blockmgr_ = (segment_->find<SharedMemoryBlock>("Manager")).first;
00071 if (!blockmgr_) {
00072 delete segment_;
00073 segment_ = NULL;
00074 ROS_ERROR("Cannot find Manager block in shared memory segment");
00075 return;
00076 }
00077 ROS_DEBUG("Got block mgr %p",blockmgr_);
00078 shm_handle_ = blockmgr_->findHandle(*segment_,this->getTopic().c_str());
00079 if (shm_handle_.is_valid()) {
00080 ROS_DEBUG("Got shm handle %p",shm_handle_.ptr);
00081 rec_thread_ = new boost::thread(&SharedmemSubscriber::receiveThread,this);
00082 } else {
00083 delete segment_;
00084 segment_ = NULL;
00085 ROS_ERROR("Cannot find memory block for %s", this->getTopic().c_str());
00086 }
00087 }
00088 }
00089
00090
00091 const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback* user_cb_;
00092 boost::thread *rec_thread_;
00093 boost::interprocess::managed_shared_memory *segment_ ;
00094 SharedMemoryBlock *blockmgr_;
00095 shm_handle shm_handle_;
00096 };
00097
00098 }
00099
00100 #endif // SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H