$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 00029 #include "ros/subscription_queue.h" 00030 #include "ros/message_deserializer.h" 00031 #include "ros/subscription_callback_helper.h" 00032 00033 namespace ros 00034 { 00035 00036 SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks) 00037 : topic_(topic) 00038 , size_(queue_size) 00039 , full_(false) 00040 , queue_size_(0) 00041 , allow_concurrent_callbacks_(allow_concurrent_callbacks) 00042 {} 00043 00044 SubscriptionQueue::~SubscriptionQueue() 00045 { 00046 00047 } 00048 00049 void SubscriptionQueue::push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer, 00050 bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy, 00051 ros::Time receipt_time, bool* was_full) 00052 { 00053 boost::mutex::scoped_lock lock(queue_mutex_); 00054 00055 if (was_full) 00056 { 00057 *was_full = false; 00058 } 00059 00060 if(fullNoLock()) 00061 { 00062 queue_.pop_front(); 00063 --queue_size_; 00064 00065 if (!full_) 00066 { 00067 ROS_DEBUG("Incoming queue full for topic \"%s\". Discarding oldest message (current queue size [%d])", topic_.c_str(), (int)queue_.size()); 00068 } 00069 00070 full_ = true; 00071 00072 if (was_full) 00073 { 00074 *was_full = true; 00075 } 00076 } 00077 else 00078 { 00079 full_ = false; 00080 } 00081 00082 Item i; 00083 i.helper = helper; 00084 i.deserializer = deserializer; 00085 i.has_tracked_object = has_tracked_object; 00086 i.tracked_object = tracked_object; 00087 i.nonconst_need_copy = nonconst_need_copy; 00088 i.receipt_time = receipt_time; 00089 queue_.push_back(i); 00090 ++queue_size_; 00091 } 00092 00093 void SubscriptionQueue::clear() 00094 { 00095 boost::recursive_mutex::scoped_lock cb_lock(callback_mutex_); 00096 boost::mutex::scoped_lock queue_lock(queue_mutex_); 00097 00098 queue_.clear(); 00099 queue_size_ = 0; 00100 } 00101 00102 CallbackInterface::CallResult SubscriptionQueue::call() 00103 { 00104 // The callback may result in our own destruction. Therefore, we may need to keep a reference to ourselves 00105 // that outlasts the scoped_try_lock 00106 boost::shared_ptr<SubscriptionQueue> self; 00107 boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock); 00108 00109 if (!allow_concurrent_callbacks_) 00110 { 00111 lock.try_lock(); 00112 if (!lock.owns_lock()) 00113 { 00114 return CallbackInterface::TryAgain; 00115 } 00116 } 00117 00118 VoidConstPtr tracker; 00119 Item i; 00120 00121 { 00122 boost::mutex::scoped_lock lock(queue_mutex_); 00123 00124 if (queue_.empty()) 00125 { 00126 return CallbackInterface::Invalid; 00127 } 00128 00129 i = queue_.front(); 00130 00131 if (queue_.empty()) 00132 { 00133 return CallbackInterface::Invalid; 00134 } 00135 00136 if (i.has_tracked_object) 00137 { 00138 tracker = i.tracked_object.lock(); 00139 00140 if (!tracker) 00141 { 00142 return CallbackInterface::Invalid; 00143 } 00144 } 00145 00146 queue_.pop_front(); 00147 --queue_size_; 00148 } 00149 00150 VoidConstPtr msg = i.deserializer->deserialize(); 00151 00152 // msg can be null here if deserialization failed 00153 if (msg) 00154 { 00155 try 00156 { 00157 self = shared_from_this(); 00158 } 00159 catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr 00160 {} 00161 00162 SubscriptionCallbackHelperCallParams params; 00163 params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction()); 00164 i.helper->call(params); 00165 } 00166 00167 return CallbackInterface::Success; 00168 } 00169 00170 bool SubscriptionQueue::ready() 00171 { 00172 return true; 00173 } 00174 00175 bool SubscriptionQueue::full() 00176 { 00177 boost::mutex::scoped_lock lock(queue_mutex_); 00178 return fullNoLock(); 00179 } 00180 00181 bool SubscriptionQueue::fullNoLock() 00182 { 00183 return (size_ > 0) && (queue_size_ >= (uint32_t)size_); 00184 } 00185 00186 } 00187