subscription_queue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 
29 #include "ros/subscription_queue.h"
32 
33 namespace ros
34 {
35 
36 SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks)
37 : topic_(topic)
38 , size_(queue_size)
39 , full_(false)
40 , queue_size_(0)
41 , allow_concurrent_callbacks_(allow_concurrent_callbacks)
42 {}
43 
45 {
46 
47 }
48 
50  bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
51  ros::Time receipt_time, bool* was_full)
52 {
53  boost::mutex::scoped_lock lock(queue_mutex_);
54 
55  if (was_full)
56  {
57  *was_full = false;
58  }
59 
60  if(fullNoLock())
61  {
62  queue_.pop_front();
63  --queue_size_;
64 
65  if (!full_)
66  {
67  ROS_DEBUG("Incoming queue was full for topic \"%s\". Discarded oldest message (current queue size [%d])", topic_.c_str(), (int)queue_.size());
68  }
69 
70  full_ = true;
71 
72  if (was_full)
73  {
74  *was_full = true;
75  }
76  }
77  else
78  {
79  full_ = false;
80  }
81 
82  Item i;
83  i.helper = helper;
84  i.deserializer = deserializer;
85  i.has_tracked_object = has_tracked_object;
86  i.tracked_object = tracked_object;
87  i.nonconst_need_copy = nonconst_need_copy;
88  i.receipt_time = receipt_time;
89  queue_.push_back(i);
90  ++queue_size_;
91 }
92 
94 {
95  boost::recursive_mutex::scoped_lock cb_lock(callback_mutex_);
96  boost::mutex::scoped_lock queue_lock(queue_mutex_);
97 
98  queue_.clear();
99  queue_size_ = 0;
100 }
101 
103 {
104  // The callback may result in our own destruction. Therefore, we may need to keep a reference to ourselves
105  // that outlasts the scoped_try_lock
107  boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
108 
110  {
111  lock.try_lock();
112  if (!lock.owns_lock())
113  {
115  }
116  }
117 
118  VoidConstPtr tracker;
119  Item i;
120 
121  {
122  boost::mutex::scoped_lock lock(queue_mutex_);
123 
124  if (queue_.empty())
125  {
127  }
128 
129  i = queue_.front();
130 
131  if (queue_.empty())
132  {
134  }
135 
136  if (i.has_tracked_object)
137  {
138  tracker = i.tracked_object.lock();
139 
140  if (!tracker)
141  {
143  }
144  }
145 
146  queue_.pop_front();
147  --queue_size_;
148  }
149 
150  VoidConstPtr msg = i.deserializer->deserialize();
151 
152  // msg can be null here if deserialization failed
153  if (msg)
154  {
155  try
156  {
157  self = shared_from_this();
158  }
159  catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr
160  {}
161 
164  i.helper->call(params);
165  }
166 
168 }
169 
171 {
173  {
174  return true;
175  }
176  boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::try_to_lock);
177  return lock.owns_lock();
178 }
179 
181 {
182  boost::mutex::scoped_lock lock(queue_mutex_);
183  return fullNoLock();
184 }
185 
187 {
188  return (size_ > 0) && (queue_size_ >= (uint32_t)size_);
189 }
190 
191 }
192 
CallResult
Possible results for the call() method.
MessageDeserializerPtr deserializer
SubscriptionCallbackHelperPtr helper
virtual CallbackInterface::CallResult call()
Call this callback.
void push(const SubscriptionCallbackHelperPtr &helper, const MessageDeserializerPtr &deserializer, bool has_tracked_object, const VoidConstWPtr &tracked_object, bool nonconst_need_copy, ros::Time receipt_time=ros::Time(), bool *was_full=0)
boost::recursive_mutex callback_mutex_
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
SubscriptionQueue(const std::string &topic, int32_t queue_size, bool allow_concurrent_callbacks)
boost::function< MessagePtr()> CreateFunction
virtual bool ready()
Provides the opportunity for specifying that a callback is not ready to be called before call() actua...
Call not ready, try again later.
#define ROS_DEBUG(...)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27