Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ros/subscription_queue.h"
00030 #include "ros/message_deserializer.h"
00031 #include "ros/subscription_callback_helper.h"
00032
00033 namespace ros
00034 {
00035
00036 SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks)
00037 : topic_(topic)
00038 , size_(queue_size)
00039 , full_(false)
00040 , queue_size_(0)
00041 , allow_concurrent_callbacks_(allow_concurrent_callbacks)
00042 {}
00043
00044 SubscriptionQueue::~SubscriptionQueue()
00045 {
00046
00047 }
00048
00049 void SubscriptionQueue::push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer,
00050 bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
00051 ros::Time receipt_time, bool* was_full)
00052 {
00053 boost::mutex::scoped_lock lock(queue_mutex_);
00054
00055 if (was_full)
00056 {
00057 *was_full = false;
00058 }
00059
00060 if(fullNoLock())
00061 {
00062 queue_.pop_front();
00063 --queue_size_;
00064
00065 if (!full_)
00066 {
00067 ROS_DEBUG("Incoming queue full for topic \"%s\". Discarding oldest message (current queue size [%d])", topic_.c_str(), (int)queue_.size());
00068 }
00069
00070 full_ = true;
00071
00072 if (was_full)
00073 {
00074 *was_full = true;
00075 }
00076 }
00077 else
00078 {
00079 full_ = false;
00080 }
00081
00082 Item i;
00083 i.helper = helper;
00084 i.deserializer = deserializer;
00085 i.has_tracked_object = has_tracked_object;
00086 i.tracked_object = tracked_object;
00087 i.nonconst_need_copy = nonconst_need_copy;
00088 i.receipt_time = receipt_time;
00089 queue_.push_back(i);
00090 ++queue_size_;
00091 }
00092
00093 void SubscriptionQueue::clear()
00094 {
00095 boost::recursive_mutex::scoped_lock cb_lock(callback_mutex_);
00096 boost::mutex::scoped_lock queue_lock(queue_mutex_);
00097
00098 queue_.clear();
00099 queue_size_ = 0;
00100 }
00101
00102 CallbackInterface::CallResult SubscriptionQueue::call()
00103 {
00104
00105
00106 boost::shared_ptr<SubscriptionQueue> self;
00107 boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
00108
00109 if (!allow_concurrent_callbacks_)
00110 {
00111 lock.try_lock();
00112 if (!lock.owns_lock())
00113 {
00114 return CallbackInterface::TryAgain;
00115 }
00116 }
00117
00118 VoidConstPtr tracker;
00119 Item i;
00120
00121 {
00122 boost::mutex::scoped_lock lock(queue_mutex_);
00123
00124 if (queue_.empty())
00125 {
00126 return CallbackInterface::Invalid;
00127 }
00128
00129 i = queue_.front();
00130
00131 if (queue_.empty())
00132 {
00133 return CallbackInterface::Invalid;
00134 }
00135
00136 if (i.has_tracked_object)
00137 {
00138 tracker = i.tracked_object.lock();
00139
00140 if (!tracker)
00141 {
00142 return CallbackInterface::Invalid;
00143 }
00144 }
00145
00146 queue_.pop_front();
00147 --queue_size_;
00148 }
00149
00150 VoidConstPtr msg = i.deserializer->deserialize();
00151
00152
00153 if (msg)
00154 {
00155 try
00156 {
00157 self = shared_from_this();
00158 }
00159 catch (boost::bad_weak_ptr&)
00160 {}
00161
00162 SubscriptionCallbackHelperCallParams params;
00163 params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
00164 i.helper->call(params);
00165 }
00166
00167 return CallbackInterface::Success;
00168 }
00169
00170 bool SubscriptionQueue::ready()
00171 {
00172 return true;
00173 }
00174
00175 bool SubscriptionQueue::full()
00176 {
00177 boost::mutex::scoped_lock lock(queue_mutex_);
00178 return fullNoLock();
00179 }
00180
00181 bool SubscriptionQueue::fullNoLock()
00182 {
00183 return (size_ > 0) && (queue_size_ >= (uint32_t)size_);
00184 }
00185
00186 }
00187
roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11