SharedMemoryBlock.h
Go to the documentation of this file.
00001 #ifndef SHARED_MEMORY_BLOCK_H
00002 #define SHARED_MEMORY_BLOCK_H
00003 
00004 #include <cassert>
00005 
00006 #include <boost/interprocess/managed_shared_memory.hpp>
00007 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00008 #include <boost/interprocess/sync/interprocess_condition.hpp>
00009 #include <boost/interprocess/sync/scoped_lock.hpp>
00010 #include <ros/ros.h>
00011 #include "sharedmem_transport/SharedMemoryBlockDescriptor.h"
00012 #include "sharedmem_transport/SharedMemBlock.h"
00013 
00014 namespace sharedmem_transport {
00015 
00016 #define ROSSharedMemoryDefaultBlock "ROS::SharedMemoryBlock"
00017 #define ROSSharedMemoryNumBlock 100
00018 
00019 
00020 
00021     struct shm_handle {
00022         uint32_t handle;
00023         uint8_t *ptr;
00024         uint32_t resize_count;
00025         shm_handle() : handle(-1), ptr(NULL), resize_count(0) {}
00026         shm_handle(uint32_t h, uint32_t rcount, uint8_t *p) : handle(h), ptr(p), resize_count(rcount) {}
00027         bool is_valid() const {return ptr != NULL;}
00028     };
00029 
00030     class SharedMemoryBlock {
00031         protected:
00032             //Mutex to protect access to the queue
00033             boost::interprocess::interprocess_mutex      mutex;
00034             boost::interprocess::interprocess_condition  cond;
00035             int32_t num_clients;
00036             SharedMemoryBlockDescriptor descriptors[ROSSharedMemoryNumBlock];
00037 
00038 
00039         public:
00040 
00041             SharedMemoryBlock() : num_clients(0) {}
00042 
00043             shm_handle findHandle(boost::interprocess::managed_shared_memory & segment, const char * name) ;
00044 
00045 
00046             shm_handle allocateBlock(boost::interprocess::managed_shared_memory & segment, 
00047                     const char * name, uint32_t size) ;
00048 
00049             void resetBlock(boost::interprocess::managed_shared_memory & segment, shm_handle & shm) ;
00050             void resetAllBlocks(boost::interprocess::managed_shared_memory & segment) ;
00051 
00052             void reallocateBlock(boost::interprocess::managed_shared_memory & segment, 
00053                     shm_handle & shm, uint32_t size) ;
00054 
00055             template <class Base>
00056                 bool wait_data(boost::interprocess::managed_shared_memory & segment,
00057                         shm_handle & src, Base & msg) {
00058                     {
00059                         ROS_DEBUG("Locking %d",src.handle);
00060                         boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> 
00061                             lock(descriptors[src.handle].mutex); 
00062                         if (!descriptors[src.handle].wait_data_and_register_client(lock))  {
00063                             return false;
00064                         }
00065                         if (!ros::ok()) {
00066                             lock.unlock();
00067                             descriptors[src.handle].unregister_client();
00068                             return false;
00069                         }
00070                         register_global_client();
00071                         ROS_DEBUG("Unlocking %d",src.handle);
00072                     }
00073                     deserialize<Base>(segment,src,msg);
00074                     unregister_global_client();
00075                     ROS_DEBUG("Unregistering %d",src.handle);
00076                     descriptors[src.handle].unregister_client();
00077                     if (!ros::ok()) return false;
00078                     return true;
00079                 }
00080 
00081             template <class M>
00082             void serialize(boost::interprocess::managed_shared_memory & segment,
00083                     shm_handle & dest, const M & msg) {
00084                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(descriptors[dest.handle].mutex);
00085                 ROS_DEBUG("serialize: locked %d, checking clients",dest.handle);
00086                 descriptors[dest.handle].check_clients(lock);
00087                 ROS_DEBUG("serialize: locked %d, clients checked",dest.handle);
00088                 register_global_client();
00089                 ROS_DEBUG("serialize: global clients checked");
00090 
00091                 assert(dest.handle < ROSSharedMemoryNumBlock);
00092                 if (dest.resize_count != descriptors[dest.handle].resize_count_) {
00093                     std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[dest.handle].name_);
00094                     dest.resize_count = descriptors[dest.handle].resize_count_;
00095                     dest.ptr = ret.first;
00096                 }
00097                 ROS_DEBUG("Serialising to %p, %d bytes",dest.ptr,descriptors[dest.handle].size_);
00098                 ros::serialization::OStream out(dest.ptr,descriptors[dest.handle].size_);
00099                 ros::serialization::serialize(out, msg);
00100                 unregister_global_client();
00101                 ROS_DEBUG("serialize: global clients released");
00102                 descriptors[dest.handle].signal_data();
00103                 ROS_DEBUG("serialize: unlocking %d",dest.handle);
00104             }
00105 
00106             std::vector<SharedMemBlock> getBlockList() const ;
00107         protected:
00108 
00109             template <class Base>
00110                 void deserialize(boost::interprocess::managed_shared_memory & segment,
00111                         shm_handle & src, Base & msg) {
00112                     assert(src.handle < ROSSharedMemoryNumBlock);
00113                     if (src.resize_count != descriptors[src.handle].resize_count_) {
00114                         std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[src.handle].name_);
00115                         src.resize_count = descriptors[src.handle].resize_count_;
00116                         src.ptr = ret.first;
00117                     }
00118                     ROS_DEBUG("Deserialising from %p, %d bytes",src.ptr,descriptors[src.handle].size_);
00119                     ros::serialization::IStream in(src.ptr,descriptors[src.handle].size_);
00120                     ros::serialization::deserialize(in, msg);
00121                 }
00122 
00123             
00124             shm_handle connectBlock(boost::interprocess::managed_shared_memory & segment, uint32_t handle) ;
00125 
00126             void check_global_clients(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> & lock) {
00127                 if (num_clients) {
00128                     ROS_DEBUG("Lock_global wait");
00129                     cond.wait(lock);
00130                 }
00131                 ROS_DEBUG("Lock_global done");
00132             }
00133 
00134 
00135             void register_global_client() {
00136                 ROS_DEBUG("register_global_client:: Locking global");
00137                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex); 
00138                 num_clients ++;
00139                 ROS_DEBUG("Registered global client");
00140             }
00141 
00142             void unregister_global_client() {
00143                 ROS_DEBUG("unregister_global_client:: Locking global");
00144                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex); 
00145                 num_clients --;
00146                 assert(num_clients >= 0);
00147                 if (num_clients == 0) {
00148                     ROS_DEBUG("Global lock is free");
00149                     cond.notify_all();
00150                 }
00151                 ROS_DEBUG("Unregistered global client");
00152             }
00153 
00154 
00155     };
00156 } //namespace image_transport
00157 
00158 #endif // SHARED_MEMORY_BLOCK_H


sharedmem_transport
Author(s): Cedric Pradalier
autogenerated on Sat Dec 28 2013 16:57:13