00001 #ifndef SHARED_MEMORY_BLOCK_H
00002 #define SHARED_MEMORY_BLOCK_H
00003
00004 #include <cassert>
00005
00006 #include <boost/interprocess/managed_shared_memory.hpp>
00007 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00008 #include <boost/interprocess/sync/interprocess_condition.hpp>
00009 #include <boost/interprocess/sync/scoped_lock.hpp>
00010 #include <ros/ros.h>
00011 #include "sharedmem_transport/SharedMemoryBlockDescriptor.h"
00012 #include "sharedmem_transport/SharedMemBlock.h"
00013
00014 namespace sharedmem_transport {
00015
00016 #define ROSSharedMemoryDefaultBlock "ROS::SharedMemoryBlock"
00017 #define ROSSharedMemoryNumBlock 100
00018
00019
00020
00021 struct shm_handle {
00022 uint32_t handle;
00023 uint8_t *ptr;
00024 uint32_t resize_count;
00025 shm_handle() : handle(-1), ptr(NULL), resize_count(0) {}
00026 shm_handle(uint32_t h, uint32_t rcount, uint8_t *p) : handle(h), ptr(p), resize_count(rcount) {}
00027 bool is_valid() const {return ptr != NULL;}
00028 };
00029
00030 class SharedMemoryBlock {
00031 protected:
00032
00033 boost::interprocess::interprocess_mutex mutex;
00034 boost::interprocess::interprocess_condition cond;
00035 int32_t num_clients;
00036 SharedMemoryBlockDescriptor descriptors[ROSSharedMemoryNumBlock];
00037
00038
00039 public:
00040
00041 SharedMemoryBlock() : num_clients(0) {}
00042
00043 shm_handle findHandle(boost::interprocess::managed_shared_memory & segment, const char * name) ;
00044
00045
00046 shm_handle allocateBlock(boost::interprocess::managed_shared_memory & segment,
00047 const char * name, uint32_t size) ;
00048
00049 void resetBlock(boost::interprocess::managed_shared_memory & segment, shm_handle & shm) ;
00050 void resetAllBlocks(boost::interprocess::managed_shared_memory & segment) ;
00051
00052 void reallocateBlock(boost::interprocess::managed_shared_memory & segment,
00053 shm_handle & shm, uint32_t size) ;
00054
00055 template <class Base>
00056 bool wait_data(boost::interprocess::managed_shared_memory & segment,
00057 shm_handle & src, Base & msg) {
00058 {
00059 ROS_DEBUG("Locking %d",src.handle);
00060 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>
00061 lock(descriptors[src.handle].mutex);
00062 if (!descriptors[src.handle].wait_data_and_register_client(lock)) {
00063 return false;
00064 }
00065 if (!ros::ok()) {
00066 lock.unlock();
00067 descriptors[src.handle].unregister_client();
00068 return false;
00069 }
00070 register_global_client();
00071 ROS_DEBUG("Unlocking %d",src.handle);
00072 }
00073 deserialize<Base>(segment,src,msg);
00074 unregister_global_client();
00075 ROS_DEBUG("Unregistering %d",src.handle);
00076 descriptors[src.handle].unregister_client();
00077 if (!ros::ok()) return false;
00078 return true;
00079 }
00080
00081 void serialize(boost::interprocess::managed_shared_memory & segment,
00082 shm_handle & dest, const ros::Message & msg) ;
00083
00084 std::vector<SharedMemBlock> getBlockList() const ;
00085 protected:
00086
00087 template <class Base>
00088 void deserialize(boost::interprocess::managed_shared_memory & segment,
00089 shm_handle & src, Base & msg) {
00090 assert(src.handle < ROSSharedMemoryNumBlock);
00091 if (src.resize_count != descriptors[src.handle].resize_count_) {
00092 std::pair<uint8_t *, std::size_t> ret = segment.find<uint8_t>(descriptors[src.handle].name_);
00093 src.resize_count = descriptors[src.handle].resize_count_;
00094 src.ptr = ret.first;
00095 }
00096 ROS_DEBUG("Deserialising from %p, %d bytes",src.ptr,descriptors[src.handle].size_);
00097 ros::serialization::IStream in(src.ptr,descriptors[src.handle].size_);
00098 ros::serialization::deserialize(in, msg);
00099 }
00100
00101
00102 shm_handle connectBlock(boost::interprocess::managed_shared_memory & segment, uint32_t handle) ;
00103
00104 void check_global_clients(boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> & lock) {
00105 if (num_clients) {
00106 ROS_DEBUG("Lock_global wait");
00107 cond.wait(lock);
00108 }
00109 ROS_DEBUG("Lock_global done");
00110 }
00111
00112
00113 void register_global_client() {
00114 ROS_DEBUG("register_global_client:: Locking global");
00115 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00116 num_clients ++;
00117 ROS_DEBUG("Registered global client");
00118 }
00119
00120 void unregister_global_client() {
00121 ROS_DEBUG("unregister_global_client:: Locking global");
00122 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00123 num_clients --;
00124 assert(num_clients >= 0);
00125 if (num_clients == 0) {
00126 ROS_DEBUG("Global lock is free");
00127 cond.notify_all();
00128 }
00129 ROS_DEBUG("Unregistered global client");
00130 }
00131
00132
00133 };
00134 }
00135
00136 #endif // SHARED_MEMORY_BLOCK_H