00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ros/subscription_queue.h"
00030 #include "ros/message_deserializer.h"
00031 #include "ros/subscription_callback_helper.h"
00032
00033 namespace ros
00034 {
00035
00036 SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks)
00037 : topic_(topic)
00038 , size_(queue_size)
00039 , full_(false)
00040 , queue_size_(0)
00041 , allow_concurrent_callbacks_(allow_concurrent_callbacks)
00042 {}
00043
00044 SubscriptionQueue::~SubscriptionQueue()
00045 {
00046
00047 }
00048
00049 void SubscriptionQueue::push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer,
00050 bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
00051 ros::Time receipt_time, bool* was_full)
00052 {
00053 boost::mutex::scoped_lock lock(queue_mutex_);
00054
00055 if (was_full)
00056 {
00057 *was_full = false;
00058 }
00059
00060 if(fullNoLock())
00061 {
00062 queue_.pop_front();
00063 --queue_size_;
00064
00065 if (!full_)
00066 {
00067 ROS_DEBUG("Incoming queue full for topic \"%s\". Discarding oldest message (current queue size [%d])", topic_.c_str(), (int)queue_.size());
00068 }
00069
00070 full_ = true;
00071
00072 if (was_full)
00073 {
00074 *was_full = true;
00075 }
00076 }
00077 else
00078 {
00079 full_ = false;
00080 }
00081
00082 Item i;
00083 i.helper = helper;
00084 i.deserializer = deserializer;
00085 i.has_tracked_object = has_tracked_object;
00086 i.tracked_object = tracked_object;
00087 i.nonconst_need_copy = nonconst_need_copy;
00088 i.receipt_time = receipt_time;
00089 queue_.push_back(i);
00090 ++queue_size_;
00091 }
00092
00093 void SubscriptionQueue::clear()
00094 {
00095 boost::recursive_mutex::scoped_lock cb_lock(callback_mutex_);
00096 boost::mutex::scoped_lock queue_lock(queue_mutex_);
00097
00098 queue_.clear();
00099 queue_size_ = 0;
00100 }
00101
00102 CallbackInterface::CallResult SubscriptionQueue::call()
00103 {
00104
00105
00106 boost::shared_ptr<SubscriptionQueue> self;
00107 boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
00108
00109 if (!allow_concurrent_callbacks_)
00110 {
00111 lock.try_lock();
00112 if (!lock.owns_lock())
00113 {
00114 return CallbackInterface::TryAgain;
00115 }
00116 }
00117
00118 VoidConstPtr tracker;
00119 Item i;
00120
00121 {
00122 boost::mutex::scoped_lock lock(queue_mutex_);
00123
00124 if (queue_.empty())
00125 {
00126 return CallbackInterface::Invalid;
00127 }
00128
00129 i = queue_.front();
00130
00131 if (queue_.empty())
00132 {
00133 return CallbackInterface::Invalid;
00134 }
00135
00136 if (i.has_tracked_object)
00137 {
00138 tracker = i.tracked_object.lock();
00139
00140 if (!tracker)
00141 {
00142 return CallbackInterface::Invalid;
00143 }
00144 }
00145
00146 queue_.pop_front();
00147 --queue_size_;
00148 }
00149
00150 VoidConstPtr msg = i.deserializer->deserialize();
00151
00152
00153 if (msg)
00154 {
00155 try
00156 {
00157 self = shared_from_this();
00158 }
00159 catch (boost::bad_weak_ptr&)
00160 {}
00161
00162 SubscriptionCallbackHelperCallParams params;
00163 params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
00164 i.helper->call(params);
00165 }
00166
00167 return CallbackInterface::Success;
00168 }
00169
00170 bool SubscriptionQueue::ready()
00171 {
00172 return true;
00173 }
00174
00175 bool SubscriptionQueue::full()
00176 {
00177 boost::mutex::scoped_lock lock(queue_mutex_);
00178 return fullNoLock();
00179 }
00180
00181 bool SubscriptionQueue::fullNoLock()
00182 {
00183 return (size_ > 0) && (queue_size_ >= (uint32_t)size_);
00184 }
00185
00186 }
00187