$search
00001 #ifndef SHARED_MEMORY_BLOCK_H 00002 #define SHARED_MEMORY_BLOCK_H 00003 00004 #include <cassert> 00005 00006 #include <boost/interprocess/managed_shared_memory.hpp> 00007 #include <boost/interprocess/sync/interprocess_mutex.hpp> 00008 #include <boost/interprocess/sync/interprocess_condition.hpp> 00009 #include <boost/interprocess/sync/scoped_lock.hpp> 00010 #include <ros/ros.h> 00011 #include "sharedmem_transport/SharedMemoryBlockDescriptor.h" 00012 #include "sharedmem_transport/SharedMemBlock.h" 00013 00014 namespace sharedmem_transport { 00015 00016 #define ROSSharedMemoryDefaultBlock "ROS::SharedMemoryBlock" 00017 #define ROSSharedMemoryNumBlock 100 00018 00019 00020 00021 struct shm_handle { 00022 uint32_t handle; 00023 uint8_t *ptr; 00024 uint32_t resize_count; 00025 shm_handle() : handle(-1), ptr(NULL), resize_count(0) {} 00026 shm_handle(uint32_t h, uint32_t rcount, uint8_t *p) : handle(h), ptr(p), resize_count(rcount) {} 00027 bool is_valid() const {return ptr != NULL;} 00028 }; 00029 00030 class SharedMemoryBlock { 00031 protected: 00032 //Mutex to protect access to the queue 00033 boost::interprocess::interprocess_mutex mutex; 00034 boost::interprocess::interprocess_condition cond; 00035 int32_t num_clients; 00036 SharedMemoryBlockDescriptor descriptors[ROSSharedMemoryNumBlock]; 00037 00038 00039 public: 00040 00041 SharedMemoryBlock() : num_clients(0) {} 00042 00043 shm_handle findHandle(boost::interprocess::managed_shared_memory & segment, const char * name) ; 00044 00045 00046 shm_handle allocateBlock(boost::interprocess::managed_shared_memory & segment, 00047 const char * name, uint32_t size) ; 00048 00049 void resetBlock(boost::interprocess::managed_shared_memory & segment, shm_handle & shm) ; 00050 void resetAllBlocks(boost::interprocess::managed_shared_memory & segment) ; 00051 00052 void reallocateBlock(boost::interprocess::managed_shared_memory & segment, 00053 shm_handle & shm, uint32_t size) ; 00054 00055 template <class Base> 00056 bool wait_data(boost::interprocess::managed_shared_memory & segment, 00057 shm_handle & src, Base & msg) { 00058 { 00059 ROS_DEBUG("Locking %d",src.handle); 00060 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> 00061 lock(descriptors[src.handle].mutex); 00062 if (!descriptors[src.handle].wait_data_and_register_client(lock)) { 00063 return false; 00064 } 00065 if (!ros::ok()) { 00066 lock.unlock(); 00067 descriptors[src.handle].unregister_client(); 00068 return false; 00069 } 00070 register_global_client(); 00071 ROS_DEBUG("Unlocking %d",src.handle); 00072 } 00073 deserialize<Base>(segment,src,msg); 00074 unregister_global_client(); 00075 ROS_DEBUG("Unregistering %d",src.handle); 00076 descriptors[src.handle].unregister_client(); 00077 if (!ros::ok()) return false; 00078 return true; 00079 } 00080 00081 template <class M> 00082 void serialize(boost::interprocess::managed_shared_memory & segment, 00083 shm_handle & dest, const M & msg) { 00084 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(descriptors[dest.handle].mutex); 00085 ROS_DEBUG("serialize: locked %d, checking clients",dest.handle); 00086 descriptors[dest.handle].check_clients(lock); 00087 ROS_DEBUG("serialize: locked %d, clients checked",dest.handle); 00088 register_global_client(); 00089 ROS_DEBUG("serialize: global clients checked"); 00090 00091 assert(dest.handle < ROSSharedMemoryNumBlock); 00092 if (dest.resize_count != descriptors[dest.handle].resize_count_) { 00093 std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[dest.handle].name_); 00094 dest.resize_count = descriptors[dest.handle].resize_count_; 00095 dest.ptr = ret.first; 00096 } 00097 ROS_DEBUG("Serialising to %p, %d bytes",dest.ptr,descriptors[dest.handle].size_); 00098 ros::serialization::OStream out(dest.ptr,descriptors[dest.handle].size_); 00099 ros::serialization::serialize(out, msg); 00100 unregister_global_client(); 00101 ROS_DEBUG("serialize: global clients released"); 00102 descriptors[dest.handle].signal_data(); 00103 ROS_DEBUG("serialize: unlocking %d",dest.handle); 00104 } 00105 00106 std::vector<SharedMemBlock> getBlockList() const ; 00107 protected: 00108 00109 template <class Base> 00110 void deserialize(boost::interprocess::managed_shared_memory & segment, 00111 shm_handle & src, Base & msg) { 00112 assert(src.handle < ROSSharedMemoryNumBlock); 00113 if (src.resize_count != descriptors[src.handle].resize_count_) { 00114 std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[src.handle].name_); 00115 src.resize_count = descriptors[src.handle].resize_count_; 00116 src.ptr = ret.first; 00117 } 00118 ROS_DEBUG("Deserialising from %p, %d bytes",src.ptr,descriptors[src.handle].size_); 00119 ros::serialization::IStream in(src.ptr,descriptors[src.handle].size_); 00120 ros::serialization::deserialize(in, msg); 00121 } 00122 00123 00124 shm_handle connectBlock(boost::interprocess::managed_shared_memory & segment, uint32_t handle) ; 00125 00126 void check_global_clients(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> & lock) { 00127 if (num_clients) { 00128 ROS_DEBUG("Lock_global wait"); 00129 cond.wait(lock); 00130 } 00131 ROS_DEBUG("Lock_global done"); 00132 } 00133 00134 00135 void register_global_client() { 00136 ROS_DEBUG("register_global_client:: Locking global"); 00137 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex); 00138 num_clients ++; 00139 ROS_DEBUG("Registered global client"); 00140 } 00141 00142 void unregister_global_client() { 00143 ROS_DEBUG("unregister_global_client:: Locking global"); 00144 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex); 00145 num_clients --; 00146 assert(num_clients >= 0); 00147 if (num_clients == 0) { 00148 ROS_DEBUG("Global lock is free"); 00149 cond.notify_all(); 00150 } 00151 ROS_DEBUG("Unregistered global client"); 00152 } 00153 00154 00155 }; 00156 } //namespace image_transport 00157 00158 #endif // SHARED_MEMORY_BLOCK_H