00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <gtest/gtest.h>
00037 #include "ros/subscription_queue.h"
00038 #include "ros/message_deserializer.h"
00039 #include "ros/message.h"
00040 #include "ros/callback_queue_interface.h"
00041 #include "ros/subscription_callback_helper.h"
00042 #include "ros/init.h"
00043
00044 #include <boost/shared_array.hpp>
00045 #include <boost/bind.hpp>
00046 #include <boost/thread.hpp>
00047
00048 using namespace ros;
00049
00050 class FakeMessage : public Message
00051 {
00052 public:
00053 virtual const std::string __getDataType() const { return ""; }
00054 virtual const std::string __getMD5Sum() const { return ""; }
00055 virtual const std::string __getMessageDefinition() const { return ""; }
00056 virtual uint32_t serializationLength() const { return 0; }
00057 virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { return write_ptr; }
00058 virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; }
00059 };
00060
00061 class FakeSubHelper : public SubscriptionCallbackHelper
00062 {
00063 public:
00064 FakeSubHelper()
00065 : calls_(0)
00066 {}
00067
00068 virtual VoidConstPtr deserialize(const SubscriptionCallbackHelperDeserializeParams&)
00069 {
00070 return VoidConstPtr(new FakeMessage);
00071 }
00072
00073 virtual std::string getMD5Sum() { return ""; }
00074 virtual std::string getDataType() { return ""; }
00075
00076 virtual void call(SubscriptionCallbackHelperCallParams& params)
00077 {
00078 {
00079 boost::mutex::scoped_lock lock(mutex_);
00080 ++calls_;
00081 }
00082
00083 if (cb_)
00084 {
00085 cb_();
00086 }
00087 }
00088
00089 virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); }
00090 virtual bool isConst() { return true; }
00091
00092 boost::mutex mutex_;
00093 int32_t calls_;
00094
00095 boost::function<void(void)> cb_;
00096 };
00097 typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;
00098
00099 TEST(SubscriptionQueue, queueSize)
00100 {
00101 SubscriptionQueue queue("blah", 1, false);
00102
00103 FakeSubHelperPtr helper(new FakeSubHelper);
00104 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00105
00106 ASSERT_FALSE(queue.full());
00107
00108 queue.push(helper, des, false, VoidConstWPtr(), true);
00109
00110 ASSERT_TRUE(queue.full());
00111
00112 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00113
00114 ASSERT_FALSE(queue.full());
00115
00116 queue.push(helper, des, false, VoidConstWPtr(), true);
00117
00118 ASSERT_TRUE(queue.full());
00119
00120 ASSERT_TRUE(queue.ready());
00121
00122 queue.push(helper, des, false, VoidConstWPtr(), true);
00123
00124 ASSERT_TRUE(queue.full());
00125
00126 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00127 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00128
00129 ASSERT_EQ(helper->calls_, 2);
00130 }
00131
00132 TEST(SubscriptionQueue, infiniteQueue)
00133 {
00134 SubscriptionQueue queue("blah", 0, false);
00135
00136 FakeSubHelperPtr helper(new FakeSubHelper);
00137 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00138
00139 ASSERT_FALSE(queue.full());
00140
00141 queue.push(helper, des, false, VoidConstWPtr(), true);
00142 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00143
00144 ASSERT_FALSE(queue.full());
00145
00146 for (int i = 0; i < 10000; ++i)
00147 {
00148 queue.push(helper, des, false, VoidConstWPtr(), true);
00149 }
00150
00151 ASSERT_FALSE(queue.full());
00152
00153 for (int i = 0; i < 10000; ++i)
00154 {
00155 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00156 }
00157
00158 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00159
00160 ASSERT_EQ(helper->calls_, 10001);
00161 }
00162
00163 TEST(SubscriptionQueue, clearCall)
00164 {
00165 SubscriptionQueue queue("blah", 1, false);
00166
00167 FakeSubHelperPtr helper(new FakeSubHelper);
00168 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00169
00170 queue.push(helper, des, false, VoidConstWPtr(), true);
00171 queue.clear();
00172 ASSERT_EQ(queue.call(), CallbackInterface::Invalid);
00173 }
00174
00175 TEST(SubscriptionQueue, clearThenAddAndCall)
00176 {
00177 SubscriptionQueue queue("blah", 1, false);
00178
00179 FakeSubHelperPtr helper(new FakeSubHelper);
00180 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00181
00182 queue.push(helper, des, false, VoidConstWPtr(), true);
00183 queue.clear();
00184 queue.push(helper, des, false, VoidConstWPtr(), true);
00185 ASSERT_EQ(queue.call(), CallbackInterface::Success);
00186 }
00187
00188 void clearInCallbackCallback(SubscriptionQueue& queue)
00189 {
00190 queue.clear();
00191 }
00192
00193 TEST(SubscriptionQueue, clearInCallback)
00194 {
00195 SubscriptionQueue queue("blah", 1, false);
00196
00197 FakeSubHelperPtr helper(new FakeSubHelper);
00198 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00199
00200 helper->cb_ = boost::bind(clearInCallbackCallback, boost::ref(queue));
00201 queue.push(helper, des, false, VoidConstWPtr(), true);
00202 queue.call();
00203 }
00204
00205 void clearWhileThreadIsBlockingCallback(bool* done, boost::barrier* barrier)
00206 {
00207 barrier->wait();
00208 ros::WallDuration(.1).sleep();
00209 *done = true;
00210 }
00211
00212 void callThread(SubscriptionQueue& queue)
00213 {
00214 queue.call();
00215 }
00216
00217 TEST(SubscriptionQueue, clearWhileThreadIsBlocking)
00218 {
00219 SubscriptionQueue queue("blah", 1, false);
00220
00221 FakeSubHelperPtr helper(new FakeSubHelper);
00222 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00223
00224 bool done = false;
00225 boost::barrier barrier(2);
00226 helper->cb_ = boost::bind(clearWhileThreadIsBlockingCallback, &done, &barrier);
00227 queue.push(helper, des, false, VoidConstWPtr(), true);
00228 boost::thread t(callThread, boost::ref(queue));
00229 barrier.wait();
00230
00231 queue.clear();
00232
00233 ASSERT_TRUE(done);
00234 }
00235
00236 void waitForBarrier(boost::barrier* bar)
00237 {
00238 bar->wait();
00239 }
00240
00241 TEST(SubscriptionQueue, concurrentCallbacks)
00242 {
00243 SubscriptionQueue queue("blah", 0, true);
00244 FakeSubHelperPtr helper(new FakeSubHelper);
00245 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00246
00247 boost::barrier bar(2);
00248 helper->cb_ = boost::bind(waitForBarrier, &bar);
00249 queue.push(helper, des, false, VoidConstWPtr(), true);
00250 queue.push(helper, des, false, VoidConstWPtr(), true);
00251 boost::thread t1(callThread, boost::ref(queue));
00252 boost::thread t2(callThread, boost::ref(queue));
00253 t1.join();
00254 t2.join();
00255
00256 ASSERT_EQ(helper->calls_, 2);
00257 }
00258
00259 void waitForASecond()
00260 {
00261 ros::WallDuration(1.0).sleep();
00262 }
00263
00264 TEST(SubscriptionQueue, nonConcurrentOrdering)
00265 {
00266 SubscriptionQueue queue("blah", 0, false);
00267 FakeSubHelperPtr helper(new FakeSubHelper);
00268 MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
00269
00270 helper->cb_ = waitForASecond;
00271 queue.push(helper, des, false, VoidConstWPtr(), true);
00272 queue.push(helper, des, false, VoidConstWPtr(), true);
00273 boost::thread t1(callThread, boost::ref(queue));
00274 boost::thread t2(callThread, boost::ref(queue));
00275 t1.join();
00276 t2.join();
00277
00278 ASSERT_EQ(helper->calls_, 1);
00279 queue.call();
00280 ASSERT_EQ(helper->calls_, 2);
00281 }
00282
00283 int main(int argc, char** argv)
00284 {
00285 testing::InitGoogleTest(&argc, argv);
00286 ros::init(argc, argv, "blah");
00287 return RUN_ALL_TESTS();
00288 }
00289
00290