sharedmem_subscriber.h
Go to the documentation of this file.
00001 #ifndef SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H
00002 #define SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H
00003 
00004 #include <boost/bind.hpp>
00005 #include <boost/thread.hpp>
00006 #include <boost/interprocess/managed_shared_memory.hpp>
00007 
00008 #include <message_transport/simple_subscriber_plugin.h>
00009 #include <sharedmem_transport/SharedMemoryBlock.h>
00010 #include <sharedmem_transport/SharedMemHeader.h>
00011 
00012 namespace sharedmem_transport {
00013 
00014         template <class Base>
00015         class SharedmemSubscriber : public message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>
00016         {
00017                 public:
00018                         SharedmemSubscriber() {
00019                                 segment_ = NULL;
00020                 blockmgr_ = NULL;
00021                 rec_thread_ = NULL;
00022                         }
00023 
00024                         virtual ~SharedmemSubscriber() {
00025                 ROS_DEBUG("Shutting down SharedmemSubscriber");
00026                 if (rec_thread_) {
00027                     // We probably need to do something to clean up the
00028                     // cancelled thread here
00029                     rec_thread_->interrupt();
00030                     rec_thread_->join();
00031                     delete rec_thread_;
00032                 }
00033                 rec_thread_ = NULL;
00034                                 delete segment_;
00035                         }
00036 
00037                         virtual std::string getTransportName() const
00038                         {
00039                                 return "sharedmem";
00040                         }
00041 
00042                 protected:
00043             void receiveThread() {
00044                 ROS_DEBUG("Receive thread running");
00045                 while (ros::ok()) {
00046                     ROS_DEBUG("Waiting for data");
00047                     boost::shared_ptr<Base> message_ptr(new Base);
00048                     if (blockmgr_->wait_data(*segment_, shm_handle_, *message_ptr)
00049                             && user_cb_ && ros::ok()) {
00050                         (*user_cb_)(message_ptr);
00051                     }
00052                 }
00053                 ROS_DEBUG("Unregistering client");
00054             }
00055 
00056                         virtual void internalCallback(const sharedmem_transport::SharedMemHeaderConstPtr& message,
00057                                         const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback& user_cb)
00058                         {
00059                                 user_cb_ = &user_cb;
00060                 ROS_DEBUG("received latched message");
00061                 if (!segment_) {
00062                     try {
00063                     segment_ = new boost::interprocess::managed_shared_memory(boost::interprocess::open_only,ROSSharedMemoryDefaultBlock);
00064                     ROS_DEBUG("Connected to segment");
00065                     } catch (boost::interprocess::bad_alloc e) {
00066                         segment_ = NULL;
00067                         ROS_ERROR("Failed to connect to shared memory segment");
00068                         return;
00069                     }
00070                     blockmgr_ = (segment_->find<SharedMemoryBlock>("Manager")).first;
00071                     if (!blockmgr_) {
00072                         delete segment_;
00073                         segment_ = NULL;
00074                         ROS_ERROR("Cannot find Manager block in shared memory segment");
00075                         return;
00076                     }
00077                     ROS_DEBUG("Got block mgr %p",blockmgr_);
00078                     shm_handle_ = blockmgr_->findHandle(*segment_,this->getTopic().c_str());
00079                     if (shm_handle_.is_valid()) {
00080                         ROS_DEBUG("Got shm handle %p",shm_handle_.ptr);
00081                         rec_thread_ = new  boost::thread(&SharedmemSubscriber::receiveThread,this);
00082                     } else {
00083                         delete segment_;
00084                         segment_ = NULL;
00085                         ROS_ERROR("Cannot find memory block for %s", this->getTopic().c_str());
00086                     }
00087                 }
00088                         }
00089 
00090 
00091                         const typename message_transport::SimpleSubscriberPlugin<Base,sharedmem_transport::SharedMemHeader>::Callback* user_cb_;
00092             boost::thread *rec_thread_;
00093                         boost::interprocess::managed_shared_memory *segment_ ;
00094             SharedMemoryBlock *blockmgr_;
00095                         shm_handle shm_handle_;
00096         };
00097 
00098 } //namespace transport
00099 
00100 #endif // SHAREDMEM_MESSAGE_TRANSPORT_SUBSCRIBER_H


sharedmem_transport
Author(s): Cedric Pradalier
autogenerated on Sat Dec 28 2013 16:57:13