00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <gtest/gtest.h>
00037 #include "ros/subscription_queue.h"
00038 #include "ros/message_deserializer.h"
00039 #include "ros/callback_queue_interface.h"
00040 #include "ros/subscription_callback_helper.h"
00041 #include "ros/init.h"
00042
00043 #include <boost/shared_array.hpp>
00044 #include <boost/bind.hpp>
00045 #include <boost/thread.hpp>
00046
00047 using namespace ros;
00048
00049 class FakeMessage
00050 {
00051 public:
00052 virtual const std::string __getDataType() const { return ""; }
00053 virtual const std::string __getMD5Sum() const { return ""; }
00054 virtual const std::string __getMessageDefinition() const { return ""; }
00055 virtual uint32_t serializationLength() const { return 0; }
00056 virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { return write_ptr; }
00057 virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; }
00058 };
00059
00060 class FakeSubHelper : public SubscriptionCallbackHelper
00061 {
00062 public:
00063 FakeSubHelper()
00064 : calls_(0)
00065 {}
00066
00067 virtual VoidConstPtr deserialize(const SubscriptionCallbackHelperDeserializeParams&)
00068 {
00069 return VoidConstPtr(new FakeMessage);
00070 }
00071
00072 virtual std::string getMD5Sum() { return ""; }
00073 virtual std::string getDataType() { return ""; }
00074
00075 virtual void call(SubscriptionCallbackHelperCallParams& params)
00076 {
00077 {
00078 boost::mutex::scoped_lock lock(mutex_);
00079 ++calls_;
00080 }
00081
00082 if (cb_)
00083 {
00084 cb_();
00085 }
00086 }
00087
00088 virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); }
00089 virtual bool isConst() { return true; }
00090
00091 boost::mutex mutex_;
00092 int32_t calls_;
00093
00094 boost::function<void(void)> cb_;
00095 };
00096 typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;
00097
00098 TEST(SubscriptionQueue, queueSize)
00099 {
00100 SubscriptionQueue queue("blah", 1, false);
00101
00102 FakeSubHelperPtr helper(new FakeSubHelper);
00103 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00104
00105 ASSERT_FALSE(queue.full());
00106
00107 queue.push(helper, des, false, VoidConstWPtr(), true);
00108
00109 ASSERT_TRUE(queue.full());
00110
00111 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00112
00113 ASSERT_FALSE(queue.full());
00114
00115 queue.push(helper, des, false, VoidConstWPtr(), true);
00116
00117 ASSERT_TRUE(queue.full());
00118
00119 ASSERT_TRUE(queue.ready());
00120
00121 queue.push(helper, des, false, VoidConstWPtr(), true);
00122
00123 ASSERT_TRUE(queue.full());
00124
00125 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00126 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00127
00128 ASSERT_EQ(helper->calls_, 2);
00129 }
00130
00131 TEST(SubscriptionQueue, infiniteQueue)
00132 {
00133 SubscriptionQueue queue("blah", 0, false);
00134
00135 FakeSubHelperPtr helper(new FakeSubHelper);
00136 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00137
00138 ASSERT_FALSE(queue.full());
00139
00140 queue.push(helper, des, false, VoidConstWPtr(), true);
00141 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00142
00143 ASSERT_FALSE(queue.full());
00144
00145 for (int i = 0; i < 10000; ++i)
00146 {
00147 queue.push(helper, des, false, VoidConstWPtr(), true);
00148 }
00149
00150 ASSERT_FALSE(queue.full());
00151
00152 for (int i = 0; i < 10000; ++i)
00153 {
00154 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00155 }
00156
00157 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00158
00159 ASSERT_EQ(helper->calls_, 10001);
00160 }
00161
00162 TEST(SubscriptionQueue, clearCall)
00163 {
00164 SubscriptionQueue queue("blah", 1, false);
00165
00166 FakeSubHelperPtr helper(new FakeSubHelper);
00167 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00168
00169 queue.push(helper, des, false, VoidConstWPtr(), true);
00170 queue.clear();
00171 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00172 }
00173
00174 TEST(SubscriptionQueue, clearThenAddAndCall)
00175 {
00176 SubscriptionQueue queue("blah", 1, false);
00177
00178 FakeSubHelperPtr helper(new FakeSubHelper);
00179 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00180
00181 queue.push(helper, des, false, VoidConstWPtr(), true);
00182 queue.clear();
00183 queue.push(helper, des, false, VoidConstWPtr(), true);
00184 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00185 }
00186
00187 void clearInCallbackCallback(SubscriptionQueue& queue)
00188 {
00189 queue.clear();
00190 }
00191
00192 TEST(SubscriptionQueue, clearInCallback)
00193 {
00194 SubscriptionQueue queue("blah", 1, false);
00195
00196 FakeSubHelperPtr helper(new FakeSubHelper);
00197 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00198
00199 helper->cb_ = boost::bind(clearInCallbackCallback, boost::ref(queue));
00200 queue.push(helper, des, false, VoidConstWPtr(), true);
00201 queue.call();
00202 }
00203
00204 void clearWhileThreadIsBlockingCallback(bool* done, boost::barrier* barrier)
00205 {
00206 barrier->wait();
00207 ros::WallDuration(.1).sleep();
00208 *done = true;
00209 }
00210
00211 void callThread(SubscriptionQueue& queue)
00212 {
00213 queue.call();
00214 }
00215
00216 TEST(SubscriptionQueue, clearWhileThreadIsBlocking)
00217 {
00218 SubscriptionQueue queue("blah", 1, false);
00219
00220 FakeSubHelperPtr helper(new FakeSubHelper);
00221 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00222
00223 bool done = false;
00224 boost::barrier barrier(2);
00225 helper->cb_ = boost::bind(clearWhileThreadIsBlockingCallback, &done, &barrier);
00226 queue.push(helper, des, false, VoidConstWPtr(), true);
00227 boost::thread t(callThread, boost::ref(queue));
00228 barrier.wait();
00229
00230 queue.clear();
00231
00232 ASSERT_TRUE(done);
00233 }
00234
00235 void waitForBarrier(boost::barrier* bar)
00236 {
00237 bar->wait();
00238 }
00239
00240 TEST(SubscriptionQueue, concurrentCallbacks)
00241 {
00242 SubscriptionQueue queue("blah", 0, true);
00243 FakeSubHelperPtr helper(new FakeSubHelper);
00244 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00245
00246 boost::barrier bar(2);
00247 helper->cb_ = boost::bind(waitForBarrier, &bar);
00248 queue.push(helper, des, false, VoidConstWPtr(), true);
00249 queue.push(helper, des, false, VoidConstWPtr(), true);
00250 boost::thread t1(callThread, boost::ref(queue));
00251 boost::thread t2(callThread, boost::ref(queue));
00252 t1.join();
00253 t2.join();
00254
00255 ASSERT_EQ(helper->calls_, 2);
00256 }
00257
00258 void waitForASecond()
00259 {
00260 ros::WallDuration(1.0).sleep();
00261 }
00262
00263 TEST(SubscriptionQueue, nonConcurrentOrdering)
00264 {
00265 SubscriptionQueue queue("blah", 0, false);
00266 FakeSubHelperPtr helper(new FakeSubHelper);
00267 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00268
00269 helper->cb_ = waitForASecond;
00270 queue.push(helper, des, false, VoidConstWPtr(), true);
00271 queue.push(helper, des, false, VoidConstWPtr(), true);
00272 boost::thread t1(callThread, boost::ref(queue));
00273 boost::thread t2(callThread, boost::ref(queue));
00274 t1.join();
00275 t2.join();
00276
00277 ASSERT_EQ(helper->calls_, 1);
00278 queue.call();
00279 ASSERT_EQ(helper->calls_, 2);
00280 }
00281
00282 int main(int argc, char** argv)
00283 {
00284 testing::InitGoogleTest(&argc, argv);
00285 ros::init(argc, argv, "blah");
00286 return RUN_ALL_TESTS();
00287 }
00288
00289