Go to the documentation of this file.00001 #ifndef SHARED_MEMORY_BLOCK_H
00002 #define SHARED_MEMORY_BLOCK_H
00003
00004 #include <cassert>
00005
00006 #include <boost/interprocess/managed_shared_memory.hpp>
00007 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00008 #include <boost/interprocess/sync/interprocess_condition.hpp>
00009 #include <boost/interprocess/sync/scoped_lock.hpp>
00010 #include <ros/ros.h>
00011 #include "sharedmem_transport/SharedMemoryBlockDescriptor.h"
00012 #include "sharedmem_transport/SharedMemBlock.h"
00013
00014 namespace sharedmem_transport {
00015
00016 #define ROSSharedMemoryDefaultBlock "ROS::SharedMemoryBlock"
00017 #define ROSSharedMemoryNumBlock 100
00018
00019
00020
00021 struct shm_handle {
00022 uint32_t handle;
00023 uint8_t *ptr;
00024 uint32_t resize_count;
00025 shm_handle() : handle(-1), ptr(NULL), resize_count(0) {}
00026 shm_handle(uint32_t h, uint32_t rcount, uint8_t *p) : handle(h), ptr(p), resize_count(rcount) {}
00027 bool is_valid() const {return ptr != NULL;}
00028 };
00029
00030 class SharedMemoryBlock {
00031 protected:
00032
00033 boost::interprocess::interprocess_mutex mutex;
00034 boost::interprocess::interprocess_condition cond;
00035 int32_t num_clients;
00036 SharedMemoryBlockDescriptor descriptors[ROSSharedMemoryNumBlock];
00037
00038
00039 public:
00040
00041 SharedMemoryBlock() : num_clients(0) {}
00042
00043 shm_handle findHandle(boost::interprocess::managed_shared_memory & segment, const char * name) ;
00044
00045
00046 shm_handle allocateBlock(boost::interprocess::managed_shared_memory & segment,
00047 const char * name, uint32_t size) ;
00048
00049 void resetBlock(boost::interprocess::managed_shared_memory & segment, shm_handle & shm) ;
00050 void resetAllBlocks(boost::interprocess::managed_shared_memory & segment) ;
00051
00052 void reallocateBlock(boost::interprocess::managed_shared_memory & segment,
00053 shm_handle & shm, uint32_t size) ;
00054
00055 template <class Base>
00056 bool wait_data(boost::interprocess::managed_shared_memory & segment,
00057 shm_handle & src, Base & msg) {
00058 {
00059 ROS_DEBUG("Locking %d",src.handle);
00060 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>
00061 lock(descriptors[src.handle].mutex);
00062 if (!descriptors[src.handle].wait_data_and_register_client(lock)) {
00063 return false;
00064 }
00065 if (!ros::ok()) {
00066 lock.unlock();
00067 descriptors[src.handle].unregister_client();
00068 return false;
00069 }
00070 register_global_client();
00071 ROS_DEBUG("Unlocking %d",src.handle);
00072 }
00073 deserialize<Base>(segment,src,msg);
00074 unregister_global_client();
00075 ROS_DEBUG("Unregistering %d",src.handle);
00076 descriptors[src.handle].unregister_client();
00077 if (!ros::ok()) return false;
00078 return true;
00079 }
00080
00081 template <class M>
00082 void serialize(boost::interprocess::managed_shared_memory & segment,
00083 shm_handle & dest, const M & msg) {
00084 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(descriptors[dest.handle].mutex);
00085 ROS_DEBUG("serialize: locked %d, checking clients",dest.handle);
00086 descriptors[dest.handle].check_clients(lock);
00087 ROS_DEBUG("serialize: locked %d, clients checked",dest.handle);
00088 register_global_client();
00089 ROS_DEBUG("serialize: global clients checked");
00090
00091 assert(dest.handle < ROSSharedMemoryNumBlock);
00092 if (dest.resize_count != descriptors[dest.handle].resize_count_) {
00093 std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[dest.handle].name_);
00094 dest.resize_count = descriptors[dest.handle].resize_count_;
00095 dest.ptr = ret.first;
00096 }
00097 ROS_DEBUG("Serialising to %p, %d bytes",dest.ptr,descriptors[dest.handle].size_);
00098 ros::serialization::OStream out(dest.ptr,descriptors[dest.handle].size_);
00099 ros::serialization::serialize(out, msg);
00100 unregister_global_client();
00101 ROS_DEBUG("serialize: global clients released");
00102 descriptors[dest.handle].signal_data();
00103 ROS_DEBUG("serialize: unlocking %d",dest.handle);
00104 }
00105
00106 std::vector<SharedMemBlock> getBlockList() const ;
00107 protected:
00108
00109 template <class Base>
00110 void deserialize(boost::interprocess::managed_shared_memory & segment,
00111 shm_handle & src, Base & msg) {
00112 assert(src.handle < ROSSharedMemoryNumBlock);
00113 if (src.resize_count != descriptors[src.handle].resize_count_) {
00114 std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[src.handle].name_);
00115 src.resize_count = descriptors[src.handle].resize_count_;
00116 src.ptr = ret.first;
00117 }
00118 ROS_DEBUG("Deserialising from %p, %d bytes",src.ptr,descriptors[src.handle].size_);
00119 ros::serialization::IStream in(src.ptr,descriptors[src.handle].size_);
00120 ros::serialization::deserialize(in, msg);
00121 }
00122
00123
00124 shm_handle connectBlock(boost::interprocess::managed_shared_memory & segment, uint32_t handle) ;
00125
00126 void check_global_clients(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> & lock) {
00127 if (num_clients) {
00128 ROS_DEBUG("Lock_global wait");
00129 cond.wait(lock);
00130 }
00131 ROS_DEBUG("Lock_global done");
00132 }
00133
00134
00135 void register_global_client() {
00136 ROS_DEBUG("register_global_client:: Locking global");
00137 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00138 num_clients ++;
00139 ROS_DEBUG("Registered global client");
00140 }
00141
00142 void unregister_global_client() {
00143 ROS_DEBUG("unregister_global_client:: Locking global");
00144 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00145 num_clients --;
00146 assert(num_clients >= 0);
00147 if (num_clients == 0) {
00148 ROS_DEBUG("Global lock is free");
00149 cond.notify_all();
00150 }
00151 ROS_DEBUG("Unregistered global client");
00152 }
00153
00154
00155 };
00156 }
00157
00158 #endif // SHARED_MEMORY_BLOCK_H