00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <gtest/gtest.h>
00037 #include "ros/subscription_queue.h"
00038 #include "ros/message_deserializer.h"
00039 #include "ros/callback_queue_interface.h"
00040 #include "ros/subscription_callback_helper.h"
00041 #include "ros/init.h"
00042
00043 #include <boost/shared_array.hpp>
00044 #include <boost/bind.hpp>
00045 #include <boost/thread.hpp>
00046
00047 using namespace ros;
00048
00049 class FakeMessage
00050 {
00051 public:
00052 virtual const std::string __getDataType() const { return ""; }
00053 virtual const std::string __getMD5Sum() const { return ""; }
00054 virtual const std::string __getMessageDefinition() const { return ""; }
00055 virtual uint32_t serializationLength() const { return 0; }
00056 virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { (void)seq; return write_ptr; }
00057 virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; }
00058 };
00059
00060 class FakeSubHelper : public SubscriptionCallbackHelper
00061 {
00062 public:
00063 FakeSubHelper()
00064 : calls_(0)
00065 {}
00066
00067 virtual VoidConstPtr deserialize(const SubscriptionCallbackHelperDeserializeParams&)
00068 {
00069 return boost::make_shared<FakeMessage>();
00070 }
00071
00072 virtual std::string getMD5Sum() { return ""; }
00073 virtual std::string getDataType() { return ""; }
00074
00075 virtual void call(SubscriptionCallbackHelperCallParams& params)
00076 {
00077 (void)params;
00078 {
00079 boost::mutex::scoped_lock lock(mutex_);
00080 ++calls_;
00081 }
00082
00083 if (cb_)
00084 {
00085 cb_();
00086 }
00087 }
00088
00089 virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); }
00090 virtual bool isConst() { return true; }
00091 virtual bool hasHeader() { return false; }
00092
00093 boost::mutex mutex_;
00094 int32_t calls_;
00095
00096 boost::function<void(void)> cb_;
00097 };
00098 typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;
00099
00100 TEST(SubscriptionQueue, queueSize)
00101 {
00102 SubscriptionQueue queue("blah", 1, false);
00103
00104 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00105 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00106
00107 ASSERT_FALSE(queue.full());
00108
00109 queue.push(helper, des, false, VoidConstWPtr(), true);
00110
00111 ASSERT_TRUE(queue.full());
00112
00113 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00114
00115 ASSERT_FALSE(queue.full());
00116
00117 queue.push(helper, des, false, VoidConstWPtr(), true);
00118
00119 ASSERT_TRUE(queue.full());
00120
00121 ASSERT_TRUE(queue.ready());
00122
00123 queue.push(helper, des, false, VoidConstWPtr(), true);
00124
00125 ASSERT_TRUE(queue.full());
00126
00127 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00128 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00129
00130 ASSERT_EQ(helper->calls_, 2);
00131 }
00132
00133 TEST(SubscriptionQueue, infiniteQueue)
00134 {
00135 SubscriptionQueue queue("blah", 0, false);
00136
00137 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00138 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00139
00140 ASSERT_FALSE(queue.full());
00141
00142 queue.push(helper, des, false, VoidConstWPtr(), true);
00143 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00144
00145 ASSERT_FALSE(queue.full());
00146
00147 for (int i = 0; i < 10000; ++i)
00148 {
00149 queue.push(helper, des, false, VoidConstWPtr(), true);
00150 }
00151
00152 ASSERT_FALSE(queue.full());
00153
00154 for (int i = 0; i < 10000; ++i)
00155 {
00156 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00157 }
00158
00159 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00160
00161 ASSERT_EQ(helper->calls_, 10001);
00162 }
00163
00164 TEST(SubscriptionQueue, clearCall)
00165 {
00166 SubscriptionQueue queue("blah", 1, false);
00167
00168 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00169 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00170
00171 queue.push(helper, des, false, VoidConstWPtr(), true);
00172 queue.clear();
00173 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00174 }
00175
00176 TEST(SubscriptionQueue, clearThenAddAndCall)
00177 {
00178 SubscriptionQueue queue("blah", 1, false);
00179
00180 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00181 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00182
00183 queue.push(helper, des, false, VoidConstWPtr(), true);
00184 queue.clear();
00185 queue.push(helper, des, false, VoidConstWPtr(), true);
00186 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00187 }
00188
00189 void clearInCallbackCallback(SubscriptionQueue& queue)
00190 {
00191 queue.clear();
00192 }
00193
00194 TEST(SubscriptionQueue, clearInCallback)
00195 {
00196 SubscriptionQueue queue("blah", 1, false);
00197
00198 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00199 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00200
00201 helper->cb_ = boost::bind(clearInCallbackCallback, boost::ref(queue));
00202 queue.push(helper, des, false, VoidConstWPtr(), true);
00203 queue.call();
00204 }
00205
00206 void clearWhileThreadIsBlockingCallback(bool* done, boost::barrier* barrier)
00207 {
00208 barrier->wait();
00209 ros::WallDuration(.1).sleep();
00210 *done = true;
00211 }
00212
00213 void callThread(SubscriptionQueue& queue)
00214 {
00215 queue.call();
00216 }
00217
00218 TEST(SubscriptionQueue, clearWhileThreadIsBlocking)
00219 {
00220 SubscriptionQueue queue("blah", 1, false);
00221
00222 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00223 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00224
00225 bool done = false;
00226 boost::barrier barrier(2);
00227 helper->cb_ = boost::bind(clearWhileThreadIsBlockingCallback, &done, &barrier);
00228 queue.push(helper, des, false, VoidConstWPtr(), true);
00229 boost::thread t(callThread, boost::ref(queue));
00230 barrier.wait();
00231
00232 queue.clear();
00233
00234 ASSERT_TRUE(done);
00235 }
00236
00237 void waitForBarrier(boost::barrier* bar)
00238 {
00239 bar->wait();
00240 }
00241
00242 TEST(SubscriptionQueue, concurrentCallbacks)
00243 {
00244 SubscriptionQueue queue("blah", 0, true);
00245 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00246 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00247
00248 boost::barrier bar(2);
00249 helper->cb_ = boost::bind(waitForBarrier, &bar);
00250 queue.push(helper, des, false, VoidConstWPtr(), true);
00251 queue.push(helper, des, false, VoidConstWPtr(), true);
00252 boost::thread t1(callThread, boost::ref(queue));
00253 boost::thread t2(callThread, boost::ref(queue));
00254 t1.join();
00255 t2.join();
00256
00257 ASSERT_EQ(helper->calls_, 2);
00258 }
00259
00260 void waitForASecond()
00261 {
00262 ros::WallDuration(1.0).sleep();
00263 }
00264
00265 TEST(SubscriptionQueue, nonConcurrentOrdering)
00266 {
00267 SubscriptionQueue queue("blah", 0, false);
00268 FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
00269 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00270
00271 helper->cb_ = waitForASecond;
00272 queue.push(helper, des, false, VoidConstWPtr(), true);
00273 queue.push(helper, des, false, VoidConstWPtr(), true);
00274 boost::thread t1(callThread, boost::ref(queue));
00275 boost::thread t2(callThread, boost::ref(queue));
00276 t1.join();
00277 t2.join();
00278
00279 ASSERT_EQ(helper->calls_, 1);
00280 queue.call();
00281 ASSERT_EQ(helper->calls_, 2);
00282 }
00283
00284 int main(int argc, char** argv)
00285 {
00286 testing::InitGoogleTest(&argc, argv);
00287 ros::init(argc, argv, "blah");
00288 return RUN_ALL_TESTS();
00289 }
00290
00291