subscription_queue.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 
00029 #include "ros/subscription_queue.h"
00030 #include "ros/message_deserializer.h"
00031 #include "ros/subscription_callback_helper.h"
00032 
00033 namespace ros
00034 {
00035 
00036 SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks)
00037 : topic_(topic)
00038 , size_(queue_size)
00039 , full_(false)
00040 , queue_size_(0)
00041 , allow_concurrent_callbacks_(allow_concurrent_callbacks)
00042 {}
00043 
00044 SubscriptionQueue::~SubscriptionQueue()
00045 {
00046 
00047 }
00048 
00049 void SubscriptionQueue::push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer,
00050                                  bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
00051                                  ros::Time receipt_time, bool* was_full)
00052 {
00053   boost::mutex::scoped_lock lock(queue_mutex_);
00054 
00055   if (was_full)
00056   {
00057     *was_full = false;
00058   }
00059 
00060   if(fullNoLock())
00061   {
00062     queue_.pop_front();
00063     --queue_size_;
00064 
00065     if (!full_)
00066     {
00067       ROS_DEBUG("Incoming queue full for topic \"%s\".  Discarding oldest message (current queue size [%d])", topic_.c_str(), (int)queue_.size());
00068     }
00069 
00070     full_ = true;
00071 
00072     if (was_full)
00073     {
00074       *was_full = true;
00075     }
00076   }
00077   else
00078   {
00079     full_ = false;
00080   }
00081 
00082   Item i;
00083   i.helper = helper;
00084   i.deserializer = deserializer;
00085   i.has_tracked_object = has_tracked_object;
00086   i.tracked_object = tracked_object;
00087   i.nonconst_need_copy = nonconst_need_copy;
00088   i.receipt_time = receipt_time;
00089   queue_.push_back(i);
00090   ++queue_size_;
00091 }
00092 
00093 void SubscriptionQueue::clear()
00094 {
00095   boost::recursive_mutex::scoped_lock cb_lock(callback_mutex_);
00096   boost::mutex::scoped_lock queue_lock(queue_mutex_);
00097 
00098   queue_.clear();
00099   queue_size_ = 0;
00100 }
00101 
00102 CallbackInterface::CallResult SubscriptionQueue::call()
00103 {
00104   // The callback may result in our own destruction.  Therefore, we may need to keep a reference to ourselves
00105   // that outlasts the scoped_try_lock
00106   boost::shared_ptr<SubscriptionQueue> self;
00107   boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
00108 
00109   if (!allow_concurrent_callbacks_)
00110   {
00111     lock.try_lock();
00112     if (!lock.owns_lock())
00113     {
00114       return CallbackInterface::TryAgain;
00115     }
00116   }
00117 
00118   VoidConstPtr tracker;
00119   Item i;
00120 
00121   {
00122     boost::mutex::scoped_lock lock(queue_mutex_);
00123 
00124     if (queue_.empty())
00125     {
00126       return CallbackInterface::Invalid;
00127     }
00128 
00129     i = queue_.front();
00130 
00131     if (queue_.empty())
00132     {
00133       return CallbackInterface::Invalid;
00134     }
00135 
00136     if (i.has_tracked_object)
00137     {
00138       tracker = i.tracked_object.lock();
00139 
00140       if (!tracker)
00141       {
00142         return CallbackInterface::Invalid;
00143       }
00144     }
00145 
00146     queue_.pop_front();
00147     --queue_size_;
00148   }
00149 
00150   VoidConstPtr msg = i.deserializer->deserialize();
00151 
00152   // msg can be null here if deserialization failed
00153   if (msg)
00154   {
00155     try
00156     {
00157       self = shared_from_this();
00158     }
00159     catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr
00160     {}
00161 
00162     SubscriptionCallbackHelperCallParams params;
00163     params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
00164     i.helper->call(params);
00165   }
00166 
00167   return CallbackInterface::Success;
00168 }
00169 
00170 bool SubscriptionQueue::ready()
00171 {
00172   return true;
00173 }
00174 
00175 bool SubscriptionQueue::full()
00176 {
00177   boost::mutex::scoped_lock lock(queue_mutex_);
00178   return fullNoLock();
00179 }
00180 
00181 bool SubscriptionQueue::fullNoLock()
00182 {
00183   return (size_ > 0) && (queue_size_ >= (uint32_t)size_);
00184 }
00185 
00186 }
00187 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47